Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/docs/en/guide/project/workflow-definition.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ After selecting multiple workflows, you can perform batch operations at the bott

Description of workflow operating parameters:

* Failure strategy: When a task node fails to execute, other parallel task nodes need to execute this strategy. "Continue" means: after a certain task fails, other task nodes execute normally; "End" means: terminate all tasks execution, and terminate the entire process.
* Failure strategy: When a task node fails to execute, other parallel task nodes need to execute this strategy. "Continue" means: after a certain task fails, end the workflow after waiting for other task nodes at the same level to execute normally.; "End" means: terminate all tasks execution, and terminate the entire workflow.
* Notification strategy: When the process is over, send the process execution result notification email according to the process status, options including no send, send if success, send of failure, send whatever result.
* Process priority: The priority of process operation, divide into five levels: highest (HIGHEST), high (HIGH), medium (MEDIUM), low (LOW), and lowest (LOWEST). When the number of master threads is insufficient, high priority processes will execute first in the execution queue, and processes with the same priority will execute in the order of first in, first out.
* Worker group: The process can only be executed in the specified worker machine group. The default is `Default`, which can execute on any worker.
Expand Down
2 changes: 1 addition & 1 deletion docs/docs/zh/guide/project/workflow-definition.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@

工作流运行参数说明:

* 失败策略:当某一个任务节点执行失败时,其他并行的任务节点需要执行的策略。”继续“表示:某一任务失败后,其他任务节点正常执行;”结束“表示:终止所有正在执行的任务,并终止整个流程
* 失败策略:当某一个任务节点执行失败时,其他并行的任务节点需要执行的策略。”继续“表示:某一任务失败后,等待其他同级任务节点正常执行完毕后结束工作流;”结束“表示:终止所有正在执行的任务,并终止整个工作流
* 通知策略:当流程结束,根据流程状态发送流程执行信息通知邮件,包含任何状态都不发,成功发,失败发,成功或失败都发。
* 流程优先级:流程运行的优先级,分五个等级:最高(HIGHEST),高(HIGH),中(MEDIUM),低(LOW),最低(LOWEST)。当 master 线程数不足时,级别高的流程在执行队列中会优先执行,相同优先级的流程按照先进先出的顺序执行。
* Worker 分组:该流程只能在指定的 worker 机器组里执行。默认是 Default,可以在任一 worker 上执行。
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ protected void triggerTasks(final IWorkflowExecutionRunnable workflowExecutionRu
.sorted(Comparator.comparing(ITaskExecutionRunnable::getName))
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(readyToTriggerTasks)) {
final boolean isAllCandidateTaskPredecessorsInActive = triggerCandidateTasks.stream()
.flatMap(taskExecutionRunnable -> workflowExecutionGraph
.getPredecessors(taskExecutionRunnable.getName())
.stream())
.allMatch(workflowExecutionGraph::isTaskExecutionRunnableInActive);
if (isAllCandidateTaskPredecessorsInActive) {
emitWorkflowFinishedEventIfApplicable(workflowExecutionRunnable);
}
return;
}
final WorkflowEventBus workflowEventBus = workflowExecutionRunnable.getWorkflowEventBus();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ public void testStartWorkflow_with_oneSuccessSwitch_twoFakeTask() {
final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
.workflowDefinition(parentWorkflow)
.runWorkflowCommandParam(new RunWorkflowCommandParam())
.failureStrategy(FailureStrategy.CONTINUE)
.build();
final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);

Expand Down Expand Up @@ -610,6 +611,7 @@ void testStartWorkflow_with_oneTaskWithMultiplePredecessors_runFailed() {
final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
.workflowDefinition(parentWorkflow)
.runWorkflowCommandParam(new RunWorkflowCommandParam())
.failureStrategy(FailureStrategy.CONTINUE)
.build();
final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);

Expand Down Expand Up @@ -640,6 +642,45 @@ void testStartWorkflow_with_oneTaskWithMultiplePredecessors_runFailed() {
masterContainer.assertAllResourceReleased();
}

