Skip to content

Commit 2b066d2

Browse files
committed
make SqlServer reader splitTableStrategy enable
1 parent c01090b commit 2b066d2

File tree

18 files changed

+262
-411
lines changed

18 files changed

+262
-411
lines changed

tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/datax/common/RdbmsReaderContext.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package com.qlangtech.tis.plugin.datax.common;
2020

2121
import com.qlangtech.tis.datax.IDataxReaderContext;
22+
import com.qlangtech.tis.plugin.datax.common.RdbmsReaderContext.ISplitTableContext;
2223
import com.qlangtech.tis.plugin.ds.DataSourceFactory;
2324
import com.qlangtech.tis.plugin.ds.IDataSourceDumper;
2425
import org.apache.commons.lang.StringUtils;
@@ -90,4 +91,9 @@ public String getTaskName() {
9091
}
9192

9293

94+
public interface ISplitTableContext {
95+
public boolean isSplitTable();
96+
97+
public String getSplitTabs();
98+
}
9399
}

tis-datax/tis-datax-sqlserver-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXSqlserverWriter.java

Lines changed: 13 additions & 151 deletions
Original file line numberDiff line numberDiff line change
@@ -18,25 +18,18 @@
1818

1919
package com.qlangtech.tis.plugin.datax;
2020

21+
import com.alibaba.citrus.turbine.Context;
2122
import com.qlangtech.tis.annotation.Public;
2223
import com.qlangtech.tis.datax.IDataxContext;
2324
import com.qlangtech.tis.datax.IDataxProcessor;
24-
import com.qlangtech.tis.datax.SourceColMetaGetter;
2525
import com.qlangtech.tis.extension.TISExtension;
2626
import com.qlangtech.tis.extension.impl.IOUtils;
27-
import com.qlangtech.tis.plugin.datax.CreateTableSqlBuilder.ColWrapper;
2827
import com.qlangtech.tis.plugin.datax.common.BasicDataXRdbmsWriter;
2928
import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules;
30-
import com.qlangtech.tis.plugin.ds.ColumnMetaData;
31-
import com.qlangtech.tis.plugin.ds.DataSourceFactory.ISchemaSupported;
32-
import com.qlangtech.tis.plugin.ds.DataType;
33-
import com.qlangtech.tis.plugin.ds.IColMetaGetter;
3429
import com.qlangtech.tis.plugin.ds.sqlserver.SqlServerDatasourceFactory;
35-
import org.apache.commons.lang.StringUtils;
30+
import com.qlangtech.tis.runtime.module.misc.IControlMsgHandler;
3631

37-
import java.util.List;
3832
import java.util.Optional;
39-
import java.util.stream.Collectors;
4033

