Skip to content

Commit 9f90590

Browse files
author
luxl
committed
PriorityDelayQueue move to WorkerGroupTaskDispatcher,Simplify the WorkerGroupTaskDispatcherManager logic
1 parent 90672bf commit 9f90590

File tree

3 files changed

+151
-79
lines changed

3 files changed

+151
-79
lines changed

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcher.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,19 @@ public class WorkerGroupTaskDispatcher extends BaseDaemonThread implements AutoC
5353
@Getter
5454
private DispatchWorkerStatus status;
5555

56-
public WorkerGroupTaskDispatcher(String workerGroupName, ITaskExecutorClient taskExecutorClient,
57-
PriorityDelayQueue<PriorityAndDelayBasedTaskEntry> workerGroupQueue) {
56+
public WorkerGroupTaskDispatcher(String workerGroupName, ITaskExecutorClient taskExecutorClient) {
5857
super("WorkerGroupTaskDispatcher-" + workerGroupName);
5958
this.taskExecutorClient = taskExecutorClient;
60-
this.workerGroupQueue = workerGroupQueue;
59+
this.workerGroupQueue = new PriorityDelayQueue<>();
60+
status = DispatchWorkerStatus.DEFAULT;
61+
}
62+
63+
public void add(ITaskExecutionRunnable taskExecutionRunnable, long delayTimeMills) {
64+
workerGroupQueue.add(new PriorityAndDelayBasedTaskEntry(delayTimeMills, taskExecutionRunnable));
65+
}
66+
67+
public int size() {
68+
return workerGroupQueue.size();
6169
}
6270

6371
@Override
@@ -78,8 +86,6 @@ public void close() throws Exception {
7886
if (RUNNING_FLAG.compareAndSet(true, false)) {
7987
log.info("{} stopping...", this.getName());
8088
log.info("{} stopped...", this.getName());
81-
} else {
82-
log.error("{} is not started", this.getName());
8389
}
8490
} else {
8591
log.warn("The {} queue is not empty, will not stop", this.getName());

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcherManager.java

Lines changed: 47 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@
2222
import org.apache.dolphinscheduler.server.master.cluster.WorkerGroupChangeNotifier;
2323
import org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
2424
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
25-
import org.apache.dolphinscheduler.server.master.runner.queue.PriorityAndDelayBasedTaskEntry;
26-
import org.apache.dolphinscheduler.server.master.runner.queue.PriorityDelayQueue;
2725
import org.apache.dolphinscheduler.server.master.utils.MasterThreadFactory;
2826

2927
import java.util.List;
@@ -57,17 +55,16 @@ public class WorkerGroupTaskDispatcherManager implements AutoCloseable, WorkerGr
5755

5856
@Getter
5957
private final ConcurrentHashMap<String, WorkerGroupTaskDispatcher> dispatchWorkerMap;
60-
@Getter
61-
private final ConcurrentHashMap<String, PriorityDelayQueue<PriorityAndDelayBasedTaskEntry>> workerGroupPriorityDelayQueueMap;
6258

6359
private final ScheduledExecutorService scheduler;
6460

61+
private boolean shutDownFlag;
62+
6563
public WorkerGroupTaskDispatcherManager() {
6664
dispatchWorkerMap = new ConcurrentHashMap<>();
67-
workerGroupPriorityDelayQueueMap = new ConcurrentHashMap<>();
6865
scheduler = MasterThreadFactory.getDefaultSchedulerThreadExecutor();
69-
70-
scheduler.scheduleAtFixedRate(this::checkDeleteDispatchWorker, 0, 1, TimeUnit.SECONDS);
66+
shutDownFlag = false;
67+
scheduler.scheduleAtFixedRate(this::checkDeleteDispatchWorkerComplete, 0, 1, TimeUnit.SECONDS);
7168
}
7269

7370
@PostConstruct
@@ -82,15 +79,16 @@ public void init() {
8279
* @param taskExecutionRunnable an instance of ITaskExecutionRunnable representing the task to be executed
8380
* @param delayTimeMills the delay time before the task is executed, in milliseconds
8481
*/
85-
public void add(String workerGroup, ITaskExecutionRunnable taskExecutionRunnable, long delayTimeMills) {
86-
PriorityDelayQueue<PriorityAndDelayBasedTaskEntry> workerGroupQueue =
87-
workerGroupPriorityDelayQueueMap.get(workerGroup);
88-
if (workerGroupQueue != null) {
89-
workerGroupQueue.add(new PriorityAndDelayBasedTaskEntry<>(delayTimeMills, taskExecutionRunnable));
90-
log.info("queue size {}", workerGroupQueue.size());
82+
public Boolean add(String workerGroup, ITaskExecutionRunnable taskExecutionRunnable, long delayTimeMills) {
83+
WorkerGroupTaskDispatcher workerGroupTaskDispatcher = dispatchWorkerMap.get(workerGroup);
84+
if (workerGroupTaskDispatcher != null) {
85+
workerGroupTaskDispatcher.add(taskExecutionRunnable, delayTimeMills);
86+
log.info("queue size {}", workerGroupTaskDispatcher.size());
87+
return true;
9188
} else {
92-
log.error("workerGroup {} not found", workerGroup);
89+
log.error("workerGroupTaskDispatcher {} not found", workerGroup);
9390
}
91+
return false;
9492
}
9593

9694
/**
@@ -102,6 +100,8 @@ public synchronized void deleteWorkerGroup(String workerGroup) throws Exception
102100
WorkerGroupTaskDispatcher workerGroupTaskDispatcher = dispatchWorkerMap.get(workerGroup);
103101
if (workerGroupTaskDispatcher != null) {
104102
workerGroupTaskDispatcher.close();
103+
} else {
104+
log.warn("workerGroupTaskDispatcher {} not found", workerGroup);
105105
}
106106
}
107107

@@ -111,12 +111,9 @@ public synchronized void deleteWorkerGroup(String workerGroup) throws Exception
111111
* @param workerGroup the identifier for the worker group
112112
*/
113113
public synchronized void addWorkerGroup(String workerGroup) {
114-
PriorityDelayQueue<PriorityAndDelayBasedTaskEntry> workerGroupQueue =
115-
workerGroupPriorityDelayQueueMap.computeIfAbsent(workerGroup, k -> new PriorityDelayQueue<>());
116114
WorkerGroupTaskDispatcher looper =
117115
dispatchWorkerMap.computeIfAbsent(workerGroup,
118-
k -> new WorkerGroupTaskDispatcher(workerGroup, taskExecutorClient,
119-
workerGroupQueue));
116+
k -> new WorkerGroupTaskDispatcher(workerGroup, taskExecutorClient));
120117
looper.start();
121118
}
122119

@@ -125,13 +122,15 @@ public synchronized void addWorkerGroup(String workerGroup) {
125122
*/
126123
@Override
127124
public void close() throws Exception {
128-
log.info("WorkerGroupTaskDispatcherManager stopping...");
129-
scheduler.shutdown();
130-
if (!scheduler.awaitTermination(SHUTDOWN_WAIT_TIME, TimeUnit.SECONDS)) {
131-
log.warn("WorkerGroupTaskDispatcherManager did not terminate within 10 seconds, shutting down now");
132-
scheduler.shutdownNow();
125+
log.info("WorkerGroupTaskDispatcherManager ready close...");
126+
shutDownFlag = true;
127+
for (Map.Entry<String, WorkerGroupTaskDispatcher> entry : dispatchWorkerMap.entrySet()) {
128+
try {
129+
entry.getValue().close();
130+
} catch (Exception e) {
131+
log.error("stop worker group error", e);
132+
}
133133
}
134-
log.info("WorkerGroupTaskDispatcherManager stopped");
135134
}
136135

137136
@Override
@@ -160,7 +159,8 @@ public void onWorkerGroupDelete(List<WorkerGroup> workerGroups) {
160159
}
161160
}
162161

163-
private void checkDeleteDispatchWorker() {
162+
private void checkDeleteDispatchWorkerComplete() {
163+
boolean complete = true;
164164
for (Map.Entry<String, WorkerGroupTaskDispatcher> entry : dispatchWorkerMap.entrySet()) {
165165
String workerGroup = entry.getKey();
166166
WorkerGroupTaskDispatcher workerGroupTaskDispatcher = entry.getValue();
@@ -171,6 +171,7 @@ private void checkDeleteDispatchWorker() {
171171
} catch (Exception e) {
172172
log.error("stop worker group error", e);
173173
}
174+
complete = false;
174175
break;
175176
case DELETE_SUCCESS:
176177
try (WorkerGroupTaskDispatcher ignored = dispatchWorkerMap.remove(workerGroup)) {
@@ -180,10 +181,31 @@ private void checkDeleteDispatchWorker() {
180181
}
181182
break;
182183
default:
184+
complete = false;
183185
log.debug("worker group {} status {}", workerGroup, workerGroupTaskDispatcher.getStatus());
184186
break;
185187
}
186188
}
189+
if (shutDownFlag && complete) {
190+
this.shutdown();
191+
}
192+
}
193+
194+
private void shutdown() {
195+
log.info("WorkerGroupTaskDispatcherManager start close...");
196+
dispatchWorkerMap.clear();
197+
scheduler.shutdown();
198+
try {
199+
if (!scheduler.awaitTermination(SHUTDOWN_WAIT_TIME, TimeUnit.SECONDS)) {
200+
log.warn(
201+
"WorkerGroupTaskDispatcherManager did not terminate within SHUTDOWN_WAIT_TIME seconds, shutting down now");
202+
scheduler.shutdownNow();
203+
Thread.currentThread().interrupt();
204+
}
205+
} catch (InterruptedException e) {
206+
log.info("WorkerGroupTaskDispatcherManager error: ", e);
207+
}
208+
log.info("WorkerGroupTaskDispatcherManager closed");
187209
}
188210

189211
}

dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcherManagerTest.java

Lines changed: 93 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -17,88 +17,132 @@
1717

1818
package org.apache.dolphinscheduler.server.master.runner;
1919

20+
import static org.junit.jupiter.api.Assertions.assertEquals;
21+
import static org.junit.jupiter.api.Assertions.assertFalse;
22+
import static org.junit.jupiter.api.Assertions.assertTrue;
2023
import static org.mockito.Mockito.mock;
21-
import static org.mockito.Mockito.verify;
2224

25+
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
2326
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
2427

25-
import org.junit.jupiter.api.Assertions;
26-
import org.junit.jupiter.api.BeforeEach;
28+
import java.time.Duration;
29+
import java.util.Arrays;
30+
import java.util.List;
31+
32+
import org.awaitility.Awaitility;
2733
import org.junit.jupiter.api.Test;
2834
import org.junit.jupiter.api.extension.ExtendWith;
2935
import org.mockito.InjectMocks;
30-
import org.mockito.MockitoAnnotations;
3136
import org.mockito.junit.jupiter.MockitoExtension;
3237

38+
import com.google.common.truth.Truth;
39+
3340
@ExtendWith(MockitoExtension.class)
3441
public class WorkerGroupTaskDispatcherManagerTest {
3542

3643
@InjectMocks
37-
private WorkerGroupTaskDispatcherManager workerGroupTaskDispatcherManager;
38-
39-
@BeforeEach
40-
public void setUp() {
41-
MockitoAnnotations.openMocks(this);
42-
}
44+
private WorkerGroupTaskDispatcherManager manager;
4345

4446
@Test
45-
public void testAddTaskToExistingWorkerGroup() {
46-
String workerGroup = "testWorkerGroup";
47-
ITaskExecutionRunnable task = mock(ITaskExecutionRunnable.class);
48-
long delay = 1000L;
47+
public void testAddTaskToExistingWorkerGroup_ShouldReturnTrue() {
48+
String workerGroupName = "testGroup";
49+
manager.addWorkerGroup(workerGroupName);
50+
ITaskExecutionRunnable mockTask = mock(ITaskExecutionRunnable.class);
4951

50-
workerGroupTaskDispatcherManager.add(workerGroup, task, delay);
52+
boolean result = manager.add(workerGroupName, mockTask, 0L);
5153

52-
// not have workerGroup queue,cannot add
53-
Assertions.assertEquals(0,
54-
workerGroupTaskDispatcherManager.getDispatchWorkerMap().size());
54+
assertTrue(result);
55+
}
56+
57+
@Test
58+
public void testAddTaskToNonExistingWorkerGroup_ShouldReturnFalse() {
59+
String workerGroupName = "nonExistingGroup";
60+
ITaskExecutionRunnable mockTask = mock(ITaskExecutionRunnable.class);
61+
boolean result = manager.add(workerGroupName, mockTask, 0L);
62+
assertFalse(result);
5563
}
5664

5765
@Test
58-
public void testAddTaskToNonExistingWorkerGroup() {
59-
String workerGroup = "nonExistingWorkerGroup";
60-
ITaskExecutionRunnable task = mock(ITaskExecutionRunnable.class);
61-
long delay = 1000L;
62-
workerGroupTaskDispatcherManager.addWorkerGroup(workerGroup);
63-
workerGroupTaskDispatcherManager.add(workerGroup, task, delay);
64-
65-
Assertions.assertTrue(
66-
workerGroupTaskDispatcherManager.getWorkerGroupPriorityDelayQueueMap().containsKey(workerGroup));
66+
public void testDeleteExistingWorkerGroup_ShouldRemoveGroup() throws Exception {
67+
String workerGroupName = "testGroup";
68+
manager.addWorkerGroup(workerGroupName);
69+
70+
manager.deleteWorkerGroup(workerGroupName);
71+
72+
Awaitility.await()
73+
.atMost(Duration.ofSeconds(5))
74+
.untilAsserted(() -> {
75+
Truth.assertThat(manager.getDispatchWorkerMap().isEmpty()).isTrue();
76+
});
6777
}
6878

6979
@Test
70-
public void testStopWorkerGroup() {
71-
String workerGroup = "testWorkerGroup";
72-
ITaskExecutionRunnable task = mock(ITaskExecutionRunnable.class);
73-
long delay = 1000L;
74-
75-
workerGroupTaskDispatcherManager.addWorkerGroup(workerGroup);
76-
workerGroupTaskDispatcherManager.add(workerGroup, task, delay);
77-
Assertions.assertTrue(
78-
workerGroupTaskDispatcherManager.getWorkerGroupPriorityDelayQueueMap().get(workerGroup).size() > 0);
80+
public void testAddNewWorkerGroup_ShouldAddGroup() {
81+
String workerGroupName = "newGroup";
82+
manager.addWorkerGroup(workerGroupName);
83+
assertFalse(manager.getDispatchWorkerMap().isEmpty());
7984
}
8085

8186
@Test
82-
public void testAddWorkerGroup() {
83-
String workerGroup = "newWorkerGroup";
87+
public void testClose_ShouldShutdownScheduler() throws Exception {
88+
manager.addWorkerGroup("testGroup");
89+
manager.add("testGroup", mock(ITaskExecutionRunnable.class), 0);
90+
WorkerGroupTaskDispatcher dispatcher = manager.getDispatchWorkerMap().get("testGroup");
8491

85-
workerGroupTaskDispatcherManager.addWorkerGroup(workerGroup);
92+
manager.deleteWorkerGroup("testGroup");
93+
94+
Awaitility.await()
95+
.atMost(Duration.ofSeconds(5))
96+
.untilAsserted(() -> {
97+
Truth.assertThat(dispatcher.getStatus() == DispatchWorkerStatus.DELETE_SUCCESS).isTrue();
98+
});
8699

87-
Assertions.assertTrue(
88-
workerGroupTaskDispatcherManager.getWorkerGroupPriorityDelayQueueMap().containsKey(workerGroup));
89-
Assertions.assertTrue(workerGroupTaskDispatcherManager.getWorkerGroupPriorityDelayQueueMap()
90-
.containsKey(workerGroup));
91100
}
92101

93102
@Test
94-
public void testClose() throws Exception {
95-
String workerGroup = "testWorkerGroup";
96-
WorkerGroupTaskDispatcher looper = mock(WorkerGroupTaskDispatcher.class);
103+
public void testOnWorkerGroupAdd_ShouldAddWorkerGroups() {
104+
WorkerGroup group1 = new WorkerGroup();
105+
WorkerGroup group2 = new WorkerGroup();
106+
group1.setName("testGroup1");
107+
group2.setName("testGroup2");
108+
List<WorkerGroup> workerGroups = Arrays.asList(group1, group2);
109+
manager.onWorkerGroupAdd(workerGroups);
110+
assertEquals(2, manager.getDispatchWorkerMap().size());
111+
}
97112

98-
workerGroupTaskDispatcherManager.getDispatchWorkerMap().put(workerGroup, looper);
113+
@Test
114+
public void testOnWorkerGroupDelete_ShouldDeleteWorkerGroups() {
115+
WorkerGroup group1 = new WorkerGroup();
116+
WorkerGroup group2 = new WorkerGroup();
117+
group1.setName("testGroup1");
118+
group2.setName("testGroup2");
119+
List<WorkerGroup> workerGroups = Arrays.asList(group1, group2);
120+
workerGroups.forEach(workerGroup -> manager.addWorkerGroup(workerGroup.getName()));
121+
122+
manager.onWorkerGroupDelete(workerGroups);
123+
124+
Awaitility.await()
125+
.atMost(Duration.ofSeconds(5))
126+
.untilAsserted(() -> {
127+
Truth.assertThat(manager.getDispatchWorkerMap().isEmpty()).isTrue();
128+
});
99129

100-
workerGroupTaskDispatcherManager.deleteWorkerGroup(workerGroup);
130+
}
101131

102-
verify(looper).close();
132+
@Test
133+
public void testOnCloseWorkerGroupTaskDispatcherManager() throws Exception {
134+
WorkerGroup group1 = new WorkerGroup();
135+
WorkerGroup group2 = new WorkerGroup();
136+
group1.setName("testGroup1");
137+
group2.setName("testGroup2");
138+
List<WorkerGroup> workerGroups = Arrays.asList(group1, group2);
139+
workerGroups.forEach(workerGroup -> manager.addWorkerGroup(workerGroup.getName()));
140+
141+
manager.close();
142+
Awaitility.await()
143+
.atMost(Duration.ofSeconds(5))
144+
.untilAsserted(() -> {
145+
Truth.assertThat(manager.getDispatchWorkerMap().isEmpty()).isTrue();
146+
});
103147
}
104148
}

0 commit comments

Comments
 (0)