Skip to content

Commit ac0685b

Browse files
authored
[feature request] Issue 44 (#46)
* refactor inconsistent/improper namings of write model strategies * add UpdateOneTimestamps strategy with basic test case * add documentation and fix variable naming this solves #44
1 parent d84303e commit ac0685b

File tree

11 files changed

+264
-85
lines changed

11 files changed

+264
-85
lines changed

README.md

Lines changed: 96 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -362,24 +362,27 @@ These settings cause:
362362

363363
Note the use of the **"." character** as navigational operator in both examples. It's used in order to refer to nested fields in sub documents of the record structure. The prefix at the very beginning is used as a simple convention to distinguish between the _key_ and _value_ structure of a document.
364364

365-
### Custom Write Model Filters
366-
The default behaviour for the connector whenever documents are written to MongoDB collections is to make use of a proper [ReplaceOneModel](http://mongodb.github.io/mongo-java-driver/3.6/javadoc/com/mongodb/client/model/ReplaceOneModel.html) with [upsert mode](http://mongodb.github.io/mongo-java-driver/3.6/javadoc/com/mongodb/client/model/UpdateOptions.html) and **create the filter documents based on the _id field** as it is given in the value structure of the sink record.
367-
However, there are other use cases which need a different approach and the **customization option for generating filter documents** can support these.
368-
A new configuration option (_mongodb.replace.one.strategy_) allows for such customizations. Currently, the following two strategies are implemented:
365+
### Custom Write Models
366+
The default behaviour for the connector whenever documents are written to MongoDB collections is to make use of a proper [ReplaceOneModel](http://mongodb.github.io/mongo-java-driver/3.6/javadoc/com/mongodb/client/model/ReplaceOneModel.html) with [upsert mode](http://mongodb.github.io/mongo-java-driver/3.6/javadoc/com/mongodb/client/model/UpdateOptions.html) and **create the filter document based on the _id field** which results from applying the configured DocumentIdAdder in the value structure of the sink document.
369367

370-
* **default behaviour** at.grahsl.kafka.connect.mongodb.writemodel.filter.strategy.**ReplaceOneDefaultFilterStrategy**
371-
* **business key** (see description of use case below) at.grahsl.kafka.connect.mongodb.writemodel.filter.strategy.**ReplaceOneBusinessKeyFilterStrategy**
368+
However, there are other use cases which need different approaches and the **customization option for generating custom write models** can support these. The configuration entry (_mongodb.writemodel.strategy_) allows for such customizations. Currently, the following strategies are implemented:
372369

373-
##### Use Case: Employing Business Keys
370+
* **default behaviour** at.grahsl.kafka.connect.mongodb.writemodel.strategy.**ReplaceOneDefaultStrategy**
371+
* **business key** (-> see [use case 1](https://github.com/hpgrahsl/kafka-connect-mongodb#use-case-1-employing-business-keys)) at.grahsl.kafka.connect.mongodb.writemodel.strategy.**ReplaceOneBusinessKeyStrategy**
372+
* **delete on null values** at.grahsl.kafka.connect.mongodb.writemodel.strategy.**DeleteOneDefaultStrategy** implicitly used when config option _mongodb.delete.on.null.values=true_ for [convention-based deletion](https://github.com/hpgrahsl/kafka-connect-mongodb#convention-based-deletion-on-null-values)
373+
* **add inserted/modified timestamps** (-> see [use case 2](https://github.com/hpgrahsl/kafka-connect-mongodb#use-case-2-add-inserted-and-modified-timestamps)) at.grahsl.kafka.connect.mongodb.writemodel.strategy.**UpdateOneTimestampsStrategy**
374+
375+
_NOTE:_ Future versions will allow to make use of arbitrary, individual strategies that can be registered and easily used as _mongodb.writemodel.strategy_ configuration setting.
376+
377+
##### Use Case 1: Employing Business Keys
374378
Let's say you want to re-use a unique business key found in your sink records while at the same time have _BSON ObjectIds_ created for the resulting MongoDB documents.
375379
To achieve this a few simple configuration steps are necessary:
376380

377381
1) make sure to **create a unique key constraint** for the business key of your target MongoDB collection
378382
2) use the **PartialValueStrategy** as the DocumentIdAdder's strategy in order to let the connector know which fields belong to the business key
379-
3) use the **ReplaceOneBusinessKeyFilterStrategy** instead of the default behaviour
383+
3) use the **ReplaceOneBusinessKeyStrategy** instead of the default behaviour
380384

381-
These configuration settings then allow to have **filter documents based on the original business key but still have _BSON ObjectIds_ created for the _id field** during the first upsert into your target MongoDB target collection.
382-
Find below how such a setup might look like:
385+
These configuration settings then allow to have **filter documents based on the original business key but still have _BSON ObjectIds_ created for the _id field** during the first upsert into your target MongoDB target collection. Find below how such a setup might look like:
383386

384387
Given the following fictional Kafka record
385388

@@ -398,23 +401,16 @@ together with the sink connector config below
398401
{
399402
"name": "mdb-sink",
400403
"config": {
401-
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
402-
"key.converter.schemas.enable": false,
403-
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
404-
"value.converter.schemas.enable": false,
405-
"connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
406-
"topics": "mytopic",
407-
"mongodb.connection.uri": "mongodb://mongodb:27017/kafkaconnect?w=1&journal=true",
404+
...
408405
"mongodb.document.id.strategy": "at.grahsl.kafka.connect.mongodb.processor.id.strategy.PartialValueStrategy",
409406
"mongodb.key.projection.list": "fieldA,fieldB",
410407
"mongodb.key.projection.type": "whitelist",
411-
"mongodb.collection": "mycollection",
412-
"mongodb.replace.one.strategy":"at.grahsl.kafka.connect.mongodb.writemodel.filter.strategy.ReplaceOneBusinessKeyFilterStrategy"
408+
"mongodb.writemodel.strategy": "at.grahsl.kafka.connect.mongodb.writemodel.strategy.ReplaceOneBusinessKeyStrategy"
413409
}
414410
}
415411
```
416412

417-
will eventually result in MongoDB documents looking like:
413+
will eventually result in a MongoDB document looking like:
418414

419415
```json
420416
{
@@ -426,17 +422,91 @@ will eventually result in MongoDB documents looking like:
426422
}
427423
```
428424

429-
All upsert operations are done based on the unique business key which for this example is a compound one that consists of the two fields _(fieldA,fieldB)_.
425+
All upsert operations are done based on the unique business key which for this example is a compound one that consists of the two fields _(fieldA,fieldB)_.
426+
427+
##### Use Case 2: Add Inserted and Modified Timestamps
428+
Let's say you want to attach timestamps to the resulting MongoDB documents such that you can store the point in time of the document insertion and at the same time maintain a second timestamp reflecting when a document was modified.
429+
430+
All that needs to be done is use the **UpdateOneTimestampsStrategy** instead of the default behaviour. What results from this is that
431+
the custom write model will take care of attaching two timestamps to MongoDB documents:
432+
433+
1) **_insertedTS**: will only be set once in case the upsert operation results in a new MongoDB document being inserted into the corresponding collection
434+
2) **_modifiedTS**: will be set each time the upsert operation
435+
results in an existing MongoDB document being updated in the corresponding collection
436+
437+
Given the following fictional Kafka record
438+
439+
```json
440+
{
441+
"_id": "ABCD-1234",
442+
"fieldA": "Anonymous",
443+
"fieldB": 42,
444+
"active": true,
445+
"values": [12.34, 23.45, 34.56, 45.67]
446+
}
447+
```
448+
449+
together with the sink connector config below
430450

431-
NOTE: Future versions will allow to make use of arbitrary, individual strategies that can be registered and as used for the _mongodb.replace.one.strategy_ configuration setting.
451+
```json
452+
{
453+
"name": "mdb-sink",
454+
"config": {
455+
...
456+
"mongodb.document.id.strategy": "at.grahsl.kafka.connect.mongodb.processor.id.strategy.ProvidedInValueStrategy",
457+
"mongodb.writemodel.strategy": "at.grahsl.kafka.connect.mongodb.writemodel.strategy.UpdateOneTimestampsStrategy"
458+
}
459+
}
460+
```
461+
462+
will result in a new MongoDB document looking like:
463+
464+
```json
465+
{
466+
"_id": "ABCD-1234",
467+
"_insertedTS": ISODate("2018-07-22T09:19:000Z"),
468+
"_modifiedTS": ISODate("2018-07-22T09:19:000Z"),
469+
"fieldA": "Anonymous",
470+
"fieldB": 42,
471+
"active": true,
472+
"values": [12.34, 23.45, 34.56, 45.67]
473+
}
474+
```
475+
476+
If at some point in time later there is a Kafka record referring to the same _id but containing updated data
477+
478+
```json
479+
{
480+
"_id": "ABCD-1234",
481+
"fieldA": "anonymous",
482+
"fieldB": -23,
483+
"active": false,
484+
"values": [12.34, 23.45]
485+
}
486+
```
487+
488+
then the existing MongoDB document will get updated together with a fresh timestamp for the **_modifiedTS** value:
489+
490+
```json
491+
{
492+
"_id": "ABCD-1234",
493+
"_insertedTS": ISODate("2018-07-22T09:19:000Z"),
494+
"_modifiedTS": ISODate("2018-07-31T19:09:000Z"),
495+
"fieldA": "anonymous",
496+
"fieldB": -23,
497+
"active": false,
498+
"values": [12.34, 23.45]
499+
}
500+
```
432501

433502
### Change Data Capture Mode
434503
The sink connector can also be used in a different operation mode in order to handle change data capture (CDC) events. Currently, the following CDC events from [Debezium](http://debezium.io/) can be processed:
435504

436505
* [MongoDB](http://debezium.io/docs/connectors/mongodb/)
437506
* [MySQL](http://debezium.io/docs/connectors/mysql/)
438507
* [PostgreSQL](http://debezium.io/docs/connectors/postgresql/)
439-
* Oracle (not yet finished at Debezium Project)
508+
* **Oracle** _coming soon!_ ([early preview at Debezium Project](http://debezium.io/docs/connectors/oracle/))
509+
* **SQL Server** ([not yet finished at Debezium Project](http://debezium.io/docs/connectors/sqlserver/))
440510

441511
This effectively allows to replicate all state changes within the source databases into MongoDB collections. Debezium produces very similar CDC events for MySQL and PostgreSQL. The so far addressed use cases worked fine based on the same code which is why there is only one _RdbmsHandler_ implementation to support them both at the moment. Debezium Oracle CDC format will be integrated in a future release.
442512

@@ -462,7 +532,8 @@ The sink connector configuration offers a property called *mongodb.change.data.c
462532
}
463533
```
464534

465-
**NOTE:** There are scenarios in which there is no CDC enabled source connector in place. However, it might be required to still be able to handle record deletions. For these cases the sink connector can be configured to delete records in MongoDB whenever it encounters sink records which exhibit _null_ values. This is a simple convention that can be activated by setting the following configuration option:
535+
##### Convention-based deletion on null values
536+
There are scenarios in which there is no CDC enabled source connector in place. However, it might be required to still be able to handle record deletions. For these cases the sink connector can be configured to delete records in MongoDB whenever it encounters sink records which exhibit _null_ values. This is a simple convention that can be activated by setting the following configuration option:
466537

467538
```properties
468539
mongodb.delete.on.null.values=true
@@ -494,7 +565,7 @@ At the moment the following settings can be configured by means of the *connecto
494565
| mongodb.key.projection.list | comma separated list of field names for key projection | string | "" | | low |
495566
| mongodb.key.projection.type | whether or not and which key projection to use | string | none | [none, blacklist, whitelist] | low |
496567
| mongodb.post.processor.chain | comma separated list of post processor classes to build the chain with | string | at.grahsl.kafka.connect.mongodb.processor.DocumentIdAdder | | low |
497-
| mongodb.replace.one.strategy | how to build the filter doc for the replaceOne write model | string | at.grahsl.kafka.connect.mongodb.writemodel.filter.strategy.ReplaceOneDefaultFilterStrategy | | low |
568+
| mongodb.replace.one.strategy | how to build the filter doc for the replaceOne write model | string | ReplaceOneDefaultFilterStrategy | | low |
498569
| mongodb.value.projection.list | comma separated list of field names for value projection | string | "" | | low |
499570
| mongodb.value.projection.type | whether or not and which value projection to use | string | none | [none, blacklist, whitelist] | low |
500571

config/MongoDbSinkConnector.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,4 @@ mongodb.field.renamer.regexp=[]
4242
mongodb.post.processor.chain=at.grahsl.kafka.connect.mongodb.processor.DocumentIdAdder
4343
mongodb.change.data.capture.handler=
4444
mongodb.delete.on.null.values=false
45-
mongodb.replace.one.strategy=at.grahsl.kafka.connect.mongodb.writemodel.filter.strategy.ReplaceOneDefaultFilterStrategy
45+
mongodb.writemodel.strategy=at.grahsl.kafka.connect.mongodb.writemodel.strategy.ReplaceOneDefaultStrategy

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
<groupId>at.grahsl.kafka.connect</groupId>
66
<artifactId>kafka-connect-mongodb</artifactId>
7-
<version>1.1.0</version>
7+
<version>1.2.0-SNAPSHOT</version>
88
<packaging>jar</packaging>
99

1010
<name>kafka-connect-mongodb</name>

src/main/java/at/grahsl/kafka/connect/mongodb/MongoDbSinkConnectorConfig.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import at.grahsl.kafka.connect.mongodb.processor.field.renaming.RegExpSettings;
2727
import at.grahsl.kafka.connect.mongodb.processor.field.renaming.RenameByRegExp;
2828
import at.grahsl.kafka.connect.mongodb.processor.id.strategy.*;
29-
import at.grahsl.kafka.connect.mongodb.writemodel.filter.strategy.WriteModelFilterStrategy;
29+
import at.grahsl.kafka.connect.mongodb.writemodel.strategy.WriteModelStrategy;
3030
import com.fasterxml.jackson.core.type.TypeReference;
3131
import com.fasterxml.jackson.databind.ObjectMapper;
3232
import com.mongodb.MongoClientURI;
@@ -74,7 +74,7 @@ public enum FieldProjectionTypes {
7474
public static final String MONGODB_POST_PROCESSOR_CHAIN_DEFAULT = "at.grahsl.kafka.connect.mongodb.processor.DocumentIdAdder";
7575
public static final String MONGODB_CHANGE_DATA_CAPTURE_HANDLER_DEFAULT = "";
7676
public static final boolean MONGODB_DELETE_ON_NULL_VALUES_DEFAULT = false;
77-
public static final String MONGODB_REPLACE_ONE_STRATEGY_DEFAULT = "at.grahsl.kafka.connect.mongodb.writemodel.filter.strategy.ReplaceOneDefaultFilterStrategy";
77+
public static final String MONGODB_WRITEMODEL_STRATEGY_DEFAULT = "at.grahsl.kafka.connect.mongodb.writemodel.strategy.ReplaceOneDefaultStrategy";
7878

7979
public static final String MONGODB_CONNECTION_URI_CONF = "mongodb.connection.uri";
8080
private static final String MONGODB_CONNECTION_URI_DOC = "the monogdb connection URI as supported by the offical drivers";
@@ -121,8 +121,8 @@ public enum FieldProjectionTypes {
121121
public static final String MONGODB_DELETE_ON_NULL_VALUES = "mongodb.delete.on.null.values";
122122
private static final String MONGODB_DELETE_ON_NULL_VALUES_DOC = "whether or not the connector tries to delete documents based on key when value is null";
123123

124-
public static final String MONGODB_REPLACE_ONE_STRATEGY = "mongodb.replace.one.strategy";
125-
private static final String MONGODB_REPLACE_ONE_STRATEGY_DOC = "how to build the filter doc for the replaceOne write model";
124+
public static final String MONGODB_WRITEMODEL_STRATEGY = "mongodb.writemodel.strategy";
125+
private static final String MONGODB_WRITEMODEL_STRATEGY_DOC = "how to build the write models for the sink documents";
126126

127127
private static ObjectMapper objectMapper = new ObjectMapper();
128128

@@ -199,7 +199,7 @@ public Map<String, ConfigValue> validateAll(Map<String, String> props) {
199199
.define(MONGODB_POST_PROCESSOR_CHAIN, Type.STRING, MONGODB_POST_PROCESSOR_CHAIN_DEFAULT, emptyString().or(matching(FULLY_QUALIFIED_CLASS_NAME_LIST)), Importance.LOW, MONGODB_POST_PROCESSOR_CHAIN_DOC)
200200
.define(MONGODB_CHANGE_DATA_CAPTURE_HANDLER, Type.STRING, MONGODB_CHANGE_DATA_CAPTURE_HANDLER_DEFAULT, emptyString().or(matching(FULLY_QUALIFIED_CLASS_NAME)), Importance.LOW, MONGODB_CHANGE_DATA_CAPTURE_HANDLER_DOC)
201201
.define(MONGODB_DELETE_ON_NULL_VALUES, Type.BOOLEAN, MONGODB_DELETE_ON_NULL_VALUES_DEFAULT, Importance.MEDIUM, MONGODB_DELETE_ON_NULL_VALUES_DOC)
202-
.define(MONGODB_REPLACE_ONE_STRATEGY, Type.STRING, MONGODB_REPLACE_ONE_STRATEGY_DEFAULT, Importance.LOW, MONGODB_REPLACE_ONE_STRATEGY_DOC)
202+
.define(MONGODB_WRITEMODEL_STRATEGY, Type.STRING, MONGODB_WRITEMODEL_STRATEGY_DEFAULT, Importance.LOW, MONGODB_WRITEMODEL_STRATEGY_DOC)
203203
;
204204
}
205205

@@ -377,17 +377,17 @@ public boolean isDeleteOnNullValues() {
377377
return getBoolean(MONGODB_DELETE_ON_NULL_VALUES);
378378
}
379379

380-
public WriteModelFilterStrategy getReplaceOneFilterStrategy() {
381-
String replaceOneFilterStrategy = getString(MONGODB_REPLACE_ONE_STRATEGY);
380+
public WriteModelStrategy getWriteModelStrategy() {
381+
String replaceOneFilterStrategy = getString(MONGODB_WRITEMODEL_STRATEGY);
382382
try {
383-
return (WriteModelFilterStrategy) Class.forName(replaceOneFilterStrategy)
383+
return (WriteModelStrategy) Class.forName(replaceOneFilterStrategy)
384384
.getConstructor().newInstance();
385385
} catch (ReflectiveOperationException e) {
386386
throw new ConfigException(e.getMessage(),e);
387387
} catch (ClassCastException e) {
388388
throw new ConfigException("error: specified class "+ replaceOneFilterStrategy
389389
+ " violates the contract since it doesn't implement " +
390-
WriteModelFilterStrategy.class);
390+
WriteModelStrategy.class);
391391
}
392392
}
393393

0 commit comments

Comments
 (0)