4134
/**
4235
* @author: baisui 百岁
@@ -49,12 +42,6 @@ public class DataXSqlserverWriter extends BasicDataXRdbmsWriter<SqlServerDatasou
4942
public static String getDftTemplate() {
5043
return IOUtils.loadResourceFromClasspath(DataXSqlserverWriter.class, "DataXSqlserverWriter-tpl.json");
5144
}
52-
53-
// @Override
54-
// public void initWriterTable(String targetTabName, List<String> jdbcUrls) throws Exception {
55-
// InitWriterTable.process(this.dataXName, targetTabName, jdbcUrls);
56-
// }
57-
5845
@Override
5946
public IDataxContext getSubTask(Optional<IDataxProcessor.TableMap> tableMap, Optional<RecordTransformerRules> transformerRules) {
6047
if (!tableMap.isPresent()) {
@@ -65,142 +52,6 @@ public IDataxContext getSubTask(Optional<IDataxProcessor.TableMap> tableMap, Opt
6552
}
6653

6754

68-
// @Override
69-
// public CreateTableSqlBuilder.CreateDDL generateCreateDDL(
70-
// SourceColMetaGetter sourceColMetaGetter,
71-
// IDataxProcessor.TableMap tableMapper, Optional<RecordTransformerRules> transformers) {
72-
//// if (!this.autoCreateTable) {
73-
//// return null;
74-
//// }
75-
//
76-
// // https://www.cnblogs.com/mingfei200169/articles/427591.html
77-
// final CreateTableSqlBuilder createTableSqlBuilder
78-
// = new CreateTableSqlBuilder<ColWrapper>(tableMapper, this.getDataSourceFactory(), transformers) {
79-
//
80-
// private boolean isMulitPks() {
81-
// return this.pks.size() > 1;
82-
// }
83-
//
84-
// private String convertType(DataType type, boolean isPk) {
85-
// //https://www.cnblogs.com/liberty777/p/10748570.html
86-
// StringBuffer createSql = new StringBuffer(getSqlServerType(type));
87-
//
88-
// if (!this.isMulitPks() && isPk) {
89-
// createSql.append(" primary key ");
90-
// }
91-
// return createSql.toString();
92-
// }
93-
//
94-
// @Override
95-
// protected ColWrapper createColWrapper(IColMetaGetter c) {
96-
// return new ColWrapper(c, this.pks) {
97-
// @Override
98-
// public String getMapperType() {
99-
// return convertType(this.getType(), this.isPk());
100-
// }
101-
// };
102-
// }
103-
//
104-
// private String getSqlServerType(DataType type) {
105-
//
106-
// switch (type.getJdbcType()) {
107-
// case INTEGER:
108-
// case TINYINT:
109-
// case SMALLINT:
110-
// return "int";
111-
// case BIGINT:
112-
// return "bigint";
113-
// case FLOAT:
114-
// case DOUBLE:
115-
// case DECIMAL:
116-
// return "decimal(" + type.getColumnSize() + ", " + type.getDecimalDigits() + ")";
117-
// case DATE:
118-
// return "date";
119-
// case TIME:
120-
// case TIMESTAMP:
121-
// return "datetime";
122-
// case BIT:
123-
// case BOOLEAN:
124-
// return "bit";
125-
// case BLOB:
126-
// case BINARY:
127-
// case LONGVARBINARY:
128-
// case VARBINARY:
129-
// //https://learn.microsoft.com/en-us/sql/t-sql/data-types/binary-and-varbinary-transact-sql?view=sql-server-ver16
130-
// // Variable-length binary data. n can be a value from 1 through 8,000.
131-
// // type.columnSize 可能为0 所以要用Math.max() 调整一下
132-
// return "varbinary(" + Math.min(Math.max(type.getColumnSize(), 300), 8000) + ")";
133-
// case LONGVARCHAR:
134-
// return "text";
135-
// default:
136-
// return "varchar(" + type.getColumnSize() + ")";
137-
// }
138-
// }
139-
//
140-
//
141-
// @Override
142-
// protected void appendExtraColDef(List<String> pk) {
143-
// if (this.isMulitPks()) {
144-
// /**
145-
// * 建表语句中不能有超过一个列的修饰符为 “primary key”
146-
// * <pre>
147-
// * CREATE TABLE "base"
148-
// * (
149-
// * "base_id" int primary key ,
150-
// * "start_time" datetime,
151-
// * "update_date" datetime primary key ,
152-
// * "update_time" datetime,
153-
// * "price" decimal(5, 2),
154-
// * "json_content" varchar(2000),
155-
// * "col_blob" varbinary(8000),
156-
// * "col_text" text
157-
// * )
158-
// * </pre>
159-
// * 应该改为:
160-
// * <pre>
161-
// * CREATE TABLE "base"
162-
// * (
163-
// * "base_id" int ,
164-
// * "start_time" datetime,
165-
// * "update_date" datetime ,
166-
// * "update_time" datetime,
167-
// * "price" decimal(5, 2),
168-
// * "json_content" varchar(2000),
169-
// * "col_blob" varbinary(8000),
170-
// * "col_text" text
171-
// * ,PRIMARY KEY ( "base_id" , "update_date")
172-
// * )
173-
// * </pre>
174-
// */
175-
// script.appendLine(",PRIMARY KEY ( " + pk.stream().map((key) -> this.dsMeta.getEscapedEntity(key)).collect(Collectors.joining(",")) + " )");
176-
// }
177-
// }
178-
//
179-
// @Override
180-
// protected void appendTabMeta(List<String> pk) {
181-
// if (autoCreateTable.enabledColumnComment()) {
182-
// final ISchemaSupported schemaSupported = (ISchemaSupported) dsMeta;
183-
// for (ColWrapper col : this.getCols()) {
184-
//
185-
// ColumnMetaData colMeta = sourceColMetaGetter.getColMeta(tableMapper, col.getName());
186-
// if (colMeta != null && StringUtils.isNotEmpty(colMeta.getComment())) {
187-
// script.blockBody(true, new String[]{StringUtils.EMPTY, ";"}
188-
// , "EXEC sp_addextendedproperty", (buffer) -> {
189-
// buffer.appendLine("@name = N'MS_Description'");
190-
// buffer.appendLine("@value = '" + colMeta.getComment() + "'");
191-
// buffer.appendLine("@level0type = N'Schema', @level0name = '" + schemaSupported.getDBSchema() + "',");
192-
// buffer.appendLine("@level1type = N'Table', @level1name = '" + tableMapper.getTo() + "',");
193-
// buffer.appendLine("@level2type = N'Column', @level2name = '" + col.getName() + "'");
194-
// });
195-
// }
196-
// }
197-
// }
198-
// }
199-
// };
200-
//
201-
// return createTableSqlBuilder.build();
202-
// }
203-
20455
@TISExtension()
20556
public static class DefaultDescriptor extends RdbmsWriterDescriptor {
20657
public DefaultDescriptor() {
@@ -217,6 +68,17 @@ public EndType getEndType() {
21768
return EndType.SqlServer;
21869
}
21970

71+
@Override
72+
protected boolean validatePostForm(IControlMsgHandler msgHandler, Context context, BasicDataXRdbmsWriter form) {
73+
DataXSqlserverWriter dataxWriter = (DataXSqlserverWriter) form;
74+
SqlServerDatasourceFactory dsFactory = dataxWriter.getDataSourceFactory();
75+
if (dsFactory.splitTableStrategy.isSplittable()) {
76+
msgHandler.addFieldError(context, KEY_DB_NAME_FIELD_NAME, "Writer端不能使用带有分表策略的数据源");
77+
return false;
78+
}
79+
return super.validatePostForm(msgHandler, context, dataxWriter);
80+
}
81+
22082
@Override
22183
public boolean isSupportTabCreate() {
22284
return true;

tis-datax/tis-datax-sqlserver-plugin/src/main/java/com/qlangtech/tis/plugin/datax/SqlServerReaderContext.java

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,44 @@
11
/**
2-
* Licensed to the Apache Software Foundation (ASF) under one
3-
* or more contributor license agreements. See the NOTICE file
4-
* distributed with this work for additional information
5-
* regarding copyright ownership. The ASF licenses this file
6-
* to you under the Apache License, Version 2.0 (the
7-
* "License"); you may not use this file except in compliance
8-
* with the License. You may obtain a copy of the License at
9-
*
10-
* http://www.apache.org/licenses/LICENSE-2.0
11-
*
12-
* Unless required by applicable law or agreed to in writing, software
13-
* distributed under the License is distributed on an "AS IS" BASIS,
14-
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15-
* See the License for the specific language governing permissions and
16-
* limitations under the License.
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
1717
*/
1818

1919
package com.qlangtech.tis.plugin.datax;
2020

2121
import com.qlangtech.tis.plugin.datax.common.RdbmsReaderContext;
22+
import com.qlangtech.tis.plugin.datax.common.RdbmsReaderContext.ISplitTableContext;
2223
import com.qlangtech.tis.plugin.ds.IDataSourceDumper;
24+
import com.qlangtech.tis.plugin.ds.SplitTableStrategy;
2325
import com.qlangtech.tis.plugin.ds.sqlserver.SqlServerDatasourceFactory;
2426

27+
import java.util.List;
28+
import java.util.Objects;
29+
2530

2631
/**
2732
* @author: 百岁([email protected]
2833
* @create: 2021-06-06 14:59
2934
**/
30-
public class SqlServerReaderContext extends RdbmsReaderContext<DataXSqlserverReader, SqlServerDatasourceFactory> {
35+
public class SqlServerReaderContext
36+
extends RdbmsReaderContext<DataXSqlserverReader, SqlServerDatasourceFactory> implements ISplitTableContext {
37+
private final SplitTableStrategy splitTableStrategy;
38+
3139
public SqlServerReaderContext(String jobName, String sourceTableName, IDataSourceDumper dumper, DataXSqlserverReader reader) {
3240
super(jobName, sourceTableName, dumper, reader);
41+
this.splitTableStrategy = Objects.requireNonNull(dsFactory.splitTableStrategy, "splitTableStrategy can not be null");
3342
}
3443

3544
@Override
@@ -61,6 +70,18 @@ public int getFetchSize() {
6170
return this.plugin.fetchSize;
6271
}
6372

73+
74+
@Override
75+
public boolean isSplitTable() {
76+
return splitTableStrategy.isSplittable();
77+
}
78+
79+
@Override
80+
public String getSplitTabs() {
81+
List<String> allPhysicsTabs = this.splitTableStrategy.getAllPhysicsTabs(dsFactory, this.getJdbcUrl(), this.sourceTableName);
82+
return getEntitiesWithQuotation(allPhysicsTabs);
83+
}
84+
6485
public static void main(String[] args) {
6586

6687

tis-datax/tis-datax-sqlserver-plugin/src/main/java/com/qlangtech/tis/plugin/ds/sqlserver/SqlServerDatasourceFactory.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,23 @@
2020

2121
import com.alibaba.citrus.turbine.Context;
2222
import com.qlangtech.tis.annotation.Public;
23+
import com.qlangtech.tis.datax.DataXJobSubmit;
2324
import com.qlangtech.tis.plugin.annotation.FormField;
2425
import com.qlangtech.tis.plugin.annotation.FormFieldType;
2526
import com.qlangtech.tis.plugin.annotation.Validator;
2627
import com.qlangtech.tis.plugin.ds.BasicDataSourceFactory;
2728
import com.qlangtech.tis.plugin.ds.DBConfig;
2829
import com.qlangtech.tis.plugin.ds.JDBCConnection;
30+
import com.qlangtech.tis.plugin.ds.SplitTableStrategy;
31+
import com.qlangtech.tis.plugin.ds.TableInDB;
2932
import com.qlangtech.tis.runtime.module.misc.IFieldErrorHandler;
3033
import com.qlangtech.tis.sql.parser.tuple.creator.EntityName;
3134
import org.apache.commons.lang.StringUtils;
3235

3336
import java.sql.ResultSet;
3437
import java.sql.SQLException;
38+
import java.util.List;
39+
import java.util.Objects;
3540
import java.util.Optional;
3641
import java.util.Properties;
3742
import java.util.Set;
@@ -47,6 +52,11 @@ public abstract class SqlServerDatasourceFactory extends BasicDataSourceFactory
4752
private static final String DS_TYPE_SQL_SERVER = "SqlServer";
4853
@FormField(ordinal = 4, type = FormFieldType.INPUTTEXT, validate = {Validator.require, Validator.db_col_name})
4954
public String tabSchema;
55+
/**
56+
* 分表策略
57+
*/
58+
@FormField(ordinal = 1, validate = {Validator.require})
59+
public SplitTableStrategy splitTableStrategy;
5060

5161
@Override
5262
public String buidJdbcUrl(DBConfig db, String ip, String dbName) {
@@ -57,6 +67,35 @@ public String buidJdbcUrl(DBConfig db, String ip, String dbName) {
5767
return jdbcUrl;
5868
}
5969

70+
@Override
71+
protected TableInDB createTableInDB() {
72+
return Objects.requireNonNull(this.splitTableStrategy, "SqlServer DataSourceFactory:" + this.identityValue() + " "
73+
+ "relevant prop splitTableStrategy can not be null").createTableInDB(this);
74+
}
75+
76+
@Override
77+
public List<String> getAllPhysicsTabs(DataXJobSubmit.TableDataXEntity tabEntity) {
78+
// return super.getAllPhysicsTabs(tabEntity);
79+
return this.splitTableStrategy.getAllPhysicsTabs(this, tabEntity);
80+
}
81+
82+
@Override
83+
protected EntityName logicTable2PhysicsTable(String jdbcUrl, EntityName table) {
84+
if (table.isPhysics()) {
85+
return table;
86+
}
87+
// return super.logicTable2PhysicsTable(table);
88+
SplitTableStrategy.DBPhysicsTable physicsTable = Objects.requireNonNull(this.splitTableStrategy,
89+
"splitTableStrategy can not be null").getMatchedPhysicsTable(this, jdbcUrl, table);
90+
return physicsTable.getPhysicsTab();
91+
}
92+
93+
@Override
94+
protected String getNodeDesc() {
95+
return Objects.requireNonNull(
96+
this.splitTableStrategy, "splitTableStrategy can not be null").getNodeDesc();
97+
}
98+
6099
@Override
61100
protected CreateColumnMeta createColumnMetaBuilder(
62101
EntityName table, ResultSet columns1, Set<String> pkCols, JDBCConnection conn) {

tis-datax/tis-datax-sqlserver-plugin/src/main/resources/com/qlangtech/tis/plugin/datax/DataXSqlserverReader-tpl.json

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,17 @@
1414
"jdbcUrl": [
1515
"${reader.jdbcUrl}"
1616
],
17+
#if($reader.splitTable)
18+
"table": [${reader.splitTabs}]
19+
#else
1720
"querySql": [
1821
"SELECT ${reader.cols} FROM ${reader.sourceEntityName} #if($reader.containWhere) WHERE ${reader.where} #end"
1922
]
23+
#end
2024
}
2125
]
26+
#if($reader.splitTable)
27+
,"column": [${reader.colsQuotes}]
28+
#end
2229
}
2330
}

0 commit comments

Comments
 (0)