Skip to content

Commit 76dc9e7

Browse files
committed
[Fix-16991] Missing environmentConfig when retry/failover/recover task instance
1 parent 879e966 commit 76dc9e7

File tree

16 files changed

+440
-211
lines changed

16 files changed

+440
-211
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.dao.repository;
19+
20+
import org.apache.dolphinscheduler.dao.entity.Environment;
21+
22+
import java.util.Optional;
23+
24+
public interface IEnvironmentDao extends IDao<Environment> {
25+
26+
Optional<Environment> queryByEnvironmentCode(Long environmentCode);
27+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.dao.repository.impl;
19+
20+
import org.apache.dolphinscheduler.dao.entity.Environment;
21+
import org.apache.dolphinscheduler.dao.mapper.EnvironmentMapper;
22+
import org.apache.dolphinscheduler.dao.repository.BaseDao;
23+
import org.apache.dolphinscheduler.dao.repository.IEnvironmentDao;
24+
25+
import java.util.Optional;
26+
27+
import lombok.NonNull;
28+
29+
import org.springframework.stereotype.Repository;
30+
31+
@Repository
32+
public class EnvironmentDaoImpl extends BaseDao<Environment, EnvironmentMapper> implements IEnvironmentDao {
33+
34+
public EnvironmentDaoImpl(@NonNull EnvironmentMapper environmentMapper) {
35+
super(environmentMapper);
36+
}
37+
38+
@Override
39+
public Optional<Environment> queryByEnvironmentCode(Long environmentCode) {
40+
return Optional.ofNullable(mybatisMapper.queryByEnvironmentCode(environmentCode));
41+
}
42+
}

dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskInstanceUtils.java

Lines changed: 0 additions & 80 deletions
This file was deleted.

dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/TaskInstanceUtilsTest.java

Lines changed: 0 additions & 43 deletions
This file was deleted.

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/fake/LogicFakeTask.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import org.apache.dolphinscheduler.server.master.engine.executor.plugin.ITaskParameterDeserializer;
2727
import org.apache.dolphinscheduler.server.master.exception.MasterTaskExecuteException;
2828

29+
import org.apache.commons.lang3.StringUtils;
30+
2931
import lombok.extern.slf4j.Slf4j;
3032

3133
import com.fasterxml.jackson.core.type.TypeReference;
@@ -51,9 +53,13 @@ public void start() throws MasterTaskExecuteException {
5153
try {
5254
log.info("Begin to execute LogicFakeTask: {}", taskExecutionContext.getTaskName());
5355

54-
final String shellScript = ParameterUtils.convertParameterPlaceholders(
56+
String shellScript = ParameterUtils.convertParameterPlaceholders(
5557
taskParameters.getShellScript(),
5658
ParameterUtils.convert(taskExecutionContext.getPrepareParamsMap()));
59+
60+
if (StringUtils.isNotEmpty(taskExecutionContext.getEnvironmentConfig())) {
61+
shellScript = taskExecutionContext.getEnvironmentConfig() + "\n" + shellScript;
62+
}
5763
final String[] cmd = {"/bin/sh", "-c", shellScript};
5864
process = Runtime.getRuntime().exec(cmd);
5965
int exitCode = process.waitFor();

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/AbstractTaskInstanceFactory.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.dolphinscheduler.server.master.engine.task.runnable;
1919

20-
import org.apache.dolphinscheduler.dao.entity.Environment;
2120
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
2221
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
2322
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
@@ -62,7 +61,6 @@ protected TaskInstance cloneTaskInstance(TaskInstance originTaskInstance) {
6261
result.setTaskInstancePriority(originTaskInstance.getTaskInstancePriority());
6362
result.setWorkerGroup(originTaskInstance.getWorkerGroup());
6463
result.setEnvironmentCode(originTaskInstance.getEnvironmentCode());
65-
result.setEnvironmentConfig(originTaskInstance.getEnvironmentConfig());
6664
result.setExecutorId(originTaskInstance.getExecutorId());
6765
result.setVarPool(originTaskInstance.getVarPool());
6866
result.setExecutorName(originTaskInstance.getExecutorName());
@@ -116,14 +114,4 @@ protected void injectMetadataFromWorkflowInstance(TaskInstance taskInstance, Wor
116114
taskInstance.setTestFlag(workflowInstance.getTestFlag());
117115
}
118116

119-
protected void injectEnvironmentConfigFromDB(TaskInstance taskInstance) {
120-
if (EnvironmentUtils.isEnvironmentCodeEmpty(taskInstance.getEnvironmentCode())) {
121-
return;
122-
}
123-
Environment environment = environmentMapper.queryByEnvironmentCode(taskInstance.getEnvironmentCode());
124-
if (environment == null) {
125-
throw new IllegalArgumentException("Cannot find the environment: " + taskInstance.getEnvironmentCode());
126-
}
127-
taskInstance.setEnvironmentConfig(environment.getConfig());
128-
}
129117
}

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/FirstRunTaskInstanceFactory.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ public TaskInstance createTaskInstance(FirstRunTaskInstanceBuilder builder) {
5151
TaskInstance taskInstance = new TaskInstance();
5252
injectMetadataFromTaskDefinition(taskInstance, taskDefinition);
5353
injectMetadataFromWorkflowInstance(taskInstance, workflowInstance);
54-
injectEnvironmentConfigFromDB(taskInstance);
5554

5655
taskInstance.setState(TaskExecutionStatus.SUBMITTED_SUCCESS);
5756
taskInstance.setFirstSubmitTime(new Date());

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/TaskExecutionContextBuilder.java

Lines changed: 18 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,9 @@
2323
import org.apache.dolphinscheduler.common.utils.DateUtils;
2424
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
2525
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
26-
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
2726
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
2827
import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
2928
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
30-
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
3129
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
3230
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
3331
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
@@ -48,35 +46,37 @@ public static TaskExecutionContextBuilder get() {
4846
return new TaskExecutionContextBuilder();
4947
}
5048

51-
private TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
49+
private final TaskExecutionContext taskExecutionContext;
50+
51+
public TaskExecutionContextBuilder() {
52+
this.taskExecutionContext = new TaskExecutionContext();
53+
}
5254

5355
/**
5456
* build taskInstance related info
5557
*
5658
* @param taskInstance taskInstance
5759
* @return TaskExecutionContextBuilder
5860
*/
59-
public TaskExecutionContextBuilder buildTaskInstanceRelatedInfo(TaskInstance taskInstance) {
61+
public TaskExecutionContextBuilder buildTaskInstanceRelatedInfo(final TaskInstance taskInstance) {
6062
taskExecutionContext.setTaskInstanceId(taskInstance.getId());
6163
taskExecutionContext.setTaskName(taskInstance.getName());
6264
taskExecutionContext.setFirstSubmitTime(DateUtils.dateToTimeStamp(taskInstance.getFirstSubmitTime()));
6365
taskExecutionContext.setStartTime(DateUtils.dateToTimeStamp(taskInstance.getStartTime()));
6466
taskExecutionContext.setTaskType(taskInstance.getTaskType());
6567
taskExecutionContext.setLogPath(taskInstance.getLogPath());
6668
taskExecutionContext.setWorkerGroup(taskInstance.getWorkerGroup());
67-
taskExecutionContext.setEnvironmentConfig(taskInstance.getEnvironmentConfig());
6869
taskExecutionContext.setHost(taskInstance.getHost());
6970
taskExecutionContext.setVarPool(taskInstance.getVarPool());
7071
taskExecutionContext.setDryRun(taskInstance.getDryRun());
7172
taskExecutionContext.setTestFlag(taskInstance.getTestFlag());
72-
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUBMITTED_SUCCESS);
7373
taskExecutionContext.setCpuQuota(taskInstance.getCpuQuota());
7474
taskExecutionContext.setMemoryMax(taskInstance.getMemoryMax());
7575
taskExecutionContext.setAppIds(taskInstance.getAppLink());
7676
return this;
7777
}
7878

79-
public TaskExecutionContextBuilder buildTaskDefinitionRelatedInfo(TaskDefinition taskDefinition) {
79+
public TaskExecutionContextBuilder buildTaskDefinitionRelatedInfo(final TaskDefinition taskDefinition) {
8080
// todo: remove the timeout setting here the timeout strategy should be used at master
8181
taskExecutionContext.setTaskTimeout(Integer.MAX_VALUE);
8282
if (taskDefinition.getTimeoutFlag() == TimeoutFlag.OPEN) {
@@ -97,7 +97,7 @@ public TaskExecutionContextBuilder buildTaskDefinitionRelatedInfo(TaskDefinition
9797
* @param workflowInstance processInstance
9898
* @return TaskExecutionContextBuilder
9999
*/
100-
public TaskExecutionContextBuilder buildProcessInstanceRelatedInfo(WorkflowInstance workflowInstance) {
100+
public TaskExecutionContextBuilder buildProcessInstanceRelatedInfo(final WorkflowInstance workflowInstance) {
101101
taskExecutionContext.setWorkflowInstanceId(workflowInstance.getId());
102102
taskExecutionContext.setScheduleTime(DateUtils.dateToTimeStamp(workflowInstance.getScheduleTime()));
103103
taskExecutionContext.setGlobalParams(workflowInstance.getGlobalParams());
@@ -110,20 +110,7 @@ public TaskExecutionContextBuilder buildProcessInstanceRelatedInfo(WorkflowInsta
110110
return this;
111111
}
112112

113-
/**
114-
* build processDefinition related info
115-
*
116-
* @param workflowDefinition processDefinition
117-
* @return TaskExecutionContextBuilder
118-
*/
119-
public TaskExecutionContextBuilder buildProcessDefinitionRelatedInfo(WorkflowDefinition workflowDefinition) {
120-
taskExecutionContext.setWorkflowDefinitionCode(workflowDefinition.getCode());
121-
taskExecutionContext.setWorkflowDefinitionVersion(workflowDefinition.getVersion());
122-
taskExecutionContext.setProjectCode(workflowDefinition.getProjectCode());
123-
return this;
124-
}
125-
126-
public TaskExecutionContextBuilder buildResourceParametersInfo(ResourceParametersHelper parametersHelper) {
113+
public TaskExecutionContextBuilder buildResourceParameters(final ResourceParametersHelper parametersHelper) {
127114
taskExecutionContext.setResourceParametersHelper(parametersHelper);
128115
return this;
129116
}
@@ -135,7 +122,7 @@ public TaskExecutionContextBuilder buildResourceParametersInfo(ResourceParameter
135122
* @return TaskExecutionContextBuilder
136123
*/
137124

138-
public TaskExecutionContextBuilder buildK8sTaskRelatedInfo(K8sTaskExecutionContext k8sTaskExecutionContext) {
125+
public TaskExecutionContextBuilder buildK8sTaskRelatedInfo(final K8sTaskExecutionContext k8sTaskExecutionContext) {
139126
taskExecutionContext.setK8sTaskExecutionContext(k8sTaskExecutionContext);
140127
return this;
141128
}
@@ -146,7 +133,7 @@ public TaskExecutionContextBuilder buildK8sTaskRelatedInfo(K8sTaskExecutionConte
146133
* @param propertyMap
147134
* @return
148135
*/
149-
public TaskExecutionContextBuilder buildParamInfo(Map<String, Property> propertyMap) {
136+
public TaskExecutionContextBuilder buildPrepareParams(final Map<String, Property> propertyMap) {
150137
taskExecutionContext.setPrepareParamsMap(propertyMap);
151138
return this;
152139
}
@@ -157,16 +144,21 @@ public TaskExecutionContextBuilder buildParamInfo(Map<String, Property> property
157144
* @param businessParamsMap
158145
* @return
159146
*/
160-
public TaskExecutionContextBuilder buildBusinessParamsMap(Map<String, Property> businessParamsMap) {
147+
public TaskExecutionContextBuilder buildBusinessParams(final Map<String, Property> businessParamsMap) {
161148
taskExecutionContext.setParamsMap(businessParamsMap);
162149
return this;
163150
}
164151

165-
public TaskExecutionContextBuilder buildWorkflowInstanceHost(String masterHost) {
152+
public TaskExecutionContextBuilder buildWorkflowInstanceHost(final String masterHost) {
166153
taskExecutionContext.setWorkflowInstanceHost(masterHost);
167154
return this;
168155
}
169156

157+
public TaskExecutionContextBuilder buildEnvironmentConfig(final String environmentConfig) {
158+
taskExecutionContext.setEnvironmentConfig(environmentConfig);
159+
return this;
160+
}
161+
170162
/**
171163
* create
172164
*

0 commit comments

Comments
 (0)