[opt](cloud) Add request-coalescing version cache for point queries#61111
[opt](cloud) Add request-coalescing version cache for point queries#61111eldenmoon wants to merge 3 commits intoapache:masterfrom
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
run buildall |
There was a problem hiding this comment.
Pull request overview
Adds a request-coalescing, short-TTL version cache to reduce MetaService RPC fan-out for concurrent snapshot point queries in cloud mode.
Changes:
- Introduce
PointQueryVersionCachesingleton to cache partition visible versions and coalesce concurrent refreshes viaCompletableFuture. - Wire
PointQueryExecutor.updateCloudPartitionVersions()to use the new cache and add session variablepoint_query_version_cache_ttl_ms. - Add unit tests for cache behavior and an end-to-end concurrency/perf-style test for the executor path.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/PointQueryVersionCache.java | Implements request-coalescing TTL cache and MetaService version fetch logic. |
| fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExecutor.java | Switches snapshot point query version fetch to use the new cache. |
| fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java | Adds point_query_version_cache_ttl_ms session variable for enabling/configuring the cache. |
| fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/PointQueryVersionCacheTest.java | Unit tests for TTL hits/expiry, coalescing, and failure behavior. |
| fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/PointQueryExecutorPerfTest.java | Concurrency/perf-style test exercising the real executor method under mocked RPC latency. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| myFuture.completeExceptionally(e); | ||
| if (e instanceof RpcException) { | ||
| throw (RpcException) e; | ||
| } | ||
| throw new RpcException("get version", e.getMessage()); |
There was a problem hiding this comment.
Wrapping non-RpcException failures into a new RpcException drops the original exception as the cause, which makes debugging harder. Consider using the RpcException(host, message, Exception) constructor (or otherwise attaching the cause) so stack traces aren’t lost.
| * <li><b>Short TTL caching</b>: Partition versions are cached for a configurable duration | ||
| * ({@code point_query_version_cache_ttl_ms}, default 500ms). Within the TTL window, | ||
| * concurrent queries reuse the cached version.</li> |
There was a problem hiding this comment.
The class Javadoc claims the default TTL is 500ms, but the new session variable defaults to 0 (disabled). Please update the Javadoc to reflect the actual default/behavior to avoid misleading operators.
| * <li><b>Short TTL caching</b>: Partition versions are cached for a configurable duration | |
| * ({@code point_query_version_cache_ttl_ms}, default 500ms). Within the TTL window, | |
| * concurrent queries reuse the cached version.</li> | |
| * <li><b>Short TTL caching</b>: When enabled, partition versions are cached for a | |
| * configurable duration ({@code point_query_version_cache_ttl_ms}). The default | |
| * value is {@code 0}, which disables the cache; setting a positive value (for | |
| * example, {@code 500} milliseconds) allows concurrent queries within the TTL | |
| * window to reuse the cached version.</li> |
| // Try to become the leader request for this partition | ||
| CompletableFuture<Long> myFuture = new CompletableFuture<>(); | ||
| CompletableFuture<Long> existingFuture = inflightRequests.putIfAbsent(partitionId, myFuture); | ||
|
|
||
| if (existingFuture != null) { |
There was a problem hiding this comment.
A late-arriving thread can miss the cache, then find no inflight future (after the previous leader removed it) and become the new leader without re-checking the now-populated cache, causing an unnecessary extra MetaService RPC. Consider re-checking the cache after winning leadership (before calling fetchVersionFromMs) or using an atomic computeIfAbsent-based future to avoid this race.
| cache.put(partitionId, new VersionEntry(version, System.currentTimeMillis())); | ||
| // Also update the partition's cached version | ||
| partition.setCachedVisibleVersion(version, System.currentTimeMillis()); | ||
| // Complete the future so waiting requests get the result |
There was a problem hiding this comment.
Passing System.currentTimeMillis() as the partition visibleVersionTime changes the meaning of Partition.visibleVersionTime (CloudPartition normally uses MetaService’s versionUpdateTimeMs). This can make visibleVersionTime inaccurate. Either extract and propagate versionUpdateTimeMs from the MetaService response, or avoid calling setCachedVisibleVersion here.
| Throwable cause = e.getCause(); | ||
| if (cause instanceof RpcException) { | ||
| throw (RpcException) cause; | ||
| } | ||
| throw new RpcException("get version", cause != null ? cause.getMessage() : e.getMessage()); |
There was a problem hiding this comment.
When converting ExecutionException into RpcException, the original cause is not preserved, which makes debugging harder. Prefer using RpcException(host, message, Exception) (or otherwise attaching the cause) so the stack trace isn’t lost.
| private final ConcurrentHashMap<Long, VersionEntry> cache = new ConcurrentHashMap<>(); | ||
|
|
||
| // partitionId -> inflight RPC future (for request coalescing) | ||
| private final ConcurrentHashMap<Long, CompletableFuture<Long>> inflightRequests = new ConcurrentHashMap<>(); | ||
|
|
There was a problem hiding this comment.
Expired entries are never evicted from this cache: VersionEntry expiration is checked on read, but stale entries remain in the map indefinitely, so memory usage can grow with the number of partitions ever point-queried. Consider a bounded/expiring cache (e.g., Caffeine with maximumSize + expireAfterWrite) or explicit cleanup/removal.
| Assertions.assertTrue(noCacheRpcs >= numThreads, | ||
| "Without cache, expected >= " + numThreads + " MS RPCs, got " + noCacheRpcs); | ||
| Assertions.assertTrue(cachedRpcs <= 5, | ||
| "With cache, expected <= 5 MS RPCs, got " + cachedRpcs); | ||
| Assertions.assertTrue(reductionFactor >= 20, |
There was a problem hiding this comment.
The assertions here use strict absolute thresholds (e.g., cachedRpcs <= 5 and >= 20x reduction) under heavy concurrency, which can be flaky on contended/slow CI hosts. Consider asserting a looser relative improvement (e.g., cachedRpcs is significantly less than noCacheRpcs) and/or adding timeouts to avoid hangs.
When enable_snapshot_point_query=true in cloud mode, every point query triggers an independent RPC to MetaService to fetch the partition visible version. Under high concurrency, this creates N RPCs for N concurrent queries on the same partition, becoming a significant bottleneck. This patch introduces PointQueryVersionCache, a request-coalescing cache that reduces MetaService RPCs by: - Short TTL caching: partition versions are cached for a configurable duration (point_query_version_cache_ttl_ms, default 0 = disabled). Within the TTL window, concurrent queries reuse the cached version. - Request coalescing: when cache expires, only the first request issues the RPC; concurrent requests wait on the inflight result via CompletableFuture. Under 100 concurrent point queries with 5ms RPC latency, the cache reduces MetaService RPCs from 100+ to 1-2 (>= 20x reduction). Changes: - New: PointQueryVersionCache.java - singleton request-coalescing cache - Modified: PointQueryExecutor.updateCloudPartitionVersions() to use cache - Modified: SessionVariable.java - add point_query_version_cache_ttl_ms - New: PointQueryVersionCacheTest.java - 11 unit tests - New: PointQueryExecutorPerfTest.java - E2E performance benchmark
6da9fb1 to
bf046d6
Compare
|
run buildall |
TPC-H: Total hot run time: 27474 ms |
TPC-DS: Total hot run time: 152655 ms |
FE Regression Coverage ReportIncrement line coverage |
|
/review |
There was a problem hiding this comment.
Code Review Summary
This PR adds a PointQueryVersionCache that coalesces concurrent MetaService RPC calls for partition version fetching in cloud-mode point queries. The overall approach is sound — request coalescing via CompletableFuture is a well-known pattern. However, there are several issues that should be addressed before merging.
Critical Checkpoints
Goal and correctness: The goal is to reduce MetaService RPCs for concurrent point queries on the same partition. The code achieves this via TTL caching + request coalescing. The default TTL is 0 (disabled), which is safe. Tests cover the happy path well.
Modification scope: The change is focused and minimal — one new cache class, one modified executor method, one session variable. Good.
Concurrency: The coalescing logic using putIfAbsent + CompletableFuture is correct for the leader/follower pattern. However:
CompletableFuture.get()has no timeout, creating a risk of indefinite blocking if the leader thread is killed.- There is a TOCTOU window between the cache staleness check and
putIfAbsentthat can cause redundant RPCs (not a correctness bug, but defeats coalescing).
Memory/lifecycle: This is the primary concern. The ConcurrentHashMap<Long, VersionEntry> cache is unbounded with no eviction mechanism. Every other FE cache in the codebase uses Caffeine or Guava CacheBuilder with maximumSize and/or expireAfterWrite. The EvictableCacheBuilder even throws an exception for unbounded caches. Dropped partitions leave permanent stale entries. This should be replaced with a bounded cache (e.g., Caffeine with maximumSize + expireAfterWrite).
Configuration: Session variable pointQueryVersionCacheTtlMs defaults to 0 (disabled), which is safe. The variable is dynamically changeable (no needForward), which is correct since it's read per-query.
Parallel code paths: CloudPartition.getSnapshotVisibleVersion() already has its own TTL-based caching mechanism via cloudPartitionVersionCacheTtlMs. The new cache is a separate layer that only applies to point queries. This is acceptable but creates two overlapping caches — worth documenting.
Javadoc inconsistency: The class Javadoc says "default 500ms" but the actual default is 0 (disabled).
Test coverage: 11 unit tests + 1 perf test. Good coverage of cache hit/miss/expiry/coalescing/error propagation. Missing: test for version monotonicity enforcement (what happens if MS returns a lower version than cached?).
Observability: DEBUG-level logging is present for cache hits and RPC fetches. No metrics are added for cache hit rate or coalescing count, which would be useful for tuning the TTL in production.
Incompatible changes: None — this is purely additive with a disabled-by-default session variable.
Performance: When enabled, this significantly reduces MetaService RPCs under high concurrency. No performance regression when disabled (ttl=0 bypasses cache entirely).
Issues Found
- [High] Unbounded cache — potential memory leak (PointQueryVersionCache.java:78)
- [Medium] No timeout on
CompletableFuture.get()(PointQueryVersionCache.java:145) - [Low] Javadoc says default 500ms but actual default is 0 (PointQueryVersionCache.java:44)
- [Low] No version monotonicity enforcement in cache (PointQueryVersionCache.java:162)
|
|
||
| // partitionId -> cached VersionEntry | ||
| private final ConcurrentHashMap<Long, VersionEntry> cache = new ConcurrentHashMap<>(); | ||
|
|
There was a problem hiding this comment.
[High] Unbounded cache — potential memory leak
This ConcurrentHashMap has no size limit and no eviction mechanism. Expired entries are never removed — they stay in the map forever (only isExpired() prevents serving them). Every partition ever queried adds a permanent entry, and dropped partitions are never cleaned up.
Every other FE cache in the codebase uses Caffeine or Guava CacheBuilder with maximumSize and/or expireAfterWrite. The EvictableCacheBuilder even throws an exception for unbounded caches ("Unbounded cache is not supported").
In a cluster with thousands of tables and many partitions, this will grow monotonically without bound.
Suggestion: Replace with Caffeine.newBuilder().maximumSize(N).expireAfterWrite(ttl, TimeUnit.MILLISECONDS).build(). This also eliminates the need for the manual VersionEntry.isExpired() check. The request coalescing via inflightRequests ConcurrentHashMap is fine since entries are self-cleaning (removed in finally block).
| } | ||
| try { | ||
| return existingFuture.get(); | ||
| } catch (InterruptedException e) { |
There was a problem hiding this comment.
[Medium] No timeout on CompletableFuture.get() — risk of indefinite blocking
existingFuture.get() blocks without timeout. If the leader thread is unexpectedly killed (e.g., OOM, Thread.stop(), or other abnormal termination before completing the future), all follower threads will block forever.
While fetchVersionFromMs() has its own retry/timeout logic inside VersionHelper, the overall wait here could be very long (retry_times × timeout_per_attempt), and in pathological cases, infinite.
Suggestion: Use existingFuture.get(timeout, TimeUnit.MILLISECONDS) with a reasonable timeout (e.g., 30s or configurable), and throw RpcException on TimeoutException.
| * <ul> | ||
| * <li><b>Short TTL caching</b>: Partition versions are cached for a configurable duration | ||
| * ({@code point_query_version_cache_ttl_ms}, default 500ms). Within the TTL window, | ||
| * concurrent queries reuse the cached version.</li> |
There was a problem hiding this comment.
[Low] Javadoc inconsistency: The comment says default 500ms but the actual default value of pointQueryVersionCacheTtlMs is 0 (disabled). Please update the Javadoc to match.
| long version = fetchVersionFromMs(partition); | ||
| // Update cache | ||
| cache.put(partitionId, new VersionEntry(version, System.currentTimeMillis())); | ||
| // Also update the partition's cached version |
There was a problem hiding this comment.
[Low] No version monotonicity enforcement in cache
The cache blindly stores whatever version is returned by MetaService, even if it's lower than a previously cached version. While CloudPartition.setCachedVisibleVersion() (line 164) enforces monotonicity for the partition object itself (via lock + version comparison), the PointQueryVersionCache can serve a stale/lower version if MetaService transiently returns a lower value.
This is unlikely in practice but could happen during MetaService failover or network partitions.
Suggestion: Before cache.put(), check if (existingEntry == null || version >= existingEntry.version) to ensure the cache never serves a version regression. Alternatively, use cache.compute() with a version comparison.
|
run buildall |
|
run buildall |
|
run buildall |
When enable_snapshot_point_query=true in cloud mode, every point query triggers an independent RPC to MetaService to fetch the partition visible version. Under high concurrency, this creates N RPCs for N concurrent queries on the same partition, becoming a significant bottleneck.
This patch introduces PointQueryVersionCache, a request-coalescing cache that reduces MetaService RPCs by:
Under 100 concurrent point queries with 5ms RPC latency, the cache reduces MetaService RPCs from 100+ to 1-2 (>= 20x reduction).
Changes: