Skip to content

Commit 352def8

Browse files
committed
add new prop slot and dropSolt for kingbase and postgrtesql source connector
1 parent 0ed34e1 commit 352def8

File tree

9 files changed

+255
-51
lines changed

9 files changed

+255
-51
lines changed

tis-incr/tis-flink-cdc-kafka-plugin/src/main/java/com/qlangtech/tis/plugin/kafka/consumer/FlinkKafkaFunction.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,6 @@ public AsyncMsg<List<ReaderSource>> start(IConsumerRateLimiter streamFactory,
9292
, (tabName) -> createDispatched(tabName, sourceFactory.independentBinLogMonitor));
9393

9494
return sourceChannel;
95-
//return (JobExecutionResult) this.getConsumerHandle().consume(dataxName, sourceChannel, dataXProcessor);
9695
} catch (Exception e) {
9796
throw new RuntimeException(e);
9897
}

tis-incr/tis-flink-cdc-kingbase-plugin/src/main/java/com/qlangtech/plugins/incr/flink/cdc/kingbase/FlinkCDCKingBaseSourceFunction.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import com.qlangtech.plugins.incr.flink.cdc.FlinkCol;
2222
import com.qlangtech.plugins.incr.flink.cdc.kingbase.source.KingBaseDialect;
23+
import com.qlangtech.plugins.incr.flink.cdc.pglike.FlinkCDCPGLikeSourceFactory;
2324
import com.qlangtech.plugins.incr.flink.cdc.pglike.FlinkCDCPGLikeSourceFunction;
2425
import com.qlangtech.plugins.incr.flink.cdc.pglike.PostgreSQLDeserializationSchema;
2526
import com.qlangtech.plugins.incr.flink.cdc.pglike.StartupOptionUtils;
@@ -106,7 +107,17 @@ public boolean visit(DBConfig config, String jdbcUrl, String ip, String dbName)
106107
configFactory.decodingPluginName(sourceFactory.decodingPluginName);
107108
configFactory.password(dsFactory.password);
108109
configFactory.debeziumProperties(debeziumProperties);
110+
// configFactory.slotName(sourceFactory.slotName);
109111

112+
FlinkCDCPGLikeSourceFactory.debeziumProps.forEach((trip) -> {
113+
// debeziumProperties.setProperty(trip.getMiddle().name(), String.valueOf(trip.getRight().apply(sourceFactory)));
114+
115+
trip.getValue().accept(debeziumProperties,sourceFactory);
116+
});
117+
118+
// debeziumProperties.setProperty(
119+
// io.debezium.connector.postgresql.PostgresConnectorConfig.DROP_SLOT_ON_STOP.name(), String.valueOf(sourceFactory.dropSolt));
120+
// configFactory.slotName();
110121
configFactory.startupOptions(StartupOptionUtils.getStartupOptions(sourceFactory.startupOptions));
111122
// .deserializer(new PostgreSQLDeserializationSchema(tabs, flinkColCreator, contextParamValsGetterMapper, sourceFactory.getRepIdentity()))
112123
// .build();

tis-incr/tis-flink-cdc-kingbase-plugin/src/main/java/com/qlangtech/plugins/incr/flink/cdc/pglike/FlinkCDCPGLikeSourceFactory.java

Lines changed: 107 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,23 +19,34 @@
1919
package com.qlangtech.plugins.incr.flink.cdc.pglike;
2020

2121
import com.alibaba.citrus.turbine.Context;
22+
import com.google.common.collect.Lists;
23+
import com.qlangtech.plugins.incr.debuzium.DebuziumPropAssist;
2224
import com.qlangtech.plugins.incr.flink.cdc.FlinkCol;
2325
import com.qlangtech.plugins.incr.flink.cdc.pglike.PGDTOColValProcess.PGCDCTypeVisitor;
2426
import com.qlangtech.tis.async.message.client.consumer.IFlinkColCreator;
2527
import com.qlangtech.tis.async.message.client.consumer.impl.MQListenerFactory;
2628
import com.qlangtech.tis.datax.DataXName;
2729
import com.qlangtech.tis.datax.impl.DataxReader;
30+
import com.qlangtech.tis.extension.util.AbstractPropAssist;
31+
import com.qlangtech.tis.extension.util.OverwriteProps;
2832
import com.qlangtech.tis.plugin.annotation.FormField;
2933
import com.qlangtech.tis.plugin.annotation.FormFieldType;
3034
import com.qlangtech.tis.plugin.annotation.Validator;
3135
import com.qlangtech.tis.plugin.ds.DataSourceMeta;
3236
import com.qlangtech.tis.plugin.ds.IDataSourceFactoryGetter;
3337
import com.qlangtech.tis.plugin.ds.ISelectedTab;
3438
import com.qlangtech.tis.runtime.module.misc.IControlMsgHandler;
39+
import com.qlangtech.tis.runtime.module.misc.IFieldErrorHandler;
3540
import com.qlangtech.tis.util.IPluginContext;
41+
import io.debezium.config.Field;
42+
import io.debezium.connector.postgresql.PostgresConnectorConfig;
43+
import org.apache.commons.lang3.tuple.Pair;
3644

