Skip to content

Commit 47accef

Browse files
committed
add support for DBZ postgresql CDC
initial support for postgres is based on the observation that DBZ basically produces similar CDC events for mysql and postgres. thus both can for now be supported based on the same code as written for mysql. refactorings have been made to avoid redundancy and allow for specific sub-classes of RdbmsHandler
1 parent bbd6aaa commit 47accef

File tree

12 files changed

+165
-82
lines changed

12 files changed

+165
-82
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -360,10 +360,10 @@ The sink connector can also be used in a different operation mode in order to ha
360360

361361
* [MongoDB](http://debezium.io/docs/connectors/mongodb/)
362362
* [MySQL](http://debezium.io/docs/connectors/mysql/)
363-
* PostgreSQL (coming later)
363+
* [PostgreSQL](http://debezium.io/docs/connectors/postgresql/)
364364
* Oracle (not yet finished at Debezium Project)
365365

366-
This effectively allows to replicate all state changes within the source databases into MongoDB collections. Further Debezium formats - namely PostgreSQL and Oracle - will probably get integrated in future releases.
366+
This effectively allows to replicate all state changes within the source databases into MongoDB collections. Debezium produces very similar CDC events for MySQL and PostgreSQL. The so far addressed use cases worked fine based on the same code which is why there is only one _RdbmsHandler_ implementation to support them both at the moment. Debezium Oracle CDC format will be integrated in a future release.
367367

368368
Also note that **both serialization formats (JSON+Schema & AVRO) can be used** depending on which configuration is a better fit for your use case.
369369

src/main/java/at/grahsl/kafka/connect/mongodb/MongoDbSinkConnectorConfig.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818

1919
import at.grahsl.kafka.connect.mongodb.cdc.CdcHandler;
2020
import at.grahsl.kafka.connect.mongodb.cdc.debezium.mongodb.MongoDbHandler;
21-
import at.grahsl.kafka.connect.mongodb.cdc.debezium.mysql.MysqlHandler;
21+
import at.grahsl.kafka.connect.mongodb.cdc.debezium.rdbms.mysql.MysqlHandler;
22+
import at.grahsl.kafka.connect.mongodb.cdc.debezium.rdbms.postgres.PostgresHandler;
2223
import at.grahsl.kafka.connect.mongodb.processor.*;
2324
import at.grahsl.kafka.connect.mongodb.processor.field.projection.FieldProjector;
2425
import at.grahsl.kafka.connect.mongodb.processor.field.renaming.FieldnameMapping;
@@ -313,6 +314,7 @@ public static Set<String> getPredefinedCdcHandlerClassNames() {
313314
Set<String> cdcHandlers = new HashSet<String>();
314315
cdcHandlers.add(MongoDbHandler.class.getName());
315316
cdcHandlers.add(MysqlHandler.class.getName());
317+
cdcHandlers.add(PostgresHandler.class.getName());
316318
return cdcHandlers;
317319
}
318320

src/main/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/mysql/MysqlDelete.java renamed to src/main/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/rdbms/RdbmsDelete.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package at.grahsl.kafka.connect.mongodb.cdc.debezium.mysql;
17+
package at.grahsl.kafka.connect.mongodb.cdc.debezium.rdbms;
1818

1919
import at.grahsl.kafka.connect.mongodb.cdc.CdcOperation;
2020
import at.grahsl.kafka.connect.mongodb.cdc.debezium.OperationType;
@@ -24,7 +24,7 @@
2424
import org.apache.kafka.connect.errors.DataException;
2525
import org.bson.BsonDocument;
2626

27-
public class MysqlDelete implements CdcOperation {
27+
public class RdbmsDelete implements CdcOperation {
2828

2929
@Override
3030
public WriteModel<BsonDocument> perform(SinkDocument doc) {
@@ -38,7 +38,7 @@ public WriteModel<BsonDocument> perform(SinkDocument doc) {
3838
);
3939

4040
try {
41-
BsonDocument filterDoc = MysqlHandler.generateFilterDoc(keyDoc, valueDoc, OperationType.DELETE);
41+
BsonDocument filterDoc = RdbmsHandler.generateFilterDoc(keyDoc, valueDoc, OperationType.DELETE);
4242
return new DeleteOneModel<>(filterDoc);
4343
} catch(Exception exc) {
4444
throw new DataException(exc);

src/main/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/mysql/MysqlHandler.java renamed to src/main/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/rdbms/RdbmsHandler.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package at.grahsl.kafka.connect.mongodb.cdc.debezium.mysql;
17+
package at.grahsl.kafka.connect.mongodb.cdc.debezium.rdbms;
1818

1919
import at.grahsl.kafka.connect.mongodb.MongoDbSinkConnectorConfig;
2020
import at.grahsl.kafka.connect.mongodb.cdc.CdcOperation;
@@ -34,25 +34,25 @@
3434
import java.util.Map;
3535
import java.util.Optional;
3636

37-
public class MysqlHandler extends DebeziumCdcHandler {
37+
public class RdbmsHandler extends DebeziumCdcHandler {
3838

3939
public static final String JSON_DOC_BEFORE_FIELD = "before";
4040
public static final String JSON_DOC_AFTER_FIELD = "after";
4141

42-
private static Logger logger = LoggerFactory.getLogger(MysqlHandler.class);
42+
private static Logger logger = LoggerFactory.getLogger(RdbmsHandler.class);
4343

44-
public MysqlHandler(MongoDbSinkConnectorConfig config) {
44+
public RdbmsHandler(MongoDbSinkConnectorConfig config) {
4545
super(config);
4646
final Map<OperationType,CdcOperation> operations = new HashMap<>();
47-
operations.put(OperationType.CREATE,new MysqlInsert());
48-
operations.put(OperationType.READ,new MysqlInsert());
49-
operations.put(OperationType.UPDATE,new MysqlUpdate());
50-
operations.put(OperationType.DELETE,new MysqlDelete());
47+
operations.put(OperationType.CREATE,new RdbmsInsert());
48+
operations.put(OperationType.READ,new RdbmsInsert());
49+
operations.put(OperationType.UPDATE,new RdbmsUpdate());
50+
operations.put(OperationType.DELETE,new RdbmsDelete());
5151
registerOperations(operations);
5252
}
5353

54-
public MysqlHandler(MongoDbSinkConnectorConfig config,
55-
Map<OperationType,CdcOperation> operations) {
54+
public RdbmsHandler(MongoDbSinkConnectorConfig config,
55+
Map<OperationType,CdcOperation> operations) {
5656
super(config);
5757
registerOperations(operations);
5858
}

src/main/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/mysql/MysqlInsert.java renamed to src/main/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/rdbms/RdbmsInsert.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package at.grahsl.kafka.connect.mongodb.cdc.debezium.mysql;
17+
package at.grahsl.kafka.connect.mongodb.cdc.debezium.rdbms;
1818

1919
import at.grahsl.kafka.connect.mongodb.cdc.CdcOperation;
2020
import at.grahsl.kafka.connect.mongodb.cdc.debezium.OperationType;
@@ -25,7 +25,7 @@
2525
import org.apache.kafka.connect.errors.DataException;
2626
import org.bson.BsonDocument;
2727

28-
public class MysqlInsert implements CdcOperation {
28+
public class RdbmsInsert implements CdcOperation {
2929

3030
private static final UpdateOptions UPDATE_OPTIONS =
3131
new UpdateOptions().upsert(true);
@@ -42,8 +42,8 @@ public WriteModel<BsonDocument> perform(SinkDocument doc) {
4242
);
4343

4444
try {
45-
BsonDocument filterDoc = MysqlHandler.generateFilterDoc(keyDoc, valueDoc, OperationType.CREATE);
46-
BsonDocument upsertDoc = MysqlHandler.generateUpsertOrReplaceDoc(keyDoc, valueDoc, filterDoc);
45+
BsonDocument filterDoc = RdbmsHandler.generateFilterDoc(keyDoc, valueDoc, OperationType.CREATE);
46+
BsonDocument upsertDoc = RdbmsHandler.generateUpsertOrReplaceDoc(keyDoc, valueDoc, filterDoc);
4747
return new ReplaceOneModel<>(filterDoc, upsertDoc, UPDATE_OPTIONS);
4848
} catch (Exception exc) {
4949
throw new DataException(exc);

src/main/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/mysql/MysqlUpdate.java renamed to src/main/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/rdbms/RdbmsUpdate.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package at.grahsl.kafka.connect.mongodb.cdc.debezium.mysql;
17+
package at.grahsl.kafka.connect.mongodb.cdc.debezium.rdbms;
1818

1919
import at.grahsl.kafka.connect.mongodb.cdc.CdcOperation;
2020
import at.grahsl.kafka.connect.mongodb.cdc.debezium.OperationType;
@@ -25,7 +25,7 @@
2525
import org.apache.kafka.connect.errors.DataException;
2626
import org.bson.BsonDocument;
2727

28-
public class MysqlUpdate implements CdcOperation {
28+
public class RdbmsUpdate implements CdcOperation {
2929

3030
private static final UpdateOptions UPDATE_OPTIONS =
3131
new UpdateOptions().upsert(true);
@@ -42,8 +42,8 @@ public WriteModel<BsonDocument> perform(SinkDocument doc) {
4242
);
4343

4444
try {
45-
BsonDocument filterDoc = MysqlHandler.generateFilterDoc(keyDoc, valueDoc, OperationType.UPDATE);
46-
BsonDocument replaceDoc = MysqlHandler.generateUpsertOrReplaceDoc(keyDoc, valueDoc, filterDoc);
45+
BsonDocument filterDoc = RdbmsHandler.generateFilterDoc(keyDoc, valueDoc, OperationType.UPDATE);
46+
BsonDocument replaceDoc = RdbmsHandler.generateUpsertOrReplaceDoc(keyDoc, valueDoc, filterDoc);
4747
return new ReplaceOneModel<>(filterDoc, replaceDoc, UPDATE_OPTIONS);
4848
} catch (Exception exc) {
4949
throw new DataException(exc);
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright (c) 2017. Hans-Peter Grahsl ([email protected])
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package at.grahsl.kafka.connect.mongodb.cdc.debezium.rdbms.mysql;
18+
19+
import at.grahsl.kafka.connect.mongodb.MongoDbSinkConnectorConfig;
20+
import at.grahsl.kafka.connect.mongodb.cdc.CdcOperation;
21+
import at.grahsl.kafka.connect.mongodb.cdc.debezium.OperationType;
22+
import at.grahsl.kafka.connect.mongodb.cdc.debezium.rdbms.RdbmsHandler;
23+
24+
import java.util.Map;
25+
26+
public class MysqlHandler extends RdbmsHandler {
27+
28+
//NOTE: this class is prepared in case there are
29+
//mysql specific differences to be considered
30+
//and the CDC handling deviates from the standard
31+
//behaviour as implemented in RdbmsHandler.class
32+
33+
public MysqlHandler(MongoDbSinkConnectorConfig config) {
34+
super(config);
35+
}
36+
37+
public MysqlHandler(MongoDbSinkConnectorConfig config, Map<OperationType, CdcOperation> operations) {
38+
super(config, operations);
39+
}
40+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright (c) 2017. Hans-Peter Grahsl ([email protected])
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package at.grahsl.kafka.connect.mongodb.cdc.debezium.rdbms.postgres;
18+
19+
import at.grahsl.kafka.connect.mongodb.MongoDbSinkConnectorConfig;
20+
import at.grahsl.kafka.connect.mongodb.cdc.CdcOperation;
21+
import at.grahsl.kafka.connect.mongodb.cdc.debezium.OperationType;
22+
import at.grahsl.kafka.connect.mongodb.cdc.debezium.rdbms.RdbmsHandler;
23+
24+
import java.util.Map;
25+
26+
public class PostgresHandler extends RdbmsHandler {
27+
28+
//NOTE: this class is prepared in case there are
29+
//postgres specific differences to be considered
30+
//and the CDC handling deviates from the standard
31+
//behaviour as implemented in RdbmsHandler.class
32+
33+
public PostgresHandler(MongoDbSinkConnectorConfig config) {
34+
super(config);
35+
}
36+
37+
public PostgresHandler(MongoDbSinkConnectorConfig config, Map<OperationType, CdcOperation> operations) {
38+
super(config, operations);
39+
}
40+
41+
}

src/test/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/mysql/MysqlDeleteTest.java renamed to src/test/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/rdbms/RdbmsDeleteTest.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package at.grahsl.kafka.connect.mongodb.cdc.debezium.mysql;
1+
package at.grahsl.kafka.connect.mongodb.cdc.debezium.rdbms;
22

33
import at.grahsl.kafka.connect.mongodb.converter.SinkDocument;
44
import com.mongodb.DBCollection;
@@ -17,9 +17,9 @@
1717
import static org.junit.jupiter.api.Assertions.*;
1818

1919
@RunWith(JUnitPlatform.class)
20-
public class MysqlDeleteTest {
20+
public class RdbmsDeleteTest {
2121

22-
public static final MysqlDelete MYSQL_DELETE = new MysqlDelete();
22+
public static final RdbmsDelete RDBMS_DELETE = new RdbmsDelete();
2323

2424
@Test
2525
@DisplayName("when valid cdc event with single field PK then correct DeleteOneModel")
@@ -33,7 +33,7 @@ public void testValidSinkDocumentSingleFieldPK() {
3333
BsonDocument valueDoc = new BsonDocument("op",new BsonString("d"));
3434

3535
WriteModel<BsonDocument> result =
36-
MYSQL_DELETE.perform(new SinkDocument(keyDoc,valueDoc));
36+
RDBMS_DELETE.perform(new SinkDocument(keyDoc,valueDoc));
3737

3838
assertTrue(result instanceof DeleteOneModel,
3939
() -> "result expected to be of type DeleteOneModel");
@@ -62,7 +62,7 @@ public void testValidSinkDocumentCompoundPK() {
6262
BsonDocument valueDoc = new BsonDocument("op",new BsonString("d"));
6363

6464
WriteModel<BsonDocument> result =
65-
MYSQL_DELETE.perform(new SinkDocument(keyDoc,valueDoc));
65+
RDBMS_DELETE.perform(new SinkDocument(keyDoc,valueDoc));
6666

6767
assertTrue(result instanceof DeleteOneModel,
6868
() -> "result expected to be of type DeleteOneModel");
@@ -93,7 +93,7 @@ public void testValidSinkDocumentNoPK() {
9393
.append("active", new BsonBoolean(true)));
9494

9595
WriteModel<BsonDocument> result =
96-
MYSQL_DELETE.perform(new SinkDocument(keyDoc,valueDoc));
96+
RDBMS_DELETE.perform(new SinkDocument(keyDoc,valueDoc));
9797

9898
assertTrue(result instanceof DeleteOneModel,
9999
() -> "result expected to be of type DeleteOneModel");
@@ -112,23 +112,23 @@ public void testValidSinkDocumentNoPK() {
112112
@DisplayName("when missing key doc then DataException")
113113
public void testMissingKeyDocument() {
114114
assertThrows(DataException.class,() ->
115-
MYSQL_DELETE.perform(new SinkDocument(null,new BsonDocument()))
115+
RDBMS_DELETE.perform(new SinkDocument(null,new BsonDocument()))
116116
);
117117
}
118118

119119
@Test
120120
@DisplayName("when missing value doc then DataException")
121121
public void testMissingValueDocument() {
122122
assertThrows(DataException.class,() ->
123-
MYSQL_DELETE.perform(new SinkDocument(new BsonDocument(),null))
123+
RDBMS_DELETE.perform(new SinkDocument(new BsonDocument(),null))
124124
);
125125
}
126126

127127
@Test
128128
@DisplayName("when key doc and value 'before' field both empty then DataException")
129129
public void testEmptyKeyDocAndEmptyValueBeforeField() {
130130
assertThrows(DataException.class,() ->
131-
MYSQL_DELETE.perform(new SinkDocument(new BsonDocument(),
131+
RDBMS_DELETE.perform(new SinkDocument(new BsonDocument(),
132132
new BsonDocument("op",new BsonString("d")).append("before",new BsonDocument())))
133133
);
134134
}

0 commit comments

Comments
 (0)