You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
* add feature to be able to configure filter document for replace one model
introduced a write model filter strategy to be able to support different use cases. besides the default option there is now a new filter strategy which allows to work with unique business keys and still have e.g. a mongodb ObjectId inserted once during the first upsert per document. this strategy only leads to a correct behaviour in case the business key fields (expressed by means of the PartialValueStrategy for the _id field) are guaranteed to be unique in the original kafka records.
see config option mongodb.replace.one.strategy with two pre-defined strategy options
- at.grahsl.kafka.connect.mongodb.writemodel.filter.strategy.ReplaceOneDefaultFilterStrategy
- at.grahsl.kafka.connect.mongodb.writemodel.filter.strategy.ReplaceOneBusinessKeyFilterStrategy
* document new feature allowing to configure write model filters
* add basic unit tests for write model strategies
* fix delete one model default strategy
Copy file name to clipboardExpand all lines: README.md
+86-17Lines changed: 86 additions & 17 deletions
Display the source diff
Display the rich diff
Original file line number
Diff line number
Diff line change
@@ -355,6 +355,74 @@ These settings cause:
355
355
356
356
Note the use of the **"." character** as navigational operator in both examples. It's used in order to refer to nested fields in sub documents of the record structure. The prefix at the very beginning is used as a simple convention to distinguish between the _key_ and _value_ structure of a document.
357
357
358
+
### Custom Write Model Filters
359
+
The default behaviour for the connector whenever documents are written to MongoDB collections is to make use of a proper [ReplaceOneModel](http://mongodb.github.io/mongo-java-driver/3.6/javadoc/com/mongodb/client/model/ReplaceOneModel.html) with [upsert mode](http://mongodb.github.io/mongo-java-driver/3.6/javadoc/com/mongodb/client/model/UpdateOptions.html) and **create the filter documents based on the _id field** as it is given in the value structure of the sink record.
360
+
However, there are other use cases which need a different approach and the **customization option for generating filter documents** can support these.
361
+
A new configuration option (_mongodb.replace.one.strategy_) allows for such customizations. Currently, the following two strategies are implemented:
***business key** (see description of use case below) at.grahsl.kafka.connect.mongodb.writemodel.filter.strategy.**ReplaceOneBusinessKeyFilterStrategy**
365
+
366
+
##### Use Case: Employing Business Keys
367
+
Let's say you want to re-use a unique business key found in your sink records while at the same time have _BSON ObjectIds_ created for the resulting MongoDB documents.
368
+
To achieve this a few simple configuration steps are necessary:
369
+
370
+
1) make sure to **create a unique key constraint** for the business key of your target MongoDB collection
371
+
2) use the **PartialValueStrategy** as the DocumentIdAdder's strategy in order to let the connector know which fields belong to the business key
372
+
3) use the **ReplaceOneBusinessKeyFilterStrategy** instead of the default behaviour
373
+
374
+
These configuration settings then allow to have **filter documents based on the original business key but still have _BSON ObjectIds_ created for the _id field** during the first upsert into your target MongoDB target collection.
will eventually result in MongoDB documents looking like:
411
+
412
+
```json
413
+
{
414
+
"_id": ObjectId("5abf52cc97e51aae0679d237"),
415
+
"fieldA": "Anonymous",
416
+
"fieldB": 42,
417
+
"active": true,
418
+
"values": [12.34, 23.45, 34.56, 45.67]
419
+
}
420
+
```
421
+
422
+
All upsert operations are done based on the unique business key which for this example is a compound one that consists of the two fields _(fieldA,fieldB)_.
423
+
424
+
NOTE: Future versions will allow to make use of arbitrary, individual strategies that can be registered and as used for the _mongodb.replace.one.strategy_ configuration setting.
425
+
358
426
### Change Data Capture Mode
359
427
The sink connector can also be used in a different operation mode in order to handle change data capture (CDC) events. Currently, the following CDC events from [Debezium](http://debezium.io/) can be processed:
360
428
@@ -404,23 +472,24 @@ Data is written using acknowledged writes and the configured write concern level
404
472
405
473
At the moment the following settings can be configured by means of the *connector.properties* file. For a config file containing default settings see [this example](https://github.com/hpgrahsl/kafka-connect-mongodb/blob/master/config/MongoDbSinkConnector.properties).
406
474
407
-
| Name | Description | Type | Default | Valid Values | Importance |
| mongodb.collection | single sink collection name to write to | string | kafkatopic || high |
410
-
| mongodb.connection.uri | the mongodb connection URI as supported by the official drivers | string | mongodb://localhost:27017/kafkaconnect?w=1&journal=true || high |
411
-
| mongodb.document.id.strategy | class name of strategy to use for generating a unique document id (_id) | string | at.grahsl.kafka.connect.mongodb.processor.id.strategy.BsonOidStrategy || high |
412
-
| mongodb.delete.on.null.values | whether or not the connector tries to delete documents based on key when value is null | boolean | false || medium |
413
-
| mongodb.max.num.retries | how often a retry should be done on write errors | int | 3 |[0,...]| medium |
414
-
| mongodb.retries.defer.timeout | how long in ms a retry should get deferred | int | 5000 |[0,...]| medium |
415
-
| mongodb.change.data.capture.handler | class name of CDC handler to use for processing | string | "" || low |
416
-
| mongodb.document.id.strategies | comma separated list of custom strategy classes to register for usage | string | "" || low |
417
-
| mongodb.field.renamer.mapping | inline JSON array with objects describing field name mappings (see docs) | string |[]|| low |
418
-
| mongodb.field.renamer.regexp | inline JSON array with objects describing regexp settings (see docs) | string |[]|| low |
419
-
| mongodb.key.projection.list | comma separated list of field names for key projection | string | "" || low |
420
-
| mongodb.key.projection.type | whether or not and which key projection to use | string | none |[none, blacklist, whitelist]| low |
421
-
| mongodb.post.processor.chain | comma separated list of post processor classes to build the chain with | string | at.grahsl.kafka.connect.mongodb.processor.DocumentIdAdder || low |
422
-
| mongodb.value.projection.list | comma separated list of field names for value projection | string | "" || low |
423
-
| mongodb.value.projection.type | whether or not and which value projection to use | string | none |[none, blacklist, whitelist]| low |
475
+
| Name | Description | Type | Default | Valid Values | Importance |
| mongodb.collection | single sink collection name to write to | string | kafkatopic || high |
478
+
| mongodb.connection.uri | the monogdb connection URI as supported by the offical drivers | string | mongodb://localhost:27017/kafkaconnect?w=1&journal=true || high |
479
+
| mongodb.document.id.strategy | class name of strategy to use for generating a unique document id (_id) | string | at.grahsl.kafka.connect.mongodb.processor.id.strategy.BsonOidStrategy || high |
480
+
| mongodb.delete.on.null.values | whether or not the connector tries to delete documents based on key when value is null | boolean | false || medium |
481
+
| mongodb.max.num.retries | how often a retry should be done on write errors | int | 3 |[0,...]| medium |
482
+
| mongodb.retries.defer.timeout | how long in ms a retry should get deferred | int | 5000 |[0,...]| medium |
483
+
| mongodb.change.data.capture.handler | class name of CDC handler to use for processing | string | "" || low |
484
+
| mongodb.document.id.strategies | comma separated list of custom strategy classes to register for usage | string | "" || low |
485
+
| mongodb.field.renamer.mapping | inline JSON array with objects describing field name mappings (see docs) | string |[]|| low |
486
+
| mongodb.field.renamer.regexp | inline JSON array with objects describing regexp settings (see docs) | string |[]|| low |
487
+
| mongodb.key.projection.list | comma separated list of field names for key projection | string | "" || low |
488
+
| mongodb.key.projection.type | whether or not and which key projection to use | string | none |[none, blacklist, whitelist]| low |
489
+
| mongodb.post.processor.chain | comma separated list of post processor classes to build the chain with | string | at.grahsl.kafka.connect.mongodb.processor.DocumentIdAdder || low |
490
+
| mongodb.replace.one.strategy | how to build the filter doc for the replaceOne write model | string | at.grahsl.kafka.connect.mongodb.writemodel.filter.strategy.ReplaceOneDefaultFilterStrategy || low |
491
+
| mongodb.value.projection.list | comma separated list of field names for value projection | string | "" || low |
492
+
| mongodb.value.projection.type | whether or not and which value projection to use | string | none |[none, blacklist, whitelist]| low |
0 commit comments