Skip to content

Commit 9dfdb6c

Browse files
committed
make PhysicsTable2LogicalTableMapper as a Serializeable interface
1 parent bf6f097 commit 9dfdb6c

File tree

10 files changed

+208
-187
lines changed

10 files changed

+208
-187
lines changed

tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/datax/format/guesstype/GuessOn.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public class GuessOn extends GuessFieldType {
7979
}
8080
tabName = p2lMapper.parseLogicalTableName(row.tabName);
8181
if (tabName == null) {
82-
throw new IllegalStateException("tableName can not be empty");
82+
throw new IllegalStateException("tableName:“" + row.tabName + "” can not be empty");
8383
}
8484
if ((priorityResult = result.get(tabName)) == null) {
8585
priorityResult = Maps.newHashMap();
@@ -91,11 +91,11 @@ public class GuessOn extends GuessFieldType {
9191
throw TisException.create("has not find any record, can not invoke guess field types process,priorityResult.size:"
9292
+ result.size() + ",lineIndex:" + lineIndex);
9393
}
94-
return result.entrySet().stream().collect(Collectors.toMap((e) -> e.getKey()
94+
return result.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey
9595
, (e) -> {
9696
return e.getValue().entrySet().stream().collect(
9797
Collectors.toMap(
98-
(col) -> col.getKey()
98+
Map.Entry::getKey
9999
, (col) -> {
100100
DataType type = null;
101101
if ((type = col.getValue().type) != null) {

tis-datax/tis-datax-doris-plugin/src/main/java/com/qlangtech/tis/plugin/ds/doris/DorisSourceFactory.java

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@
4949

5050
import java.io.IOException;
5151
import java.io.InputStream;
52-
import java.net.HttpURLConnection;
5352
import java.net.URL;
5453
import java.sql.ResultSet;
5554
import java.sql.SQLException;
@@ -246,13 +245,16 @@ protected boolean validateDSFactory(final IControlMsgHandler msgHandler, final C
246245
final DorisSourceFactory dorisDS = (DorisSourceFactory) dsFactory;
247246
final Duration socketReadTimeout = Duration.ofSeconds(5);
248247
for (String feLoadHost : dorisDS.getLoadUrls()) {
249-
//利用doris的clusterAction: https://doris.apache.org/zh-CN/docs/1.2/admin-manual/http-actions/fe/cluster-action
248+
249+
//利用doris的clusterAction: https://doris.apache.org/zh-CN/docs/4.x/admin-manual/open-api/fe-http/cluster-action
250250
// 对:{"msg":"success","code":0,"data":{"http":["192.168.28.200:8030"],"mysql":["192.168.28.200:9030"]},"count":0}
251251
StringBuffer clusterInfoApiUrl = new StringBuffer("http://");
252252
clusterInfoApiUrl.append(feLoadHost).append("/rest/v2/manager/cluster/cluster_info/conn_info");
253253

254254
try {
255-
Boolean success = HttpUtils.get(new URL(clusterInfoApiUrl.toString()), new PostFormStreamProcess<Boolean>() {
255+
Boolean success = HttpUtils.get(new URL(clusterInfoApiUrl.toString()) //
256+
, new PostFormStreamProcess<Boolean>( //
257+
ConfigFileContext.setAuthorizationHeader( dorisDS.getUserName(), dorisDS.getPassword())) { //
256258
@Override
257259
public ContentType getContentType() {
258260
return ContentType.JSON;
@@ -262,13 +264,7 @@ public ContentType getContentType() {
262264
public Duration getSocketReadTimeout() {
263265
return socketReadTimeout;
264266
}
265-
266-
@Override
267-
public void preSet(HttpURLConnection conn) throws IOException {
268-
super.preSet(conn);
269-
ConfigFileContext.StreamProcess.setAuthorization(conn, dorisDS.getUserName(), dorisDS.getPassword());
270-
}
271-
267+
272268
@Override
273269
public Boolean p(int status, InputStream stream, Map<String, List<String>> headerFields) {
274270
try {

tis-datax/tis-ds-mysql-plugin/src/test/java/com/qlangtech/tis/plugin/datax/TestDataxMySQLReader.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,8 @@ public void setUp() throws Exception {
9494
public void testPluginDescPropsRefs() {
9595
DataxMySQLReader mySQLReader = new DataxMySQLReader();
9696
Descriptor<DataxReader> descriptor = mySQLReader.getDescriptor();
97-
Map<String, Descriptor.PropsImplRefs> propsImplRefs = descriptor.getPropsImplRefs();
98-
Descriptor.PropsImplRefs dbNameRef = propsImplRefs.get(BasicDataXRdbmsWriter.KEY_DB_NAME_FIELD_NAME);
97+
Map<String, PluginExtraProps.FieldRefCreateor> propsImplRefs = descriptor.getPropsImplRefs();
98+
PluginExtraProps.FieldRefCreateor dbNameRef = propsImplRefs.get(BasicDataXRdbmsWriter.KEY_DB_NAME_FIELD_NAME);
9999
Assert.assertNotNull(
100100
BasicDataXRdbmsWriter.KEY_DB_NAME_FIELD_NAME + " relevant ref can not be null", dbNameRef);
101101

tis-incr/tis-flink-cdc-common/src/main/java/com/qlangtech/plugins/incr/flink/cdc/pglike/FlinkCDCPGLikeSourceFactory.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,10 @@ public BasePGLikeDescriptor() {
114114
this.debeziumProps = Lists.newArrayList(
115115
Pair.of((opts) -> {
116116
opts.add(FIELD_KEY_SLOT_NAME, getSoltNameField()
117-
, new OverwriteProps().setDftVal(new UnCacheString<>(() -> IPluginContext.getThreadLocalInstance().getCollectionName().getPipelineName())));
117+
, new OverwriteProps().setDftVal(new UnCacheString<>(() -> {
118+
return IPluginContext.getThreadLocalInstance().getCollectionName().getPipelineName();
119+
}))
120+
);
118121
}
119122
, (debeziumProperties, sourceFactory) -> {
120123
debeziumProperties.setProperty(getSoltNameField().name(), sourceFactory.slotName);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.qlangtech.plugins.incr.flink.cdc.kingbase;
2+
3+
import com.qlangtech.tis.plugin.common.PluginDesc;
4+
import com.qlangtech.tis.util.IPluginContext;
5+
import org.junit.Test;
6+
7+
/**
8+
*
9+
* @author 百岁 (baisui@qlangtech.com)
10+
* @date 2025/12/22
11+
*/
12+
public class FlinkCDCKingBaseSourceFactoryTest {
13+
14+
@Test
15+
public void testDescGenerate() throws Exception {
16+
IPluginContext.setPluginContext(IPluginContext.namedContext("test_pipeline"));
17+
PluginDesc.testDescGenerate(FlinkCDCKingBaseSourceFactory.class
18+
, "flink-cdc-kingbase-source-factory-descriptor.json");
19+
}
20+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
{
2+
"com.qlangtech.plugins.incr.flink.cdc.kingbase.FlinkCDCKingBaseSourceFactory":{
3+
"attrs":[
4+
{
5+
"describable":false,
6+
"eprops":{
7+
"dftVal":"decoderbufs",
8+
"enum":[
9+
{
10+
"label":"decoderbufs",
11+
"val":"decoderbufs"
12+
}
13+
],
14+
"help":"The name of the KingBase logical decoding plug-in installed on the server. Supported values are decoderbufs",
15+
"label":"解码器"
16+
},
17+
"key":"decodingPluginName",
18+
"ord":0,
19+
"pk":false,
20+
"required":true,
21+
"type":5
22+
},
23+
{
24+
"describable":false,
25+
"eprops":{
26+
"asyncHelp":true,
27+
"dftVal":"test_pipeline",
28+
"label":"Slot"
29+
},
30+
"key":"slotName",
31+
"ord":1,
32+
"pk":false,
33+
"required":true,
34+
"type":1
35+
},
36+
{
37+
"describable":false,
38+
"eprops":{
39+
"asyncHelp":true,
40+
"dftVal":false,
41+
"enum":[
42+
{
43+
"label":"",
44+
"val":true
45+
},
46+
{
47+
"label":"",
48+
"val":false
49+
}
50+
],
51+
"label":"Drop slot on stop"
52+
},
53+
"key":"dropSolt",
54+
"ord":2,
55+
"pk":false,
56+
"required":true,
57+
"type":5
58+
},
59+
{
60+
"describable":false,
61+
"eprops":{
62+
"asyncHelp":true,
63+
"dftVal":"latest",
64+
"enum":[
65+
{
66+
"label":"Latest",
67+
"val":"latest"
68+
},
69+
{
70+
"label":"Initial",
71+
"val":"initial"
72+
}
73+
],
74+
"label":"起始位点"
75+
},
76+
"key":"startupOptions",
77+
"ord":3,
78+
"pk":false,
79+
"required":true,
80+
"type":5
81+
},
82+
{
83+
"describable":true,
84+
"descriptors":{
85+
"com.qlangtech.plugins.incr.flink.cdc.pglike.replica.DefaultPGLikeReplicaIdentity":{
86+
"attrs":[
87+
88+
],
89+
"containAdvance":false,
90+
"displayName":"DEFAULT",
91+
"extendPoint":"com.qlangtech.plugins.incr.flink.cdc.pglike.PGLikeReplicaIdentity",
92+
"extractProps":{
93+
94+
},
95+
"impl":"com.qlangtech.plugins.incr.flink.cdc.pglike.replica.DefaultPGLikeReplicaIdentity",
96+
"implUrl":"http://tis.pub/docs/plugin/plugins/#comqlangtechpluginsincrflinkcdcpglikereplicadefaultpglikereplicaidentity",
97+
"veriflable":false
98+
},
99+
"com.qlangtech.plugins.incr.flink.cdc.pglike.replica.FullPGLikeReplicaIdentity":{
100+
"attrs":[
101+
102+
],
103+
"containAdvance":false,
104+
"displayName":"FULL",
105+
"extendPoint":"com.qlangtech.plugins.incr.flink.cdc.pglike.PGLikeReplicaIdentity",
106+
"extractProps":{
107+
108+
},
109+
"impl":"com.qlangtech.plugins.incr.flink.cdc.pglike.replica.FullPGLikeReplicaIdentity",
110+
"implUrl":"http://tis.pub/docs/plugin/plugins/#comqlangtechpluginsincrflinkcdcpglikereplicafullpglikereplicaidentity",
111+
"veriflable":false
112+
}
113+
},
114+
"eprops":{
115+
"asyncHelp":true,
116+
"dftVal":"FULL",
117+
"label":"Replica Rule"
118+
},
119+
"extendPoint":"com.qlangtech.plugins.incr.flink.cdc.pglike.PGLikeReplicaIdentity",
120+
"extensible":true,
121+
"key":"replicaRule",
122+
"ord":4,
123+
"pk":false,
124+
"required":true,
125+
"type":1
126+
},
127+
{
128+
"advance":true,
129+
"describable":false,
130+
"eprops":{
131+
"enumMode":"multiple",
132+
"help":"可以将数据流中将某一种事件类型的事件过滤掉,有以下几种类型可以选择:INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE",
133+
"label":"过滤"
134+
},
135+
"key":"filterRowKind",
136+
"ord":10,
137+
"pk":false,
138+
"required":false,
139+
"type":5
140+
}
141+
],
142+
"containAdvance":true,
143+
"displayName":"Flink-CDC-KingBase",
144+
"extendPoint":"com.qlangtech.tis.async.message.client.consumer.impl.MQListenerFactory",
145+
"extractProps":{
146+
"endType":"kingbase",
147+
"endTypeDesc":"人大金仓国产数据库,官网:https://www.kingbase.com.cn/",
148+
"extendSelectedTabProp":false,
149+
"helpPath":"docs/integrations/data/KingBase/#实时读",
150+
"supportIcon":true,
151+
"targetType":"kingbase"
152+
},
153+
"impl":"com.qlangtech.plugins.incr.flink.cdc.kingbase.FlinkCDCKingBaseSourceFactory",
154+
"implUrl":"http://tis.pub/docs/plugin/plugins/#comqlangtechpluginsincrflinkcdckingbaseflinkcdckingbasesourcefactory",
155+
"veriflable":false
156+
}
157+
}

tis-incr/tis-realtime-flink/src/main/java/com/qlangtech/plugins/incr/flink/launch/CheckpointFactory.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,26 @@
2020

2121
import com.qlangtech.tis.annotation.Public;
2222
import com.qlangtech.tis.extension.Describable;
23+
import com.qlangtech.tis.extension.TISExtensible;
2324
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
2425

2526
/**
2627
* @author: 百岁(baisui@qlangtech.com)
2728
* @create: 2022-03-01 16:15
2829
* @see com.qlangtech.plugins.incr.flink.launch.ckpt.CKOff
29-
* @see com.qlangtech.plugins.incr.flink.launch.ckpt.CKOn
30+
* //@see com.qlangtech.plugins.incr.flink.launch.ckpt.CKOn
3031
**/
32+
@TISExtensible
3133
@Public
3234
public abstract class CheckpointFactory implements Describable<CheckpointFactory> {
3335

36+
/**
37+
* 是否已经开启
38+
*
39+
* @return
40+
*/
41+
public abstract boolean isOn();
42+
3443
public abstract void setProps(StreamExecutionEnvironment env);
3544

3645
}

tis-incr/tis-realtime-flink/src/main/java/com/qlangtech/plugins/incr/flink/launch/TISFlinkCDCStreamFactory.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121

2222
import com.alibaba.citrus.turbine.Context;
23-
import com.qlangtech.plugins.incr.flink.launch.ckpt.CKOn;
2423
import com.qlangtech.plugins.incr.flink.launch.clustertype.ClusterType;
2524
import com.qlangtech.tis.annotation.Public;
2625
import com.qlangtech.tis.config.k8s.ReplicasSpec;
@@ -149,7 +148,7 @@ public Integer getParallelism() {
149148
}
150149

151150
private boolean isCheckpointEnable() {
152-
return checkpoint instanceof CKOn;
151+
return this.checkpoint.isOn();
153152
}
154153

155154
@Override

tis-incr/tis-realtime-flink/src/main/java/com/qlangtech/plugins/incr/flink/launch/ckpt/CKOff.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,11 @@ public void setProps(StreamExecutionEnvironment env) {
3535

3636
}
3737

38+
@Override
39+
public boolean isOn() {
40+
return false;
41+
}
42+
3843
@TISExtension()
3944
public static class DefaultDescriptor extends Descriptor<CheckpointFactory> {
4045
@Override

0 commit comments

Comments
 (0)