Skip to content

Commit d296064

Browse files
author
luxl
committed
Remove unnecessary resource cleanup
1 parent 9f90590 commit d296064

File tree

3 files changed

+98
-93
lines changed

3 files changed

+98
-93
lines changed

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcherManager.java

Lines changed: 5 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
@Slf4j
4949
public class WorkerGroupTaskDispatcherManager implements AutoCloseable, WorkerGroupChangeNotifier.WorkerGroupListener {
5050

51-
private static final int SHUTDOWN_WAIT_TIME = 5;
51+
private static final long CHECK_DELETE_DISPATCH_WORKER_PERIOD_SECONDS = 5;
5252

5353
@Autowired
5454
private ITaskExecutorClient taskExecutorClient;
@@ -58,13 +58,11 @@ public class WorkerGroupTaskDispatcherManager implements AutoCloseable, WorkerGr
5858

5959
private final ScheduledExecutorService scheduler;
6060

61-
private boolean shutDownFlag;
62-
6361
public WorkerGroupTaskDispatcherManager() {
6462
dispatchWorkerMap = new ConcurrentHashMap<>();
6563
scheduler = MasterThreadFactory.getDefaultSchedulerThreadExecutor();
66-
shutDownFlag = false;
67-
scheduler.scheduleAtFixedRate(this::checkDeleteDispatchWorkerComplete, 0, 1, TimeUnit.SECONDS);
64+
scheduler.scheduleAtFixedRate(this::checkDeleteDispatchWorkerComplete, 0,
65+
CHECK_DELETE_DISPATCH_WORKER_PERIOD_SECONDS, TimeUnit.SECONDS);
6866
}
6967

7068
@PostConstruct
@@ -122,15 +120,15 @@ public synchronized void addWorkerGroup(String workerGroup) {
122120
*/
123121
@Override
124122
public void close() throws Exception {
125-
log.info("WorkerGroupTaskDispatcherManager ready close...");
126-
shutDownFlag = true;
123+
log.info("WorkerGroupTaskDispatcherManager start close");
127124
for (Map.Entry<String, WorkerGroupTaskDispatcher> entry : dispatchWorkerMap.entrySet()) {
128125
try {
129126
entry.getValue().close();
130127
} catch (Exception e) {
131128
log.error("stop worker group error", e);
132129
}
133130
}
131+
log.info("WorkerGroupTaskDispatcherManager closed");
134132
}
135133

136134
@Override
@@ -160,7 +158,6 @@ public void onWorkerGroupDelete(List<WorkerGroup> workerGroups) {
160158
}
161159

162160
private void checkDeleteDispatchWorkerComplete() {
163-
boolean complete = true;
164161
for (Map.Entry<String, WorkerGroupTaskDispatcher> entry : dispatchWorkerMap.entrySet()) {
165162
String workerGroup = entry.getKey();
166163
WorkerGroupTaskDispatcher workerGroupTaskDispatcher = entry.getValue();
@@ -171,7 +168,6 @@ private void checkDeleteDispatchWorkerComplete() {
171168
} catch (Exception e) {
172169
log.error("stop worker group error", e);
173170
}
174-
complete = false;
175171
break;
176172
case DELETE_SUCCESS:
177173
try (WorkerGroupTaskDispatcher ignored = dispatchWorkerMap.remove(workerGroup)) {
@@ -181,31 +177,9 @@ private void checkDeleteDispatchWorkerComplete() {
181177
}
182178
break;
183179
default:
184-
complete = false;
185180
log.debug("worker group {} status {}", workerGroup, workerGroupTaskDispatcher.getStatus());
186181
break;
187182
}
188183
}
189-
if (shutDownFlag && complete) {
190-
this.shutdown();
191-
}
192184
}
193-
194-
private void shutdown() {
195-
log.info("WorkerGroupTaskDispatcherManager start close...");
196-
dispatchWorkerMap.clear();
197-
scheduler.shutdown();
198-
try {
199-
if (!scheduler.awaitTermination(SHUTDOWN_WAIT_TIME, TimeUnit.SECONDS)) {
200-
log.warn(
201-
"WorkerGroupTaskDispatcherManager did not terminate within SHUTDOWN_WAIT_TIME seconds, shutting down now");
202-
scheduler.shutdownNow();
203-
Thread.currentThread().interrupt();
204-
}
205-
} catch (InterruptedException e) {
206-
log.info("WorkerGroupTaskDispatcherManager error: ", e);
207-
}
208-
log.info("WorkerGroupTaskDispatcherManager closed");
209-
}
210-
211185
}

dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcherManagerTest.java

Lines changed: 25 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,12 @@
2020
import static org.junit.jupiter.api.Assertions.assertEquals;
2121
import static org.junit.jupiter.api.Assertions.assertFalse;
2222
import static org.junit.jupiter.api.Assertions.assertTrue;
23-
import static org.mockito.Mockito.mock;
23+
import static org.mockito.Mockito.when;
2424

25+
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
2526
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
27+
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
28+
import org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
2629
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
2730

2831
import java.time.Duration;
@@ -33,49 +36,38 @@
3336
import org.junit.jupiter.api.Test;
3437
import org.junit.jupiter.api.extension.ExtendWith;
3538
import org.mockito.InjectMocks;
39+
import org.mockito.Mock;
3640
import org.mockito.junit.jupiter.MockitoExtension;
3741

38-
import com.google.common.truth.Truth;
39-
4042
@ExtendWith(MockitoExtension.class)
4143
public class WorkerGroupTaskDispatcherManagerTest {
4244

4345
@InjectMocks
4446
private WorkerGroupTaskDispatcherManager manager;
4547

48+
@Mock
49+
private ITaskExecutionRunnable taskExecutionRunnable;
50+
51+
@Mock
52+
private ITaskExecutorClient taskExecutorClient;
53+
4654
@Test
4755
public void testAddTaskToExistingWorkerGroup_ShouldReturnTrue() {
4856
String workerGroupName = "testGroup";
4957
manager.addWorkerGroup(workerGroupName);
50-
ITaskExecutionRunnable mockTask = mock(ITaskExecutionRunnable.class);
5158

52-
boolean result = manager.add(workerGroupName, mockTask, 0L);
59+
boolean result = manager.add(workerGroupName, taskExecutionRunnable, 0L);
5360

5461
assertTrue(result);
5562
}
5663

5764
@Test
5865
public void testAddTaskToNonExistingWorkerGroup_ShouldReturnFalse() {
5966
String workerGroupName = "nonExistingGroup";
60-
ITaskExecutionRunnable mockTask = mock(ITaskExecutionRunnable.class);
61-
boolean result = manager.add(workerGroupName, mockTask, 0L);
67+
boolean result = manager.add(workerGroupName, taskExecutionRunnable, 0L);
6268
assertFalse(result);
6369
}
6470

65-
@Test
66-
public void testDeleteExistingWorkerGroup_ShouldRemoveGroup() throws Exception {
67-
String workerGroupName = "testGroup";
68-
manager.addWorkerGroup(workerGroupName);
69-
70-
manager.deleteWorkerGroup(workerGroupName);
71-
72-
Awaitility.await()
73-
.atMost(Duration.ofSeconds(5))
74-
.untilAsserted(() -> {
75-
Truth.assertThat(manager.getDispatchWorkerMap().isEmpty()).isTrue();
76-
});
77-
}
78-
7971
@Test
8072
public void testAddNewWorkerGroup_ShouldAddGroup() {
8173
String workerGroupName = "newGroup";
@@ -84,18 +76,18 @@ public void testAddNewWorkerGroup_ShouldAddGroup() {
8476
}
8577

8678
@Test
87-
public void testClose_ShouldShutdownScheduler() throws Exception {
79+
public void testDeleteWorkerGroup_ShouldMapEmpty() throws Exception {
8880
manager.addWorkerGroup("testGroup");
89-
manager.add("testGroup", mock(ITaskExecutionRunnable.class), 0);
90-
WorkerGroupTaskDispatcher dispatcher = manager.getDispatchWorkerMap().get("testGroup");
81+
TaskInstance taskInstance = new TaskInstance();
82+
taskInstance.setState(TaskExecutionStatus.SUBMITTED_SUCCESS);
83+
when(taskExecutionRunnable.getTaskInstance()).thenReturn(taskInstance);
84+
manager.add("testGroup", taskExecutionRunnable, 0);
9185

9286
manager.deleteWorkerGroup("testGroup");
9387

9488
Awaitility.await()
95-
.atMost(Duration.ofSeconds(5))
96-
.untilAsserted(() -> {
97-
Truth.assertThat(dispatcher.getStatus() == DispatchWorkerStatus.DELETE_SUCCESS).isTrue();
98-
});
89+
.atMost(Duration.ofSeconds(6))
90+
.untilAsserted(() -> assertEquals(0, manager.getDispatchWorkerMap().size()));
9991

10092
}
10193

@@ -122,10 +114,8 @@ public void testOnWorkerGroupDelete_ShouldDeleteWorkerGroups() {
122114
manager.onWorkerGroupDelete(workerGroups);
123115

124116
Awaitility.await()
125-
.atMost(Duration.ofSeconds(5))
126-
.untilAsserted(() -> {
127-
Truth.assertThat(manager.getDispatchWorkerMap().isEmpty()).isTrue();
128-
});
117+
.atMost(Duration.ofSeconds(6))
118+
.untilAsserted(() -> assertEquals(0, manager.getDispatchWorkerMap().size()));
129119

130120
}
131121

@@ -139,10 +129,8 @@ public void testOnCloseWorkerGroupTaskDispatcherManager() throws Exception {
139129
workerGroups.forEach(workerGroup -> manager.addWorkerGroup(workerGroup.getName()));
140130

141131
manager.close();
142-
Awaitility.await()
143-
.atMost(Duration.ofSeconds(5))
144-
.untilAsserted(() -> {
145-
Truth.assertThat(manager.getDispatchWorkerMap().isEmpty()).isTrue();
146-
});
132+
workerGroups.forEach(workerGroup -> {
133+
assertEquals(DispatchWorkerStatus.DELETE_SUCCESS, manager.getDispatchWorkerMap().get(workerGroup.getName()).getStatus());
134+
});
147135
}
148136
}

dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcherTest.java

Lines changed: 68 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717

1818
package org.apache.dolphinscheduler.server.master.runner;
1919

20-
import static org.mockito.ArgumentMatchers.any;
20+
import static org.awaitility.Awaitility.await;
21+
import static org.junit.jupiter.api.Assertions.assertEquals;
2122
import static org.mockito.Mockito.doThrow;
22-
import static org.mockito.Mockito.mock;
2323
import static org.mockito.Mockito.times;
2424
import static org.mockito.Mockito.verify;
2525
import static org.mockito.Mockito.when;
@@ -29,10 +29,10 @@
2929
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
3030
import org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
3131
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
32-
import org.apache.dolphinscheduler.server.master.exception.dispatch.TaskDispatchException;
33-
import org.apache.dolphinscheduler.server.master.runner.queue.PriorityAndDelayBasedTaskEntry;
34-
import org.apache.dolphinscheduler.server.master.runner.queue.PriorityDelayQueue;
3532

33+
import java.time.Duration;
34+
35+
import org.junit.jupiter.api.BeforeEach;
3636
import org.junit.jupiter.api.Test;
3737
import org.junit.jupiter.api.extension.ExtendWith;
3838
import org.mockito.InjectMocks;
@@ -46,43 +46,86 @@ public class WorkerGroupTaskDispatcherTest {
4646
private ITaskExecutorClient taskExecutorClient;
4747

4848
@Mock
49-
private PriorityDelayQueue<PriorityAndDelayBasedTaskEntry> workerGroupQueue;
49+
private ITaskExecutionRunnable taskExecutionRunnable;
5050

5151
@InjectMocks
5252
private WorkerGroupTaskDispatcher workerGroupTaskDispatcher;
5353

54-
@Test
55-
public void dispatch_TaskStatusEligible_ShouldDispatchTask() throws TaskDispatchException {
56-
ITaskExecutionRunnable taskExecutionRunnable = mock(ITaskExecutionRunnable.class);
57-
TaskInstance taskInstance = mock(TaskInstance.class);
54+
@BeforeEach
55+
public void setUp() {
56+
workerGroupTaskDispatcher = new WorkerGroupTaskDispatcher("testWorkerGroup", taskExecutorClient);
57+
}
5858

59-
when(workerGroupQueue.take()).thenReturn(new PriorityAndDelayBasedTaskEntry<>(0, taskExecutionRunnable));
59+
@Test
60+
public void testDispatch_Success() throws Exception {
61+
TaskInstance taskInstance = new TaskInstance();
62+
taskInstance.setState(TaskExecutionStatus.SUBMITTED_SUCCESS);
6063
when(taskExecutionRunnable.getTaskInstance()).thenReturn(taskInstance);
61-
when(taskInstance.getState()).thenReturn(TaskExecutionStatus.SUBMITTED_SUCCESS);
6264

63-
workerGroupTaskDispatcher.dispatch();
65+
workerGroupTaskDispatcher.add(taskExecutionRunnable, 0L);
6466

65-
verify(workerGroupQueue, times(1)).take();
66-
verify(taskExecutorClient, times(1)).dispatch(taskExecutionRunnable);
67+
workerGroupTaskDispatcher.start();
68+
await().atMost(Duration.ofSeconds(2)).untilAsserted(() -> {
69+
workerGroupTaskDispatcher.close();
70+
verify(taskExecutorClient, times(1)).dispatch(taskExecutionRunnable);
71+
});
6772
}
6873

6974
@Test
70-
public void dispatch_TaskDispatchFails_ShouldRetryTask() throws TaskDispatchException {
71-
ITaskExecutionRunnable taskExecutionRunnable = mock(ITaskExecutionRunnable.class);
72-
TaskInstance taskInstance = mock(TaskInstance.class);
75+
public void testDispatch_FailureAndRetry() throws Exception {
7376

74-
when(workerGroupQueue.take()).thenReturn(new PriorityAndDelayBasedTaskEntry<>(0, taskExecutionRunnable));
75-
when(taskExecutionRunnable.getTaskInstance()).thenReturn(taskInstance);
76-
when(taskInstance.getState()).thenReturn(TaskExecutionStatus.SUBMITTED_SUCCESS);
7777
TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
7878
taskExecutionContext.setDispatchFailTimes(1);
79+
TaskInstance taskInstance = new TaskInstance();
80+
taskInstance.setState(TaskExecutionStatus.SUBMITTED_SUCCESS);
81+
when(taskExecutionRunnable.getTaskInstance()).thenReturn(taskInstance);
7982
when(taskExecutionRunnable.getTaskExecutionContext()).thenReturn(taskExecutionContext);
8083
doThrow(new RuntimeException("Dispatch failed")).when(taskExecutorClient).dispatch(taskExecutionRunnable);
8184

82-
workerGroupTaskDispatcher.dispatch();
85+
workerGroupTaskDispatcher.add(taskExecutionRunnable, 0L);
86+
87+
workerGroupTaskDispatcher.start();
88+
await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> {
89+
workerGroupTaskDispatcher.close();
90+
verify(taskExecutorClient, times(2)).dispatch(taskExecutionRunnable);
91+
});
8392

84-
verify(workerGroupQueue, times(1)).take();
85-
verify(taskExecutorClient, times(1)).dispatch(taskExecutionRunnable);
86-
verify(workerGroupQueue, times(1)).add(any(PriorityAndDelayBasedTaskEntry.class));
8793
}
94+
95+
@Test
96+
public void testDispatch_TaskStatusCheck() throws Exception {
97+
TaskInstance taskInstance = new TaskInstance();
98+
taskInstance.setState(TaskExecutionStatus.RUNNING_EXECUTION);
99+
when(taskExecutionRunnable.getTaskInstance()).thenReturn(taskInstance);
100+
101+
workerGroupTaskDispatcher.add(taskExecutionRunnable, 0L);
102+
103+
workerGroupTaskDispatcher.start();
104+
await().atMost(Duration.ofSeconds(1)).untilAsserted(() -> {
105+
workerGroupTaskDispatcher.close();
106+
});
107+
108+
verify(taskExecutorClient, times(0)).dispatch(taskExecutionRunnable);
109+
}
110+
111+
@Test
112+
public void testClose_QueueEmpty() throws Exception {
113+
workerGroupTaskDispatcher.start();
114+
workerGroupTaskDispatcher.close();
115+
await().atMost(Duration.ofSeconds(1)).until(
116+
() -> workerGroupTaskDispatcher.getStatus().equals(DispatchWorkerStatus.DELETE_SUCCESS));
117+
118+
}
119+
120+
@Test
121+
public void testClose_QueueNotEmpty() throws Exception {
122+
TaskInstance taskInstance = new TaskInstance();
123+
taskInstance.setState(TaskExecutionStatus.SUBMITTED_SUCCESS);
124+
when(taskExecutionRunnable.getTaskInstance()).thenReturn(taskInstance);
125+
workerGroupTaskDispatcher.add(taskExecutionRunnable, 1000);
126+
workerGroupTaskDispatcher.start();
127+
workerGroupTaskDispatcher.close();
128+
assertEquals(DispatchWorkerStatus.DELETING, workerGroupTaskDispatcher.getStatus());
129+
}
130+
88131
}

0 commit comments

Comments
 (0)