3745
import java.util.List;
3846
import java.util.Objects;
47+
import java.util.Properties;
48+
import java.util.function.BiConsumer;
49+
import java.util.function.Consumer;
3950

4051
/**
4152
* @author: 百岁(baisui@qlangtech.com)
@@ -44,20 +55,27 @@
4455
public abstract class FlinkCDCPGLikeSourceFactory extends MQListenerFactory {
4556

4657
public static final String FIELD_REPLICA_RULE = "replicaRule";
47-
58+
public static final String FIELD_KEY_SLOT_NAME = "slotName";
4859
/**
4960
* The name of the Postgres logical decoding plug-in installed on the server. Supported values are decoderbufs, wal2json, wal2json_rds, wal2json_streaming, wal2json_rds_streaming and pgoutput.
5061
*/
5162
@FormField(ordinal = 0, type = FormFieldType.ENUM, validate = {Validator.require})
5263
public String decodingPluginName;
5364

65+
@FormField(ordinal = 1, type = FormFieldType.INPUTTEXT, validate = {Validator.require, Validator.db_col_name})
66+
public String slotName;
67+
68+
//DROP_SLOT_ON_STOP
69+
@FormField(ordinal = 2, type = FormFieldType.ENUM, validate = {Validator.require, Validator.db_col_name})
70+
public Boolean dropSolt;
71+
5472
/**
5573
* https://ververica.github.io/flink-cdc-connectors/master/content/connectors/postgres-cdc.html#incremental-snapshot-optionshttps://ververica.github.io/flink-cdc-connectors/master/content/connectors/postgres-cdc.html#incremental-snapshot-options
5674
*/
57-
@FormField(ordinal = 1, type = FormFieldType.ENUM, validate = {Validator.require})
75+
@FormField(ordinal = 3, type = FormFieldType.ENUM, validate = {Validator.require})
5876
public String startupOptions;
5977
// REPLICA IDENTITY
60-
@FormField(ordinal = 2, advance = false, validate = {Validator.require})
78+
@FormField(ordinal = 4, advance = false, validate = {Validator.require})
6179
public PGLikeReplicaIdentity replicaRule;
6280

6381

@@ -74,14 +92,58 @@ public IFlinkColCreator<FlinkCol> createFlinkColCreator(DataSourceMeta sourceMet
7492
return flinkColCreator;
7593
}
7694

