Skip to content

Commit bd836a5

Browse files
authored
[feature request] Issue 52 (#53)
* add support to map multiple topics to collections -full configuration customization for all relevant properties -test cases for the most vital parts -TODO documentation * add documentation for topic/collection-aware processing needs this PR closes #52
1 parent 636710b commit bd836a5

26 files changed

+1426
-250
lines changed

README.md

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -570,6 +570,104 @@ At the moment the following settings can be configured by means of the *connecto
570570
| mongodb.value.projection.type | whether or not and which value projection to use | string | none | [none, blacklist, whitelist] | low |
571571
| mongodb.writemodel.strategy | how to build the write models for the sink documents | string | at.grahsl.kafka.connect.mongodb.writemodel.strategy.ReplaceOneDefaultStrategy | | low |
572572

573+
The above listed *connector.properties* are the 'original' (still valid / supported) way to configure the sink connector. The main drawback with it is that only one MongoDB collection could be used so far to sink data from either a single / multiple Kafka topic(s).
574+
575+
### Collection-aware Configuration Settings
576+
577+
In the past several sink connector instances had to be configured and run separately, one for each topic / collection which needed to have individual processing settings applied. Starting with version 1.2.0 it is possible to **configure multiple Kafka topic <-> MongoDB collection mappings.** This allows for a lot more flexibility and **supports complex data processing needs within one and the same sink connector instance.**
578+
579+
Essentially all relevant *connector.properties* can now be defined individually for each topic / collection.
580+
581+
##### Topic <-> Collection Mappings
582+
583+
The most important change in configuration options is about defining the named-relation between configured Kafka topics and MongoDB collections like so:
584+
585+
```properties
586+
587+
#Kafka topics to consume from
588+
topics=foo-t,blah-t
589+
590+
#MongoDB collections to write to
591+
mongodb.collections=foo-c,blah-c
592+
593+
#Named topic <-> collection mappings
594+
mongodb.collection.foo-t=foo-c
595+
mongodb.collection.blah-t=blah-c
596+
597+
```
598+
599+
##### Individual Settings for each Collection
600+
601+
Configuration properties can then be defined specifically for any of the collections for which there is a named mapping defined. The following configuration fragments show how to apply different settings for *foo-c* and *blah-c* MongoDB sink collections.
602+
603+
```properties
604+
605+
#specific processing settings for topic 'foo-t' -> collection 'foo-c'
606+
607+
mongodb.document.id.strategy.foo-c=at.grahsl.kafka.connect.mongodb.processor.id.strategy.UuidStrategy
608+
mongodb.post.processor.chain.foo-c=at.grahsl.kafka.connect.mongodb.processor.DocumentIdAdder,at.grahsl.kafka.connect.mongodb.processor.BlacklistValueProjector
609+
mongodb.value.projection.type.foo-c=blacklist
610+
mongodb.value.projection.list.foo-c=k2,k4
611+
mongodb.max.batch.size.foo-c=100
612+
613+
```
614+
615+
These properties result in the following actions for messages originating form Kafka topic 'foo-t':
616+
617+
* document identity (*_id* field) will be given by a generated UUID
618+
* value projection will be done using a blacklist approach in order to remove fields *k2* and *k4*
619+
* at most 100 documents will be written to the MongoDB collection 'foo-c' in one bulk write operation
620+
621+
Then there are also individual settings for collection 'blah-c':
622+
623+
```properties
624+
625+
#specific processing settings for topic 'blah-t' -> collection 'blah-c'
626+
627+
mongodb.document.id.strategy.blah-c=at.grahsl.kafka.connect.mongodb.processor.id.strategy.ProvidedInValueStrategy
628+
mongodb.post.processor.chain.blah-c=at.grahsl.kafka.connect.mongodb.processor.WhitelistValueProjector
629+
mongodb.value.projection.type.blah-c=whitelist
630+
mongodb.value.projection.list.blah-c=k3,k5
631+
mongodb.writemodel.strategy.blah-c=at.grahsl.kafka.connect.mongodb.writemodel.strategy.UpdateOneTimestampsStrategy
632+
633+
```
634+
635+
These settings result in the following actions for messages originating from Kafka topic 'blah-t':
636+
637+
* document identity (*_id* field) will be taken from the value structure of the message
638+
* value projection will be done using a whitelist approach to remove only retain *k3* and *k5*
639+
* the chosen write model strategy will keep track of inserted and modified timestamps for each written document
640+
641+
##### Fallback to Defaults
642+
643+
Whenever the sink connector tries to apply collection specific settings where no such settings are in place, it automatically falls back to either:
644+
645+
* what was explicitly configured for the same collection-agnostic property
646+
647+
or
648+
649+
* what is implicitly defined for the same collection-agnostic property
650+
651+
For instance, given the following configuration fragment:
652+
653+
```properties
654+
655+
#explicitly defined fallback for document identity
656+
mongodb.document.id.strategy=at.grahsl.kafka.connect.mongodb.processor.id.strategy.FullKeyStrategy
657+
658+
#collections specific overriding for document identity
659+
mongodb.document.id.strategy.foo-c=at.grahsl.kafka.connect.mongodb.processor.id.strategy.UuidStrategy
660+
661+
#collections specific overriding for write model
662+
mongodb.writemodel.strategy.blah-c=at.grahsl.kafka.connect.mongodb.writemodel.strategy.UpdateOneTimestampsStrategy
663+
664+
```
665+
666+
means that:
667+
668+
* **document identity would fallback to the explicitly given default which is the *FullKeyStrategy*** for all collections other than 'foo-c' for which it uses the specified *UuidStrategy*
669+
* **write model strategy would fallback to the implicitly defined *ReplaceOneDefaultStrategy*** for all collections other than 'blah-c' for which it uses the specified *UpdateOneTimestampsStrategy*
670+
573671
### Running in development
574672

575673
```
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package at.grahsl.kafka.connect.mongodb;
2+
3+
import org.apache.kafka.common.config.AbstractConfig;
4+
import org.apache.kafka.common.config.ConfigDef;
5+
import org.apache.kafka.common.config.ConfigException;
6+
7+
import java.util.HashMap;
8+
import java.util.Map;
9+
10+
public class CollectionAwareConfig extends AbstractConfig {
11+
12+
//NOTE: the merging of values() and originals() is a workaround
13+
//in order to allow for properties not being given in the
14+
//ConfigDef at compile time to be picked up and available as well...
15+
private final Map<String, Object> collectionAwareSettings;
16+
17+
public CollectionAwareConfig(ConfigDef definition, Map<?, ?> originals, boolean doLog) {
18+
super(definition, originals, doLog);
19+
collectionAwareSettings = new HashMap<>(256);
20+
collectionAwareSettings.putAll(values());
21+
collectionAwareSettings.putAll(originals());
22+
}
23+
24+
public CollectionAwareConfig(ConfigDef definition, Map<?, ?> originals) {
25+
super(definition, originals);
26+
collectionAwareSettings = new HashMap<>(256);
27+
collectionAwareSettings.putAll(values());
28+
collectionAwareSettings.putAll(originals());
29+
}
30+
31+
protected Object get(String property, String collection) {
32+
String fullProperty = property+"."+collection;
33+
if(collectionAwareSettings.containsKey(fullProperty)) {
34+
return collectionAwareSettings.get(fullProperty);
35+
}
36+
return collectionAwareSettings.get(property);
37+
}
38+
39+
public String getString(String property, String collection) {
40+
if(collection == null || collection.isEmpty()) {
41+
return (String) get(property);
42+
}
43+
return (String) get(property,collection);
44+
}
45+
46+
//NOTE: in the this topic aware map, everything is currently stored as
47+
//type String so direct casting won't work which is why the
48+
//*.parse*(String value) methods are to be used for now.
49+
public Boolean getBoolean(String property, String collection) {
50+
Object obj;
51+
52+
if(collection == null || collection.isEmpty()) {
53+
obj = get(property);
54+
} else {
55+
obj = get(property,collection);
56+
}
57+
58+
if(obj instanceof Boolean)
59+
return (Boolean) obj;
60+
61+
if(obj instanceof String)
62+
return Boolean.parseBoolean((String)obj);
63+
64+
throw new ConfigException("error: unsupported property type for '"+obj+"' where Boolean expected");
65+
}
66+
67+
public Integer getInt(String property, String collection) {
68+
69+
Object obj;
70+
71+
if(collection == null || collection.isEmpty()) {
72+
obj = get(property);
73+
} else {
74+
obj = get(property,collection);
75+
}
76+
77+
if(obj instanceof Integer)
78+
return (Integer) obj;
79+
80+
if(obj instanceof String)
81+
return Integer.parseInt((String)obj);
82+
83+
throw new ConfigException("error: unsupported property type for '"+obj+"' where Integer expected");
84+
}
85+
86+
}

0 commit comments

Comments
 (0)