You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Based on this setting the sink connector tries to delete a MongoDB document from the corresponding collection based on the sink record's key or actually the resulting *_id* value thereof, which is generated according to the specified [DocumentIdAdder](https://github.com/hpgrahsl/kafka-connect-mongodb#documentidadder-mandatory).
543
543
544
544
### MongoDB Persistence
545
-
The sink records are converted to BSON documents which are in turn inserted into the corresponding MongoDB target collection. The implementation uses unorderd bulk writes based on the [ReplaceOneModel](http://mongodb.github.io/mongo-java-driver/3.6/javadoc/com/mongodb/client/model/ReplaceOneModel.html)together with [upsert mode](http://mongodb.github.io/mongo-java-driver/3.6/javadoc/com/mongodb/client/model/UpdateOptions.html) whenever inserts or updates are handled. If the connector is configured to process deletes when _null_ values of sink records are discovered then it uses a [DeleteOneModel](http://mongodb.github.io/mongo-java-driver/3.6/javadoc/com/mongodb/client/model/DeleteOneModel.html) respectively.
545
+
The sink records are converted to BSON documents which are in turn inserted into the corresponding MongoDB target collection. The implementation uses unorderd bulk writes. According to the chosen write model strategy either a [ReplaceOneModel](http://mongodb.github.io/mongo-java-driver/3.6/javadoc/com/mongodb/client/model/ReplaceOneModel.html)or an [UpdateOneModel](http://mongodb.github.io/mongo-java-driver/3.6/javadoc/com/mongodb/client/model/UpdateOneModel.html) - both of which are run in [upsert mode](http://mongodb.github.io/mongo-java-driver/3.6/javadoc/com/mongodb/client/model/UpdateOptions.html)- is used whenever inserts or updates are handled. If the connector is configured to process convention-based deletes when _null_ values of sink records are discovered then it uses a [DeleteOneModel](http://mongodb.github.io/mongo-java-driver/3.6/javadoc/com/mongodb/client/model/DeleteOneModel.html) respectively.
546
546
547
547
Data is written using acknowledged writes and the configured write concern level of the connection as specified in the connection URI. If the bulk write fails (totally or partially) errors are logged and a simple retry logic is in place. More robust/sophisticated failure mode handling has yet to be implemented.
548
548
549
549
### Sink Connector Properties
550
550
551
551
At the moment the following settings can be configured by means of the *connector.properties* file. For a config file containing default settings see [this example](https://github.com/hpgrahsl/kafka-connect-mongodb/blob/master/config/MongoDbSinkConnector.properties).
552
552
553
-
| Name | Description | Type | Default | Valid Values| Importance |
| mongodb.collection | single sink collection name to write to | string | kafkatopic || high |
556
-
| mongodb.connection.uri | the monogdb connection URI as supported by the offical drivers | string | mongodb://localhost:27017/kafkaconnect?w=1&journal=true || high |
557
-
| mongodb.document.id.strategy | class name of strategy to use for generating a unique document id (_id) | string | at.grahsl.kafka.connect.mongodb.processor.id.strategy.BsonOidStrategy || high |
558
-
| mongodb.delete.on.null.values | whether or not the connector tries to delete documents based on key when value is null | boolean | false || medium |
559
-
| mongodb.max.num.retries | how often a retry should be done on write errors | int | 3 |[0,...]| medium |
560
-
| mongodb.retries.defer.timeout | how long in ms a retry should get deferred | int | 5000 |[0,...]| medium |
561
-
| mongodb.change.data.capture.handler | class name of CDC handler to use for processing | string | "" || low |
562
-
| mongodb.document.id.strategies | comma separated list of custom strategy classes to register for usage | string | "" || low |
563
-
| mongodb.field.renamer.mapping | inline JSON array with objects describing field name mappings (see docs) | string |[]|| low |
564
-
| mongodb.field.renamer.regexp | inline JSON array with objects describing regexp settings (see docs) | string |[]|| low |
565
-
| mongodb.key.projection.list | comma separated list of field names for key projection | string | "" || low |
566
-
| mongodb.key.projection.type | whether or not and which key projection to use | string | none |[none, blacklist, whitelist]| low |
567
-
| mongodb.post.processor.chain | comma separated list of post processor classes to build the chain with | string | at.grahsl.kafka.connect.mongodb.processor.DocumentIdAdder || low |
568
-
| mongodb.replace.one.strategy| how to build the filter doc for the replaceOne write model| string |ReplaceOneDefaultFilterStrategy || low |
569
-
| mongodb.value.projection.list|comma separated list of field names for value projection | string |""|| low |
570
-
| mongodb.value.projection.type| whether or not and which value projection to use | string |none |[none, blacklist, whitelist]| low |
553
+
| Name | Description | Type | Default | Valid Values| Importance |
| mongodb.collection | single sink collection name to write to | string | kafkatopic || high |
556
+
| mongodb.connection.uri | the monogdb connection URI as supported by the offical drivers | string | mongodb://localhost:27017/kafkaconnect?w=1&journal=true || high |
557
+
| mongodb.document.id.strategy | class name of strategy to use for generating a unique document id (_id) | string | at.grahsl.kafka.connect.mongodb.processor.id.strategy.BsonOidStrategy | at.grahsl.kafka.connect.mongodb.MongoDbSinkConnectorConfig$ValidatorWithOperators$$Lambda$193/910091170@3f191845| high |
558
+
| mongodb.delete.on.null.values | whether or not the connector tries to delete documents based on key when value is null | boolean | false || medium |
559
+
| mongodb.max.num.retries | how often a retry should be done on write errors | int | 3 |[0,...]| medium |
560
+
| mongodb.retries.defer.timeout | how long in ms a retry should get deferred | int | 5000 |[0,...]| medium |
561
+
| mongodb.change.data.capture.handler | class name of CDC handler to use for processing | string | "" | at.grahsl.kafka.connect.mongodb.MongoDbSinkConnectorConfig$ValidatorWithOperators$$Lambda$193/910091170@5f049ea1| low |
562
+
| mongodb.document.id.strategies | comma separated list of custom strategy classes to register for usage | string | "" | at.grahsl.kafka.connect.mongodb.MongoDbSinkConnectorConfig$ValidatorWithOperators$$Lambda$193/910091170@72cc7e6f| low |
563
+
| mongodb.field.renamer.mapping | inline JSON array with objects describing field name mappings (see docs) | string |[]|| low |
564
+
| mongodb.field.renamer.regexp | inline JSON array with objects describing regexp settings (see docs) | string |[]|| low |
565
+
| mongodb.key.projection.list | comma separated list of field names for key projection | string | "" || low |
566
+
| mongodb.key.projection.type | whether or not and which key projection to use | string | none |[none, blacklist, whitelist]| low |
567
+
| mongodb.post.processor.chain | comma separated list of post processor classes to build the chain with | string | at.grahsl.kafka.connect.mongodb.processor.DocumentIdAdder | at.grahsl.kafka.connect.mongodb.MongoDbSinkConnectorConfig$ValidatorWithOperators$$Lambda$193/910091170@5afa3c9| low |
568
+
| mongodb.value.projection.list| comma separated list of field names for value projection | string |"" || low |
569
+
| mongodb.value.projection.type|whether or not and which value projection to use| string |none|[none, blacklist, whitelist]| low |
570
+
| mongodb.writemodel.strategy| how to build the write models for the sink documents| string |at.grahsl.kafka.connect.mongodb.writemodel.strategy.ReplaceOneDefaultStrategy || low |
0 commit comments