Skip to content

Commit a146f8e

Browse files
committed
reface DAG execution with AKKA,for issue: datavane/tis#486
1 parent ff822ed commit a146f8e

File tree

87 files changed

+6455
-710
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

87 files changed

+6455
-710
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@
6464
<module>tis-kerberos-plugin</module>
6565

6666
<module>tis-powerjob-common-plugin</module>
67-
<module>tis-datax/tis-datax-local-embedded-executor</module>
67+
6868
<module>tis-transformer</module>
6969
<module>tis-datax/tis-datax-dolphinscheduler-plugin</module>
7070
<module>tis-datax/tis-hive-shim-common</module>
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
{
2+
"permissions": {
3+
"allow": [
4+
"Bash(mvn test -pl tis-datax-local-akka-executor -Dtest=com.qlangtech.tis.dag.actor.TestWorkflowInstanceActor)",
5+
"Bash(find /Users/mozhenghua/j2ee_solution/project -type f -name \"*.java\" -exec grep -l \"class WorkflowDAGFileManager\" {} ;)",
6+
"Bash(find /Users/mozhenghua/j2ee_solution/project -type f -name \"*.java\" -exec grep -l \"enum InstanceStatus\" {} ;)",
7+
"Bash(find /Users/mozhenghua/j2ee_solution/project -type f -name \"*.java\" -exec grep -l \"enum WorkflowNodeType\" {} ;)"
8+
]
9+
}
10+
}

tis-datax/executor/dolphinscheduler-task-tis-datasync/src/main/java/com/qlangtech/tis/plugin/dolphinscheduler/task/impl/DSTaskContext.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,8 @@ public File getSpecifiedLocalLoggerPath() {
7878

7979
@Override
8080
public InstanceType getJobTriggerType() {
81-
return InstanceType.DS;
81+
// return InstanceType.DS;
82+
throw new UnsupportedOperationException();
8283
}
8384

8485
@Override

tis-datax/executor/powerjob-worker-samples/src/main/java/com/qlangtech/tis/datax/powerjob/impl/PowerJobTaskContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public PowerJobTaskContext(TaskContext context) {
3939

4040
@Override
4141
public InstanceType getJobTriggerType() {
42-
return InstanceType.DISTRIBUTE;
42+
return InstanceType.AKKA;
4343
}
4444

4545
public static ExecPhase parse(MapProcessor processor, TaskContext context) {

tis-datax/executor/powerjob-worker-samples/src/test/java/com/qlangtech/tis/datax/powerjob/TestSplitTabSync.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package com.qlangtech.tis.datax.powerjob;
22

33
import com.qlangtech.tis.cloud.ITISCoordinator;
4-
import com.qlangtech.tis.datax.CuratorDataXTaskMessage;
54
import com.qlangtech.tis.datax.executor.BasicTISTableDumpProcessor;
65
import com.qlangtech.tis.datax.powerjob.impl.PowerJobTaskContext;
76
import com.qlangtech.tis.exec.AbstractExecContext;
@@ -44,12 +43,12 @@ public void testExecSync() throws Exception {
4443
pair.getLeft().setCoordinator(ITISCoordinator.create());
4544
// IDataxProcessor processor = DataxProcessor.load(null, pair.getRight().getDataXName());
4645

47-
for (CuratorDataXTaskMessage tskMsg : pair.getRight().getSplitTabsCfg()) {
48-
SplitTabSync tabSync = new SplitTabSync(tskMsg);
49-
tabSync.execSync(pair.getLeft(), rpcSvc);
50-
this.verifyAll();
51-
return;
52-
}
46+
// for (CuratorDataXTaskMessage tskMsg : pair.getRight().getSplitTabsCfg()) {
47+
// SplitTabSync tabSync = new SplitTabSync(tskMsg);
48+
// tabSync.execSync(pair.getLeft(), rpcSvc);
49+
// this.verifyAll();
50+
// return;
51+
// }
5352

5453
Assert.fail("have not process tskMsg ");
5554

tis-datax/executor/tis-datax-executor/src/main/java/com/qlangtech/tis/datax/DataXJobSingleProcessorExecutor.java

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
package com.qlangtech.tis.datax;
2020

21-
import com.qlangtech.tis.job.common.JobCommon;
2221
import com.qlangtech.tis.manage.common.Config;
2322
import com.qlangtech.tis.manage.common.TisUTF8;
2423
import com.qlangtech.tis.offline.DataxUtils;
@@ -33,11 +32,9 @@
3332

3433
import java.io.File;
3534
import java.io.IOException;
36-
import java.util.Arrays;
35+
import java.time.Duration;
3736
import java.util.Objects;
3837
import java.util.concurrent.ConcurrentHashMap;
39-
import java.util.concurrent.TimeUnit;
40-
import java.util.stream.Collectors;
4138

4239
/**
4340
* 独立进程中执行DataX任务,这样可以有效避免每次执行DataX任务由于ClassLoader的冲突导致的错误
@@ -51,20 +48,20 @@ public abstract class DataXJobSingleProcessorExecutor<T extends IDataXTaskReleva
5148
// 记录当前正在执行的任务<taskid,ExecuteWatchdog>
5249
public final ConcurrentHashMap<Integer, ExecuteWatchdog> runningTask = new ConcurrentHashMap<>();
5350

54-
// @Override
5551
public void consumeMessage(T msg) throws Exception {
5652
//MDC.put();
57-
Integer taskId = msg.getTaskId();
58-
String jobName = msg.getJobName();
59-
String dataxName = msg.getDataXName();
60-
// StoreResourceType resType = Objects.requireNonNull(msg.getResType(), "resType can not be null");
61-
// MDC.put(JobCommon.KEY_TASK_ID, String.valueOf(jobId));
62-
// MDC.put(JobCommon.KEY_COLLECTION, dataxName);
63-
JobCommon.setMDC(taskId, dataxName);
64-
65-
66-
// 查看当前任务是否正在进行中,如果已经终止则要退出
67-
execSystemTask(msg, taskId, jobName, dataxName);
53+
throw new UnsupportedOperationException();
54+
// Integer taskId = msg.getTaskId();
55+
// String jobName = msg.getJobName();
56+
// String dataxName = msg.getDataXName();
57+
// // StoreResourceType resType = Objects.requireNonNull(msg.getResType(), "resType can not be null");
58+
// // MDC.put(JobCommon.KEY_TASK_ID, String.valueOf(jobId));
59+
// // MDC.put(JobCommon.KEY_COLLECTION, dataxName);
60+
// JobCommon.setMDC(taskId, dataxName);
61+
//
62+
//
63+
// // 查看当前任务是否正在进行中,如果已经终止则要退出
64+
// execSystemTask(msg, taskId, jobName, dataxName);
6865
}
6966

7067
protected void execSystemTask(T msg, Integer jobId, String jobName, String dataxName) throws IOException,
@@ -114,7 +111,7 @@ protected void execSystemTask(T msg, Integer jobId, String jobName, String datax
114111
executor.setStreamHandler(new PumpStreamHandler(System.out));
115112
executor.setExitValue(0);
116113
executor.setWatchdog(watchdog);
117-
String command = Arrays.stream(cmdLine.toStrings()).collect(Collectors.joining(" "));
114+
String command = String.join(" ", cmdLine.toStrings());
118115
logger.info("workDir:{},command:{}", workingDir.getAbsolutePath(), command);
119116
if (DataxUtils.localDataXCommandConsumer != null) {
120117
DataxUtils.localDataXCommandConsumer.accept(command);
@@ -140,12 +137,13 @@ protected void execSystemTask(T msg, Integer jobId, String jobName, String datax
140137
* @throws InterruptedException
141138
* @throws DataXJobSingleProcessorException
142139
*/
143-
protected void waitForTerminator(Integer jobId, String dataxName, final Integer taskExpireHours, DefaultExecuteResultHandler resultHandler) throws InterruptedException, DataXJobSingleProcessorException {
140+
protected void waitForTerminator(Integer jobId, String dataxName, final Duration taskExpireHours, DefaultExecuteResultHandler resultHandler) throws InterruptedException, DataXJobSingleProcessorException {
144141

145-
int timeout = Objects.requireNonNull(taskExpireHours, "taskExpireHours can not be null");
142+
Duration timeout = Objects.requireNonNull(taskExpireHours, "taskExpireHours can not be null");
146143
try {
144+
147145
// 等待5个小时
148-
resultHandler.waitFor(TimeUnit.HOURS.toMillis(timeout));
146+
resultHandler.waitFor(timeout.toMillis());
149147

150148
if (resultHandler.getExitValue() != DataXJobInfo.DATAX_THREAD_PROCESSING_CANCAL_EXITCODE) {
151149
if ( //resultHandler.hasResult() &&

tis-datax/executor/tis-datax-executor/src/main/java/com/qlangtech/tis/datax/DataXLifecycleHookMsg.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
*/
1111
public class DataXLifecycleHookMsg extends JobHookMsg {
1212

13-
private IDataXBatchPost.LifeCycleHook lifeCycleHook;
13+
private LifeCycleHook lifeCycleHook;
1414
private String tableName;
1515

1616
public DataXLifecycleHookMsg() {
@@ -20,7 +20,7 @@ public DataXLifecycleHookMsg() {
2020
public static DataXLifecycleHookMsg createDataXLifecycleHookMsg(IDataxProcessor processor, String tableName,
2121
Integer taskId, String jobName,
2222
Long currentTimeStamp,
23-
IDataXBatchPost.LifeCycleHook lifeCycleHook,
23+
LifeCycleHook lifeCycleHook,
2424
Boolean dryRun) {
2525
if (StringUtils.isEmpty(tableName)) {
2626
throw new IllegalArgumentException("tableName can not be null");
@@ -43,11 +43,11 @@ public void setTableName(String tableName) {
4343
this.tableName = tableName;
4444
}
4545

46-
public IDataXBatchPost.LifeCycleHook getLifeCycleHook() {
46+
public LifeCycleHook getLifeCycleHook() {
4747
return Objects.requireNonNull(this.lifeCycleHook, "lifeCycleHook can not be null");
4848
}
4949

50-
public void setLifeCycleHook(IDataXBatchPost.LifeCycleHook lifeCycleHook) {
50+
public void setLifeCycleHook(LifeCycleHook lifeCycleHook) {
5151
this.lifeCycleHook = lifeCycleHook;
5252
}
5353

tis-datax/executor/tis-datax-executor/src/main/java/com/qlangtech/tis/datax/DataxPrePostConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ protected boolean isCurrentJobProcessing(Integer jobId) {
134134

135135
@Override
136136
protected DataXJobSubmit.InstanceType getExecMode() {
137-
return DataXJobSubmit.InstanceType.DISTRIBUTE;
137+
return DataXJobSubmit.InstanceType.AKKA;
138138
}
139139

140140
protected String getIncrStateCollectAddress() {

tis-datax/executor/tis-datax-executor/src/main/java/com/qlangtech/tis/datax/DataxPrePostExecutor.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -108,10 +108,10 @@ public static void main(String[] args) throws Exception {
108108

109109
IDataXBatchPost.process(dataxProcessor, tab, (batchPostTask, entryName) -> {
110110
IRemoteTaskTrigger hookTrigger = null;
111-
if (IDataXBatchPost.KEY_POST.equalsIgnoreCase(lifecycleHookName)) {
112-
hookTrigger = batchPost.createPostTask(
113-
execContext, entryName, tab, dataxProcessor.getDataxCfgFileNames(null, Optional.empty()));
114-
} else if (IDataXBatchPost.KEY_PREP.equalsIgnoreCase(lifecycleHookName)) {
111+
if (LifeCycleHook.Post.getToken().equalsIgnoreCase(lifecycleHookName)) {
112+
hookTrigger = batchPost.createPostTask(execContext,
113+
entryName, tab, dataxProcessor.getDataxCfgFileNames(null, Optional.empty()));
114+
} else if (LifeCycleHook.Prep.getToken().equalsIgnoreCase(lifecycleHookName)) {
115115
hookTrigger = batchPost.createPreExecuteTask(execContext, entryName, tab);
116116
} else {
117117
throw new IllegalArgumentException("illegal lifecycleHookName:" + lifecycleHookName);
@@ -133,13 +133,13 @@ public static void main(String[] args) throws Exception {
133133
if (statusRpc != null) {
134134
// StatusRpcClientFactory.AssembleSvcCompsite svc =
135135
// (StatusRpcClientFactory.AssembleSvcCompsite) statusRpc.get();
136-
if (IDataXBatchPost.KEY_POST.equalsIgnoreCase(lifecycleHookName)) {
136+
if (LifeCycleHook.Post.getToken().equalsIgnoreCase(lifecycleHookName)) {
137137
JoinPhaseStatus.JoinTaskStatus joinStatus = new JoinPhaseStatus.JoinTaskStatus(jobName);
138138
joinStatus.setFaild(true);
139139
joinStatus.setComplete(true);
140140
joinStatus.setStart();
141141
statusRpc.reportJoinStatus(taskId, joinStatus);
142-
} else if (IDataXBatchPost.KEY_PREP.equalsIgnoreCase(lifecycleHookName)) {
142+
} else if (LifeCycleHook.Prep.getToken().equalsIgnoreCase(lifecycleHookName)) {
143143
statusRpc.reportDumpJobStatus(true, true, false, taskId, jobName, -1, -1);
144144
} else {
145145
throw new IllegalArgumentException("illegal lifecycleHookName:" + lifecycleHookName);

tis-datax/executor/tis-datax-executor/src/main/java/com/qlangtech/tis/datax/JobHookMsg.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -108,19 +108,19 @@ public void setResType(StoreResourceType resType) {
108108
// this.lifeCycleHook = lifeCycleHook;
109109
// }
110110

111-
@Override
112-
public Integer getTaskId() {
113-
return Objects.requireNonNull(this.taskId, "taskid can not be null");
114-
}
111+
// @Override
112+
// public Integer getTaskId() {
113+
// return Objects.requireNonNull(this.taskId, "taskid can not be null");
114+
// }
115115

116116
@Override
117117
public String getJobName() {
118118
return this.jobName;
119119
}
120120

121121
@Override
122-
public String getDataXName() {
123-
return this.resName;
122+
public DataXName getDataXName() {
123+
return new DataXName(this.resName, this.resType);
124124
}
125125

126126
@Override

0 commit comments

Comments
 (0)