Skip to content

Commit c01090b

Browse files
committed
add kingbase connector datavane/tis#408
1 parent ec28030 commit c01090b

File tree

45 files changed

+1126
-544
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+1126
-544
lines changed

tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/datax/common/impl/ParamsAutoCreateTable.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
* @author: 百岁(baisui@qlangtech.com)
3131
* @create: 2024-12-14 11:32
3232
**/
33-
public abstract class ParamsAutoCreateTable<COL_WRAPPER extends ColWrapper> extends AutoCreateTable<COL_WRAPPER> {
33+
public abstract class ParamsAutoCreateTable<COL_WRAPPER extends ColWrapper> extends AutoCreateTable<COL_WRAPPER> {
3434
/**
3535
* 添加列注释
3636
*/
@@ -62,5 +62,7 @@ public DftDesc() {
6262
public final String getDisplayName() {
6363
return SWITCH_ON;
6464
}
65+
66+
6567
}
6668
}

tis-datax/tis-datax-kingbase-plugin/src/main/java/com/qlangtech/tis/plugin/datax/kingbase/KingBaseCompatibleMode.java renamed to tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/datax/kingbase/KingBaseCompatibleMode.java

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.qlangtech.tis.datax.SourceColMetaGetter;
2323
import com.qlangtech.tis.datax.impl.DataxWriter;
2424
import com.qlangtech.tis.extension.Describable;
25+
import com.qlangtech.tis.extension.TISExtensible;
2526
import com.qlangtech.tis.plugin.IEndTypeGetter.EndType;
2627
import com.qlangtech.tis.plugin.annotation.FormField;
2728
import com.qlangtech.tis.plugin.annotation.FormFieldType;
@@ -37,20 +38,48 @@
3738
/**
3839
* @author: 百岁(baisui@qlangtech.com)
3940
* @create: 2025-01-14 15:23
40-
* @see com.qlangtech.tis.plugin.datax.kingbase.mode.MySQLMode
41-
* @see com.qlangtech.tis.plugin.datax.kingbase.mode.OracleMode
42-
* @see com.qlangtech.tis.plugin.datax.kingbase.mode.PGMode
41+
* // @see com.qlangtech.tis.plugin.datax.kingbase.mode.MySQLMode
42+
* // @see com.qlangtech.tis.plugin.datax.kingbase.mode.OracleMode
43+
* // @see com.qlangtech.tis.plugin.datax.kingbase.mode.PGMode
4344
**/
45+
@TISExtensible
4446
public abstract class KingBaseCompatibleMode implements Describable<KingBaseCompatibleMode>, Serializable {
4547

4648
public abstract EndType getEndType();
4749

50+
public boolean isEndTypeMatch(final String dbMode) {
51+
// final String dbMode = result.getString("database_mode");
52+
EndType expectEndType = null;
53+
switch (dbMode) {
54+
case "oracle":
55+
expectEndType = EndType.Oracle;
56+
break;
57+
case "mysql":
58+
expectEndType = EndType.MySQL;
59+
break;
60+
case "pg":
61+
expectEndType = EndType.Postgres;
62+
break;
63+
default:
64+
throw new IllegalStateException("unsupported dbMode:" + dbMode);
65+
}
66+
return this.getEndType() == expectEndType;
67+
}
68+
69+
4870
public abstract Optional<String> getEscapeChar();
4971

5072
@FormField(ordinal = 10, type = FormFieldType.ENUM, validate = {Validator.require})
5173
// 目标源中是否自动创建表,这样会方便不少
5274
public AutoCreateTable autoCreateTable;
5375

76+
/**
77+
* 需要是继承 <b>com.dtstack.chunjun.connector.jdbc.dialect.JdbcDialect</b> 的类
78+
*
79+
* @return
80+
*/
81+
public abstract Class<?> getJdbcDialectClass();
82+
5483
public final CreateTableSqlBuilder<ColWrapper> createSQLDDLBuilder(
5584
DataxWriter rdbmsWriter, SourceColMetaGetter sourceColMetaGetter, TableMap tableMapper, Optional<RecordTransformerRules> transformers) {
5685
return autoCreateTable.createSQLDDLBuilder(rdbmsWriter, sourceColMetaGetter, tableMapper, transformers);

tis-datax/tis-datax-kingbase-plugin/src/main/resources/com/qlangtech/tis/plugin/datax/kingbase/KingBaseCompatibleMode.json renamed to tis-datax/tis-datax-common-plugin/src/main/resources/com/qlangtech/tis/plugin/datax/kingbase/KingBaseCompatibleMode.json

File renamed without changes.
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.qlangtech.tis.plugin.datax.kingbase;
20+
21+
import com.qlangtech.tis.extension.Describable;
22+
23+
import java.io.Serializable;
24+
import java.util.Properties;
25+
26+
/**
27+
* https://bbs.kingbase.com.cn/docHtml?recId=218c307e5f3d72bf20bb84a51859344a&url=aHR0cHM6Ly9iYnMua2luZ2Jhc2UuY29tLmNuL2tpbmdiYXNlLWRvYy92OS4xLjEuMjQvZmFxL2ZhcS1uZXcvaW50ZXJmYWNlL2pkYmMuaHRtbCNpZDQ
28+
*
29+
* @author: 百岁(baisui@qlangtech.com)
30+
* @create: 2025-02-06 18:34
31+
**/
32+
public abstract class KingBaseDispatch implements Describable<KingBaseDispatch>, Serializable {
33+
public abstract void extractSetJdbcProps(Properties props);
34+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.qlangtech.tis.plugin.datax.kingbase.dispatch;
20+
21+
import com.kingbase8.KBProperty;
22+
import com.qlangtech.tis.extension.Descriptor;
23+
import com.qlangtech.tis.extension.TISExtension;
24+
import com.qlangtech.tis.plugin.datax.kingbase.KingBaseDispatch;
25+
26+
import java.util.Properties;
27+
28+
/**
29+
* @author: 百岁(baisui@qlangtech.com)
30+
* @create: 2025-02-06 18:36
31+
**/
32+
public class Off extends KingBaseDispatch {
33+
@Override
34+
public void extractSetJdbcProps(Properties props) {
35+
props.setProperty(KBProperty.USEDISPATCH.getName(), String.valueOf(Boolean.FALSE));
36+
}
37+
38+
@TISExtension
39+
public static class DftDesc extends Descriptor<KingBaseDispatch> {
40+
public DftDesc() {
41+
super();
42+
}
43+
44+
@Override
45+
public String getDisplayName() {
46+
return SWITCH_OFF;
47+
}
48+
}
49+
}
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.qlangtech.tis.plugin.datax.kingbase.dispatch;
20+
21+
import com.alibaba.citrus.turbine.Context;
22+
import com.google.common.collect.Lists;
23+
import com.kingbase8.KBProperty;
24+
import com.qlangtech.tis.extension.Descriptor;
25+
import com.qlangtech.tis.extension.TISExtension;
26+
import com.qlangtech.tis.plugin.annotation.FormField;
27+
import com.qlangtech.tis.plugin.annotation.FormFieldType;
28+
import com.qlangtech.tis.plugin.annotation.Validator;
29+
import com.qlangtech.tis.plugin.datax.kingbase.KingBaseDispatch;
30+
import com.qlangtech.tis.realtime.utils.NetUtils;
31+
import com.qlangtech.tis.runtime.module.misc.IFieldErrorHandler;
32+
import com.qlangtech.tis.runtime.module.misc.impl.AdapterFieldErrorHandler;
33+
import org.apache.commons.io.IOUtils;
34+
import org.apache.commons.io.LineIterator;
35+
import org.apache.commons.lang.StringUtils;
36+
37+
import java.io.StringReader;
38+
import java.util.List;
39+
import java.util.Properties;
40+
import java.util.concurrent.atomic.AtomicInteger;
41+
import java.util.stream.Collectors;
42+
43+
/**
44+
* @author: 百岁(baisui@qlangtech.com)
45+
* @create: 2025-02-06 18:35
46+
**/
47+
public class On extends KingBaseDispatch {
48+
49+
@FormField(ordinal = 0, type = FormFieldType.TEXTAREA, validate = {Validator.require})
50+
public String slavers;
51+
52+
//LOADRATE
53+
@FormField(ordinal = 1, type = FormFieldType.INT_NUMBER, validate = {Validator.require, Validator.integer})
54+
public Integer loadRate;
55+
56+
57+
@Override
58+
public void extractSetJdbcProps(Properties props) {
59+
60+
List<KingBaseSlaver> nodes = parseSlaverNodes();
61+
62+
props.setProperty(KBProperty.USEDISPATCH.getName(), String.valueOf(Boolean.TRUE));
63+
props.setProperty(KBProperty.SLAVE_ADD.getName()
64+
, nodes.stream().map((node) -> node.addr).collect(Collectors.joining(",")));
65+
props.setProperty(KBProperty.SLAVE_PORT.getName()
66+
, nodes.stream().map((node) -> String.valueOf(node.port)).collect(Collectors.joining(",")));
67+
props.setProperty(KBProperty.HOSTLOADRATE.getName(), String.valueOf(this.loadRate));
68+
props.setProperty(KBProperty.USECONNECT_POOL.getName(), String.valueOf(Boolean.FALSE));
69+
}
70+
71+
private List<KingBaseSlaver> parseSlaverNodes() {
72+
List<KingBaseSlaver> nodes = Lists.newArrayList();
73+
String[] tuple;
74+
try (StringReader lines = (new StringReader(this.slavers))) {
75+
LineIterator lineIt = IOUtils.lineIterator(lines);
76+
while (lineIt.hasNext()) {
77+
final String host = lineIt.nextLine();
78+
tuple = StringUtils.split(host, ":");
79+
if (tuple.length != 2) {
80+
throw new IllegalStateException("illegal host address:" + host);
81+
}
82+
nodes.add(new KingBaseSlaver(tuple[0], Integer.parseInt(tuple[1])));
83+
}
84+
}
85+
return nodes;
86+
}
87+
88+
private static class KingBaseSlaver {
89+
private final String addr;
90+
private final Integer port;
91+
92+
public KingBaseSlaver(String addr, Integer port) {
93+
this.addr = addr;
94+
this.port = port;
95+
}
96+
}
97+
98+
@TISExtension
99+
public static class DftDesc extends Descriptor<KingBaseDispatch> {
100+
public DftDesc() {
101+
super();
102+
}
103+
104+
public boolean validateLoadRate(IFieldErrorHandler msgHandler, Context context, String fieldName, String val) {
105+
int rate = Integer.parseInt(val);
106+
if (rate > 100 || rate < 0) {
107+
msgHandler.addFieldError(context, fieldName, "必须为1到100之间");
108+
return false;
109+
}
110+
return true;
111+
}
112+
113+
public boolean validateSlavers(IFieldErrorHandler msgHandler, Context context, String fieldName, String val) {
114+
final AtomicInteger lineNum = new AtomicInteger(1);
115+
try (StringReader lines = (new StringReader(val))) {
116+
LineIterator lineIt = IOUtils.lineIterator(lines);
117+
while (lineIt.hasNext()) {
118+
final String host = lineIt.nextLine();
119+
if (!Validator.host.validate(new AdapterFieldErrorHandler(msgHandler) {
120+
@Override
121+
public void addFieldError(Context context, String fieldName, String msg, Object... params) {
122+
super.addFieldError(context, fieldName, "第" + lineNum.get() + "行," + msg, params);
123+
}
124+
}, context, fieldName, host)) {
125+
return false;
126+
}
127+
if (!NetUtils.isReachable(host)) {
128+
msgHandler.addFieldError(context, fieldName, "第" + lineNum.get() + "行host:" + host + "不可连通");
129+
return false;
130+
}
131+
lineNum.incrementAndGet();
132+
}
133+
} catch (Exception e) {
134+
throw new RuntimeException(e);
135+
}
136+
137+
return true;
138+
}
139+
140+
@Override
141+
public String getDisplayName() {
142+
return SWITCH_ON;
143+
}
144+
}
145+
}

0 commit comments

Comments
 (0)