Skip to content

Commit 4be78e9

Browse files
committed
[Improvement-17057][Master]Check if the WorkflowGrapth is a dag at constructor(#17057)
1 parent 2dcea84 commit 4be78e9

File tree

2 files changed

+15
-50
lines changed

2 files changed

+15
-50
lines changed

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowGraph.java

Lines changed: 10 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,15 @@
1919

2020
import static com.google.common.base.Preconditions.checkNotNull;
2121

22+
import org.apache.dolphinscheduler.common.graph.DAG;
2223
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
2324
import org.apache.dolphinscheduler.dao.entity.WorkflowTaskRelation;
2425

25-
import java.util.ArrayDeque;
2626
import java.util.ArrayList;
2727
import java.util.HashMap;
2828
import java.util.HashSet;
2929
import java.util.List;
3030
import java.util.Map;
31-
import java.util.Queue;
3231
import java.util.Set;
3332
import java.util.function.Function;
3433
import java.util.stream.Collectors;
@@ -61,57 +60,18 @@ public WorkflowGraph(List<WorkflowTaskRelation> workflowTaskRelations, List<Task
6160
}
6261

6362
private void checkIfDAG(List<WorkflowTaskRelation> workflowTaskRelations, List<TaskDefinition> taskDefinitions) {
64-
// If topology-sort-result`s size less than taskDefinitions`s size, then not a DAG
65-
Map<Long, List<Long>> preTaskCodeMap = workflowTaskRelations
66-
.stream()
67-
.collect(Collectors.groupingBy(WorkflowTaskRelation::getPostTaskCode,
68-
Collectors.mapping(WorkflowTaskRelation::getPreTaskCode, Collectors.toList())));
69-
Map<Long, List<Long>> postTaskCodeMap = workflowTaskRelations
70-
.stream()
71-
.collect(Collectors.groupingBy(WorkflowTaskRelation::getPreTaskCode,
72-
Collectors.mapping(WorkflowTaskRelation::getPostTaskCode, Collectors.toList())));
73-
74-
// build in-degree count
75-
Map<Long, Integer> inDegreeCount = new HashMap<>();
63+
DAG<Long, TaskDefinition, WorkflowTaskRelation> graph = new DAG<>();
64+
// Fill the vertices
7665
for (TaskDefinition taskDefinition : taskDefinitions) {
77-
List<Long> preTasks = preTaskCodeMap.get(taskDefinition.getCode());
78-
if (preTasks == null) {
79-
inDegreeCount.put(taskDefinition.getCode(), 0);
80-
} else {
81-
inDegreeCount.put(taskDefinition.getCode(), preTasks.size());
82-
}
83-
}
84-
85-
// Adds the task with zero-in-degree to the queue
86-
Set<Long> visitTable = new HashSet<>();
87-
Queue<Long> queue = new ArrayDeque<>();
88-
for (Map.Entry<Long, Integer> entry : inDegreeCount.entrySet()) {
89-
if (entry.getValue() == 0 && visitTable.add(entry.getKey())) {
90-
queue.offer(entry.getKey());
91-
}
66+
graph.addNode(taskDefinition.getCode(), taskDefinition);
9267
}
93-
94-
// topology sort
95-
Set<Long> resultTable = new HashSet<>();
96-
while (!queue.isEmpty()) {
97-
Long taskCode = queue.poll();
98-
resultTable.add(taskCode);
99-
100-
List<Long> postCodes = postTaskCodeMap.get(taskCode);
101-
if (postCodes == null) {
102-
continue;
68+
// Fill edge relations
69+
for (WorkflowTaskRelation relation : workflowTaskRelations) {
70+
long preTaskCode = relation.getPreTaskCode();
71+
// When exist a ring cycle, then not a DAG.
72+
if (preTaskCode != 0 && !graph.addEdge(preTaskCode, relation.getPostTaskCode())) {
73+
throw new IllegalArgumentException("The workflow graph is not a DAG");
10374
}
104-
for (Long postCode : postCodes) {
105-
inDegreeCount.put(postCode, inDegreeCount.get(postCode) - 1);
106-
107-
if (inDegreeCount.get(postCode) == 0) {
108-
queue.offer(postCode);
109-
}
110-
}
111-
}
112-
113-
if (resultTable.size() < taskDefinitions.size()) {
114-
throw new IllegalArgumentException("The workflow task relation is not a DAG");
11575
}
11676
}
11777

dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowGraphCheckIfDAGTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@ public void checkIfDAGSingleTaskNoException() {
4343
TaskDefinition taskDefinition = new TaskDefinition(1L, 1);
4444
taskDefinitions.add(taskDefinition);
4545

46+
WorkflowTaskRelation relation = new WorkflowTaskRelation();
47+
relation.setPreTaskCode(0);
48+
relation.setPostTaskCode(1);
49+
workflowTaskRelations.add(relation);
50+
4651
Assertions.assertDoesNotThrow(() -> new WorkflowGraph(workflowTaskRelations, taskDefinitions));
4752
}
4853

0 commit comments

Comments
 (0)