@Test
@DisplayName("Test start a workflow with shared downstream task when failed predecessor finishes first using continue failure strategy")
void testStartWorkflow_with_sharedDownstreamTask_whenFailedPredecessorFinishFirst_usingFailureStrategyContinue() {
final String yaml =
"/it/start/workflow_with_shared_downstream_task_when_failed_predecessor_finish_first.yaml";
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
final WorkflowDefinition parentWorkflow = context.getOneWorkflow();

final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
.workflowDefinition(parentWorkflow)
.runWorkflowCommandParam(new RunWorkflowCommandParam())
.failureStrategy(FailureStrategy.CONTINUE)
.build();
final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);

await()
.atMost(Duration.ofMinutes(1))
.untilAsserted(() -> {
Assertions
.assertThat(repository.queryWorkflowInstance(workflowInstanceId))
.matches(
workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.FAILURE)
.matches(workflowInstance -> workflowInstance.getEndTime() != null);

Assertions
.assertThat(repository.queryTaskInstance(workflowInstanceId))
.hasSize(2)
.anySatisfy(taskInstance -> {
assertThat(taskInstance.getName()).isEqualTo("A");
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
})
.anySatisfy(taskInstance -> {
assertThat(taskInstance.getName()).isEqualTo("B");
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
});
});
masterContainer.assertAllResourceReleased();
}

@Test
@DisplayName("Test start a workflow with one sub workflow task(A) failed")
public void testStartWorkflow_with_subWorkflowTask_failed() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# A(failed) -> C(not triggered)
# B(success) -> C(not triggered)
# A finishes before B.
project:
name: MasterIntegrationTest
code: 1
description: This is a fake project
userId: 1
userName: admin
createTime: 2024-08-12 00:00:00
updateTime: 2021-08-12 00:00:00

workflows:
- name: workflow_with_shared_downstream_task_when_failed_predecessor_finish_first
code: 1
version: 1
projectCode: 1
description: This is a fake workflow with a shared downstream task
releaseState: ONLINE
createTime: 2024-08-12 00:00:00
updateTime: 2021-08-12 00:00:00
userId: 1
executionType: PARALLEL

tasks:
- name: A
code: 1
version: 1
projectCode: 1
userId: 1
taskType: LogicFakeTask
taskParams: '{"localParams":null,"varPool":[],"shellScript":"xx"}'
workerGroup: default
createTime: 2024-08-12 00:00:00
updateTime: 2021-08-12 00:00:00
taskExecuteType: BATCH
- name: B
code: 2
version: 1
projectCode: 1
userId: 1
taskType: LogicFakeTask
taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 2"}'
workerGroup: default
createTime: 2024-08-12 00:00:00
updateTime: 2021-08-12 00:00:00
taskExecuteType: BATCH
- name: C
code: 3
version: 1
projectCode: 1
userId: 1
taskType: LogicFakeTask
taskParams: '{"localParams":null,"varPool":[],"shellScript":"echo success"}'
workerGroup: default
createTime: 2024-08-12 00:00:00
updateTime: 2021-08-12 00:00:00
taskExecuteType: BATCH

taskRelations:
- projectCode: 1
workflowDefinitionCode: 1
workflowDefinitionVersion: 1
preTaskCode: 0
preTaskVersion: 0
postTaskCode: 1
postTaskVersion: 1
createTime: 2024-08-12 00:00:00
updateTime: 2024-08-12 00:00:00
- projectCode: 1
workflowDefinitionCode: 1
workflowDefinitionVersion: 1
preTaskCode: 0
preTaskVersion: 0
postTaskCode: 2
postTaskVersion: 1
createTime: 2024-08-12 00:00:00
updateTime: 2024-08-12 00:00:00
- projectCode: 1
workflowDefinitionCode: 1
workflowDefinitionVersion: 1
preTaskCode: 1
preTaskVersion: 1
postTaskCode: 3
postTaskVersion: 1
createTime: 2024-08-12 00:00:00
updateTime: 2024-08-12 00:00:00
- projectCode: 1
workflowDefinitionCode: 1
workflowDefinitionVersion: 1
preTaskCode: 2
preTaskVersion: 1
postTaskCode: 3
postTaskVersion: 1
createTime: 2024-08-12 00:00:00
updateTime: 2024-08-12 00:00:00
Loading