33import at .grahsl .kafka .connect .mongodb .MongoDbSinkConnectorConfig ;
44import at .grahsl .kafka .connect .mongodb .cdc .debezium .OperationType ;
55import at .grahsl .kafka .connect .mongodb .converter .SinkDocument ;
6+ import com .mongodb .client .model .DeleteOneModel ;
7+ import com .mongodb .client .model .ReplaceOneModel ;
8+ import com .mongodb .client .model .UpdateOneModel ;
9+ import com .mongodb .client .model .WriteModel ;
610import org .apache .kafka .connect .errors .DataException ;
711import org .bson .BsonDocument ;
812import org .bson .BsonInt32 ;
2529@ RunWith (JUnitPlatform .class )
2630public class MongoDbHandlerTest {
2731
28- public static final MongoDbHandler MONGODB_HANDLER =
32+ public static final MongoDbHandler MONGODB_HANDLER_DEFAULT_MAPPING =
2933 new MongoDbHandler (new MongoDbSinkConnectorConfig (new HashMap <>()));
3034
35+ public static final MongoDbHandler MONGODB_HANDLER_EMPTY_MAPPING =
36+ new MongoDbHandler (new MongoDbSinkConnectorConfig (new HashMap <>()),
37+ new HashMap <>());
38+
39+ @ Test
40+ @ DisplayName ("verify existing default config from base class" )
41+ public void testExistingDefaultConfig () {
42+ assertAll (
43+ () -> assertNotNull (MONGODB_HANDLER_DEFAULT_MAPPING .getConfig (),
44+ () -> "default config for handler must not be null" ),
45+ () -> assertNotNull (MONGODB_HANDLER_EMPTY_MAPPING .getConfig (),
46+ () -> "default config for handler must not be null" )
47+ );
48+ }
49+
3150 @ Test
3251 @ DisplayName ("when key document missing then DataException" )
3352 public void testMissingKeyDocument () {
3453 assertThrows (DataException .class , () ->
35- MONGODB_HANDLER .handle (new SinkDocument (null ,null ))
54+ MONGODB_HANDLER_DEFAULT_MAPPING .handle (new SinkDocument (null ,null ))
3655 );
3756 }
3857
3958 @ Test
4059 @ DisplayName ("when key doc contains 'id' field but value is empty then null due to tombstone" )
4160 public void testTombstoneEvent () {
4261 assertEquals (Optional .empty (),
43- MONGODB_HANDLER .handle (new SinkDocument (
62+ MONGODB_HANDLER_DEFAULT_MAPPING .handle (new SinkDocument (
4463 new BsonDocument ("id" ,new BsonInt32 (1234 )),
4564 new BsonDocument ())
4665 ),
@@ -56,7 +75,20 @@ public void testUnkownCdcOperationType() {
5675 new BsonDocument ("op" ,new BsonString ("x" ))
5776 );
5877 assertThrows (DataException .class , () ->
59- MONGODB_HANDLER .handle (cdcEvent )
78+ MONGODB_HANDLER_DEFAULT_MAPPING .handle (cdcEvent )
79+ );
80+ }
81+
82+ @ Test
83+ @ DisplayName ("when value doc contains unmapped operation type then DataException" )
84+ public void testUnmappedCdcOperationType () {
85+ SinkDocument cdcEvent = new SinkDocument (
86+ new BsonDocument ("_id" ,new BsonInt32 (1234 )),
87+ new BsonDocument ("op" ,new BsonString ("c" ))
88+ .append ("after" ,new BsonString ("{_id:1234,foo:\" blah\" }" ))
89+ );
90+ assertThrows (DataException .class , () ->
91+ MONGODB_HANDLER_EMPTY_MAPPING .handle (cdcEvent )
6092 );
6193 }
6294
@@ -68,7 +100,7 @@ public void testInvalidCdcOperationType() {
68100 new BsonDocument ("op" ,new BsonInt32 ('c' ))
69101 );
70102 assertThrows (DataException .class , () ->
71- MONGODB_HANDLER .handle (cdcEvent )
103+ MONGODB_HANDLER_DEFAULT_MAPPING .handle (cdcEvent )
72104 );
73105 }
74106
@@ -80,32 +112,96 @@ public void testMissingCdcOperationType() {
80112 new BsonDocument ("po" , BsonNull .VALUE )
81113 );
82114 assertThrows (DataException .class , () ->
83- MONGODB_HANDLER .handle (cdcEvent )
115+ MONGODB_HANDLER_DEFAULT_MAPPING .handle (cdcEvent )
84116 );
85117 }
86118
87119 @ TestFactory
88- @ DisplayName ("when valid cdc operation type then correct MongoDB CdcOperation" )
89- public Stream <DynamicTest > testValidCdcOpertionTypes () {
120+ @ DisplayName ("when valid CDC event then correct WriteModel" )
121+ public Stream <DynamicTest > testValidCdcDocument () {
122+
123+ return Stream .of (
124+ dynamicTest ("test operation " +OperationType .CREATE , () -> {
125+ Optional <WriteModel <BsonDocument >> result =
126+ MONGODB_HANDLER_DEFAULT_MAPPING .handle (
127+ new SinkDocument (
128+ new BsonDocument ("_id" ,new BsonString ("1234" )),
129+ new BsonDocument ("op" ,new BsonString ("c" ))
130+ .append ("after" ,new BsonString ("{_id:1234,foo:\" blah\" }" ))
131+ )
132+ );
133+ assertTrue (result .isPresent ());
134+ assertTrue (result .get () instanceof ReplaceOneModel ,
135+ () -> "result expected to be of type ReplaceOneModel" );
136+
137+ }),
138+ dynamicTest ("test operation " +OperationType .READ , () -> {
139+ Optional <WriteModel <BsonDocument >> result =
140+ MONGODB_HANDLER_DEFAULT_MAPPING .handle (
141+ new SinkDocument (
142+ new BsonDocument ("_id" ,new BsonString ("1234" )),
143+ new BsonDocument ("op" ,new BsonString ("r" ))
144+ .append ("after" ,new BsonString ("{_id:1234,foo:\" blah\" }" ))
145+ )
146+ );
147+ assertTrue (result .isPresent ());
148+ assertTrue (result .get () instanceof ReplaceOneModel ,
149+ () -> "result expected to be of type ReplaceOneModel" );
150+
151+ }),
152+ dynamicTest ("test operation " +OperationType .UPDATE , () -> {
153+ Optional <WriteModel <BsonDocument >> result =
154+ MONGODB_HANDLER_DEFAULT_MAPPING .handle (
155+ new SinkDocument (
156+ new BsonDocument ("id" ,new BsonString ("1234" )),
157+ new BsonDocument ("op" ,new BsonString ("u" ))
158+ .append ("patch" ,new BsonString ("{\" $set\" :{foo:\" blah\" }}" ))
159+ )
160+ );
161+ assertTrue (result .isPresent ());
162+ assertTrue (result .get () instanceof UpdateOneModel ,
163+ () -> "result expected to be of type UpdateOneModel" );
164+
165+ }),
166+ dynamicTest ("test operation " +OperationType .DELETE , () -> {
167+ Optional <WriteModel <BsonDocument >> result =
168+ MONGODB_HANDLER_DEFAULT_MAPPING .handle (
169+ new SinkDocument (
170+ new BsonDocument ("id" ,new BsonString ("1234" )),
171+ new BsonDocument ("op" ,new BsonString ("d" ))
172+ )
173+ );
174+ assertTrue (result .isPresent (), () -> "write model result must be present" );
175+ assertTrue (result .get () instanceof DeleteOneModel ,
176+ () -> "result expected to be of type DeleteOneModel" );
177+
178+ })
179+ );
180+
181+ }
182+
183+ @ TestFactory
184+ @ DisplayName ("when valid cdc operation type then correct MongoDB CdcOperation" )
185+ public Stream <DynamicTest > testValidCdcOpertionTypes () {
90186
91187 return Stream .of (
92188 dynamicTest ("test operation " +OperationType .CREATE , () ->
93- assertTrue (MONGODB_HANDLER .getCdcOperation (
94- new BsonDocument ("op" ,new BsonString ("c" )))
95- instanceof MongoDbInsert )
96- ),
189+ assertTrue (MONGODB_HANDLER_DEFAULT_MAPPING .getCdcOperation (
190+ new BsonDocument ("op" ,new BsonString ("c" )))
191+ instanceof MongoDbInsert )
192+ ),
97193 dynamicTest ("test operation " +OperationType .READ , () ->
98- assertTrue (MONGODB_HANDLER .getCdcOperation (
194+ assertTrue (MONGODB_HANDLER_DEFAULT_MAPPING .getCdcOperation (
99195 new BsonDocument ("op" ,new BsonString ("r" )))
100196 instanceof MongoDbInsert )
101197 ),
102198 dynamicTest ("test operation " +OperationType .UPDATE , () ->
103- assertTrue (MONGODB_HANDLER .getCdcOperation (
199+ assertTrue (MONGODB_HANDLER_DEFAULT_MAPPING .getCdcOperation (
104200 new BsonDocument ("op" ,new BsonString ("u" )))
105201 instanceof MongoDbUpdate )
106202 ),
107203 dynamicTest ("test operation " +OperationType .DELETE , () ->
108- assertTrue (MONGODB_HANDLER .getCdcOperation (
204+ assertTrue (MONGODB_HANDLER_DEFAULT_MAPPING .getCdcOperation (
109205 new BsonDocument ("op" ,new BsonString ("d" )))
110206 instanceof MongoDbDelete )
111207 )
0 commit comments