95+
// public static List<Triple<String, Field, Function<FlinkCDCPGLikeSourceFactory, Object>>> debeziumProps
96+
// = Lists.newArrayList(
97+
// Triple.of(FIELD_KEY_SLOT_NAME, PostgresConnectorConfig.SLOT_NAME, (sf) -> sf.slotName)
98+
// , Triple.of("dropSolt", PostgresConnectorConfig.DROP_SLOT_ON_STOP, (sf) -> sf.dropSolt));
99+
100+
// Properties debeziumProperties
101+
public static List<Pair<Consumer<AbstractPropAssist.Options<MQListenerFactory, Field>>, BiConsumer<Properties, FlinkCDCPGLikeSourceFactory>>> debeziumProps
102+
= Lists.newArrayList(
103+
Pair.of((opts) -> {
104+
opts.add(FIELD_KEY_SLOT_NAME, PostgresConnectorConfig.SLOT_NAME
105+
, new OverwriteProps().setDftVal(IPluginContext.getThreadLocalInstance().getCollectionName().getPipelineName()));
106+
}
107+
, (debeziumProperties, sourceFactory) -> {
108+
debeziumProperties.setProperty(PostgresConnectorConfig.SLOT_NAME.name(), sourceFactory.slotName);
109+
})
110+
, //
111+
Pair.of((opts) -> {
112+
opts.add("dropSolt", PostgresConnectorConfig.DROP_SLOT_ON_STOP);
113+
}, (debeziumProperties, sourceFactory) -> {
114+
debeziumProperties.setProperty(PostgresConnectorConfig.DROP_SLOT_ON_STOP.name(), String.valueOf(sourceFactory.dropSolt));
115+
}));
116+
77117
// @TISExtension()
78118
public static class BasePGLikeDescriptor extends MQListenerFactory.BaseDescriptor {
79119

120+
121+
public BasePGLikeDescriptor() {
122+
super();
123+
AbstractPropAssist.Options<MQListenerFactory, Field> opts = DebuziumPropAssist.createOpts(this);
124+
debeziumProps.forEach((pair) -> {
125+
//opts.add(trip.getLeft(), trip.getMiddle());
126+
pair.getKey().accept(opts);
127+
});
128+
// this.opts.add("slotName", PostgresConnectorConfig.SLOT_NAME);
129+
// this.opts.add("dropSolt", PostgresConnectorConfig.DROP_SLOT_ON_STOP);
130+
}
131+
80132
@Override
81133
public PluginVender getVender() {
82134
return PluginVender.FLINK_CDC;
83135
}
84136

137+
// @Override
138+
public boolean validateSlotName(IFieldErrorHandler msgHandler, Context context, String fieldName, String value) {
139+
if (!value.matches("[a-z0-9_]{1,63}")) {
140+
msgHandler.addFieldError(context, fieldName, "must contain only digits, lowercase characters and underscores with length <= 63");
141+
return false;
142+
}
143+
144+
return true;
145+
}
146+
85147
@Override
86148
protected boolean validateMQListenerForm(
87149
IControlMsgHandler msgHandler, Context context, MQListenerFactory sourceFactory) {
@@ -90,7 +152,48 @@ protected boolean validateMQListenerForm(
90152
DataxReader dataxReader = DataxReader.load((IPluginContext) msgHandler, pipeline.getPipelineName());
91153
IDataSourceFactoryGetter dataSourceGetter = (IDataSourceFactoryGetter) dataxReader;
92154
List<ISelectedTab> selectedTabs = dataxReader.getSelectedTabs();
93-
return incrSource.replicaRule.validateSelectedTabs(msgHandler, context, dataSourceGetter, selectedTabs);
155+
156+
if (!incrSource.replicaRule.validateSelectedTabs(msgHandler, context, dataSourceGetter, selectedTabs)) {
157+
return false;
158+
}
159+
160+
final boolean[] validationPassed = {true};
161+
162+
dataSourceGetter.getDataSourceFactory().visitFirstConnection((conn) -> {
163+
// Validate replication slot availability
164+
String slotName = incrSource.slotName;
165+
166+
String checkSlotSql = "SELECT active FROM pg_replication_slots WHERE slot_name = ?";
167+
try (java.sql.PreparedStatement pstmt = conn.preparedStatement(checkSlotSql)) {
168+
pstmt.setString(1, slotName);
169+
170+
try (java.sql.ResultSet rs = pstmt.executeQuery()) {
171+
if (rs.next()) {
172+
boolean isActive = rs.getBoolean("active");
173+
174+
if (isActive) {
175+
// Slot is currently active and being used by another task
176+
msgHandler.addFieldError(context, FIELD_KEY_SLOT_NAME,
177+
"Replication slot '" + slotName + "' is already active and in use by another CDC task. " +
178+
"Please use a different slot name or stop the task currently using this slot.");
179+
validationPassed[0] = false;
180+
}
181+
// If active=false, the slot exists but is idle, which is acceptable
182+
// The task can reuse this slot
183+
}
184+
// If no result, the slot doesn't exist yet, which is fine
185+
// Flink CDC will create the slot automatically on startup
186+
}
187+
} catch (java.sql.SQLException e) {
188+
// Log warning but don't fail validation if we can't check the slot
189+
// This could happen if pg_replication_slots view doesn't exist or permission issues
190+
msgHandler.addErrorMessage(context,
191+
"Unable to validate replication slot status: " + e.getMessage());
192+
validationPassed[0] = false;
193+
}
194+
});
195+
196+
return validationPassed[0];
94197
}
95198
}
96199
}

