Skip to content

Commit 7213958

Browse files
committed
add pre validating process for postgreSQL cdc launching:datavane/tis#470
1 parent 115f51f commit 7213958

File tree

12 files changed

+2362
-80
lines changed

12 files changed

+2362
-80
lines changed

requirment/add-validator-before-pg-cdc-launching.md

Lines changed: 1466 additions & 0 deletions
Large diffs are not rendered by default.

tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DefaultDataxProcessor.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,10 @@ public Optional<IPluginStore<DefaultDataXProcessorManipulate>> getManipulateStor
242242
if (appName == null) {
243243
return Optional.empty();
244244
}
245-
return Optional.of(DefaultDataXProcessorManipulate.loadPlugins(null, DefaultDataXProcessorManipulate.class, appName).getValue());
245+
246+
return Optional.of(DefaultDataXProcessorManipulate.getPluginStore(null, appName));
247+
248+
// return Optional.of(DefaultDataXProcessorManipulate.loadPlugins(null, DefaultDataXProcessorManipulate.class, appName).getValue());
246249
}
247250
}
248251

tis-datax/tis-datax-dolphinscheduler-plugin/src/main/java/com/qlangtech/tis/plugin/datax/doplinscheduler/DolphinschedulerDistributedSPIDataXJobSubmit.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,12 @@
2323
import com.qlangtech.tis.build.task.IBuildHistory;
2424
import com.qlangtech.tis.coredefine.module.action.TriggerBuildResult;
2525
import com.qlangtech.tis.dao.ICommonDAOContext;
26-
import com.qlangtech.tis.datax.DataXJobSubmit.InstanceType;
2726
import com.qlangtech.tis.datax.DataXName;
2827
import com.qlangtech.tis.datax.DefaultDataXProcessorManipulate;
2928
import com.qlangtech.tis.datax.impl.DataxProcessor;
29+
import com.qlangtech.tis.extension.IDescribableManipulate;
3030
import com.qlangtech.tis.extension.TISExtension;
3131
import com.qlangtech.tis.lang.TisException;
32-
import com.qlangtech.tis.plugin.IPluginStore;
3332
import com.qlangtech.tis.plugin.datax.BasicDistributedSPIDataXJobSubmit;
3433
import com.qlangtech.tis.plugin.datax.BasicWorkflowPayload;
3534
import com.qlangtech.tis.plugin.datax.doplinscheduler.export.ExportTISPipelineToDolphinscheduler;
@@ -39,7 +38,6 @@
3938
import com.qlangtech.tis.runtime.module.misc.IControlMsgHandler;
4039
import com.qlangtech.tis.sql.parser.ISqlTask;
4140
import com.qlangtech.tis.sql.parser.SqlTaskNodeMeta.SqlDataFlowTopology;
42-
import com.qlangtech.tis.workflow.pojo.IWorkflow;
4341
import com.qlangtech.tis.workflow.pojo.WorkFlowBuildHistory;
4442
import org.apache.commons.lang3.tuple.Pair;
4543

