Skip to content

Commit 6f31d1b

Browse files
committed
[opt](TabletScheduler) introduce TableDispatchScheduler for table-level tablet scheduling
This PR optimizes the TabletScheduler by introducing a table-level dispatching mechanism. Instead of processing tablets sequentially from a single global queue, the scheduler now dispatches tablets to table-specific queues handled by a pool of worker threads. This optimization aims to prevent a long-held lock or a potential deadlock on a specific table from blocking the entire TabletScheduler and enhance overall scheduling throughput by allowing multiple tables to be processed concurrently.
1 parent b076ae9 commit 6f31d1b

File tree

2 files changed

+302
-68
lines changed

2 files changed

+302
-68
lines changed
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.clone;
19+
20+
import org.apache.doris.common.ThreadPoolManager;
21+
22+
import com.google.common.collect.Lists;
23+
import com.google.common.collect.Maps;
24+
import com.google.common.collect.Sets;
25+
import org.apache.logging.log4j.LogManager;
26+
import org.apache.logging.log4j.Logger;
27+
28+
import java.util.Deque;
29+
import java.util.LinkedList;
30+
import java.util.List;
31+
import java.util.Map;
32+
import java.util.Set;
33+
import java.util.concurrent.LinkedBlockingQueue;
34+
import java.util.concurrent.ThreadPoolExecutor;
35+
import java.util.concurrent.TimeUnit;
36+
import java.util.function.Consumer;
37+
38+
class TableDispatchScheduler {
39+
private static final Logger LOG = LogManager.getLogger(TableDispatchScheduler.class);
40+
private static final int MAX_ACTIVE_TABLE_WORKERS = 16;
41+
private static final long WORKER_KEEP_ALIVE_SECONDS = 60;
42+
43+
// table id -> tablets queued for worker consumption (worker-queued state)
44+
private final Map<Long, Deque<TabletSchedCtx>> workerQueuedTabletsByTable = Maps.newHashMap();
45+
// one table can only be handled by one worker at a time
46+
private final Set<Long> activeTableWorkers = Sets.newHashSet();
47+
private final ThreadPoolExecutor tableWorkerPool;
48+
private final Consumer<List<TabletSchedCtx>> tabletBatchProcessor;
49+
private final int workerBatchSize;
50+
51+
TableDispatchScheduler(Consumer<List<TabletSchedCtx>> tabletBatchProcessor, int workerBatchSize) {
52+
this.tabletBatchProcessor = tabletBatchProcessor;
53+
this.workerBatchSize = workerBatchSize;
54+
this.tableWorkerPool = ThreadPoolManager.newDaemonThreadPool(0, MAX_ACTIVE_TABLE_WORKERS,
55+
WORKER_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
56+
new ThreadPoolExecutor.AbortPolicy(), "table-dispatch-scheduler-worker", true);
57+
this.tableWorkerPool.allowCoreThreadTimeOut(true);
58+
}
59+
60+
synchronized void clear() {
61+
workerQueuedTabletsByTable.clear();
62+
activeTableWorkers.clear();
63+
}
64+
65+
void enqueueTablets(List<TabletSchedCtx> tabletCtxs) {
66+
synchronized (this) {
67+
for (TabletSchedCtx tabletCtx : tabletCtxs) {
68+
long tableId = tabletCtx.getTblId();
69+
Deque<TabletSchedCtx> queue = workerQueuedTabletsByTable.get(tableId);
70+
if (queue == null) {
71+
queue = new LinkedList<>();
72+
workerQueuedTabletsByTable.put(tableId, queue);
73+
}
74+
queue.addLast(tabletCtx);
75+
}
76+
}
77+
78+
triggerTableWorkers();
79+
}
80+
81+
synchronized void appendWorkerQueuedTablets(List<TabletSchedCtx> target, int limit) {
82+
for (Deque<TabletSchedCtx> queue : workerQueuedTabletsByTable.values()) {
83+
if (target.size() >= limit) {
84+
return;
85+
}
86+
queue.stream().limit(limit - target.size()).forEach(target::add);
87+
}
88+
}
89+
90+
synchronized List<TabletSchedCtx> getWorkerQueuedTablets() {
91+
List<TabletSchedCtx> queuedTablets = Lists.newArrayList();
92+
for (Deque<TabletSchedCtx> queue : workerQueuedTabletsByTable.values()) {
93+
queuedTablets.addAll(queue);
94+
}
95+
return queuedTablets;
96+
}
97+
98+
private void triggerTableWorkers() {
99+
while (true) {
100+
long tableIdToSchedule;
101+
synchronized (this) {
102+
if (activeTableWorkers.size() >= MAX_ACTIVE_TABLE_WORKERS) {
103+
return;
104+
}
105+
106+
tableIdToSchedule = pickNextTableIdToActivate();
107+
if (tableIdToSchedule == -1) {
108+
return;
109+
}
110+
111+
activeTableWorkers.add(tableIdToSchedule);
112+
}
113+
try {
114+
tableWorkerPool.execute(new TableScheduleWorker(tableIdToSchedule));
115+
} catch (RuntimeException e) {
116+
synchronized (this) {
117+
activeTableWorkers.remove(tableIdToSchedule);
118+
}
119+
LOG.warn("failed to submit table schedule worker for table {}", tableIdToSchedule, e);
120+
return;
121+
}
122+
}
123+
}
124+
125+
private long pickNextTableIdToActivate() {
126+
long tableId = -1;
127+
TabletSchedCtx candidate = null;
128+
for (Map.Entry<Long, Deque<TabletSchedCtx>> entry : workerQueuedTabletsByTable.entrySet()) {
129+
if (activeTableWorkers.contains(entry.getKey()) || entry.getValue().isEmpty()) {
130+
continue;
131+
}
132+
TabletSchedCtx head = entry.getValue().peekFirst();
133+
if (head == null) {
134+
continue;
135+
}
136+
137+
// higher priority tablet
138+
if (candidate == null || head.compareTo(candidate) < 0) {
139+
candidate = head;
140+
tableId = entry.getKey();
141+
}
142+
}
143+
return tableId;
144+
}
145+
146+
private synchronized List<TabletSchedCtx> pollNextTabletCtxBatch(long tableId) {
147+
List<TabletSchedCtx> tabletCtxs = Lists.newArrayList();
148+
Deque<TabletSchedCtx> queue = workerQueuedTabletsByTable.get(tableId);
149+
if (queue == null) {
150+
return tabletCtxs;
151+
}
152+
while (tabletCtxs.size() < workerBatchSize) {
153+
TabletSchedCtx tabletCtx = queue.pollFirst();
154+
if (tabletCtx == null) {
155+
break;
156+
}
157+
tabletCtxs.add(tabletCtx);
158+
}
159+
if (queue.isEmpty()) {
160+
workerQueuedTabletsByTable.remove(tableId);
161+
}
162+
return tabletCtxs;
163+
}
164+
165+
private void onTableWorkerDone(long tableId) {
166+
synchronized (this) {
167+
activeTableWorkers.remove(tableId);
168+
}
169+
triggerTableWorkers();
170+
}
171+
172+
private class TableScheduleWorker implements Runnable {
173+
private final long tableId;
174+
175+
private TableScheduleWorker(long tableId) {
176+
this.tableId = tableId;
177+
}
178+
179+
@Override
180+
public void run() {
181+
try {
182+
while (true) {
183+
List<TabletSchedCtx> tabletCtxBatch = pollNextTabletCtxBatch(tableId);
184+
if (tabletCtxBatch.isEmpty()) {
185+
return;
186+
}
187+
tabletBatchProcessor.accept(tabletCtxBatch);
188+
}
189+
} finally {
190+
onTableWorkerDone(tableId);
191+
}
192+
}
193+
}
194+
}

0 commit comments

Comments
 (0)