Skip to content

[opt](cloud) Add request-coalescing version cache for point queries#61111

Open
eldenmoon wants to merge 3 commits intoapache:masterfrom
eldenmoon:optimize-point-query-batch-read-version
Open

[opt](cloud) Add request-coalescing version cache for point queries#61111
eldenmoon wants to merge 3 commits intoapache:masterfrom
eldenmoon:optimize-point-query-batch-read-version

Conversation

@eldenmoon
Copy link
Member

When enable_snapshot_point_query=true in cloud mode, every point query triggers an independent RPC to MetaService to fetch the partition visible version. Under high concurrency, this creates N RPCs for N concurrent queries on the same partition, becoming a significant bottleneck.

This patch introduces PointQueryVersionCache, a request-coalescing cache that reduces MetaService RPCs by:

  • Short TTL caching: partition versions are cached for a configurable duration (point_query_version_cache_ttl_ms, default 0 = disabled). Within the TTL window, concurrent queries reuse the cached version.
  • Request coalescing: when cache expires, only the first request issues the RPC; concurrent requests wait on the inflight result via CompletableFuture.

Under 100 concurrent point queries with 5ms RPC latency, the cache reduces MetaService RPCs from 100+ to 1-2 (>= 20x reduction).

Changes:

  • New: PointQueryVersionCache.java - singleton request-coalescing cache
  • Modified: PointQueryExecutor.updateCloudPartitionVersions() to use cache
  • Modified: SessionVariable.java - add point_query_version_cache_ttl_ms
  • New: PointQueryVersionCacheTest.java - 11 unit tests
  • New: PointQueryExecutorPerfTest.java - E2E performance benchmark

Copilot AI review requested due to automatic review settings March 6, 2026 08:55
@hello-stephen
Copy link
Contributor

Thank you for your contribution to Apache Doris.
Don't know what should be done next? See How to process your PR.

