Skip to content

Commit bf046d6

Browse files
committed
[opt](cloud) Add request-coalescing version cache for point queries
When enable_snapshot_point_query=true in cloud mode, every point query triggers an independent RPC to MetaService to fetch the partition visible version. Under high concurrency, this creates N RPCs for N concurrent queries on the same partition, becoming a significant bottleneck. This patch introduces PointQueryVersionCache, a request-coalescing cache that reduces MetaService RPCs by: - Short TTL caching: partition versions are cached for a configurable duration (point_query_version_cache_ttl_ms, default 0 = disabled). Within the TTL window, concurrent queries reuse the cached version. - Request coalescing: when cache expires, only the first request issues the RPC; concurrent requests wait on the inflight result via CompletableFuture. Under 100 concurrent point queries with 5ms RPC latency, the cache reduces MetaService RPCs from 100+ to 1-2 (>= 20x reduction). Changes: - New: PointQueryVersionCache.java - singleton request-coalescing cache - Modified: PointQueryExecutor.updateCloudPartitionVersions() to use cache - Modified: SessionVariable.java - add point_query_version_cache_ttl_ms - New: PointQueryVersionCacheTest.java - 11 unit tests - New: PointQueryExecutorPerfTest.java - E2E performance benchmark
1 parent bb3205c commit bf046d6

File tree

5 files changed

+802
-13
lines changed

5 files changed