@@ -88,16 +86,24 @@ private static BasicWorkflowPayload<DSWorkflowInstance> createPayload(BasicDistr
8886
// if (!(module instanceof IPluginContext)) {
8987
// throw new IllegalStateException("type of module:" + module.getClass() + " must be type of " + IPluginContext.class);
9088
// }
91-
Pair<List<ExportTISPipelineToDolphinscheduler>, IPluginStore<DefaultDataXProcessorManipulate>> pair
92-
= DefaultDataXProcessorManipulate.loadPlugins(null, ExportTISPipelineToDolphinscheduler.class, appName);
89+
90+
91+
DefaultDataXProcessorManipulate.ProcessorManipulateManager<ExportTISPipelineToDolphinscheduler> pair
92+
= DefaultDataXProcessorManipulate.loadPlugins(null, ExportTISPipelineToDolphinscheduler.class, appName, new IDescribableManipulate.IManipulateStorable() {
93+
@Override
94+
public boolean isManipulateStorable() {
95+
return true;
96+
}
97+
});
9398
ExportTISPipelineToDolphinscheduler exportCfg = null;
94-
for (ExportTISPipelineToDolphinscheduler cfg : pair.getKey()) {
99+
List<ExportTISPipelineToDolphinscheduler> existPlugins = pair.getTargetInstancePlugin();
100+
for (ExportTISPipelineToDolphinscheduler cfg : existPlugins) {
95101
exportCfg = cfg;
96102
}
97103

98104
DataxProcessor dataxProcessor = (DataxProcessor) DataxProcessor.load(null, appName);
99105
return new DSWorkflowPayload(Objects.requireNonNull(exportCfg
100-
, "load export ds scheduler instance size:" + pair.getKey().size() + ",appName:" + appName)
106+
, "load export ds scheduler instance size:" + existPlugins.size() + ",appName:" + appName)
101107
, dataxProcessor, commonDAOContext, submit) {
102108
@Override
103109
public void innerCreatePowerjobWorkflow(boolean updateProcess, Optional<Pair<Map<ISelectedTab, SelectedTabTriggers>

tis-datax/tis-datax-dolphinscheduler-plugin/src/main/java/com/qlangtech/tis/plugin/datax/doplinscheduler/export/ExportTISPipelineToDolphinscheduler.java

Lines changed: 67 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -24,19 +24,15 @@
2424
import com.google.common.collect.Lists;
2525
import com.google.common.collect.Sets;
2626
import com.qlangtech.tis.config.ParamsConfig;
27-
import com.qlangtech.tis.datax.DataXName;
2827
import com.qlangtech.tis.datax.DefaultDataXProcessorManipulate;
2928
import com.qlangtech.tis.datax.IDataXTaskRelevant;
3029
import com.qlangtech.tis.datax.IDataxProcessor;
3130
import com.qlangtech.tis.datax.impl.DataxProcessor;
32-
import com.qlangtech.tis.extension.Descriptor.ParseDescribable;
33-
import com.qlangtech.tis.extension.IDescribableManipulate.IManipulateStorable;
3431
import com.qlangtech.tis.extension.TISExtension;
3532
import com.qlangtech.tis.manage.common.AppAndRuntime;
3633
import com.qlangtech.tis.manage.common.Config;
3734
import com.qlangtech.tis.manage.common.HttpUtils.PostParam;
3835
import com.qlangtech.tis.plugin.IEndTypeGetter;
39-
import com.qlangtech.tis.plugin.IPluginStore;
4036
import com.qlangtech.tis.plugin.IdentityName;
4137
import com.qlangtech.tis.plugin.MemorySpecification;
4238
import com.qlangtech.tis.plugin.annotation.FormField;
@@ -53,9 +49,7 @@
5349
import com.qlangtech.tis.runtime.module.misc.IControlMsgHandler;
5450
import com.qlangtech.tis.util.IPluginContext;
5551
import org.apache.commons.lang.StringUtils;
56-
import org.apache.commons.lang3.tuple.Pair;
5752

58-
import java.util.Collections;
5953
import java.util.List;
6054
import java.util.Objects;
6155
import java.util.Optional;
@@ -121,7 +115,8 @@ public static final String dftProcessName() {
121115
*/
122116
public DolphinSchedulerURLBuilder processDefinition() {
123117
return getDSEndpoint().createSchedulerURLBuilder()
124-
.appendSubPath("projects", Objects.requireNonNull(this.projectCode, "projectCode can not be null"), "process-definition");
118+
.appendSubPath("projects", Objects.requireNonNull(this.projectCode
119+
, "projectCode can not be null"), "process-definition");
125120
// boolean endWithSlash = StringUtils.endsWith(getDSEndpoint().serverPath, "/");
126121
// StringBuffer url = new StringBuffer(getDSEndpoint().serverPath);
127122
// if (!endWithSlash) {
@@ -143,63 +138,77 @@ public DolphinSchedulerEndpoint getDSEndpoint() {
143138
}
144139

145140

146-
@Override
147-
public void manipuldateProcess(IPluginContext pluginContext, Optional<Context> context) {
148-
// 将TIS的数据同步通道配置同步到DS中
149-
// String[] originId = new String[1];
150-
/**
151-
* 校验
152-
*/
153-
ManipulateItemsProcessor itemsProcessor
154-
= ManipuldateUtils.instance(pluginContext, context.get(), null
155-
, (meta) -> {
156-
});
157-
if (StringUtils.isEmpty(itemsProcessor.getOriginIdentityId())) {
158-
throw new IllegalStateException("originId can not be null");
159-
}
160-
if (itemsProcessor == null) {
161-
return;
162-
}
163-
164-
Pair<List<ExportTISPipelineToDolphinscheduler>, IPluginStore<DefaultDataXProcessorManipulate>>
165-
pair = DefaultDataXProcessorManipulate.loadPlugins(pluginContext
166-
, ExportTISPipelineToDolphinscheduler.class, DataXName.createDataXPipeline(itemsProcessor.getOriginIdentityId()));
167-
168-
/**
169-
* 是否需要删除
170-
*/
171-
if (itemsProcessor.isDeleteProcess()) {
172-
// 只删除TIS本地端配置,dolphinscheduler端不进行任何操作
173-
pair.getRight().setPlugins(pluginContext, context, Collections.emptyList());
174-
return;
175-
}
176-
141+
// @Override
142+
// public void manipuldateProcess(IPluginContext pluginContext, Optional<Context> context) {
143+
// /**
144+
// * 校验
145+
// */
146+
// ManipulateItemsProcessor itemsProcessor
147+
// = ManipuldateUtils.instance(pluginContext, context.get(), null
148+
// , (meta) -> {
149+
// });
150+
// if (StringUtils.isEmpty(itemsProcessor.getOriginIdentityId())) {
151+
// throw new IllegalStateException("originId can not be null");
152+
// }
153+
// if (itemsProcessor == null) {
154+
// return;
155+
// }
156+
//
157+
// ProcessorManipulateManager<ExportTISPipelineToDolphinscheduler>
158+
// store = DefaultDataXProcessorManipulate.loadPlugins(pluginContext
159+
// , ExportTISPipelineToDolphinscheduler.class, DataXName.createDataXPipeline(itemsProcessor.getOriginIdentityId()));
160+
// /**
161+
// * 是否需要删除
162+
// */
163+
// if (itemsProcessor.isDeleteProcess()) {
164+
// // 只删除TIS本地端配置,dolphinscheduler端不进行任何操作
165+
// // store.setPlugins(pluginContext, context, Collections.emptyList());
166+
// store.delete(pluginContext, context, this);
167+
// return;
168+
// }
169+
//
170+
//
171+
// if (!itemsProcessor.isUpdateProcess()) {
172+
// List<ExportTISPipelineToDolphinscheduler> existPlugins = store.getTargetInstancePlugin();
173+
// // 添加操作
174+
// if (CollectionUtils.isNotEmpty(existPlugins)) {
175+
// for (ExportTISPipelineToDolphinscheduler i : existPlugins) {
176+
// pluginContext.addErrorMessage(context.get(), "实例'" + i.identityValue() + "'已经配置,不能再创建新实例");
177+
// }
178+
// return;
179+
// }
180+
// }
181+
//
182+
// /**===============================
183+
// * 添加项目参数
184+
// ===============================*/
185+
// addProjectParameters();
186+
//
187+
//
188+
// IDataxProcessor dataxProcessor = DataxProcessor.load(pluginContext, itemsProcessor.getOriginIdentityId());
189+
// DSWorkflowPayload workflowPayload = new DSWorkflowPayload(this, dataxProcessor
190+
// , BasicDistributedSPIDataXJobSubmit.getCommonDAOContext(pluginContext), new DolphinschedulerDistributedSPIDataXJobSubmit());
191+
// WorkflowSPIInitializer<DSWorkflowInstance> workflowSPIInitializer = new WorkflowSPIInitializer<>(workflowPayload);
192+
// /**
193+
// * 1. 将配置同步到DS
194+
// */
195+
// DSWorkflowInstance wfInstance = workflowSPIInitializer.initialize(true, itemsProcessor.isUpdateProcess());
196+
//
197+
//
198+
// /**
199+
// *2. 并且将实例持久化在app管道下,当DS端触发会调用 DolphinschedulerDistributedSPIDataXJobSubmit.createPayload()方法获取DS端的WorkflowDAG拓扑视图
200+
// */
201+
// store.replace(pluginContext, context, this);
202+
// }
177203

178-
if (!itemsProcessor.isUpdateProcess()) {
179-
// 添加操作
180-
if (pair.getLeft().size() > 0) {
181-
for (ExportTISPipelineToDolphinscheduler i : pair.getLeft()) {
182-
pluginContext.addErrorMessage(context.get(), "实例'" + i.processName + "'已经配置,不能再创建新实例");
183-
}
184-
// throw TisException.create("实例已经配置不能重复创建");
185-
return;
186-
}
187-
}
188204

205+
@Override
206+
protected void afterManipuldateProcess(IPluginContext pluginContext, Optional<Context> context, ManipulateItemsProcessor itemsProcessor) {
189207
/**===============================
190208
* 添加项目参数
191209
===============================*/
192210
addProjectParameters();
193211

194-
// /**===============================
195-
// * 为执行任务添加task工作组
196-
// ===============================*/
197-
// TaskGroup taskGroup = taskGroup.addTaskGroup(this);
198-
199-
// IPluginStore<DefaultDataXProcessorManipulate> pluginStore = getPluginStore(pluginContext, originId[0]);
200-
// 查看是否已经有保存的实例
201-
// List<ExportTISPipelineToDolphinscheduler> export2DSCfgs = pair.getLeft();
202-
203212

204213
IDataxProcessor dataxProcessor = DataxProcessor.load(pluginContext, itemsProcessor.getOriginIdentityId());
205214
DSWorkflowPayload workflowPayload = new DSWorkflowPayload(this, dataxProcessor
@@ -209,15 +218,8 @@ public void manipuldateProcess(IPluginContext pluginContext, Optional<Context> c
209218
* 1. 将配置同步到DS
210219
*/
211220
DSWorkflowInstance wfInstance = workflowSPIInitializer.initialize(true, itemsProcessor.isUpdateProcess());
212-
213-
214-
/**
215-
*2. 并且将实例持久化在app管道下,当DS端触发会调用 DolphinschedulerDistributedSPIDataXJobSubmit.createPayload()方法获取DS端的WorkflowDAG拓扑视图
216-
*/
217-
pair.getRight().setPlugins(pluginContext, context, Collections.singletonList(new ParseDescribable(this)));
218221
}
219222

220-
221223
void addProjectParameters() {
222224
DolphinSchedulerEndpoint endpoint = this.getDSEndpoint();
223225
DolphinSchedulerResponse response = endpoint.createSchedulerURLBuilder()
@@ -316,7 +318,7 @@ private void createProjectParam(DolphinSchedulerEndpoint endpoint, String keyNam
316318

317319
@TISExtension
318320
public static class DefaultDesc extends DefaultDataXProcessorManipulate.BasicDesc
319-
implements IEndTypeGetter, IManipulateStorable {
321+
implements IEndTypeGetter {
320322
public DefaultDesc() {
321323
super();
322324
this.registerSelectOptions(FIELD_DS_ENDPOINT, () -> ParamsConfig.getItems(DolphinSchedulerEndpoint.DISPLAY_NAME));

tis-datax/tis-datax-dolphinscheduler-plugin/src/main/java/com/qlangtech/tis/plugin/datax/doplinscheduler/history/DSWorkFlowBuildHistoryPayload.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,13 @@
2323
import com.qlangtech.tis.dao.ICommonDAOContext;
2424
import com.qlangtech.tis.datax.DefaultDataXProcessorManipulate;
2525
import com.qlangtech.tis.datax.IDataxProcessor;
26+
import com.qlangtech.tis.extension.IDescribableManipulate;
2627
import com.qlangtech.tis.manage.IAppSource;
27-
import com.qlangtech.tis.plugin.IPluginStore;
2828
import com.qlangtech.tis.plugin.datax.WorkFlowBuildHistoryPayload;
2929
import com.qlangtech.tis.plugin.datax.doplinscheduler.export.DolphinSchedulerURLBuilder.DolphinSchedulerResponse;
3030
import com.qlangtech.tis.plugin.datax.doplinscheduler.export.ExportTISPipelineToDolphinscheduler;
31-
import org.apache.commons.lang3.tuple.Pair;
3231
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
3332

34-
import java.util.List;
35-
3633
/**
3734
* DS 触发的任务执行状态追踪
3835
*
@@ -45,9 +42,16 @@ public class DSWorkFlowBuildHistoryPayload extends WorkFlowBuildHistoryPayload {
4542

4643
public DSWorkFlowBuildHistoryPayload(IDataxProcessor dataxProcessor, Integer tisTaskId, ICommonDAOContext daoContext) {
4744
super(dataxProcessor, tisTaskId, daoContext);
48-
Pair<List<ExportTISPipelineToDolphinscheduler>, IPluginStore<DefaultDataXProcessorManipulate>> pluginStorePair
49-
= DefaultDataXProcessorManipulate.loadPlugins(null, ExportTISPipelineToDolphinscheduler.class, ((IAppSource) this.dataxProcessor).getDataXName());
50-
for (ExportTISPipelineToDolphinscheduler exportDSCfg : pluginStorePair.getLeft()) {
45+
//Pair<List<ExportTISPipelineToDolphinscheduler>, IPluginStore<DefaultDataXProcessorManipulate>>
46+
DefaultDataXProcessorManipulate.ProcessorManipulateManager<ExportTISPipelineToDolphinscheduler> pluginStorePair
47+
= DefaultDataXProcessorManipulate.loadPlugins(null, ExportTISPipelineToDolphinscheduler.class, ((IAppSource) this.dataxProcessor).getDataXName()
48+
, new IDescribableManipulate.IManipulateStorable() {
49+
@Override
50+
public boolean isManipulateStorable() {
51+
return true;
52+
}
53+
});
54+
for (ExportTISPipelineToDolphinscheduler exportDSCfg : pluginStorePair.getTargetInstancePlugin()) {
5155
this.exportDSCfg = exportDSCfg;
5256
return;
5357
}

0 commit comments

Comments
 (0)