Skip to content

Commit 087488c

Browse files
committed
set new version:v5.0.0-SNAPSHOT
1 parent 9ce9ccd commit 087488c

File tree

7 files changed

+30
-17
lines changed

7 files changed

+30
-17
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
<parent>
2626
<groupId>com.qlangtech.tis</groupId>
2727
<artifactId>tis</artifactId>
28-
<version>4.3.0</version>
28+
<version>5.0.0-SNAPSHOT</version>
2929
</parent>
3030
<groupId>com.qlangtech.tis.plugins</groupId>
3131
<artifactId>tis-plugin-parent</artifactId>

pom.xml.versionBackup

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
<parent>
2626
<groupId>com.qlangtech.tis</groupId>
2727
<artifactId>tis</artifactId>
28-
<version>4.3.0-SNAPSHOT</version>
28+
<version>4.3.0</version>
2929
</parent>
3030
<groupId>com.qlangtech.tis.plugins</groupId>
3131
<artifactId>tis-plugin-parent</artifactId>

tis-datax/tis-datax-kingbase-plugin/pom.xml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@
3838
<dependency>
3939
<groupId>cn.com.kingbase</groupId>
4040
<artifactId>kingbase8</artifactId>
41-
<version>8.6.1</version>
41+
<!-- <version>8.6.1</version>-->
42+
<version>9.0.1</version>
4243
</dependency>
4344

4445

tis-incr/tis-flink-cdc-kingbase-plugin/src/main/java/com/qlangtech/plugins/incr/flink/cdc/pglike/FlinkCDCPGLikeSourceFunction.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,34 +21,29 @@
2121
import com.qlangtech.plugins.incr.flink.cdc.FlinkCol;
2222
import com.qlangtech.plugins.incr.flink.cdc.SourceChannel;
2323
import com.qlangtech.plugins.incr.flink.cdc.valconvert.DateTimeConverter;
24-
import com.qlangtech.plugins.incr.flink.launch.TISFlinkCDCStreamFactory;
2524
import com.qlangtech.tis.async.message.client.consumer.AsyncMsg;
26-
import com.qlangtech.tis.async.message.client.consumer.IConsumerHandle;
2725
import com.qlangtech.tis.async.message.client.consumer.IFlinkColCreator;
2826
import com.qlangtech.tis.async.message.client.consumer.IMQListener;
2927
import com.qlangtech.tis.async.message.client.consumer.MQConsumeException;
30-
import com.qlangtech.tis.coredefine.module.action.TargetResName;
3128
import com.qlangtech.tis.datax.DataXName;
3229
import com.qlangtech.tis.datax.IDataxProcessor;
3330
import com.qlangtech.tis.datax.IDataxReader;
34-
import com.qlangtech.tis.datax.StoreResourceType;
3531
import com.qlangtech.tis.plugin.datax.common.BasicDataXRdbmsReader;
3632
import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules;
3733
import com.qlangtech.tis.plugin.ds.BasicDataSourceFactory;
3834
import com.qlangtech.tis.plugin.ds.DataSourceFactory.ISchemaSupported;
3935
import com.qlangtech.tis.plugin.ds.ISelectedTab;
4036
import com.qlangtech.tis.plugin.ds.RunningContext;
4137
import com.qlangtech.tis.plugin.incr.IConsumerRateLimiter;
42-
import com.qlangtech.tis.plugin.incr.IncrStreamFactory;
4338
import com.qlangtech.tis.realtime.ReaderSource;
4439
import com.qlangtech.tis.realtime.dto.DTOStream;
4540
import com.qlangtech.tis.realtime.transfer.DTO;
4641
import com.qlangtech.tis.util.IPluginContext;
4742
import org.apache.commons.lang.StringUtils;
48-
import org.apache.flink.api.common.JobExecutionResult;
4943
import org.apache.flink.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
5044
import org.apache.flink.cdc.connectors.postgres.source.PostgresSourceBuilder.PostgresIncrementalSource;
5145