+802
-13
lines changed
Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.cloud.catalog;
19+
20+
import org.apache.doris.catalog.Partition;
21+
import org.apache.doris.cloud.proto.Cloud;
22+
import org.apache.doris.cloud.rpc.VersionHelper;
23+
import org.apache.doris.rpc.RpcException;
24+
import org.apache.doris.service.FrontendOptions;
25+
26+
import com.google.common.annotations.VisibleForTesting;
27+
import org.apache.logging.log4j.LogManager;
28+
import org.apache.logging.log4j.Logger;
29+
30+
import java.util.concurrent.CompletableFuture;
31+
import java.util.concurrent.ConcurrentHashMap;
32+
import java.util.concurrent.ExecutionException;
33+
34+
/**
35+
* A request-coalescing version cache for point queries in cloud mode.
36+
*
37+
* <p>When {@code enable_snapshot_point_query=true}, every point query needs to fetch
38+
* the partition's visible version from MetaService. Under high concurrency, this causes
39+
* N RPCs for N concurrent point queries on the same partition.</p>
40+
*
41+
* <p>This cache optimizes the version fetching by:
42+
* <ul>
43+
* <li><b>Short TTL caching</b>: Partition versions are cached for a configurable duration
44+
* ({@code point_query_version_cache_ttl_ms}, default 500ms). Within the TTL window,
45+
* concurrent queries reuse the cached version.</li>
46+
* <li><b>Request coalescing</b>: When the cache expires, only the first request issues
47+
* the MetaService RPC. Concurrent requests for the same partition wait on the inflight
48+
* result via a {@link CompletableFuture}.</li>
49+
* </ul>
50+
* </p>
51+
*/
52+
public class PointQueryVersionCache {
53+
private static final Logger LOG = LogManager.getLogger(PointQueryVersionCache.class);
54+
55+
private static volatile PointQueryVersionCache instance;
56+
57+
/**
58+
* Cache entry holding the version and the timestamp when it was cached.
59+
*/
60+
static class VersionEntry {
61+
final long version;
62+
final long cachedTimeMs;
63+
64+
VersionEntry(long version, long cachedTimeMs) {
65+
this.version = version;
66+
this.cachedTimeMs = cachedTimeMs;
67+
}
68+
69+
boolean isExpired(long ttlMs) {
70+
if (ttlMs <= 0) {
71+
return true;
72+
}
73+
return System.currentTimeMillis() - cachedTimeMs > ttlMs;
74+
}
75+
}
76+
77+
// partitionId -> cached VersionEntry
78+
private final ConcurrentHashMap<Long, VersionEntry> cache = new ConcurrentHashMap<>();
79+
80+
// partitionId -> inflight RPC future (for request coalescing)
81+
private final ConcurrentHashMap<Long, CompletableFuture<Long>> inflightRequests = new ConcurrentHashMap<>();
82+
83+
@VisibleForTesting
84+
public PointQueryVersionCache() {
85+
}
86+
87+
public static PointQueryVersionCache getInstance() {
88+
if (instance == null) {
89+
synchronized (PointQueryVersionCache.class) {
90+
if (instance == null) {
91+
instance = new PointQueryVersionCache();
92+
}
93+
}
94+
}
95+
return instance;
96+
}
97+
98+
@VisibleForTesting
99+
public static void setInstance(PointQueryVersionCache cache) {
100+
instance = cache;
101+
}
102+
103+
/**
104+
* Get the visible version for a partition, using TTL-based caching and request coalescing.
105+
*
106+
* @param partition the cloud partition to get version for
107+
* @param ttlMs TTL in milliseconds; 0 or negative disables caching
108+
* @return the visible version
109+
* @throws RpcException if the MetaService RPC fails
110+
*/
111+
public long getVersion(CloudPartition partition, long ttlMs) throws RpcException {
112+
long partitionId = partition.getId();
113+
114+
// If cache is disabled, fetch directly
115+
if (ttlMs <= 0) {
116+
return fetchVersionFromMs(partition);
117+
}
118+
119+
// Check cache first
120+
VersionEntry entry = cache.get(partitionId);
121+
if (entry != null && !entry.isExpired(ttlMs)) {
122+
if (LOG.isDebugEnabled()) {
123+
LOG.debug("point query version cache hit, partition={}, version={}", partitionId, entry.version);
124+
}
125+
return entry.version;
126+
}
127+
128+
// Cache miss or expired: use request coalescing
129+
return getVersionWithCoalescing(partition, partitionId, ttlMs);
130+
}
131+
132+
private long getVersionWithCoalescing(CloudPartition partition, long partitionId, long ttlMs)
133+
throws RpcException {
134+
// Try to become the leader request for this partition
135+
CompletableFuture<Long> myFuture = new CompletableFuture<>();
136+
CompletableFuture<Long> existingFuture = inflightRequests.putIfAbsent(partitionId, myFuture);
137+
138+
if (existingFuture != null) {
139+
// Another request is already in flight — wait for its result
140+
if (LOG.isDebugEnabled()) {
141+
LOG.debug("point query version coalescing, waiting for inflight request, partition={}",
142+
partitionId);
143+
}
144+
try {
145+
return existingFuture.get();
146+
} catch (InterruptedException e) {
147+
Thread.currentThread().interrupt();
148+
throw new RpcException("get version", "interrupted while waiting for coalesced request");
149+
} catch (ExecutionException e) {
150+
Throwable cause = e.getCause();
151+
if (cause instanceof RpcException) {
152+
throw (RpcException) cause;
153+
}
154+
throw new RpcException("get version", cause != null ? cause.getMessage() : e.getMessage());
155+
}
156+
}
157+
158+
// We are the leader — fetch version from MetaService
159+
try {
160+
long version = fetchVersionFromMs(partition);
161+
// Update cache
162+
cache.put(partitionId, new VersionEntry(version, System.currentTimeMillis()));
163+
// Also update the partition's cached version
164+
partition.setCachedVisibleVersion(version, System.currentTimeMillis());
165+
// Complete the future so waiting requests get the result
166+
myFuture.complete(version);
167+
if (LOG.isDebugEnabled()) {
168+
LOG.debug("point query version fetched from MS, partition={}, version={}",
169+
partitionId, version);
170+
}
171+
return version;
172+
} catch (Exception e) {
173+
// Complete exceptionally so waiting requests also get the error
174+
myFuture.completeExceptionally(e);
175+
if (e instanceof RpcException) {
176+
throw (RpcException) e;
177+
}
178+
throw new RpcException("get version", e.getMessage());
179+
} finally {
180+
// Remove the inflight request entry
181+
inflightRequests.remove(partitionId, myFuture);
182+
}
183+
}
184+
185+
/**
186+
* Fetch visible version from MetaService for a single partition.
187+
* This method is package-private to allow mocking in tests.
188+
*/
189+
@VisibleForTesting
190+
protected long fetchVersionFromMs(CloudPartition partition) throws RpcException {
191+
Cloud.GetVersionRequest request = Cloud.GetVersionRequest.newBuilder()
192+
.setRequestIp(FrontendOptions.getLocalHostAddressCached())
193+
.setDbId(partition.getDbId())
194+
.setTableId(partition.getTableId())
195+
.setPartitionId(partition.getId())
196+
.setBatchMode(false)
197+
.build();
198+
199+
Cloud.GetVersionResponse resp = VersionHelper.getVersionFromMeta(request);
200+
if (resp.getStatus().getCode() == Cloud.MetaServiceCode.OK) {
201+
return resp.getVersion();
202+
} else if (resp.getStatus().getCode() == Cloud.MetaServiceCode.VERSION_NOT_FOUND) {
203+
return Partition.PARTITION_INIT_VERSION;
204+
} else {
205+
throw new RpcException("get version", "unexpected status " + resp.getStatus());
206+
}
207+
}
208+
209+
/**
210+
* Clear all cached entries. Primarily for testing.
211+
*/
212+
@VisibleForTesting
213+
public void clear() {
214+
cache.clear();
215+
inflightRequests.clear();
216+
}
217+
218+
/**
219+
* Get the number of cached entries. Primarily for testing.
220+
*/
221+
@VisibleForTesting
222+
public int cacheSize() {
223+
return cache.size();
224+
}
225+
}

fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExecutor.java

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.doris.catalog.Env;
2626
import org.apache.doris.catalog.OlapTable;
2727
import org.apache.doris.cloud.catalog.CloudPartition;
28+
import org.apache.doris.cloud.catalog.PointQueryVersionCache;
2829
import org.apache.doris.common.Config;
2930
import org.apache.doris.common.Status;
3031
import org.apache.doris.common.UserException;
@@ -57,12 +58,10 @@
5758

5859
import java.util.ArrayList;
5960
import java.util.Collections;
60-
import java.util.HashSet;
6161
import java.util.Iterator;
6262
import java.util.List;
6363
import java.util.Map;
6464
import java.util.Map.Entry;
65-
import java.util.Set;
6665
import java.util.concurrent.ExecutionException;
6766
import java.util.concurrent.Future;
6867
import java.util.concurrent.TimeUnit;
@@ -90,18 +89,14 @@ public PointQueryExecutor(ShortCircuitQueryContext ctx, int maxMessageSize) {
9089

9190
private void updateCloudPartitionVersions() throws RpcException {
9291
OlapScanNode scanNode = shortCircuitQueryContext.scanNode;
93-
List<CloudPartition> partitions = new ArrayList<>();
94-
Set<Long> partitionSet = new HashSet<>();
9592
OlapTable table = scanNode.getOlapTable();
96-
for (Long id : scanNode.getSelectedPartitionIds()) {
97-
if (!partitionSet.contains(id)) {
98-
partitionSet.add(id);
99-
partitions.add((CloudPartition) table.getPartition(id));
100-
}
101-
}
102-
snapshotVisibleVersions = CloudPartition.getSnapshotVisibleVersion(partitions);
10393
// Only support single partition at present
104-
Preconditions.checkState(snapshotVisibleVersions.size() == 1);
94+
Preconditions.checkState(scanNode.getSelectedPartitionIds().size() == 1);
95+
Long partitionId = scanNode.getSelectedPartitionIds().iterator().next();
96+
CloudPartition partition = (CloudPartition) table.getPartition(partitionId);
97+
long ttlMs = ConnectContext.get().getSessionVariable().pointQueryVersionCacheTtlMs;
98+
long version = PointQueryVersionCache.getInstance().getVersion(partition, ttlMs);
99+
snapshotVisibleVersions = Lists.newArrayList(version);
105100
LOG.debug("set cloud version {}", snapshotVisibleVersions.get(0));
106101
}
107102

@@ -119,7 +114,6 @@ void setScanRangeLocations() throws Exception {
119114
// update partition version if cloud mode
120115
if (Config.isCloudMode()
121116
&& ConnectContext.get().getSessionVariable().enableSnapshotPointQuery) {
122-
// TODO: Optimize to reduce the frequency of version checks in the meta service.
123117
updateCloudPartitionVersions();
124118
}
125119

fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -482,6 +482,7 @@ public class SessionVariable implements Serializable, Writable {
482482
public static final String TOPN_OPT_LIMIT_THRESHOLD = "topn_opt_limit_threshold";
483483
public static final String TOPN_FILTER_RATIO = "topn_filter_ratio";
484484
public static final String ENABLE_SNAPSHOT_POINT_QUERY = "enable_snapshot_point_query";
485+
public static final String POINT_QUERY_VERSION_CACHE_TTL_MS = "point_query_version_cache_ttl_ms";
485486

486487
public static final String ENABLE_FILE_CACHE = "enable_file_cache";
487488

@@ -2158,6 +2159,14 @@ public boolean isEnableHboNonStrictMatchingMode() {
21582159
@VariableMgr.VarAttr(name = ENABLE_SNAPSHOT_POINT_QUERY)
21592160
public boolean enableSnapshotPointQuery = true;
21602161

2162+
@VariableMgr.VarAttr(name = POINT_QUERY_VERSION_CACHE_TTL_MS, description = {
2163+
"点查版本缓存的 TTL(毫秒)。在云模式下,点查需要从 MetaService 获取分区版本,"
2164+
+ "该缓存通过合并并发请求来减少 RPC 次数。设为 0 禁用缓存。",
2165+
"TTL in milliseconds for point query version cache. In cloud mode, point queries need "
2166+
+ "to fetch partition versions from MetaService. This cache reduces RPC calls by "
2167+
+ "coalescing concurrent requests. Set to 0 to disable."})
2168+
public long pointQueryVersionCacheTtlMs = 0;
2169+
21612170
@VariableMgr.VarAttr(name = ENABLE_SERVER_SIDE_PREPARED_STATEMENT, needForward = true, description = {
21622171
"是否启用开启服务端 prepared statement", "Set whether to enable server side prepared statement."})
21632172
public boolean enableServeSidePreparedStatement = true;

0 commit comments

Comments
 (0)