tis-incr/tis-flink-cdc-kingbase-plugin/src/main/java/com/qlangtech/plugins/incr/flink/cdc/pglike/KingBaseDateTimeConverter.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -76,17 +76,23 @@ protected String convertDateTime(Object input) {
7676
@Override
7777
protected String convertTimestamp(Object input) {
7878
if (input != null) {
79-
// if (input instanceof Instant) {
80-
//
81-
// }
8279

83-
// System.out.println("timestampZoneId:" + timestampZoneId);
84-
return timestampFormatter.format(LocalDateTime.ofInstant((Instant) input, this.timestampZoneId));
80+
try {
81+
Instant instant = null;
82+
if (input instanceof Instant) {
83+
instant = (Instant) input;
84+
} else if (input instanceof java.lang.Long) {
85+
// Convert millisecond timestamp to Instant
86+
long timestamp = (Long) input;
87+
instant = Instant.ofEpochMilli(timestamp);
88+
} else {
89+
throw new UnsupportedOperationException("convertTimestamp:" + input.getClass() + ",val:" + String.valueOf(input));
90+
}
91+
return timestampFormatter.format(LocalDateTime.ofInstant(instant, this.timestampZoneId));
92+
} catch (Exception e) {
93+
throw new RuntimeException("input val:" + String.valueOf(input), e);
94+
}
8595

86-
// System.out.println(">>>>convertTimestamp:" + input.getClass().getName());
87-
// System.out.println(">>>>convertTimestamp:" + input.getClass() + ",time:" + timestampFormatter.format((Instant) input));
88-
//// return timestampFormatter.format((Instant) input);
89-
// return null;
9096

9197
}
9298
return null;

tis-incr/tis-flink-cdc-mysql-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/cdc/mysql/FlinkCDCMysqlSourceFunction.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,9 @@ public MySQLReaderSourceCreator(DataXName dataXName, IConsumerRateLimiter stream
262262
@Override
263263
public List<ReaderSource> create(String dbHost, HostDBs dbs, Set<String> tbs, Properties debeziumProperties) {
264264

265-
DateTimeConverter.setDatetimeConverters(MySqlDateTimeConverter.class.getName(), debeziumProperties);
265+
DateTimeConverter.setDatetimeConverters(
266+
MySqlDateTimeConverter.class.getName()
267+
, debeziumProperties,dsFactory.getTimeZone().map(ZoneId::getId));
266268

267269
debeziumProperties.setProperty(
268270
CommonConnectorConfig.EVENT_PROCESSING_FAILURE_HANDLING_MODE.name()

tis-incr/tis-flink-cdc-postgresql-plugin/src/main/java/com/qlangtech/plugins/incr/flink/cdc/pglike/FlinkCDCPGLikeSourceFactory.java

Lines changed: 56 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,38 +18,57 @@
1818

1919
package com.qlangtech.plugins.incr.flink.cdc.pglike;
2020

21+
import com.alibaba.citrus.turbine.Context;
22+
import com.google.common.collect.Lists;
23+
import com.qlangtech.plugins.incr.debuzium.DebuziumPropAssist;
2124
import com.qlangtech.plugins.incr.flink.cdc.FlinkCol;
2225
import com.qlangtech.plugins.incr.flink.cdc.pglike.PGDTOColValProcess.PGCDCTypeVisitor;
23-
import com.qlangtech.tis.async.message.client.consumer.IConsumerHandle;
2426
import com.qlangtech.tis.async.message.client.consumer.IFlinkColCreator;
2527
import com.qlangtech.tis.async.message.client.consumer.impl.MQListenerFactory;
28+
import com.qlangtech.tis.extension.util.AbstractPropAssist;
29+
import com.qlangtech.tis.extension.util.OverwriteProps;
2630
import com.qlangtech.tis.plugin.annotation.FormField;
2731
import com.qlangtech.tis.plugin.annotation.FormFieldType;
2832
import com.qlangtech.tis.plugin.annotation.Validator;
2933
import com.qlangtech.tis.plugin.ds.DataSourceMeta;
34+
import com.qlangtech.tis.runtime.module.misc.IFieldErrorHandler;
35+
import com.qlangtech.tis.util.IPluginContext;
36+
import io.debezium.config.Field;
37+
import io.debezium.connector.postgresql.PostgresConnectorConfig;
38+
import org.apache.commons.lang3.tuple.Pair;
3039

31-
import java.util.Objects;
40+
import java.util.List;
41+
import java.util.Properties;
42+
import java.util.function.BiConsumer;
43+
import java.util.function.Consumer;
3244

3345
/**
3446
* @author: 百岁(baisui@qlangtech.com)
3547
* @create: 2025-01-19 18:14
3648
**/
3749
public abstract class FlinkCDCPGLikeSourceFactory extends MQListenerFactory {
3850
//private transient IConsumerHandle consumerHandle;
39-
51+
public static final String FIELD_KEY_SLOT_NAME = "slotName";
4052
/**
4153
* The name of the Postgres logical decoding plug-in installed on the server. Supported values are decoderbufs, wal2json, wal2json_rds, wal2json_streaming, wal2json_rds_streaming and pgoutput.
4254
*/
4355
@FormField(ordinal = 0, type = FormFieldType.ENUM, validate = {Validator.require})
4456
public String decodingPluginName;
4557

58+
@FormField(ordinal = 1, type = FormFieldType.INPUTTEXT, validate = {Validator.require, Validator.db_col_name})
59+
public String slotName;
60+
61+
//DROP_SLOT_ON_STOP
62+
@FormField(ordinal = 2, type = FormFieldType.ENUM, validate = {Validator.require, Validator.db_col_name})
63+
public Boolean dropSolt;
64+
4665
/**
4766
* https://ververica.github.io/flink-cdc-connectors/master/content/connectors/postgres-cdc.html#incremental-snapshot-optionshttps://ververica.github.io/flink-cdc-connectors/master/content/connectors/postgres-cdc.html#incremental-snapshot-options
4867
*/
49-
@FormField(ordinal = 1, type = FormFieldType.ENUM, validate = {Validator.require})
68+
@FormField(ordinal = 3, type = FormFieldType.ENUM, validate = {Validator.require})
5069
public String startupOptions;
5170
// REPLICA IDENTITY
52-
@FormField(ordinal = 2, advance = true, type = FormFieldType.ENUM, validate = {Validator.require})
71+
@FormField(ordinal = 4, advance = true, type = FormFieldType.ENUM, validate = {Validator.require})
5372
public String replicaIdentity;
5473

5574

@@ -74,9 +93,40 @@ public IFlinkColCreator<FlinkCol> createFlinkColCreator(DataSourceMeta sourceMet
7493
return flinkColCreator;
7594
}
7695

96+
public static List<Pair<Consumer<AbstractPropAssist.Options<MQListenerFactory, Field>>, BiConsumer<Properties, FlinkCDCPGLikeSourceFactory>>> debeziumProps
97+
= Lists.newArrayList(
98+
Pair.of((opts) -> {
99+
opts.add(FIELD_KEY_SLOT_NAME, PostgresConnectorConfig.SLOT_NAME
100+
, new OverwriteProps().setDftVal(IPluginContext.getThreadLocalInstance().getCollectionName().getPipelineName()));
101+
}
102+
, (debeziumProperties, sourceFactory) -> {
103+
debeziumProperties.setProperty(PostgresConnectorConfig.SLOT_NAME.name(), sourceFactory.slotName);
104+
})
105+
, //
106+
Pair.of((opts) -> {
107+
opts.add("dropSolt", PostgresConnectorConfig.DROP_SLOT_ON_STOP);
108+
}, (debeziumProperties, sourceFactory) -> {
109+
debeziumProperties.setProperty(PostgresConnectorConfig.DROP_SLOT_ON_STOP.name(), String.valueOf(sourceFactory.dropSolt));
110+
}));
111+
112+
77113
// @TISExtension()
78114
public static class BasePGLikeDescriptor extends BaseDescriptor {
79-
115+
public BasePGLikeDescriptor() {
116+
super();
117+
AbstractPropAssist.Options<MQListenerFactory, Field> opts = DebuziumPropAssist.createOpts(this);
118+
debeziumProps.forEach((pair) -> {
119+
//opts.add(trip.getLeft(), trip.getMiddle());
120+
pair.getKey().accept(opts);
121+
});
122+
}
123+
public boolean validateSlotName(IFieldErrorHandler msgHandler, Context context, String fieldName, String value) {
124+
if (!value.matches("[a-z0-9_]{1,63}")) {
125+
msgHandler.addFieldError(context, fieldName, "must contain only digits, lowercase characters and underscores with length <= 63");
126+
return false;
127+
}
128+
return true;
129+
}
80130
@Override
81131
public PluginVender getVender() {
82132
return PluginVender.FLINK_CDC;

0 commit comments

Comments
 (0)