46+
import java.time.ZoneId;
5247
import java.util.List;
5348
import java.util.Map;
5449
import java.util.Properties;
@@ -99,7 +94,8 @@ public AsyncMsg<List<ReaderSource>> start(IConsumerRateLimiter streamFactory, bo
9994
* https://stackoverflow.com/questions/59978213/debezium-could-not-access-file-decoderbufs-using-postgres-11-with-default-plug
10095
*/
10196
debeziumProperties.put("plugin.name", getWALDecoderPluginName());
102-
DateTimeConverter.setDatetimeConverters(KingBaseDateTimeConverter.class.getName(), debeziumProperties);
97+
DateTimeConverter.setDatetimeConverters(
98+
KingBaseDateTimeConverter.class.getName(), debeziumProperties, dsFactory.getTimeZone().map(ZoneId::getId));
10399

104100
return dbs.getDbStream().map((dbname) -> {
105101

tis-incr/tis-flink-cdc-kingbase-plugin/src/main/java/com/qlangtech/plugins/incr/flink/cdc/pglike/KingBaseDateTimeConverter.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,7 @@
2121
import com.qlangtech.plugins.incr.flink.cdc.valconvert.DateTimeConverter;
2222

2323
import java.time.Instant;
24-
import java.time.LocalDate;
2524
import java.time.LocalDateTime;
26-
import java.time.ZoneId;
2725

2826
/**
2927
* @author: 百岁(baisui@qlangtech.com)
@@ -53,20 +51,23 @@ protected String convertDate(Object input) {
5351
protected String convertTime(Object input) {
5452
if (input != null) {
5553
System.out.println("convertTime:" + input.getClass());
54+
throw new UnsupportedOperationException(input.getClass().getName() + ",value:" + input);
55+
5656
}
5757
return null;
5858
}
5959

6060
@Override
6161
protected String convertDateTime(Object input) {
6262
if (input != null) {
63-
System.out.println("convertDateTime:" + input.getClass());
64-
throw new UnsupportedOperationException();
63+
// System.out.println("convertDateTime:" + input.getClass());
64+
// throw new UnsupportedOperationException();
65+
return datetimeFormatter.format(LocalDateTime.ofInstant((Instant) input, this.timestampZoneId));
6566
}
6667
return null;
6768
}
6869

69-
private static final ZoneId zof = ZoneId.of("Z");
70+
// private static final ZoneId zof = ZoneId.of("Z");
7071

7172
@Override
7273
protected String convertTimestamp(Object input) {
@@ -76,7 +77,7 @@ protected String convertTimestamp(Object input) {
7677
// }
7778

7879
// System.out.println("timestampZoneId:" + timestampZoneId);
79-
return timestampFormatter.format(LocalDateTime.ofInstant((Instant) input, zof));
80+
return timestampFormatter.format(LocalDateTime.ofInstant((Instant) input, this.timestampZoneId));
8081

8182
// System.out.println(">>>>convertTimestamp:" + input.getClass().getName());
8283
// System.out.println(">>>>convertTimestamp:" + input.getClass() + ",time:" + timestampFormatter.format((Instant) input));

tis-incr/tis-realtime-flink/src/main/java/com/qlangtech/plugins/incr/flink/launch/TISFlinkCDCStreamFactory.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,8 @@ protected boolean validateAll(IControlMsgHandler msgHandler, Context context, Po
328328
}
329329
}
330330

331+
plugin.checkUseable(TargetResName.createTargetName(msgHandler.getCollectionName()), true);
332+
331333

332334
return super.validateAll(msgHandler, context, postFormVals);
333335
}

tis-incr/tis-realtime-flink/src/main/java/com/qlangtech/plugins/incr/flink/launch/clustertype/ClusterType.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,17 @@ public ServerLaunchToken getLaunchToken(TargetResName collection) {
7474
}
7575

7676

77+
/**
78+
*
79+
* @param collection
80+
* @param deploying
81+
* @param requiredSlots
82+
* @return skip flowing check?
83+
*/
84+
protected boolean skipFlowingCheck(TargetResName collection, boolean deploying, int requiredSlots) {
85+
return false;
86+
}
87+
7788
/**
7889
*
7990
* @param collection
@@ -82,8 +93,10 @@ public ServerLaunchToken getLaunchToken(TargetResName collection) {
8293
*/
8394
public final void checkUseable(TargetResName collection, boolean deploying, int requiredSlots) throws TisException {
8495

96+
if (this.skipFlowingCheck(collection, deploying, requiredSlots)) {
97+
return;
98+
}
8599

86-
// String webInterfaceURL = null;
87100
try {
88101
try (ClusterClient restClient = createRestClusterClient()) {
89102
// webInterfaceURL = restClient.getWebInterfaceURL();

0 commit comments

Comments
 (0)