Please clearly describe your PR:

  1. What problem was fixed (it's best to include specific error reporting information). How it was fixed.
  2. Which behaviors were modified. What was the previous behavior, what is it now, why was it modified, and what possible impacts might there be.
  3. What features were added. Why was this function added?
  4. Which code was refactored and why was this part of the code refactored?
  5. Which functions were optimized and what is the difference before and after the optimization?

@eldenmoon
Copy link
Member Author

run buildall

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a request-coalescing, short-TTL version cache to reduce MetaService RPC fan-out for concurrent snapshot point queries in cloud mode.

Changes:

  • Introduce PointQueryVersionCache singleton to cache partition visible versions and coalesce concurrent refreshes via CompletableFuture.
  • Wire PointQueryExecutor.updateCloudPartitionVersions() to use the new cache and add session variable point_query_version_cache_ttl_ms.
  • Add unit tests for cache behavior and an end-to-end concurrency/perf-style test for the executor path.

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/PointQueryVersionCache.java Implements request-coalescing TTL cache and MetaService version fetch logic.
fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExecutor.java Switches snapshot point query version fetch to use the new cache.
fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java Adds point_query_version_cache_ttl_ms session variable for enabling/configuring the cache.
fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/PointQueryVersionCacheTest.java Unit tests for TTL hits/expiry, coalescing, and failure behavior.
fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/PointQueryExecutorPerfTest.java Concurrency/perf-style test exercising the real executor method under mocked RPC latency.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +174 to +178
myFuture.completeExceptionally(e);
if (e instanceof RpcException) {
throw (RpcException) e;
}
throw new RpcException("get version", e.getMessage());
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrapping non-RpcException failures into a new RpcException drops the original exception as the cause, which makes debugging harder. Consider using the RpcException(host, message, Exception) constructor (or otherwise attaching the cause) so stack traces aren’t lost.

Copilot uses AI. Check for mistakes.
Comment on lines +43 to +45
* <li><b>Short TTL caching</b>: Partition versions are cached for a configurable duration
* ({@code point_query_version_cache_ttl_ms}, default 500ms). Within the TTL window,
* concurrent queries reuse the cached version.</li>
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The class Javadoc claims the default TTL is 500ms, but the new session variable defaults to 0 (disabled). Please update the Javadoc to reflect the actual default/behavior to avoid misleading operators.

Suggested change
* <li><b>Short TTL caching</b>: Partition versions are cached for a configurable duration
* ({@code point_query_version_cache_ttl_ms}, default 500ms). Within the TTL window,
* concurrent queries reuse the cached version.</li>
* <li><b>Short TTL caching</b>: When enabled, partition versions are cached for a
* configurable duration ({@code point_query_version_cache_ttl_ms}). The default
* value is {@code 0}, which disables the cache; setting a positive value (for
* example, {@code 500} milliseconds) allows concurrent queries within the TTL
* window to reuse the cached version.</li>

Copilot uses AI. Check for mistakes.
Comment on lines +134 to +138
// Try to become the leader request for this partition
CompletableFuture<Long> myFuture = new CompletableFuture<>();
CompletableFuture<Long> existingFuture = inflightRequests.putIfAbsent(partitionId, myFuture);

if (existingFuture != null) {
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A late-arriving thread can miss the cache, then find no inflight future (after the previous leader removed it) and become the new leader without re-checking the now-populated cache, causing an unnecessary extra MetaService RPC. Consider re-checking the cache after winning leadership (before calling fetchVersionFromMs) or using an atomic computeIfAbsent-based future to avoid this race.

Copilot uses AI. Check for mistakes.
Comment on lines +162 to +165
cache.put(partitionId, new VersionEntry(version, System.currentTimeMillis()));
// Also update the partition's cached version
partition.setCachedVisibleVersion(version, System.currentTimeMillis());
// Complete the future so waiting requests get the result
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing System.currentTimeMillis() as the partition visibleVersionTime changes the meaning of Partition.visibleVersionTime (CloudPartition normally uses MetaService’s versionUpdateTimeMs). This can make visibleVersionTime inaccurate. Either extract and propagate versionUpdateTimeMs from the MetaService response, or avoid calling setCachedVisibleVersion here.

Copilot uses AI. Check for mistakes.
Comment on lines +150 to +154
Throwable cause = e.getCause();
if (cause instanceof RpcException) {
throw (RpcException) cause;
}
throw new RpcException("get version", cause != null ? cause.getMessage() : e.getMessage());
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When converting ExecutionException into RpcException, the original cause is not preserved, which makes debugging harder. Prefer using RpcException(host, message, Exception) (or otherwise attaching the cause) so the stack trace isn’t lost.

Copilot uses AI. Check for mistakes.
Comment on lines +78 to +82
private final ConcurrentHashMap<Long, VersionEntry> cache = new ConcurrentHashMap<>();

// partitionId -> inflight RPC future (for request coalescing)
private final ConcurrentHashMap<Long, CompletableFuture<Long>> inflightRequests = new ConcurrentHashMap<>();

Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Expired entries are never evicted from this cache: VersionEntry expiration is checked on read, but stale entries remain in the map indefinitely, so memory usage can grow with the number of partitions ever point-queried. Consider a bounded/expiring cache (e.g., Caffeine with maximumSize + expireAfterWrite) or explicit cleanup/removal.

Copilot uses AI. Check for mistakes.
Comment on lines +219 to +223
Assertions.assertTrue(noCacheRpcs >= numThreads,
"Without cache, expected >= " + numThreads + " MS RPCs, got " + noCacheRpcs);
Assertions.assertTrue(cachedRpcs <= 5,
"With cache, expected <= 5 MS RPCs, got " + cachedRpcs);
Assertions.assertTrue(reductionFactor >= 20,
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The assertions here use strict absolute thresholds (e.g., cachedRpcs <= 5 and >= 20x reduction) under heavy concurrency, which can be flaky on contended/slow CI hosts. Consider asserting a looser relative improvement (e.g., cachedRpcs is significantly less than noCacheRpcs) and/or adding timeouts to avoid hangs.

Copilot uses AI. Check for mistakes.
When enable_snapshot_point_query=true in cloud mode, every point query
triggers an independent RPC to MetaService to fetch the partition visible
version. Under high concurrency, this creates N RPCs for N concurrent
queries on the same partition, becoming a significant bottleneck.

This patch introduces PointQueryVersionCache, a request-coalescing cache
that reduces MetaService RPCs by:
- Short TTL caching: partition versions are cached for a configurable
  duration (point_query_version_cache_ttl_ms, default 0 = disabled).
  Within the TTL window, concurrent queries reuse the cached version.
- Request coalescing: when cache expires, only the first request issues
  the RPC; concurrent requests wait on the inflight result via
  CompletableFuture.

Under 100 concurrent point queries with 5ms RPC latency, the cache
reduces MetaService RPCs from 100+ to 1-2 (>= 20x reduction).

Changes:
- New: PointQueryVersionCache.java - singleton request-coalescing cache
- Modified: PointQueryExecutor.updateCloudPartitionVersions() to use cache
- Modified: SessionVariable.java - add point_query_version_cache_ttl_ms
- New: PointQueryVersionCacheTest.java - 11 unit tests
- New: PointQueryExecutorPerfTest.java - E2E performance benchmark
@eldenmoon eldenmoon force-pushed the optimize-point-query-batch-read-version branch from 6da9fb1 to bf046d6 Compare March 6, 2026 09:16
@eldenmoon
Copy link
Member Author

run buildall

@doris-robot
Copy link

TPC-H: Total hot run time: 27474 ms
machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
scripts: https://github.com/apache/doris/tree/master/tools/tpch-tools
Tpch sf100 test result on commit bf046d63ceb5c454705d8048b52e00501e655dd3, data reload: false

------ Round 1 ----------------------------------
============================================
q1	17670	4494	4283	4283
q2	q3	10641	762	506	506
q4	4680	357	249	249
q5	7559	1200	1047	1047
q6	173	175	144	144
q7	814	860	675	675
q8	9647	1451	1285	1285
q9	4844	4736	4625	4625
q10	6335	1914	1655	1655
q11	457	267	236	236
q12	735	571	473	473
q13	18056	2897	2183	2183
q14	230	238	220	220
q15	933	793	815	793
q16	754	713	688	688
q17	708	852	395	395
q18	5917	5513	5214	5214
q19	1315	993	612	612
q20	533	513	397	397
q21	4481	2128	1532	1532
q22	381	315	262	262
Total cold run time: 96863 ms
Total hot run time: 27474 ms

----- Round 2, with runtime_filter_mode=off -----
============================================
q1	4636	4586	4517	4517
q2	q3	3892	4397	3853	3853
q4	913	1191	784	784
q5	4017	4380	4274	4274
q6	178	170	137	137
q7	1761	1617	1525	1525
q8	2470	2704	2570	2570
q9	7554	7374	7428	7374
q10	3932	3969	3546	3546
q11	514	428	409	409
q12	500	595	438	438
q13	2847	3219	2686	2686
q14	290	295	269	269
q15	839	820	847	820
q16	733	775	727	727
q17	1182	1393	1284	1284
q18	7106	6772	6568	6568
q19	883	916	889	889
q20	2090	2144	1992	1992
q21	3933	3447	3410	3410
q22	457	411	379	379
Total cold run time: 50727 ms
Total hot run time: 48451 ms

@doris-robot
Copy link

TPC-DS: Total hot run time: 152655 ms
machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
scripts: https://github.com/apache/doris/tree/master/tools/tpcds-tools
TPC-DS sf100 test result on commit bf046d63ceb5c454705d8048b52e00501e655dd3, data reload: false

query5	4369	643	509	509
query6	324	228	210	210
query7	4219	462	267	267
query8	330	265	232	232
query9	8672	2731	2711	2711
query10	499	369	349	349
query11	7312	5823	5623	5623
query12	200	128	132	128
query13	1266	452	358	358
query14	5775	3798	3572	3572
query14_1	2790	2806	2830	2806
query15	209	192	176	176
query16	1002	464	443	443
query17	944	716	614	614
query18	2452	458	349	349
query19	223	210	185	185
query20	139	128	133	128
query21	235	156	121	121
query22	4945	4963	4854	4854
query23	16543	15855	15703	15703
query23_1	15701	15762	15892	15762
query24	7976	1708	1249	1249
query24_1	1322	1339	1282	1282
query25	596	515	474	474
query26	1234	273	158	158
query27	2764	476	331	331
query28	4467	1843	1865	1843
query29	850	556	476	476
query30	316	250	210	210
query31	1360	1277	1225	1225
query32	77	72	72	72
query33	509	327	265	265
query34	931	912	564	564
query35	631	692	599	599
query36	1053	1132	983	983
query37	140	98	82	82
query38	2901	2880	2839	2839
query39	882	918	843	843
query39_1	870	838	820	820
query40	239	154	134	134
query41	62	61	58	58
query42	300	309	308	308
query43	256	257	218	218
query44	
query45	200	195	181	181
query46	888	974	598	598
query47	2116	2076	2040	2040
query48	323	316	228	228
query49	629	470	376	376
query50	666	284	212	212
query51	4086	4095	4092	4092
query52	286	296	283	283
query53	292	333	280	280
query54	287	262	267	262
query55	103	92	83	83
query56	309	315	317	315
query57	1361	1348	1280	1280
query58	291	284	263	263
query59	1375	1421	1301	1301
query60	337	328	305	305
query61	184	145	147	145
query62	642	582	532	532
query63	311	277	278	277
query64	5081	1264	986	986
query65	
query66	1453	451	376	376
query67	16300	16365	16106	16106
query68	
query69	374	307	281	281
query70	985	965	955	955
query71	339	306	294	294
query72	2752	2668	2442	2442
query73	536	561	317	317
query74	9982	9907	9759	9759
query75	2834	2755	2451	2451
query76	2294	1026	670	670
query77	350	382	314	314
query78	11156	11355	10628	10628
query79	2442	796	610	610
query80	1735	624	533	533
query81	560	282	251	251
query82	1012	159	118	118
query83	334	266	243	243
query84	291	121	105	105
query85	907	525	476	476
query86	417	287	321	287
query87	3131	3108	2989	2989
query88	3562	2654	2636	2636
query89	424	367	348	348
query90	2031	193	176	176
query91	159	159	129	129
query92	79	74	74	74
query93	978	838	509	509
query94	641	327	281	281
query95	570	337	367	337
query96	635	509	224	224
query97	2497	2518	2386	2386
query98	230	220	214	214
query99	1002	995	906	906
Total cold run time: 235711 ms
Total hot run time: 152655 ms

@hello-stephen
Copy link
Contributor

FE Regression Coverage Report

Increment line coverage 32.93% (27/82) 🎉
Increment coverage report
Complete coverage report

@eldenmoon
Copy link
Member Author

/review

Copy link
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review Summary

This PR adds a PointQueryVersionCache that coalesces concurrent MetaService RPC calls for partition version fetching in cloud-mode point queries. The overall approach is sound — request coalescing via CompletableFuture is a well-known pattern. However, there are several issues that should be addressed before merging.

Critical Checkpoints

Goal and correctness: The goal is to reduce MetaService RPCs for concurrent point queries on the same partition. The code achieves this via TTL caching + request coalescing. The default TTL is 0 (disabled), which is safe. Tests cover the happy path well.

Modification scope: The change is focused and minimal — one new cache class, one modified executor method, one session variable. Good.

Concurrency: The coalescing logic using putIfAbsent + CompletableFuture is correct for the leader/follower pattern. However:

  • CompletableFuture.get() has no timeout, creating a risk of indefinite blocking if the leader thread is killed.
  • There is a TOCTOU window between the cache staleness check and putIfAbsent that can cause redundant RPCs (not a correctness bug, but defeats coalescing).

Memory/lifecycle: This is the primary concern. The ConcurrentHashMap<Long, VersionEntry> cache is unbounded with no eviction mechanism. Every other FE cache in the codebase uses Caffeine or Guava CacheBuilder with maximumSize and/or expireAfterWrite. The EvictableCacheBuilder even throws an exception for unbounded caches. Dropped partitions leave permanent stale entries. This should be replaced with a bounded cache (e.g., Caffeine with maximumSize + expireAfterWrite).

Configuration: Session variable pointQueryVersionCacheTtlMs defaults to 0 (disabled), which is safe. The variable is dynamically changeable (no needForward), which is correct since it's read per-query.

Parallel code paths: CloudPartition.getSnapshotVisibleVersion() already has its own TTL-based caching mechanism via cloudPartitionVersionCacheTtlMs. The new cache is a separate layer that only applies to point queries. This is acceptable but creates two overlapping caches — worth documenting.

Javadoc inconsistency: The class Javadoc says "default 500ms" but the actual default is 0 (disabled).

Test coverage: 11 unit tests + 1 perf test. Good coverage of cache hit/miss/expiry/coalescing/error propagation. Missing: test for version monotonicity enforcement (what happens if MS returns a lower version than cached?).

Observability: DEBUG-level logging is present for cache hits and RPC fetches. No metrics are added for cache hit rate or coalescing count, which would be useful for tuning the TTL in production.

Incompatible changes: None — this is purely additive with a disabled-by-default session variable.

Performance: When enabled, this significantly reduces MetaService RPCs under high concurrency. No performance regression when disabled (ttl=0 bypasses cache entirely).

Issues Found

  1. [High] Unbounded cache — potential memory leak (PointQueryVersionCache.java:78)
  2. [Medium] No timeout on CompletableFuture.get() (PointQueryVersionCache.java:145)
  3. [Low] Javadoc says default 500ms but actual default is 0 (PointQueryVersionCache.java:44)
  4. [Low] No version monotonicity enforcement in cache (PointQueryVersionCache.java:162)


// partitionId -> cached VersionEntry
private final ConcurrentHashMap<Long, VersionEntry> cache = new ConcurrentHashMap<>();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[High] Unbounded cache — potential memory leak

This ConcurrentHashMap has no size limit and no eviction mechanism. Expired entries are never removed — they stay in the map forever (only isExpired() prevents serving them). Every partition ever queried adds a permanent entry, and dropped partitions are never cleaned up.

Every other FE cache in the codebase uses Caffeine or Guava CacheBuilder with maximumSize and/or expireAfterWrite. The EvictableCacheBuilder even throws an exception for unbounded caches ("Unbounded cache is not supported").

In a cluster with thousands of tables and many partitions, this will grow monotonically without bound.

Suggestion: Replace with Caffeine.newBuilder().maximumSize(N).expireAfterWrite(ttl, TimeUnit.MILLISECONDS).build(). This also eliminates the need for the manual VersionEntry.isExpired() check. The request coalescing via inflightRequests ConcurrentHashMap is fine since entries are self-cleaning (removed in finally block).

}
try {
return existingFuture.get();
} catch (InterruptedException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Medium] No timeout on CompletableFuture.get() — risk of indefinite blocking

existingFuture.get() blocks without timeout. If the leader thread is unexpectedly killed (e.g., OOM, Thread.stop(), or other abnormal termination before completing the future), all follower threads will block forever.

While fetchVersionFromMs() has its own retry/timeout logic inside VersionHelper, the overall wait here could be very long (retry_times × timeout_per_attempt), and in pathological cases, infinite.

Suggestion: Use existingFuture.get(timeout, TimeUnit.MILLISECONDS) with a reasonable timeout (e.g., 30s or configurable), and throw RpcException on TimeoutException.

* <ul>
* <li><b>Short TTL caching</b>: Partition versions are cached for a configurable duration
* ({@code point_query_version_cache_ttl_ms}, default 500ms). Within the TTL window,
* concurrent queries reuse the cached version.</li>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Low] Javadoc inconsistency: The comment says default 500ms but the actual default value of pointQueryVersionCacheTtlMs is 0 (disabled). Please update the Javadoc to match.

long version = fetchVersionFromMs(partition);
// Update cache
cache.put(partitionId, new VersionEntry(version, System.currentTimeMillis()));
// Also update the partition's cached version
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Low] No version monotonicity enforcement in cache

The cache blindly stores whatever version is returned by MetaService, even if it's lower than a previously cached version. While CloudPartition.setCachedVisibleVersion() (line 164) enforces monotonicity for the partition object itself (via lock + version comparison), the PointQueryVersionCache can serve a stale/lower version if MetaService transiently returns a lower value.

This is unlikely in practice but could happen during MetaService failover or network partitions.

Suggestion: Before cache.put(), check if (existingEntry == null || version >= existingEntry.version) to ensure the cache never serves a version regression. Alternatively, use cache.compute() with a version comparison.

@eldenmoon
Copy link
Member Author

run buildall

@eldenmoon
Copy link
Member Author

run buildall

@eldenmoon eldenmoon closed this Mar 7, 2026
@eldenmoon eldenmoon reopened this Mar 7, 2026
@eldenmoon
Copy link
Member Author

run buildall

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants