Skip to content

Commit 636710b

Browse files
authored
add simple batch support for sink record processing (#51)
There is a new configuration parameter mongodb.max.batch.size=N to specify the batching behaviour. For reasons of backwards compatibility the default value of N=0 means that no batching is applied. For values of N>=1 the write behaviour towards the configured MongoDB instance guarantees that at most N records are written within a single bulkwrite operation. This resolves #49
1 parent d5e06f0 commit 636710b

File tree

6 files changed

+197
-49
lines changed

6 files changed

+197
-49
lines changed

README.md

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -546,25 +546,26 @@ The sink records are converted to BSON documents which are in turn inserted into
546546

547547
Data is written using acknowledged writes and the configured write concern level of the connection as specified in the connection URI. If the bulk write fails (totally or partially) errors are logged and a simple retry logic is in place. More robust/sophisticated failure mode handling has yet to be implemented.
548548

549-
### Sink Connector Properties
549+
### Sink Connector Configuration Properties
550550

551551
At the moment the following settings can be configured by means of the *connector.properties* file. For a config file containing default settings see [this example](https://github.com/hpgrahsl/kafka-connect-mongodb/blob/master/config/MongoDbSinkConnector.properties).
552552

553553
| Name | Description | Type | Default | Valid Values | Importance |
554554
|-------------------------------------|----------------------------------------------------------------------------------------|---------|-------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------|------------|
555555
| mongodb.collection | single sink collection name to write to | string | kafkatopic | | high |
556556
| mongodb.connection.uri | the monogdb connection URI as supported by the offical drivers | string | mongodb://localhost:27017/kafkaconnect?w=1&journal=true | | high |
557-
| mongodb.document.id.strategy | class name of strategy to use for generating a unique document id (_id) | string | at.grahsl.kafka.connect.mongodb.processor.id.strategy.BsonOidStrategy | at.grahsl.kafka.connect.mongodb.MongoDbSinkConnectorConfig$ValidatorWithOperators$$Lambda$193/910091170@3f191845 | high |
557+
| mongodb.document.id.strategy | class name of strategy to use for generating a unique document id (_id) | string | at.grahsl.kafka.connect.mongodb.processor.id.strategy.BsonOidStrategy | valid fully-qualified class name which implements IdStrategy and is available on the classpath | high |
558558
| mongodb.delete.on.null.values | whether or not the connector tries to delete documents based on key when value is null | boolean | false | | medium |
559+
| mongodb.max.batch.size | maximum number of sink records to possibly batch together for processing | int | 0 | [0,...] | medium |
559560
| mongodb.max.num.retries | how often a retry should be done on write errors | int | 3 | [0,...] | medium |
560561
| mongodb.retries.defer.timeout | how long in ms a retry should get deferred | int | 5000 | [0,...] | medium |
561-
| mongodb.change.data.capture.handler | class name of CDC handler to use for processing | string | "" | at.grahsl.kafka.connect.mongodb.MongoDbSinkConnectorConfig$ValidatorWithOperators$$Lambda$193/910091170@5f049ea1 | low |
562-
| mongodb.document.id.strategies | comma separated list of custom strategy classes to register for usage | string | "" | at.grahsl.kafka.connect.mongodb.MongoDbSinkConnectorConfig$ValidatorWithOperators$$Lambda$193/910091170@72cc7e6f | low |
563-
| mongodb.field.renamer.mapping | inline JSON array with objects describing field name mappings (see docs) | string | [] | | low |
564-
| mongodb.field.renamer.regexp | inline JSON array with objects describing regexp settings (see docs) | string | [] | | low |
562+
| mongodb.change.data.capture.handler | class name of CDC handler to use for processing | string | "" | valid fully-qualified class name which extends CdcHandler and is available on the classpath | low |
563+
| mongodb.document.id.strategies | comma separated list of custom strategy classes to register for usage | string | "" | list of valid fully-qualified class names which implement IdStrategy and are available on the classpath | low |
564+
| mongodb.field.renamer.mapping | inline JSON array with objects describing field name mappings | string | [] | | low |
565+
| mongodb.field.renamer.regexp | inline JSON array with objects describing regexp settings | string | [] | | low |
565566
| mongodb.key.projection.list | comma separated list of field names for key projection | string | "" | | low |
566567
| mongodb.key.projection.type | whether or not and which key projection to use | string | none | [none, blacklist, whitelist] | low |
567-
| mongodb.post.processor.chain | comma separated list of post processor classes to build the chain with | string | at.grahsl.kafka.connect.mongodb.processor.DocumentIdAdder | at.grahsl.kafka.connect.mongodb.MongoDbSinkConnectorConfig$ValidatorWithOperators$$Lambda$193/910091170@5afa3c9 | low |
568+
| mongodb.post.processor.chain | comma separated list of post processor classes to build the chain with | string | at.grahsl.kafka.connect.mongodb.processor.DocumentIdAdder | list of valid fully-qualified class names which extend PostProcessor and are available on the classpath | low |
568569
| mongodb.value.projection.list | comma separated list of field names for value projection | string | "" | | low |
569570
| mongodb.value.projection.type | whether or not and which value projection to use | string | none | [none, blacklist, whitelist] | low |
570571
| mongodb.writemodel.strategy | how to build the write models for the sink documents | string | at.grahsl.kafka.connect.mongodb.writemodel.strategy.ReplaceOneDefaultStrategy | | low |

config/MongoDbSinkConnector.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,3 +43,4 @@ mongodb.post.processor.chain=at.grahsl.kafka.connect.mongodb.processor.DocumentI
4343
mongodb.change.data.capture.handler=
4444
mongodb.delete.on.null.values=false
4545
mongodb.writemodel.strategy=at.grahsl.kafka.connect.mongodb.writemodel.strategy.ReplaceOneDefaultStrategy
46+
mongodb.max.batch.size=0

src/main/java/at/grahsl/kafka/connect/mongodb/MongoDbSinkConnectorConfig.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ public enum FieldProjectionTypes {
7575
public static final String MONGODB_CHANGE_DATA_CAPTURE_HANDLER_DEFAULT = "";
7676
public static final boolean MONGODB_DELETE_ON_NULL_VALUES_DEFAULT = false;
7777
public static final String MONGODB_WRITEMODEL_STRATEGY_DEFAULT = "at.grahsl.kafka.connect.mongodb.writemodel.strategy.ReplaceOneDefaultStrategy";
78+
public static final int MONGODB_MAX_BATCH_SIZE_DEFAULT = 0;
7879

7980
public static final String MONGODB_CONNECTION_URI_CONF = "mongodb.connection.uri";
8081
private static final String MONGODB_CONNECTION_URI_DOC = "the monogdb connection URI as supported by the offical drivers";
@@ -107,10 +108,10 @@ public enum FieldProjectionTypes {
107108
private static final String MONGODB_KEY_PROJECTION_LIST_DOC = "comma separated list of field names for key projection";
108109

109110
public static final String MONGODB_FIELD_RENAMER_MAPPING = "mongodb.field.renamer.mapping";
110-
private static final String MONGODB_FIELD_RENAMER_MAPPING_DOC = "inline JSON array with objects describing field name mappings (see docs)";
111+
private static final String MONGODB_FIELD_RENAMER_MAPPING_DOC = "inline JSON array with objects describing field name mappings";
111112

112113
public static final String MONGODB_FIELD_RENAMER_REGEXP = "mongodb.field.renamer.regexp";
113-
private static final String MONGODB_FIELD_RENAMER_REGEXP_DOC = "inline JSON array with objects describing regexp settings (see docs)";
114+
private static final String MONGODB_FIELD_RENAMER_REGEXP_DOC = "inline JSON array with objects describing regexp settings";
114115

115116
public static final String MONGODB_POST_PROCESSOR_CHAIN = "mongodb.post.processor.chain";
116117
private static final String MONGODB_POST_PROCESSOR_CHAIN_DOC = "comma separated list of post processor classes to build the chain with";
@@ -124,6 +125,9 @@ public enum FieldProjectionTypes {
124125
public static final String MONGODB_WRITEMODEL_STRATEGY = "mongodb.writemodel.strategy";
125126
private static final String MONGODB_WRITEMODEL_STRATEGY_DOC = "how to build the write models for the sink documents";
126127

128+
public static final String MONGODB_MAX_BATCH_SIZE = "mongodb.max.batch.size";
129+
private static final String MONGODB_MAX_BATCH_SIZE_DOC = "maximum number of sink records to possibly batch together for processing";
130+
127131
private static ObjectMapper objectMapper = new ObjectMapper();
128132

129133
public MongoDbSinkConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
@@ -200,6 +204,7 @@ public Map<String, ConfigValue> validateAll(Map<String, String> props) {
200204
.define(MONGODB_CHANGE_DATA_CAPTURE_HANDLER, Type.STRING, MONGODB_CHANGE_DATA_CAPTURE_HANDLER_DEFAULT, emptyString().or(matching(FULLY_QUALIFIED_CLASS_NAME)), Importance.LOW, MONGODB_CHANGE_DATA_CAPTURE_HANDLER_DOC)
201205
.define(MONGODB_DELETE_ON_NULL_VALUES, Type.BOOLEAN, MONGODB_DELETE_ON_NULL_VALUES_DEFAULT, Importance.MEDIUM, MONGODB_DELETE_ON_NULL_VALUES_DOC)
202206
.define(MONGODB_WRITEMODEL_STRATEGY, Type.STRING, MONGODB_WRITEMODEL_STRATEGY_DEFAULT, Importance.LOW, MONGODB_WRITEMODEL_STRATEGY_DOC)
207+
.define(MONGODB_MAX_BATCH_SIZE, Type.INT, MONGODB_MAX_BATCH_SIZE_DEFAULT, ConfigDef.Range.atLeast(0), Importance.MEDIUM, MONGODB_MAX_BATCH_SIZE_DOC)
203208
;
204209
}
205210

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright (c) 2018. Hans-Peter Grahsl ([email protected])
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package at.grahsl.kafka.connect.mongodb;
18+
19+
import org.apache.kafka.connect.sink.SinkRecord;
20+
21+
import java.util.ArrayList;
22+
import java.util.List;
23+
24+
public class MongoDbSinkRecordBatches {
25+
26+
private int batchSize;
27+
private int currentBatch = 0;
28+
private List<List<SinkRecord>> bufferedBatches = new ArrayList<>();
29+
30+
public MongoDbSinkRecordBatches(int batchSize, int records) {
31+
this.batchSize = batchSize;
32+
bufferedBatches.add(batchSize > 0 ? new ArrayList<>(batchSize) : new ArrayList<>(records));
33+
}
34+
35+
public void buffer(SinkRecord record) {
36+
if(batchSize > 0) {
37+
if(bufferedBatches.get(currentBatch).size() < batchSize) {
38+
bufferedBatches.get(currentBatch).add(record);
39+
} else {
40+
bufferedBatches.add(new ArrayList<>(batchSize));
41+
bufferedBatches.get(++currentBatch).add(record);
42+
}
43+
} else {
44+
bufferedBatches.get(0).add(record);
45+
}
46+
}
47+
48+
public List<List<SinkRecord>> getBufferedBatches() {
49+
return bufferedBatches;
50+
}
51+
52+
}

0 commit comments

Comments
 (0)