DRAFT: Implement unified blob cache plugin SPI#29184
Conversation
8fae954 to
f5c3b56
Compare
The purpose of the unified cache is to move cache control to the engine, including cache invalidation, sizing, etc. Right now every catalog creates and manages its own cache which is suboptimial and hard to manage when dynamic catalogs are used. Instead of having multiple caches, create a single, engine-level cache that is leased to connectors.
f5c3b56 to
09ec89f
Compare
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
|
@coderabbitai full review |
✅ Actions performedFull review triggered. |
📝 WalkthroughWalkthroughThis pull request replaces filesystem-based caching with a blob-cache architecture. It introduces new SPI types (BlobCache, BlobSource, BlobCacheManager, BlobCacheManagerFactory, ConnectorCacheFactory, CacheKey, CacheManagerContext), a CacheManagerRegistry, and Guice wiring for cache manager configuration. Legacy filesystem cache modules and classes were removed. Two new plugins were added: trino-blob-cache-alluxio and trino-blob-cache-memory, with corresponding managers and factories. Cache integration was added to connector context, catalog lifecycle, server startup, testing utilities, and product-test configurations; multiple tests and build files were updated accordingly. Possibly related PRsNo PRs from the provided list have a strong code-level connection to this change. The single referenced PR (#28988) addresses Iceberg split/partitioning logic, which operates independently from the cache architecture refactoring introduced here. Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 ast-grep (0.42.1)plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.javaThanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 12
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (5)
plugin/trino-blob-cache-alluxio/src/main/java/io/trino/blob/cache/alluxio/TracingCacheManager.java (2)
144-145:⚠️ Potential issue | 🟡 MinorDuplicate
CACHE_FILE_READ_POSITIONattribute — second call overwrites the position withlength.Line 145 sets
CACHE_FILE_READ_POSITIONagain with(long) length, clobbering the position set on line 144 and never recording the read size. This should beCACHE_FILE_READ_SIZE.Proposed fix
- .setAttribute(CACHE_FILE_READ_POSITION, positionInFile(pageId, position)) - .setAttribute(CACHE_FILE_READ_POSITION, (long) length) + .setAttribute(CACHE_FILE_READ_POSITION, positionInFile(pageId, position)) + .setAttribute(CACHE_FILE_READ_SIZE, (long) length)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@plugin/trino-blob-cache-alluxio/src/main/java/io/trino/blob/cache/alluxio/TracingCacheManager.java` around lines 144 - 145, The second setAttribute call mistakenly reuses CACHE_FILE_READ_POSITION and overwrites the position value; change that second call to use CACHE_FILE_READ_SIZE so the trace records read size correctly (in the attribute setting chain where positionInFile(pageId, position) is set and then the length is set, replace CACHE_FILE_READ_POSITION with CACHE_FILE_READ_SIZE to record (long) length).
202-210:⚠️ Potential issue | 🔴 CriticalMissing
pageOffsetargument in delegate call.The 5-arg
get()method at lines 202–209 receivespageOffsetas a parameter and includes it in the span attributes, but the delegate call at line 209 omits it:delegate.get(pageId, bytesToRead, buffer, offsetInBuffer)passes only 4 arguments, matching the 4-arg overload instead of the 5-arg variant. This silently causes the cache read to start at the wrong offset within the page.Fix
- return withTracing(span, () -> delegate.get(pageId, bytesToRead, buffer, offsetInBuffer)); + return withTracing(span, () -> delegate.get(pageId, pageOffset, bytesToRead, buffer, offsetInBuffer));🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@plugin/trino-blob-cache-alluxio/src/main/java/io/trino/blob/cache/alluxio/TracingCacheManager.java` around lines 202 - 210, The delegate call in TracingCacheManager.get is calling the 4-arg overload instead of the intended 5-arg variant, so include the missing pageOffset when invoking delegate.get to match the method signature and ensure the read starts at the correct offset; update the call in method get(PageId pageId, int pageOffset, int bytesToRead, byte[] buffer, int offsetInBuffer) from delegate.get(pageId, bytesToRead, buffer, offsetInBuffer) to delegate.get(pageId, pageOffset, bytesToRead, buffer, offsetInBuffer) so the span attributes and actual delegate invocation remain consistent.core/trino-main/src/main/java/io/trino/connector/CoordinatorDynamicCatalogManager.java (1)
308-326:⚠️ Potential issue | 🟠 MajorPotential race and unnecessary work in
cacheManagerRegistry.drop.
cacheManagerRegistry.drop(catalogName)is invoked outsidecatalogsUpdateLockand unconditionally, including whenremoved == false(i.e., nothing was actually dropped because the catalog did not exist andexists == true). Two concerns:
- If another thread re-creates a catalog with the same
catalogNamebetween thesynchronizedblock exit and thedrop(...)call, the cache state for the newly created catalog can be invalidated/destroyed.- When the catalog was not present, calling
dropis wasteful and may emit misleading logs/metrics.Consider invoking
dropinside the synchronized block (right afteractiveCatalogs.remove) and guarding it onremoved:Proposed fix
boolean removed; synchronized (catalogsUpdateLock) { checkState(state != State.STOPPED, "ConnectorManager is stopped"); catalogStore.removeCatalog(catalogName); removed = activeCatalogs.remove(catalogName) != null; + if (removed) { + cacheManagerRegistry.drop(catalogName); + } } if (!removed && !exists) { throw new TrinoException(CATALOG_NOT_FOUND, format("Catalog '%s' not found", catalogName)); } - cacheManagerRegistry.drop(catalogName);🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@core/trino-main/src/main/java/io/trino/connector/CoordinatorDynamicCatalogManager.java` around lines 308 - 326, The cacheManagerRegistry.drop call is performed unconditionally outside the synchronized block, which can cause races and unnecessary work; within dropCatalog synchronize on catalogsUpdateLock and, immediately after calling catalogStore.removeCatalog and computing removed = activeCatalogs.remove(catalogName) != null, call cacheManagerRegistry.drop(catalogName) only when removed is true (i.e., the catalog was actually removed); keep the existing check that throws CATALOG_NOT_FOUND when !removed && !exists and ensure cacheManagerRegistry.drop is not invoked for non-removed catalogs to avoid invalidating newly created catalogs or emitting spurious logs/metrics.plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java (1)
1592-1614:⚠️ Potential issue | 🔴 CriticalTwo
QueryRunnerinstances will share the same cache directory when blob cache is enabled; add isolation per runner.When
TestDeltaLakeAlluxioCacheMinioAndHmsConnectorSmokeTest(or any subclass overridinggetBlobCacheProperties()) runstestDeltaLakeTableLocationChanged, the test creates a second independentQueryRunnerviacreateDeltaLakeQueryRunner()at line 1593. Both runners callcreateDeltaLakeQueryRunner()which invokesgetBlobCacheProperties()(line 240), causing them to share the same cache directory.AllaxioConfigurationFactory.canWrite()validates only directory write permissions (lines 73-82), not concurrent ownership, so the conflict goes undetected. Alluxio's local cache cannot safely handle multiple instances managing the same on-disk cache directories. Either parameterize cache directories per runner instance or create isolated temp directories for the independentQueryRunner.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java` around lines 1592 - 1614, The independent QueryRunner shares the same Alluxio/blob cache directory because createDeltaLakeQueryRunner() calls getBlobCacheProperties() which returns identical paths; update the test or runner to give each QueryRunner an isolated cache directory. Specifically, when creating the independent QueryRunner in testDeltaLakeTableLocationChanged (and in createDeltaLakeQueryRunner()/getBlobCacheProperties()), append a unique suffix (UUID or randomNameSuffix()) to the blob cache directory property (the property returned by getBlobCacheProperties()) so each createDeltaLakeQueryRunner() invocation uses its own temp cache dir; alternatively, make createDeltaLakeQueryRunner() accept per-runner blob cache properties and pass a distinct temp directory for the independentQueryRunner to avoid multiple instances managing the same on-disk cache.lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/CacheInputFile.java (1)
72-86:⚠️ Potential issue | 🟠 Major
cache.get(...).length()in InMemoryBlobCache forces full blob load for small files.For
InMemoryBlobCache, callingcache.get(key, source()).length()triggersload(source), which:
- Calls
source.length()(metadata-only)- If length ≤ maxContentLength, calls
source.readFully(0, buffer, 0, length)to load the entire blob into memory- Only if length exceeds maxContentLength does it return the original source for lazy loading
This is a regression when only
length()is needed (e.g., before an abort or short-circuit read).For
AlluxioBlobCache, only metadata is queried; lazy loading happens on actual read.Consider adding a dedicated metadata-only path to
BlobCache(e.g.,BlobCache.length(key, delegate)) or ensure implementations can provide length metadata without forcing full blob materialization.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/CacheInputFile.java` around lines 72 - 86, The length() implementation in CacheInputFile calls cache.get(key, source()).length(), which for InMemoryBlobCache forces full blob load; change this to use a metadata-only path: add a new method on BlobCache (e.g., length(CacheKey key, InputFile delegate) or length(CacheKey key, SourceProvider source)) and call that from CacheInputFile.length() after keyProvider.getCacheKey(delegate) so caches can return just the remote length without materializing content (update InMemoryBlobCache to return source.length() without readFully when size ≤ maxContentLength, and keep AlluxioBlobCache behavior intact).
🧹 Nitpick comments (16)
lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/BlobTrinoInputStream.java (1)
62-73: Avoid per-call allocation in single-byteread().Allocating a new
byte[1]on everyread()invocation will be noticeable when callers iterate byte-by-byte. Consider a reusable single-byte buffer field, or leave a comment if callers are expected to always use the bulkread(byte[], int, int)overload.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/BlobTrinoInputStream.java` around lines 62 - 73, The read() method in BlobTrinoInputStream currently allocates a new byte[1] on every call; change it to reuse a single-byte buffer field or delegate to the bulk read(byte[], int, int) to avoid per-call allocation. Add a private final byte[] singleByte = new byte[1] to the class (or call read(singleByte, 0, 1) from read()), update BlobTrinoInputStream.read() to use that buffer and adjust position handling accordingly so semantics remain the same.core/trino-server-core/src/main/provisio/trino-core.xml (1)
42-52: Consider alphabetical ordering of plugin entries.Existing plugin
artifactSetentries below are alphabetical (exchange-filesystem,functions-python,geospatial, ...). The newblob-cache-alluxio/blob-cache-memoryentries would fit more naturally beforeexchange-filesystemrather than after it.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@core/trino-server-core/src/main/provisio/trino-core.xml` around lines 42 - 52, Move the two new artifactSet entries (artifactSet to="plugin/blob-cache-alluxio" and artifactSet to="plugin/blob-cache-memory") so they appear in alphabetical order among plugin entries — specifically place them before the existing artifactSet for "plugin/exchange-filesystem" (i.e., insert the blob-cache-alluxio and blob-cache-memory blocks so they come alphabetically ahead of exchange-filesystem), preserving the current artifact id/unpack structure exactly.core/trino-spi/src/main/java/io/trino/spi/cache/ConnectorCacheFactory.java (1)
16-19: Consider adding Javadoc to this public SPI.As a newly exported SPI entry point,
ConnectorCacheFactoryandcreateBlobCache(CacheTier)would benefit from Javadoc documenting the contract: who calls it, lifecycle/ownership of the returnedBlobCache, thread-safety expectations, whethertiermay be null, and behavior when no cache manager is configured. Plugin authors implementing or consuming this interface rely on this information.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@core/trino-spi/src/main/java/io/trino/spi/cache/ConnectorCacheFactory.java` around lines 16 - 19, Add Javadoc to the public SPI interface ConnectorCacheFactory and its method createBlobCache(CacheTier) describing the contract: who calls this SPI (Trino core/plugin loader), when it is invoked, the lifecycle and ownership of the returned BlobCache (who must close/cleanup and how long it is expected to live), thread-safety expectations of the implementation and returned BlobCache, whether the tier parameter may be null (or must be non-null) and allowed values, and what should happen if no cache manager is configured (e.g., return a no-op cache or throw). Place this documentation on the ConnectorCacheFactory type and the createBlobCache method so implementors and callers (BlobCache, CacheTier) have clear guidance.core/trino-spi/src/main/java/io/trino/spi/cache/BlobCacheManagerFactory.java (1)
18-25: Inconsistent accessor naming on public SPI.
getName()uses the JavaBean-stylegetprefix whilecacheTier()uses the record/fluent style. Since this is a new public SPI that plugin authors will implement, it's worth aligning these before release — e.g.,getName()+getCacheTier(), orname()+cacheTier().🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@core/trino-spi/src/main/java/io/trino/spi/cache/BlobCacheManagerFactory.java` around lines 18 - 25, The public SPI BlobCacheManagerFactory has inconsistent accessor naming (getName() vs cacheTier()); choose and apply a consistent style across the interface and its usages (either rename getName() -> name() or cacheTier() -> getCacheTier()), update the interface method signature in BlobCacheManagerFactory, then update all implementing classes and any callers/tests to match the chosen name (including create(Map<String,String>, CacheManagerContext) usages and any javadocs or docs that reference these methods) to ensure compilation and consistent API for plugin authors..github/workflows/ci.yml (1)
363-370: Exclusion list ordering.The surrounding entries are alphabetized; the two new
trino-blob-cache-*entries are inserted betweentrino-filesystem-alluxioandtrino-filesystem-azure, breaking that ordering. Consider moving them up beforetrino-base-jdbc… actually beforetrino-cassandra(i.e., keep the list alphabetical).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In @.github/workflows/ci.yml around lines 363 - 370, The exclusion list entries are out of alphabetical order: move the two entries `!:trino-blob-cache-alluxio` and `!:trino-blob-cache-memory` so the list stays alphabetized; specifically place them before `!:trino-cassandra` (i.e., earlier than `!:trino-base-jdbc`), ensuring the sequence around `!:trino-filesystem-alluxio`, `!:trino-filesystem-azure`, `!:trino-blob-cache-alluxio`, `!:trino-blob-cache-memory`, `!:trino-base-jdbc`, `!:trino-cassandra` (or correct alphabetical order) is preserved.core/trino-spi/src/main/java/io/trino/spi/cache/BlobCache.java (1)
25-33: Consider documenting miss/population and invalidation semantics on the SPI.Since this is a new public SPI in
io.trino.spi.cache, javadoc on each method would help implementers. In particular, worth clarifying forget: whether the returnedBlobSourceis guaranteed to reflect the suppliedsourceon a miss, whether thesourcemay be read lazily vs. eagerly, and thread-safety expectations for concurrentget/invalidateon the sameCacheKey(ordering guarantees between an in-flight population and a concurrentinvalidate). Forinvalidate(Collection), whether it is required to be atomic across keys or merely best-effort per key. Implementers likeTracingBlobCache/AlluxioBlobCacheand the in-memory variant may otherwise diverge on these contracts.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@core/trino-spi/src/main/java/io/trino/spi/cache/BlobCache.java` around lines 25 - 33, Add Javadoc comments to the BlobCache SPI describing miss/population and invalidation semantics: document on BlobCache#get(CacheKey, BlobSource) whether a miss must return data reflecting the supplied source and whether implementations may populate lazily or must read eagerly, and specify thread-safety and ordering guarantees when concurrent get and invalidate for the same CacheKey occur (e.g., whether invalidate must cancel/abort an in-flight population or only affect subsequent gets); document BlobCache#invalidate(CacheKey) behavior and expectations; and document BlobCache#invalidate(Collection<CacheKey>) whether batch invalidation must be atomic across the keys or can be best-effort per key so implementers like TracingBlobCache, AlluxioBlobCache, and the in-memory cache implement consistent semantics.core/trino-main/src/main/java/io/trino/cache/CacheManagerConfig.java (1)
35-44: Minor: setter will NPE on null input.
setCacheManagerConfigFiles(null).split(",")throws NPE. Airlift's config binding typically won't pass null, but programmatic callers (tests, etc.) might. Consider defending withrequireNonNullor treating null as an empty list for robustness.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@core/trino-main/src/main/java/io/trino/cache/CacheManagerConfig.java` around lines 35 - 44, The setter CacheManagerConfig.setCacheManagerConfigFiles currently NPEs if passed null; update the method to defensively handle null by either calling requireNonNull(cacheManagerConfigFiles, "cacheManagerConfigFiles is null") or treating null as empty (e.g., guard before split and produce an empty immutable list). Ensure the method still trims, filters empties, converts to Files and assigns cacheManagerConfigFiles, and return this; reference the setCacheManagerConfigFiles method and the cacheManagerConfigFiles field when making the change.plugin/trino-blob-cache-memory/src/main/java/io/trino/blob/cache/memory/MemoryBlobSource.java (1)
46-53: Optional: align error types for invalid offsets.
checkFromIndexSizethrowsIndexOutOfBoundsException(unchecked) while the rest of the method reports invalid offsets viaIOException/EOFException. Also,toIntExact(position)will throwArithmeticExceptionifposition > Integer.MAX_VALUE(possible to reach whenlength == 0since the EOF checkposition + length > data.length()would pass). Consider rejecting out-of-rangepositionwithEOFExceptionbefore thetoIntExactcall for consistent exception semantics.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@plugin/trino-blob-cache-memory/src/main/java/io/trino/blob/cache/memory/MemoryBlobSource.java` around lines 46 - 53, The code in MemoryBlobSource mixes unchecked IndexOutOfBoundsException (from checkFromIndexSize) and checked IO exceptions and calls toIntExact(position) which may throw ArithmeticException; update the read logic to validate offsets and lengths consistently: perform explicit checks for negative position and for position + length > data.length() and throw IOException/EOFException as appropriate (use EOFException for out-of-range reads) before calling toIntExact, and only call toIntExact(position) after these bounds checks so no ArithmeticException escapes; reference the existing checkFromIndexSize, the position/length validations, toIntExact(position), and data.getBytes(...) when making the change.core/trino-main/src/main/java/io/trino/testing/PlanTester.java (1)
674-677: CallcacheManagerRegistry.shutdown()inclose().
CacheManagerRegistryhas ashutdown()method that cleanly closes all loaded blob cache managers. Currently,PlanTester.close()(line 589) shuts down executors, stopscatalogManager, and destroys the finalizer service, but does not stop the cache managers loaded throughloadCacheManager(...). AddingcacheManagerRegistry.shutdown()here would prevent resource leaks across test lifecycles.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@core/trino-main/src/main/java/io/trino/testing/PlanTester.java` around lines 674 - 677, PlanTester currently exposes loadCacheManager(String, Map) but its close() method does not call cacheManagerRegistry.shutdown(), so blob cache managers remain running; update PlanTester.close() to invoke cacheManagerRegistry.shutdown() (after or alongside existing shutdowns like executor shutdowns and catalogManager stop/destroyFinalizerService) to ensure cache managers are cleanly closed and prevent resource leaks; reference cacheManagerRegistry.shutdown(), PlanTester.close(), and loadCacheManager(...) when making the change.lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/CacheFileSystem.java (1)
79-85:invalidateDirectoryEntriesdoes a full recursive listing on every directory mutation — consider scoping.
delegate.listFiles(...)is recursive perTrinoFileSystemcontract. ForrenameDirectory(source, target), invalidating entries undertargetis typically a no-op listing (target usually doesn't exist yet) but fordeleteDirectory/renameDirectoryover large directories on object stores this issues a full recursive LIST (paginated) solely to compute per-entry cache keys — potentially thousands ofgetCacheKey(newInputFile(...))metadata calls just to invalidate, amplifying the cost of the underlying mutation substantially.If the
BlobCache/CacheKeyscheme permits it, a prefix-based invalidation primitive (BlobCache.invalidateByPrefix(location)or similar) would be much cheaper and also safer against races where entries are repopulated between list and invalidate. If prefix invalidation isn't feasible, at least drop the target-side listing inrenameDirectorysince nothing can be cached at a not-yet-existing target.Also applies to: 118-124
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/CacheFileSystem.java` around lines 79 - 85, The current invalidateDirectoryEntries(location) triggers a full recursive delegate.listFiles(...) on every directory mutation (used by deleteDirectory and renameDirectory) causing expensive paginated listings; change the implementation to use a prefix-based invalidation if the cache supports it (e.g., expose and call BlobCache.invalidateByPrefix(location) or equivalent) and fall back to per-entry invalidation only when prefix invalidation is unavailable; additionally, in renameDirectory(source, target) avoid listing/invalidation under the target path (target usually doesn't exist) — only invalidate source entries (or use prefix invalidation for source) to prevent unnecessary recursive LISTs and races.core/trino-main/src/main/java/io/trino/testing/TestingConnectorCacheFactory.java (1)
37-64: RefactorTestingBlobCacheto a transparent pass-through rather than throwingUnsupportedOperationException.If any test wires
TestingConnectorContextthroughFileSystemModuleand enables caching (e.g., viacontext.getCacheFactory().createBlobCache(...)), the currentUOE-throwing stub will break those tests even if caching is not the focus. While current test patterns don't exercise this path, the factory should be defensive: returnsourcedirectly fromget()and make bothinvalidate()methods no-ops. This allows tests to safely ignore caching without special handling.Proposed pass-through behavior
`@Override` public BlobSource get(CacheKey key, BlobSource source) { - return new BlobSource() { - `@Override` - public long length() - { - throw new UnsupportedOperationException(); - } - - `@Override` - public void readFully(long position, byte[] buffer, int offset, int length) - { - throw new UnsupportedOperationException(); - } - }; + return source; } `@Override` public void invalidate(CacheKey key) { - throw new UnsupportedOperationException(); + // no-op } `@Override` public void invalidate(Collection<CacheKey> keys) { - throw new UnsupportedOperationException(); + // no-op }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@core/trino-main/src/main/java/io/trino/testing/TestingConnectorCacheFactory.java` around lines 37 - 64, The TestingConnectorCacheFactory's TestingBlobCache currently returns a stub BlobSource that throws UnsupportedOperationException in get(CacheKey key, BlobSource source) and the two invalidate methods throw UOE; change get(...) to simply return the provided source (pass-through) and make invalidate(CacheKey) and invalidate(Collection<CacheKey>) no-ops so tests that wire TestingConnectorContext and enable caching won't fail; update the implementations inside the factory where get, invalidate(CacheKey) and invalidate(Collection<CacheKey>) are defined to use the real source return and empty method bodies respectively.lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/TrinoInputFileBlobSource.java (1)
41-57: Consider reusing a singleTrinoInputinstance across reads or document the intentional design choice.Each
readFully/readTailcall opens (and closes) a freshTrinoInputfromdelegate.newInput(). For object-store backends this typically triggersopenRange/HEAD-style calls per invocation; on cache-miss population paths that issue multiple reads per blob this can meaningfully amplify request counts and latency. If feasible, caching a lazily-openedTrinoInputacross the source's lifetime (and making the adapterAutoCloseable) would reduce connection overhead. If this short-lived approach is intentional, consider documenting why reuse is not viable.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/TrinoInputFileBlobSource.java` around lines 41 - 57, Either cache a single lazily-initialized TrinoInput returned by delegate.newInput() and reuse it across readFully/readTail calls (store as a volatile/private field, synchronize initialization, and close it in a new close() method after making TrinoInputBlobSource implement AutoCloseable) to avoid repeated openRange/HEAD calls, or if reuse is not possible, add a clear Javadoc on TrinoInputBlobSource explaining the intentional short-lived-per-read behavior and the reason reuse is not viable; locate changes around the existing readFully/readTail methods and the delegate.newInput() usage and update class signature to implement AutoCloseable when choosing the reuse path.lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/CacheInputFile.java (1)
111-114: Minor:source()allocates a freshTrinoInputFileBlobSourceon every call.
newInput(),newStream(), andlength()all callsource()independently. For short-lived input files this is negligible, but if any of these are invoked in hot paths it’s cheap to cache the instance in a field (it depends only ondelegate).Proposed refactor
- private final TrinoInputFile delegate; + private final TrinoInputFile delegate; + private BlobSource source; @@ - private BlobSource source() - { - return new TrinoInputFileBlobSource(delegate); - } + private BlobSource source() + { + if (source == null) { + source = new TrinoInputFileBlobSource(delegate); + } + return source; + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/CacheInputFile.java` around lines 111 - 114, The method source() currently allocates a new TrinoInputFileBlobSource(delegate) on each call; change it to lazily create and cache a single BlobSource instance in a private final or volatile field (e.g., blobSource or cachedSource) so newInput(), newStream(), and length() reuse the same TrinoInputFileBlobSource tied to delegate; ensure the field is initialized once (either at construction or on first use) and returned by source(), preserving thread-safety if this class can be used concurrently.core/trino-main/src/main/java/io/trino/cache/CacheManagerRegistry.java (1)
113-125: Logging style inconsistency and potential noise increateConnectorCacheFactory.Two small things:
- The lambda logs
log.info("Created new blob cache on tier %s for catalog %s", tier, catalog)(line 122) every timecreateBlobCacheis invoked. IfConnectorCacheFactory.createBlobCache(tier)is called more than once per catalog (e.g. per connector subsystem), this can become noisy; consider demoting todebugor logging once per(catalog, tier).- Line 119 uses
String.formatted(...)insidelog.warn, while the rest of the class uses theLoggervar-args formatting (e.g. lines 122, 130). Prefer the var-args form for consistency and to avoid formatting the string when the log level is disabled.♻️ Suggested refactor
return tier -> { BlobCacheManager manager = blobCacheManagers.get(tier); if (manager == null) { - log.warn("Catalog %s requested blob cache manager tier %s but none registered, using noop".formatted(catalog, tier)); + log.warn("Catalog %s requested blob cache manager tier %s but none registered, using noop", catalog, tier); return new NoopBlobCache(); } - log.info("Created new blob cache on tier %s for catalog %s", tier, catalog); + log.debug("Created new blob cache on tier %s for catalog %s", tier, catalog); return manager.createBlobCache(catalog); };🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@core/trino-main/src/main/java/io/trino/cache/CacheManagerRegistry.java` around lines 113 - 125, The createConnectorCacheFactory lambda currently formats the warning string with String.formatted and always logs an info message for each createBlobCache invocation; change the log.warn call to use SLF4J var-args (e.g., log.warn("Catalog {} requested blob cache manager tier {} but none registered, using noop", catalog, tier)) and demote the log.info("Created new blob cache...") to log.debug (or otherwise ensure it only logs once per (catalog, tier) if you prefer) in the createConnectorCacheFactory implementation that references blobCacheManagers, NoopBlobCache and manager.createBlobCache(catalog).plugin/trino-blob-cache-alluxio/src/main/java/io/trino/blob/cache/alluxio/AlluxioBlobCacheManager.java (1)
82-92:shutdown()wraps only the delegate shutdown inRuntimeException.
sharedCache.shutdown()declaresthrows Exception(fromAlluxioBlobCachein file 8), and any failure here will abort the rest of cleanup even if earlierdropcalls succeeded. Consider logging and swallowing (consistent withdrop()’s warning approach) so a single misbehaving component doesn't mask other shutdown errors.♻️ Suggested refactor
`@Override` public void shutdown() { Set.copyOf(catalogs.keySet()).forEach(this::drop); try { sharedCache.shutdown(); } - catch (Exception e) { - throw new RuntimeException(e); + catch (Exception e) { + log.error(e, "Failed to shut down shared AlluxioBlobCache"); } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@plugin/trino-blob-cache-alluxio/src/main/java/io/trino/blob/cache/alluxio/AlluxioBlobCacheManager.java` around lines 82 - 92, The shutdown() currently wraps sharedCache.shutdown() in a RuntimeException which aborts cleanup; instead, follow drop()’s pattern: call Set.copyOf(catalogs.keySet()).forEach(this::drop) then invoke sharedCache.shutdown() inside a try/catch that logs a warning (including the exception) and swallows it so other shutdown steps aren’t masked; do not rethrow the exception from AlluxioBlobCacheManager.shutdown(), and reference the AlluxioBlobCacheManager.shutdown method, sharedCache.shutdown(), drop(), and catalogs in your changes.lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/TracingBlobCache.java (1)
64-71: Add a size/count attribute to the batch-invalidate span.The batch
invalidate(Collection<CacheKey>)span carries no identifying attributes, which makes tracing of bulk invalidations less useful than the single-key variant. Consider attaching at least the number of keys (and optionally a bounded sample) so operators can correlate spans with the work actually performed.♻️ Suggested refactor
`@Override` public void invalidate(Collection<CacheKey> keys) { Span span = tracer.spanBuilder("BlobCache.invalidate") + .setAttribute("cache.key_count", keys.size()) .startSpan(); withTracing(span, () -> delegate.invalidate(keys)); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/TracingBlobCache.java` around lines 64 - 71, The batch invalidate span in TracingBlobCache.invalidate(Collection<CacheKey>) lacks attributes; update the method to record the number of keys (and optionally a bounded sample) on the Span before calling withTracing: obtain the size safely (e.g., keys == null ? 0 : keys.size()) and call span.setAttribute("blobcache.invalidate.count", count), and if desired build a short, length-bounded representation of a few CacheKey identifiers and add via span.setAttribute("blobcache.invalidate.sample", sampleString); keep using the existing tracer.spanBuilder("BlobCache.invalidate"), the local Span variable 'span', and the withTracing(span, () -> delegate.invalidate(keys)) call to execute the work.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.java`:
- Around line 126-135: The current blobCache selection silently prefers DISK
when both config.isCacheEnabled() and coordinatorFileCaching are true on a
coordinator; change this to fail fast with an explicit IllegalStateException
when both config.isCacheEnabled() and coordinatorFileCaching && isCoordinator
are true so misconfiguration is visible, and replace the non-idiomatic
binder.bind(new TypeLiteral<Optional<BlobCache>>() {}).toInstance(blobCache)
with the idiomatic Guice pattern using newOptionalBinder(binder,
BlobCache.class) and bind the resolved BlobCache (or absent) into that optional
binder; reference config.isCacheEnabled(), coordinatorFileCaching,
isCoordinator, CacheTier.DISK/MEMORY, BlobCache, and newOptionalBinder in the
update.
In
`@lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/CacheFileSystem.java`:
- Around line 148-170: The invalidate helpers currently swallow IOExceptions
silently; update invalidate(Location) and invalidateDirectoryEntries(Location)
to log a WARN including the Location and the caught exception (use the class
logger) instead of ignoring the exception, and for mutating callers (deleteFile,
deleteDirectory, renameFile, renameDirectory, deleteFiles) propagate the
IOException up where the caller already declares throws IOException so failures
to compute/invalidate keys are visible; for newOutputFile (which cannot throw)
log WARN and continue. Also avoid unnecessary metadata RPCs by preferring a
cheaper location-only invalidation path when available: consult keyProvider for
a location-based key method or fall back to deriving a stable key from Location
rather than calling delegate.newInputFile(location) per entry.
In
`@plugin/trino-blob-cache-alluxio/src/main/java/io/trino/blob/cache/alluxio/AlluxioBlobCache.java`:
- Around line 63-65: The two AlluxioBlobCache.invalidate(CacheKey) and
invalidate(Collection<CacheKey>) methods are no-ops and prevent centralized
invalidation; implement them to translate the incoming CacheKey(s) to the
Alluxio PageId(s) used when caching and call the Alluxio
CacheManager.delete(PageId) or CacheManager.invalidate(Predicate<PageInfo>)
accordingly, or if mapping is not feasible add a clear TODO comment explaining
intentional deferral to TTL/size eviction. Locate the methods in class
AlluxioBlobCache, use the same key-to-page mapping logic used when storing pages
(the code that constructs PageId on write), and call
TracingCacheManager.delete(PageId) for single keys or build a
Predicate<PageInfo> for batch invalidation via CacheManager.invalidate; if
mapping cannot be determined, document this in the invalidate implementations
with a TODO and rationale.
In
`@plugin/trino-blob-cache-alluxio/src/main/java/io/trino/blob/cache/alluxio/AlluxioBlobCacheManager.java`:
- Around line 60-66: The current computeIfAbsent in AlluxioBlobCacheManager can
throw from exporter.export and leak a partially-initialized stats object;
instead, build the CatalogEntry first (new AlluxioCacheStats, ObjectName via
statsObjectName, new CatalogScopedBlobCache), then publish it into the catalogs
map using putIfAbsent (or compute with an explicit check) to handle races (if an
existing entry is returned, discard the new one and use the existing entry), and
only after the entry is installed call exporter.export(name.getCanonicalName(),
stats) inside a try/catch that logs a warning (similar to drop()) on failure so
MBean export failures do not abort catalog creation; finally, return the chosen
entry.blobCache(). Include references to CatalogEntry, AlluxioCacheStats,
statsObjectName, CatalogScopedBlobCache, exporter.export, and drop() when making
the change.
- Around line 94-103: The ObjectName construction in statsObjectName duplicates
the catalog into both "catalog" and "name" and doesn't quote the catalog value;
update statsObjectName to use a single descriptive property (e.g., name=stats or
type=AlluxioCacheStats) instead of duplicating the catalog, and ensure the
catalog value is quoted via ObjectName.quote(...) when building the ObjectName
string; locate the statsObjectName method and replace the concatenation with a
properly quoted catalog value and a single descriptive name/type (reference:
statsObjectName and AlluxioCacheStats).
In
`@plugin/trino-blob-cache-alluxio/src/main/java/io/trino/blob/cache/alluxio/AlluxioBlobCacheManagerFactory.java`:
- Around line 68-80: The factory currently initializes an Injector and starts
Alluxio metrics sinks but discards the Injector; instead, obtain
LifeCycleManager from the Injector returned by Bootstrap.initialize() in
AlluxioBlobCacheManagerFactory and pass that LifeCycleManager into the
AlluxioBlobCacheManager constructor (add a constructor parameter accepting
LifeCycleManager), then in AlluxioBlobCacheManager.shutdown() call
lifeCycleManager.stop() to ensure the injector is cleaned up and `@PreDestroy`
handlers (including metrics sink shutdown) run; update the factory to call
injector.getInstance(LifeCycleManager.class) and supply it when returning
injector.getInstance(AlluxioBlobCacheManager.class) (or construct the manager
with the LifeCycleManager) so the lifecycle is managed correctly.
In
`@plugin/trino-blob-cache-memory/src/main/java/io/trino/blob/cache/memory/InMemoryBlobCache.java`:
- Around line 72-84: The metric largeFileSkippedCount is being incremented in
get(...) which double-counts across requests because getOrLoad may return a
cached Optional.empty(); instead, increment largeFileSkippedCount inside
load(...) at the exact point you detect content > maxContentLength and before
returning Optional.empty() so the metric measures per-blob oversize events. Also
change caching behavior so you do not store Optional.empty() results: when
load(...) decides to skip (returns empty) have getOrLoad/get(...) avoid
inserting that empty into the cache (i.e., return the original BlobSource
directly and do not call the cache.put / do not cache the empty Optional),
ensuring retries/re-evaluation can occur before expireAfterWrite elapses;
reference methods: get(CacheKey), getOrLoad(CacheKey, BlobSource), load(...),
field largeFileSkippedCount, the CacheKey, and maxContentLength.
- Around line 93-98: The invalidate(Collection<CacheKey>) method is passing
Strings (via CacheKey::key) to cache.invalidateAll, so nothing is removed
because the cache is keyed by CacheKey; change invalidate to call
cache.invalidateAll with the Collection<CacheKey> itself (or map to the CacheKey
objects) instead of mapping to their String key; update the method referencing
invalidate(Collection<CacheKey>), CacheKey::key and the cache field
(Cache<CacheKey, Optional<Slice>>) to pass the proper key type so entries are
actually invalidated.
In
`@plugin/trino-blob-cache-memory/src/main/java/io/trino/blob/cache/memory/InMemoryBlobCacheManager.java`:
- Around line 40-57: Add a defensive check to prevent NUL bytes in catalog names
before building prefix strings: in InMemoryBlobCacheManager, validate
catalog.toString() does not contain '\0' (e.g., via
checkArgument(!catalog.toString().contains("\0"), "catalogName contains NUL
byte")) before using it in createBlobCache (where CatalogScopedBlobCache is
constructed with catalog.toString() + "\0") and before calling
sharedCache.invalidatePrefix in drop; this ensures the prefix separator '\0'
cannot be present inside the catalog name.
In
`@plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAlluxioCacheMinioAndHmsConnectorSmokeTest.java`:
- Around line 62-75: getBlobCacheProperties() currently creates a new temp
directory on every call and assigns it to the field cacheDirectory, leaking
prior directories; change the logic so you only create and assign a temp
directory once (e.g., check if cacheDirectory == null before calling
Files.createTempDirectory) or track all created Paths in a collection, and
update the cleanup logic in deleteDirectory() to remove every tracked path
instead of only the last cacheDirectory; reference the getBlobCacheProperties()
method, the cacheDirectory field (or the new collection), and deleteDirectory()
when applying the fix.
In
`@plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMinioConnectorSmokeTest.java`:
- Around line 107-127: The code registers a hardcoded AlluxioBlobCachePlugin
while exposing an overridable getBlobCacheType(), causing subclasses that change
the type to mismatch; fix by adding a protected hook getBlobCachePlugin() (e.g.,
returning new AlluxioBlobCachePlugin() by default) and replace the hardcoded new
AlluxioBlobCachePlugin() call in the builder block with
builder.withPlugin(getBlobCachePlugin()), or alternatively remove/finalize
getBlobCacheType() and inline "alluxio" if you intend no variability; update
references in the builder code that call withPlugin(...) to use the new
getBlobCachePlugin() hook and keep getBlobCacheProperties() usage unchanged.
In
`@testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/common/StandardMultinode.java`:
- Around line 85-104: The coordinator setup duplicates copying
cache-manager-alluxio.properties and cache-manager-memory.properties:
withFileSystemCaching(container) already copies these files, so remove the
redundant .withCopyFileToContainer(...) calls in the
builder.configureContainer(...) chain (the ones that copy
cache-manager-alluxio.properties and cache-manager-memory.properties) and rely
on withFileSystemCaching(DockerContainer) to add those files; keep the separate
copy of multinode-master-config.properties as-is and ensure createTrinoWorker()
continues to use withFileSystemCaching(...) for the worker.
---
Outside diff comments:
In
`@core/trino-main/src/main/java/io/trino/connector/CoordinatorDynamicCatalogManager.java`:
- Around line 308-326: The cacheManagerRegistry.drop call is performed
unconditionally outside the synchronized block, which can cause races and
unnecessary work; within dropCatalog synchronize on catalogsUpdateLock and,
immediately after calling catalogStore.removeCatalog and computing removed =
activeCatalogs.remove(catalogName) != null, call
cacheManagerRegistry.drop(catalogName) only when removed is true (i.e., the
catalog was actually removed); keep the existing check that throws
CATALOG_NOT_FOUND when !removed && !exists and ensure cacheManagerRegistry.drop
is not invoked for non-removed catalogs to avoid invalidating newly created
catalogs or emitting spurious logs/metrics.
In
`@lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/CacheInputFile.java`:
- Around line 72-86: The length() implementation in CacheInputFile calls
cache.get(key, source()).length(), which for InMemoryBlobCache forces full blob
load; change this to use a metadata-only path: add a new method on BlobCache
(e.g., length(CacheKey key, InputFile delegate) or length(CacheKey key,
SourceProvider source)) and call that from CacheInputFile.length() after
keyProvider.getCacheKey(delegate) so caches can return just the remote length
without materializing content (update InMemoryBlobCache to return
source.length() without readFully when size ≤ maxContentLength, and keep
AlluxioBlobCache behavior intact).
In
`@plugin/trino-blob-cache-alluxio/src/main/java/io/trino/blob/cache/alluxio/TracingCacheManager.java`:
- Around line 144-145: The second setAttribute call mistakenly reuses
CACHE_FILE_READ_POSITION and overwrites the position value; change that second
call to use CACHE_FILE_READ_SIZE so the trace records read size correctly (in
the attribute setting chain where positionInFile(pageId, position) is set and
then the length is set, replace CACHE_FILE_READ_POSITION with
CACHE_FILE_READ_SIZE to record (long) length).
- Around line 202-210: The delegate call in TracingCacheManager.get is calling
the 4-arg overload instead of the intended 5-arg variant, so include the missing
pageOffset when invoking delegate.get to match the method signature and ensure
the read starts at the correct offset; update the call in method get(PageId
pageId, int pageOffset, int bytesToRead, byte[] buffer, int offsetInBuffer) from
delegate.get(pageId, bytesToRead, buffer, offsetInBuffer) to
delegate.get(pageId, pageOffset, bytesToRead, buffer, offsetInBuffer) so the
span attributes and actual delegate invocation remain consistent.
In
`@plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java`:
- Around line 1592-1614: The independent QueryRunner shares the same
Alluxio/blob cache directory because createDeltaLakeQueryRunner() calls
getBlobCacheProperties() which returns identical paths; update the test or
runner to give each QueryRunner an isolated cache directory. Specifically, when
creating the independent QueryRunner in testDeltaLakeTableLocationChanged (and
in createDeltaLakeQueryRunner()/getBlobCacheProperties()), append a unique
suffix (UUID or randomNameSuffix()) to the blob cache directory property (the
property returned by getBlobCacheProperties()) so each
createDeltaLakeQueryRunner() invocation uses its own temp cache dir;
alternatively, make createDeltaLakeQueryRunner() accept per-runner blob cache
properties and pass a distinct temp directory for the independentQueryRunner to
avoid multiple instances managing the same on-disk cache.
---
Nitpick comments:
In @.github/workflows/ci.yml:
- Around line 363-370: The exclusion list entries are out of alphabetical order:
move the two entries `!:trino-blob-cache-alluxio` and
`!:trino-blob-cache-memory` so the list stays alphabetized; specifically place
them before `!:trino-cassandra` (i.e., earlier than `!:trino-base-jdbc`),
ensuring the sequence around `!:trino-filesystem-alluxio`,
`!:trino-filesystem-azure`, `!:trino-blob-cache-alluxio`,
`!:trino-blob-cache-memory`, `!:trino-base-jdbc`, `!:trino-cassandra` (or
correct alphabetical order) is preserved.
In `@core/trino-main/src/main/java/io/trino/cache/CacheManagerConfig.java`:
- Around line 35-44: The setter CacheManagerConfig.setCacheManagerConfigFiles
currently NPEs if passed null; update the method to defensively handle null by
either calling requireNonNull(cacheManagerConfigFiles, "cacheManagerConfigFiles
is null") or treating null as empty (e.g., guard before split and produce an
empty immutable list). Ensure the method still trims, filters empties, converts
to Files and assigns cacheManagerConfigFiles, and return this; reference the
setCacheManagerConfigFiles method and the cacheManagerConfigFiles field when
making the change.
In `@core/trino-main/src/main/java/io/trino/cache/CacheManagerRegistry.java`:
- Around line 113-125: The createConnectorCacheFactory lambda currently formats
the warning string with String.formatted and always logs an info message for
each createBlobCache invocation; change the log.warn call to use SLF4J var-args
(e.g., log.warn("Catalog {} requested blob cache manager tier {} but none
registered, using noop", catalog, tier)) and demote the log.info("Created new
blob cache...") to log.debug (or otherwise ensure it only logs once per
(catalog, tier) if you prefer) in the createConnectorCacheFactory implementation
that references blobCacheManagers, NoopBlobCache and
manager.createBlobCache(catalog).
In `@core/trino-main/src/main/java/io/trino/testing/PlanTester.java`:
- Around line 674-677: PlanTester currently exposes loadCacheManager(String,
Map) but its close() method does not call cacheManagerRegistry.shutdown(), so
blob cache managers remain running; update PlanTester.close() to invoke
cacheManagerRegistry.shutdown() (after or alongside existing shutdowns like
executor shutdowns and catalogManager stop/destroyFinalizerService) to ensure
cache managers are cleanly closed and prevent resource leaks; reference
cacheManagerRegistry.shutdown(), PlanTester.close(), and loadCacheManager(...)
when making the change.
In
`@core/trino-main/src/main/java/io/trino/testing/TestingConnectorCacheFactory.java`:
- Around line 37-64: The TestingConnectorCacheFactory's TestingBlobCache
currently returns a stub BlobSource that throws UnsupportedOperationException in
get(CacheKey key, BlobSource source) and the two invalidate methods throw UOE;
change get(...) to simply return the provided source (pass-through) and make
invalidate(CacheKey) and invalidate(Collection<CacheKey>) no-ops so tests that
wire TestingConnectorContext and enable caching won't fail; update the
implementations inside the factory where get, invalidate(CacheKey) and
invalidate(Collection<CacheKey>) are defined to use the real source return and
empty method bodies respectively.
In `@core/trino-server-core/src/main/provisio/trino-core.xml`:
- Around line 42-52: Move the two new artifactSet entries (artifactSet
to="plugin/blob-cache-alluxio" and artifactSet to="plugin/blob-cache-memory") so
they appear in alphabetical order among plugin entries — specifically place them
before the existing artifactSet for "plugin/exchange-filesystem" (i.e., insert
the blob-cache-alluxio and blob-cache-memory blocks so they come alphabetically
ahead of exchange-filesystem), preserving the current artifact id/unpack
structure exactly.
In `@core/trino-spi/src/main/java/io/trino/spi/cache/BlobCache.java`:
- Around line 25-33: Add Javadoc comments to the BlobCache SPI describing
miss/population and invalidation semantics: document on BlobCache#get(CacheKey,
BlobSource) whether a miss must return data reflecting the supplied source and
whether implementations may populate lazily or must read eagerly, and specify
thread-safety and ordering guarantees when concurrent get and invalidate for the
same CacheKey occur (e.g., whether invalidate must cancel/abort an in-flight
population or only affect subsequent gets); document
BlobCache#invalidate(CacheKey) behavior and expectations; and document
BlobCache#invalidate(Collection<CacheKey>) whether batch invalidation must be
atomic across the keys or can be best-effort per key so implementers like
TracingBlobCache, AlluxioBlobCache, and the in-memory cache implement consistent
semantics.
In
`@core/trino-spi/src/main/java/io/trino/spi/cache/BlobCacheManagerFactory.java`:
- Around line 18-25: The public SPI BlobCacheManagerFactory has inconsistent
accessor naming (getName() vs cacheTier()); choose and apply a consistent style
across the interface and its usages (either rename getName() -> name() or
cacheTier() -> getCacheTier()), update the interface method signature in
BlobCacheManagerFactory, then update all implementing classes and any
callers/tests to match the chosen name (including create(Map<String,String>,
CacheManagerContext) usages and any javadocs or docs that reference these
methods) to ensure compilation and consistent API for plugin authors.
In `@core/trino-spi/src/main/java/io/trino/spi/cache/ConnectorCacheFactory.java`:
- Around line 16-19: Add Javadoc to the public SPI interface
ConnectorCacheFactory and its method createBlobCache(CacheTier) describing the
contract: who calls this SPI (Trino core/plugin loader), when it is invoked, the
lifecycle and ownership of the returned BlobCache (who must close/cleanup and
how long it is expected to live), thread-safety expectations of the
implementation and returned BlobCache, whether the tier parameter may be null
(or must be non-null) and allowed values, and what should happen if no cache
manager is configured (e.g., return a no-op cache or throw). Place this
documentation on the ConnectorCacheFactory type and the createBlobCache method
so implementors and callers (BlobCache, CacheTier) have clear guidance.
In
`@lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/BlobTrinoInputStream.java`:
- Around line 62-73: The read() method in BlobTrinoInputStream currently
allocates a new byte[1] on every call; change it to reuse a single-byte buffer
field or delegate to the bulk read(byte[], int, int) to avoid per-call
allocation. Add a private final byte[] singleByte = new byte[1] to the class (or
call read(singleByte, 0, 1) from read()), update BlobTrinoInputStream.read() to
use that buffer and adjust position handling accordingly so semantics remain the
same.
In
`@lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/CacheFileSystem.java`:
- Around line 79-85: The current invalidateDirectoryEntries(location) triggers a
full recursive delegate.listFiles(...) on every directory mutation (used by
deleteDirectory and renameDirectory) causing expensive paginated listings;
change the implementation to use a prefix-based invalidation if the cache
supports it (e.g., expose and call BlobCache.invalidateByPrefix(location) or
equivalent) and fall back to per-entry invalidation only when prefix
invalidation is unavailable; additionally, in renameDirectory(source, target)
avoid listing/invalidation under the target path (target usually doesn't exist)
— only invalidate source entries (or use prefix invalidation for source) to
prevent unnecessary recursive LISTs and races.
In
`@lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/CacheInputFile.java`:
- Around line 111-114: The method source() currently allocates a new
TrinoInputFileBlobSource(delegate) on each call; change it to lazily create and
cache a single BlobSource instance in a private final or volatile field (e.g.,
blobSource or cachedSource) so newInput(), newStream(), and length() reuse the
same TrinoInputFileBlobSource tied to delegate; ensure the field is initialized
once (either at construction or on first use) and returned by source(),
preserving thread-safety if this class can be used concurrently.
In
`@lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/TrinoInputFileBlobSource.java`:
- Around line 41-57: Either cache a single lazily-initialized TrinoInput
returned by delegate.newInput() and reuse it across readFully/readTail calls
(store as a volatile/private field, synchronize initialization, and close it in
a new close() method after making TrinoInputBlobSource implement AutoCloseable)
to avoid repeated openRange/HEAD calls, or if reuse is not possible, add a clear
Javadoc on TrinoInputBlobSource explaining the intentional short-lived-per-read
behavior and the reason reuse is not viable; locate changes around the existing
readFully/readTail methods and the delegate.newInput() usage and update class
signature to implement AutoCloseable when choosing the reuse path.
In
`@lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/TracingBlobCache.java`:
- Around line 64-71: The batch invalidate span in
TracingBlobCache.invalidate(Collection<CacheKey>) lacks attributes; update the
method to record the number of keys (and optionally a bounded sample) on the
Span before calling withTracing: obtain the size safely (e.g., keys == null ? 0
: keys.size()) and call span.setAttribute("blobcache.invalidate.count", count),
and if desired build a short, length-bounded representation of a few CacheKey
identifiers and add via span.setAttribute("blobcache.invalidate.sample",
sampleString); keep using the existing
tracer.spanBuilder("BlobCache.invalidate"), the local Span variable 'span', and
the withTracing(span, () -> delegate.invalidate(keys)) call to execute the work.
In
`@plugin/trino-blob-cache-alluxio/src/main/java/io/trino/blob/cache/alluxio/AlluxioBlobCacheManager.java`:
- Around line 82-92: The shutdown() currently wraps sharedCache.shutdown() in a
RuntimeException which aborts cleanup; instead, follow drop()’s pattern: call
Set.copyOf(catalogs.keySet()).forEach(this::drop) then invoke
sharedCache.shutdown() inside a try/catch that logs a warning (including the
exception) and swallows it so other shutdown steps aren’t masked; do not rethrow
the exception from AlluxioBlobCacheManager.shutdown(), and reference the
AlluxioBlobCacheManager.shutdown method, sharedCache.shutdown(), drop(), and
catalogs in your changes.
In
`@plugin/trino-blob-cache-memory/src/main/java/io/trino/blob/cache/memory/MemoryBlobSource.java`:
- Around line 46-53: The code in MemoryBlobSource mixes unchecked
IndexOutOfBoundsException (from checkFromIndexSize) and checked IO exceptions
and calls toIntExact(position) which may throw ArithmeticException; update the
read logic to validate offsets and lengths consistently: perform explicit checks
for negative position and for position + length > data.length() and throw
IOException/EOFException as appropriate (use EOFException for out-of-range
reads) before calling toIntExact, and only call toIntExact(position) after these
bounds checks so no ArithmeticException escapes; reference the existing
checkFromIndexSize, the position/length validations, toIntExact(position), and
data.getBytes(...) when making the change.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (4)
plugin/trino-blob-cache-alluxio/src/main/java/io/trino/blob/cache/alluxio/TracingCacheManager.java (2)
142-146:⚠️ Potential issue | 🟡 MinorRecord read size with the correct trace attribute.
Line 145 overwrites
CACHE_FILE_READ_POSITIONinstead of settingCACHE_FILE_READ_SIZE, sogetDataFileChannelspans lose the requested read size.🐛 Proposed fix
Span span = tracer.spanBuilder("AlluxioCacheManager.getDataFileChannel") .setAttribute(CACHE_KEY, cacheKey) .setAttribute(CACHE_FILE_READ_POSITION, positionInFile(pageId, position)) - .setAttribute(CACHE_FILE_READ_POSITION, (long) length) + .setAttribute(CACHE_FILE_READ_SIZE, (long) length) .startSpan();🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@plugin/trino-blob-cache-alluxio/src/main/java/io/trino/blob/cache/alluxio/TracingCacheManager.java` around lines 142 - 146, In TracingCacheManager.getDataFileChannel the span builder sets CACHE_FILE_READ_POSITION twice, overwriting the position attribute and losing the read size; change the second setAttribute call that currently uses CACHE_FILE_READ_POSITION to use CACHE_FILE_READ_SIZE instead so the span created by tracer.spanBuilder("AlluxioCacheManager.getDataFileChannel") records both positionInFile(pageId, position) and the (long) length under distinct attributes (CACHE_FILE_READ_POSITION and CACHE_FILE_READ_SIZE) alongside CACHE_KEY.
202-210:⚠️ Potential issue | 🔴 CriticalPreserve
pageOffsetwhen delegating cache reads.Line 209 calls the delegate without the
pageOffsetargument despite accepting it in the method signature. This causes any traced read with a non-zero page offset to return bytes from the wrong position.🐛 Proposed fix
public int get(PageId pageId, int pageOffset, int bytesToRead, byte[] buffer, int offsetInBuffer) { Span span = tracer.spanBuilder("AlluxioCacheManager.get") .setAttribute(CACHE_KEY, cacheKey) .setAttribute(CACHE_FILE_READ_POSITION, positionInFile(pageId, pageOffset)) .setAttribute(CACHE_FILE_READ_SIZE, (long) bytesToRead) .startSpan(); - return withTracing(span, () -> delegate.get(pageId, bytesToRead, buffer, offsetInBuffer)); + return withTracing(span, () -> delegate.get(pageId, pageOffset, bytesToRead, buffer, offsetInBuffer)); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@plugin/trino-blob-cache-alluxio/src/main/java/io/trino/blob/cache/alluxio/TracingCacheManager.java` around lines 202 - 210, The get method in TracingCacheManager incorrectly omits the pageOffset when delegating to delegate.get, causing reads to use the wrong position; update the delegation in TracingCacheManager.get (the call inside withTracing and the lambda) to pass pageOffset in the correct argument order (matching delegate.get's signature that accepts pageId, pageOffset, bytesToRead, buffer, offsetInBuffer) so traced reads use the intended offset while keeping the existing span creation and withTracing wrapper intact.plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/cache/DeltaLakeCacheKeyProvider.java (1)
45-63:⚠️ Potential issue | 🟠 MajorUse the full location for the blob cache key.
Line 63 keys cached data only by
inputFile.location().path(). With the new shared blob cache layer, two objects such as different buckets/authorities with the same path can collide and return the wrong data. Keeppathfor the Delta-log exclusion checks, but build theCacheKeyfrom the full location.🐛 Proposed fix
- return Optional.of(new CacheKey(path)); + return Optional.of(new CacheKey(inputFile.location().toString()));🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/cache/DeltaLakeCacheKeyProvider.java` around lines 45 - 63, The getCacheKey method currently builds the CacheKey from inputFile.location().path(), which can collide across different authorities/buckets; keep using path for the exclusion checks but change the final CacheKey instantiation to use the full location (the complete inputFile.location() / URI string) instead of path so the shared blob cache is uniquely keyed by authority+path; update the return from Optional.of(new CacheKey(path)) to use the full location representation (e.g., the inputFile.location() as a string/URI) while leaving the earlier checks (deltaLogCacheDisabled, .trinoSchema, _last_checkpoint, STATISTICS_META_DIR, STARBURST_META_DIR) unchanged.core/trino-main/src/main/java/io/trino/connector/CoordinatorDynamicCatalogManager.java (1)
312-323:⚠️ Potential issue | 🟠 MajorMove cache cleanup into the catalog update critical section.
cacheManagerRegistry.drop(catalogName)runs aftercatalogsUpdateLockis released. A concurrentCREATE CATALOGfor the same name can acquire the lock in between and create new cache state, which this stale drop can then remove. Also,DROP CATALOG IF EXISTSfor a missing catalog should not drop cache state.Proposed fix
boolean removed; synchronized (catalogsUpdateLock) { checkState(state != State.STOPPED, "ConnectorManager is stopped"); catalogStore.removeCatalog(catalogName); removed = activeCatalogs.remove(catalogName) != null; + if (removed) { + cacheManagerRegistry.drop(catalogName); + } } if (!removed && !exists) { throw new TrinoException(CATALOG_NOT_FOUND, format("Catalog '%s' not found", catalogName)); } - cacheManagerRegistry.drop(catalogName); // Do not shut down the catalog, because there may still be running queries using this catalog.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@core/trino-main/src/main/java/io/trino/connector/CoordinatorDynamicCatalogManager.java` around lines 312 - 323, Move the cache cleanup into the catalog update critical section and only drop cache state when the catalog was actually removed: inside the synchronized (catalogsUpdateLock) block (which contains checkState(State.STOPPED), catalogStore.removeCatalog(catalogName) and the removed = activeCatalogs.remove(catalogName) != null assignment), invoke cacheManagerRegistry.drop(catalogName) only when removed is true; do not call drop when !removed (this prevents DROP ... IF EXISTS from removing unrelated cache state created by a concurrent CREATE CATALOG).
♻️ Duplicate comments (6)
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMinioConnectorSmokeTest.java (1)
107-127:⚠️ Potential issue | 🟡 MinorKeep the cache type hook and installed plugin aligned.
getBlobCacheType()is overridable, but the plugin registration is still hardcoded toAlluxioBlobCachePlugin. A subclass returning another cache type would register mismatched cache SPI wiring. This was already flagged in an earlier review; the current code still appears to need the same fix.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMinioConnectorSmokeTest.java` around lines 107 - 127, The code hardcodes installation of AlluxioBlobCachePlugin while getBlobCacheType() is overridable, causing mismatches; to fix, add an overrideable hook (e.g., protected BlobCachePlugin getBlobCachePlugin() or protected Class<? extends BlobCachePlugin> getBlobCachePluginClass()) and replace the hardcoded new AlluxioBlobCachePlugin() in the builder.withPlugin(...) call with that hook, ensuring subclasses that override getBlobCacheType() also override getBlobCachePlugin()/getBlobCachePluginClass() to provide a matching plugin implementation.plugin/trino-blob-cache-memory/src/main/java/io/trino/blob/cache/memory/InMemoryBlobCache.java (2)
93-98:⚠️ Potential issue | 🔴 CriticalInvalidate with
CacheKeyobjects, not raw key strings.Line 97 passes
Stringvalues into aCache<CacheKey, Optional<Slice>>, soinvalidateAll(...)won’t match existing entries and bulk invalidation silently fails.Proposed fix
public void invalidate(Collection<CacheKey> keys) { requireNonNull(keys, "keys is null"); - cache.invalidateAll(keys.stream().map(CacheKey::key).collect(toImmutableList())); + cache.invalidateAll(keys); }To verify the mismatch in the current branch:
#!/bin/bash # Expect to see the cache keyed by CacheKey and bulk invalidation passing String keys. rg -n -C3 'Cache<CacheKey, Optional<Slice>>|invalidateAll\(keys\.stream\(\)\.map\(CacheKey::key\)' --type=java🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@plugin/trino-blob-cache-memory/src/main/java/io/trino/blob/cache/memory/InMemoryBlobCache.java` around lines 93 - 98, In InMemoryBlobCache.invalidate, bulk invalidation is currently passing raw String keys (CacheKey::key) into cache.invalidateAll which expects CacheKey objects; update the method (invalidate(Collection<CacheKey> keys)) to pass the CacheKey instances themselves to cache.invalidateAll (e.g., remove the map to key conversion and collect the original keys) so the Cache<CacheKey, Optional<Slice>> will match entries correctly; keep requireNonNull(keys, ...) and use the existing cache.invalidateAll call with the collection of CacheKey objects.
72-83:⚠️ Potential issue | 🟡 MinorAvoid caching oversized-skip sentinels.
Optional.empty()is cached for oversized blobs, so the skip decision is sticky until TTL andlargeFileSkippedCountincrements on every request for that cached empty entry. Count the skip when the size is detected and return the original source without inserting an empty cache value.One possible refactor
- private final Cache<CacheKey, Optional<Slice>> cache; + private final Cache<CacheKey, Slice> cache; @@ - .weigher((Weigher<CacheKey, Optional<Slice>>) (key, value) -> toIntExact(estimatedSizeOf(key.key()) + sizeOf(value, Slice::getRetainedSize))) + .weigher((Weigher<CacheKey, Slice>) (key, value) -> toIntExact(estimatedSizeOf(key.key()) + value.getRetainedSize())) @@ - Optional<Slice> cachedEntry = getOrLoad(key, source); - if (cachedEntry.isEmpty()) { + if (source.length() > maxContentLengthBytes) { largeFileSkippedCount.incrementAndGet(); return source; } - return new MemoryBlobSource(cachedEntry.get()); + return new MemoryBlobSource(getOrLoad(key, source)); @@ - Optional<Slice> cachedEntry = cache.getIfPresent(key); - return cachedEntry != null && cachedEntry.isPresent(); + return cache.getIfPresent(key) != null; @@ - private Optional<Slice> getOrLoad(CacheKey key, BlobSource source) + private Slice getOrLoad(CacheKey key, BlobSource source) @@ - private Optional<Slice> load(BlobSource source) + private Slice load(BlobSource source) @@ - if (length > maxContentLengthBytes) { - return Optional.empty(); - } byte[] buffer = new byte[toIntExact(length)]; source.readFully(0, buffer, 0, buffer.length); - return Optional.of(Slices.wrappedBuffer(buffer)); + return Slices.wrappedBuffer(buffer);To verify the current sticky-empty behavior:
#!/bin/bash # Expect load() to return Optional.empty() for oversized blobs and get() to count empties after getOrLoad(). rg -n -C4 'largeFileSkippedCount|Optional\.empty\(\)|cache\.get\(key, \(\) -> load\(source\)\)' --type=javaAlso applies to: 155-183
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@plugin/trino-blob-cache-memory/src/main/java/io/trino/blob/cache/memory/InMemoryBlobCache.java` around lines 72 - 83, In InMemoryBlobCache, the current get(CacheKey key, BlobSource source) path caches Optional.empty() for oversized blobs making the skip sticky and causing largeFileSkippedCount to increment on every cached miss; change getOrLoad/load so that when load detects an oversized blob it increments largeFileSkippedCount and returns WITHOUT inserting an Optional.empty() into the cache, and have get() return the original source (not a cached empty) in that case; ensure references to getOrLoad, load, largeFileSkippedCount, MemoryBlobSource and get(CacheKey, BlobSource) are used to locate and update the logic (also apply the same change to the corresponding code around the 155-183 region).plugin/trino-blob-cache-alluxio/src/main/java/io/trino/blob/cache/alluxio/AlluxioBlobCacheManager.java (2)
94-98:⚠️ Potential issue | 🟡 MinorQuote the catalog value and avoid duplicate JMX properties.
Line 98 sets both
catalogandnameto the same value and interpolatescatalogunquoted. Use a single descriptive property set and quote the catalog value for validObjectNameproperty syntax.Proposed adjustment
- return ObjectName.getInstance( - "io.trino.blob.cache.alluxio:catalog=" + catalog + ",name=" + catalog + ",type=" + AlluxioCacheStats.class.getSimpleName()); + return ObjectName.getInstance( + "io.trino.blob.cache.alluxio:type=" + AlluxioCacheStats.class.getSimpleName() + + ",catalog=" + ObjectName.quote(catalog.toString()));🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@plugin/trino-blob-cache-alluxio/src/main/java/io/trino/blob/cache/alluxio/AlluxioBlobCacheManager.java` around lines 94 - 98, The ObjectName being built in statsObjectName duplicates properties and leaves the catalog value unquoted; update the construction in AlluxioBlobCacheManager.statsObjectName to use a single descriptive property (either "catalog" or "name", not both) and quote the catalog string using ObjectName.quote(catalog) so the resulting JMX property is valid, e.g. produce "catalog=" + ObjectName.quote(catalog) + ",type=" + AlluxioCacheStats.class.getSimpleName().
60-65:⚠️ Potential issue | 🟡 MinorDon’t let MBean export failures abort catalog cache creation.
Line 63 runs inside
computeIfAbsent; ifexporter.export(...)throws, the catalog cache is not installed and catalog initialization fails. Sincedrop()already treats JMX unexport failures as non-fatal, creation should install the cache entry first and only warn if export fails.Proposed adjustment
- return catalogs.computeIfAbsent(catalog, c -> { - AlluxioCacheStats stats = new AlluxioCacheStats(); - ObjectName name = statsObjectName(catalog); - exporter.export(name.getCanonicalName(), stats); - return new CatalogEntry(stats, name, new CatalogScopedBlobCache(sharedCache, catalog, tracer, stats)); - }).blobCache(); + AlluxioCacheStats stats = new AlluxioCacheStats(); + ObjectName name = statsObjectName(catalog); + CatalogEntry newEntry = new CatalogEntry(stats, name, new CatalogScopedBlobCache(sharedCache, catalog, tracer, stats)); + CatalogEntry entry = catalogs.putIfAbsent(catalog, newEntry); + if (entry == null) { + entry = newEntry; + try { + exporter.export(name.getCanonicalName(), stats); + } + catch (Exception e) { + log.warn(e, "Failed to register AlluxioCacheStats MBean for catalog %s", catalog); + } + } + return entry.blobCache();🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@plugin/trino-blob-cache-alluxio/src/main/java/io/trino/blob/cache/alluxio/AlluxioBlobCacheManager.java` around lines 60 - 65, computeIfAbsent's lambda currently calls exporter.export(...) which can throw and prevent installing the CatalogEntry; change the lambda inside AlluxioBlobCacheManager so it first constructs the CatalogEntry (new CatalogEntry(stats, name, new CatalogScopedBlobCache(...))) and returns that entry, but perform exporter.export(name.getCanonicalName(), stats) inside a try/catch around the export call (after creating the entry) and on failure log a warning (matching drop() behavior) instead of letting the exception propagate; reference computeIfAbsent, AlluxioBlobCacheManager, CatalogEntry, statsObjectName, and exporter.export in the change.plugin/trino-blob-cache-alluxio/src/main/java/io/trino/blob/cache/alluxio/AlluxioBlobCache.java (1)
63-65:⚠️ Potential issue | 🟠 MajorImplement Alluxio invalidation or explicitly document TTL-only behavior.
Both
invalidateoverloads are no-ops, so engine-driven invalidation reaches the Alluxio tier but leaves cached pages intact. This undermines centralized invalidation for catalog drops and targeted cache expiry.To verify the available Alluxio invalidation hooks and current no-op path:
#!/bin/bash # Inspect the no-op BlobCache invalidation path and nearby Alluxio cache-manager APIs/wrappers. rg -n -C4 'void invalidate\(CacheKey|void invalidate\(Collection<CacheKey>|CacheManager\.delete|CacheManager\.invalidate|class TracingCacheManager|new PageId' --type=java🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@plugin/trino-blob-cache-alluxio/src/main/java/io/trino/blob/cache/alluxio/AlluxioBlobCache.java` around lines 63 - 65, The two no-op methods AlluxioBlobCache.invalidate(CacheKey) and invalidate(Collection<CacheKey>) must either perform real Alluxio eviction or be explicitly documented as TTL-only; implement the former by mapping CacheKey -> Alluxio page id and invoking the Alluxio cache manager's delete/invalidate API (e.g., call CacheManager.delete(pageId) or CacheManager.invalidate(pageId) via the same wrapper used elsewhere such as TracingCacheManager/PageId creation), handling exceptions and batching for the Collection variant, and add logging for success/failure; if you choose TTL-only behavior instead, update AlluxioBlobCache class javadoc and method-level comments to state invalidation is unsupported and consider throwing UnsupportedOperationException to make the behavior explicit.
🧹 Nitpick comments (2)
core/trino-spi/src/main/java/io/trino/spi/cache/BlobCache.java (1)
30-32: Consider whetherinvalidate(...)should declareIOException.
getthrowsIOException, butinvalidate(CacheKey)andinvalidate(Collection<CacheKey>)do not. Disk-backed implementations (e.g., the Alluxio blob cache) may need to perform IO to remove entries; if errors are swallowed or wrapped, they become harder for callers to surface consistently. Please confirm that implementations are expected to handle IO errors internally (and the contract should say so in the Javadoc), or make invalidation throwIOExceptionfor symmetry withget.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@core/trino-spi/src/main/java/io/trino/spi/cache/BlobCache.java` around lines 30 - 32, The BlobCache API is asymmetric: BlobCache.get throws IOException but invalidate(CacheKey) and invalidate(Collection<CacheKey>) do not; update the contract to either declare IOException on both invalidate methods or add explicit Javadoc saying implementations must handle IO errors internally and not throw; modify the BlobCache interface to add "throws IOException" to invalidate(CacheKey) and invalidate(Collection<CacheKey>) (and update all implementing classes such as the Alluxio-backed implementation and any callers), or alternatively add clear Javadoc to the existing invalidate methods stating that IO errors will be handled internally and must not be propagated — choose the throws IOException approach for symmetry and consistency unless there's a deliberate design reason to swallow IOExceptions.lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/TracingBlobCache.java (1)
42-71: Tracing attribute gaps on bulkinvalidateand reliance onBlobSource.toString().Two small observability issues:
- Line 67-68: the bulk
invalidate(Collection<CacheKey>)span carries no attributes — not even a count. Consider setting at least the batch size so operators can correlate spans meaningfully.- Line 47:
CACHE_FILE_LOCATIONis derived fromsource.toString(). This depends on an undocumentedtoStringcontract ofBlobSourceimplementations (currentlyTrinoInputFileBlobSource); futureBlobSourcetypes may produce non-location strings, silently degrading tracing. Consider exposing alocation()/describe()method onBlobSourceor an explicit attribute supplier.🪶 Proposed minor improvement for bulk invalidate
`@Override` public void invalidate(Collection<CacheKey> keys) { Span span = tracer.spanBuilder("BlobCache.invalidate") + .setAttribute("cache.invalidate.count", (long) keys.size()) .startSpan(); withTracing(span, () -> delegate.invalidate(keys)); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/TracingBlobCache.java` around lines 42 - 71, The span created in get(CacheKey, BlobSource) should not rely on BlobSource.toString() for CACHE_FILE_LOCATION: update the code to call an explicit location/describe method on BlobSource (add a new method like location() or describe() to the BlobSource interface and implement it in TrinoInputFileBlobSource), falling back to toString() only if the new method returns null; and enhance invalidate(Collection<CacheKey>) to set a tracing attribute (e.g., BATCH_SIZE) with keys.size() on the span before calling withTracing; keep existing attributes (CACHE_KEY) on single-key invalidate unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/DefaultCacheKeyProvider.java`:
- Around line 26-29: The cache key currently uses only
inputFile.location().path(), which drops scheme/authority and can cause
collisions; update DefaultCacheKeyProvider.getCacheKey to use the full location
identity (e.g., the location object's full URI/string representation from
TrinoInputFile.location()) when building the CacheKey so the scheme/authority
are included alongside lastModified() and length() to avoid
cross-bucket/container collisions for CacheKey.
In
`@plugin/trino-blob-cache-alluxio/src/main/java/io/trino/blob/cache/alluxio/AlluxioBlobCacheManagerFactory.java`:
- Around line 74-77: The call to Alluxio's global MetricsSystem
(MetricsSystem.startSinksFromConfig(new MetricsConfig(metricProps))) inside
AlluxioBlobCacheManagerFactory.create() can run multiple times; add a guard so
the sinks are registered only once: introduce a private static atomic flag
(e.g., AtomicBoolean metricsInitialized) on AlluxioBlobCacheManagerFactory and
wrap the Properties metricProps / MetricsConfig /
MetricsSystem.startSinksFromConfig(...) call in an if
(metricsInitialized.compareAndSet(false, true)) block, or alternatively move
that sink-registration into a singleton-initialized component (e.g.,
`@PostConstruct`) so the registration happens exactly once regardless of how
CacheManagerRegistry invokes create(); reference the create(), MetricsSystem,
MetricsConfig, and AlluxioBlobCacheManagerFactory symbols when making the
change.
In
`@plugin/trino-blob-cache-alluxio/src/test/java/io/trino/blob/cache/alluxio/TestFuzzAlluxioCacheFileSystem.java`:
- Around line 181-190: The temp Alluxio cache directory created by create()
(which always creates a "cache" child) is not removed because close() currently
calls tempDirectory.toFile().delete() which fails on non-empty dirs; change the
cleanup in the test class so close() removes the tempDirectory tree recursively
(e.g., use Files.walkFileTree or a utility like FileUtils.deleteDirectory) to
delete tempDirectory and its "cache" child, ensuring no leftover temp cache
directories after fuzz tests; modify the cleanup logic referenced in the test
class that currently uses tempDirectory.toFile().delete() accordingly.
In
`@plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java`:
- Around line 240-250: The code installs AlluxioBlobCachePlugin unconditionally
inside createDeltaLakeQueryRunner() while getBlobCacheType() is a protected
hook, causing subclasses overriding getBlobCacheType() to get a mismatched
plugin; fix by making plugin installation pluggable too: replace the hardcoded
new AlluxioBlobCachePlugin() with a call to a new protected method (e.g.,
getBlobCachePlugin()) that returns the Plugin instance (defaulting to new
AlluxioBlobCachePlugin()), and use that in createDeltaLakeQueryRunner();
alternatively remove the getBlobCacheType() hook if only Alluxio is intended.
---
Outside diff comments:
In
`@core/trino-main/src/main/java/io/trino/connector/CoordinatorDynamicCatalogManager.java`:
- Around line 312-323: Move the cache cleanup into the catalog update critical
section and only drop cache state when the catalog was actually removed: inside
the synchronized (catalogsUpdateLock) block (which contains
checkState(State.STOPPED), catalogStore.removeCatalog(catalogName) and the
removed = activeCatalogs.remove(catalogName) != null assignment), invoke
cacheManagerRegistry.drop(catalogName) only when removed is true; do not call
drop when !removed (this prevents DROP ... IF EXISTS from removing unrelated
cache state created by a concurrent CREATE CATALOG).
In
`@plugin/trino-blob-cache-alluxio/src/main/java/io/trino/blob/cache/alluxio/TracingCacheManager.java`:
- Around line 142-146: In TracingCacheManager.getDataFileChannel the span
builder sets CACHE_FILE_READ_POSITION twice, overwriting the position attribute
and losing the read size; change the second setAttribute call that currently
uses CACHE_FILE_READ_POSITION to use CACHE_FILE_READ_SIZE instead so the span
created by tracer.spanBuilder("AlluxioCacheManager.getDataFileChannel") records
both positionInFile(pageId, position) and the (long) length under distinct
attributes (CACHE_FILE_READ_POSITION and CACHE_FILE_READ_SIZE) alongside
CACHE_KEY.
- Around line 202-210: The get method in TracingCacheManager incorrectly omits
the pageOffset when delegating to delegate.get, causing reads to use the wrong
position; update the delegation in TracingCacheManager.get (the call inside
withTracing and the lambda) to pass pageOffset in the correct argument order
(matching delegate.get's signature that accepts pageId, pageOffset, bytesToRead,
buffer, offsetInBuffer) so traced reads use the intended offset while keeping
the existing span creation and withTracing wrapper intact.
In
`@plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/cache/DeltaLakeCacheKeyProvider.java`:
- Around line 45-63: The getCacheKey method currently builds the CacheKey from
inputFile.location().path(), which can collide across different
authorities/buckets; keep using path for the exclusion checks but change the
final CacheKey instantiation to use the full location (the complete
inputFile.location() / URI string) instead of path so the shared blob cache is
uniquely keyed by authority+path; update the return from Optional.of(new
CacheKey(path)) to use the full location representation (e.g., the
inputFile.location() as a string/URI) while leaving the earlier checks
(deltaLogCacheDisabled, .trinoSchema, _last_checkpoint, STATISTICS_META_DIR,
STARBURST_META_DIR) unchanged.
---
Duplicate comments:
In
`@plugin/trino-blob-cache-alluxio/src/main/java/io/trino/blob/cache/alluxio/AlluxioBlobCache.java`:
- Around line 63-65: The two no-op methods AlluxioBlobCache.invalidate(CacheKey)
and invalidate(Collection<CacheKey>) must either perform real Alluxio eviction
or be explicitly documented as TTL-only; implement the former by mapping
CacheKey -> Alluxio page id and invoking the Alluxio cache manager's
delete/invalidate API (e.g., call CacheManager.delete(pageId) or
CacheManager.invalidate(pageId) via the same wrapper used elsewhere such as
TracingCacheManager/PageId creation), handling exceptions and batching for the
Collection variant, and add logging for success/failure; if you choose TTL-only
behavior instead, update AlluxioBlobCache class javadoc and method-level
comments to state invalidation is unsupported and consider throwing
UnsupportedOperationException to make the behavior explicit.
In
`@plugin/trino-blob-cache-alluxio/src/main/java/io/trino/blob/cache/alluxio/AlluxioBlobCacheManager.java`:
- Around line 94-98: The ObjectName being built in statsObjectName duplicates
properties and leaves the catalog value unquoted; update the construction in
AlluxioBlobCacheManager.statsObjectName to use a single descriptive property
(either "catalog" or "name", not both) and quote the catalog string using
ObjectName.quote(catalog) so the resulting JMX property is valid, e.g. produce
"catalog=" + ObjectName.quote(catalog) + ",type=" +
AlluxioCacheStats.class.getSimpleName().
- Around line 60-65: computeIfAbsent's lambda currently calls
exporter.export(...) which can throw and prevent installing the CatalogEntry;
change the lambda inside AlluxioBlobCacheManager so it first constructs the
CatalogEntry (new CatalogEntry(stats, name, new CatalogScopedBlobCache(...)))
and returns that entry, but perform exporter.export(name.getCanonicalName(),
stats) inside a try/catch around the export call (after creating the entry) and
on failure log a warning (matching drop() behavior) instead of letting the
exception propagate; reference computeIfAbsent, AlluxioBlobCacheManager,
CatalogEntry, statsObjectName, and exporter.export in the change.
In
`@plugin/trino-blob-cache-memory/src/main/java/io/trino/blob/cache/memory/InMemoryBlobCache.java`:
- Around line 93-98: In InMemoryBlobCache.invalidate, bulk invalidation is
currently passing raw String keys (CacheKey::key) into cache.invalidateAll which
expects CacheKey objects; update the method (invalidate(Collection<CacheKey>
keys)) to pass the CacheKey instances themselves to cache.invalidateAll (e.g.,
remove the map to key conversion and collect the original keys) so the
Cache<CacheKey, Optional<Slice>> will match entries correctly; keep
requireNonNull(keys, ...) and use the existing cache.invalidateAll call with the
collection of CacheKey objects.
- Around line 72-83: In InMemoryBlobCache, the current get(CacheKey key,
BlobSource source) path caches Optional.empty() for oversized blobs making the
skip sticky and causing largeFileSkippedCount to increment on every cached miss;
change getOrLoad/load so that when load detects an oversized blob it increments
largeFileSkippedCount and returns WITHOUT inserting an Optional.empty() into the
cache, and have get() return the original source (not a cached empty) in that
case; ensure references to getOrLoad, load, largeFileSkippedCount,
MemoryBlobSource and get(CacheKey, BlobSource) are used to locate and update the
logic (also apply the same change to the corresponding code around the 155-183
region).
In
`@plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMinioConnectorSmokeTest.java`:
- Around line 107-127: The code hardcodes installation of AlluxioBlobCachePlugin
while getBlobCacheType() is overridable, causing mismatches; to fix, add an
overrideable hook (e.g., protected BlobCachePlugin getBlobCachePlugin() or
protected Class<? extends BlobCachePlugin> getBlobCachePluginClass()) and
replace the hardcoded new AlluxioBlobCachePlugin() in the
builder.withPlugin(...) call with that hook, ensuring subclasses that override
getBlobCacheType() also override getBlobCachePlugin()/getBlobCachePluginClass()
to provide a matching plugin implementation.
---
Nitpick comments:
In `@core/trino-spi/src/main/java/io/trino/spi/cache/BlobCache.java`:
- Around line 30-32: The BlobCache API is asymmetric: BlobCache.get throws
IOException but invalidate(CacheKey) and invalidate(Collection<CacheKey>) do
not; update the contract to either declare IOException on both invalidate
methods or add explicit Javadoc saying implementations must handle IO errors
internally and not throw; modify the BlobCache interface to add "throws
IOException" to invalidate(CacheKey) and invalidate(Collection<CacheKey>) (and
update all implementing classes such as the Alluxio-backed implementation and
any callers), or alternatively add clear Javadoc to the existing invalidate
methods stating that IO errors will be handled internally and must not be
propagated — choose the throws IOException approach for symmetry and consistency
unless there's a deliberate design reason to swallow IOExceptions.
In
`@lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/TracingBlobCache.java`:
- Around line 42-71: The span created in get(CacheKey, BlobSource) should not
rely on BlobSource.toString() for CACHE_FILE_LOCATION: update the code to call
an explicit location/describe method on BlobSource (add a new method like
location() or describe() to the BlobSource interface and implement it in
TrinoInputFileBlobSource), falling back to toString() only if the new method
returns null; and enhance invalidate(Collection<CacheKey>) to set a tracing
attribute (e.g., BATCH_SIZE) with keys.size() on the span before calling
withTracing; keep existing attributes (CACHE_KEY) on single-key invalidate
unchanged.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 4cc4d3df-6a00-4128-966e-4b8171fcfea9
📒 Files selected for processing (119)
.github/workflows/ci.ymlcore/docker/default/etc/cache-manager-memory.propertiescore/docker/default/etc/config.propertiescore/trino-main/src/main/java/io/trino/cache/CacheManagerConfig.javacore/trino-main/src/main/java/io/trino/cache/CacheManagerContextInstance.javacore/trino-main/src/main/java/io/trino/cache/CacheManagerModule.javacore/trino-main/src/main/java/io/trino/cache/CacheManagerRegistry.javacore/trino-main/src/main/java/io/trino/connector/ConnectorContextInstance.javacore/trino-main/src/main/java/io/trino/connector/CoordinatorDynamicCatalogManager.javacore/trino-main/src/main/java/io/trino/connector/DefaultCatalogFactory.javacore/trino-main/src/main/java/io/trino/server/PluginManager.javacore/trino-main/src/main/java/io/trino/server/Server.javacore/trino-main/src/main/java/io/trino/server/testing/TestingTrinoServer.javacore/trino-main/src/main/java/io/trino/testing/PlanTester.javacore/trino-main/src/main/java/io/trino/testing/QueryRunner.javacore/trino-main/src/main/java/io/trino/testing/StandaloneQueryRunner.javacore/trino-main/src/main/java/io/trino/testing/TestingConnectorCacheFactory.javacore/trino-main/src/main/java/io/trino/testing/TestingConnectorContext.javacore/trino-server-core/src/main/provisio/trino-core.xmlcore/trino-spi/src/main/java/io/trino/spi/Plugin.javacore/trino-spi/src/main/java/io/trino/spi/cache/BlobCache.javacore/trino-spi/src/main/java/io/trino/spi/cache/BlobCacheManager.javacore/trino-spi/src/main/java/io/trino/spi/cache/BlobCacheManagerFactory.javacore/trino-spi/src/main/java/io/trino/spi/cache/BlobSource.javacore/trino-spi/src/main/java/io/trino/spi/cache/CacheKey.javacore/trino-spi/src/main/java/io/trino/spi/cache/CacheManagerContext.javacore/trino-spi/src/main/java/io/trino/spi/cache/CacheTier.javacore/trino-spi/src/main/java/io/trino/spi/cache/ConnectorCacheFactory.javacore/trino-spi/src/main/java/io/trino/spi/connector/ConnectorContext.javacore/trino-spi/src/main/java/module-info.javalib/trino-filesystem-cache-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemCacheModule.javalib/trino-filesystem-cache-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioInput.javalib/trino-filesystem-cache-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioInputStream.javalib/trino-filesystem-manager/pom.xmllib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.javalib/trino-filesystem/pom.xmllib/trino-filesystem/src/main/java/io/trino/filesystem/cache/BlobTrinoInput.javalib/trino-filesystem/src/main/java/io/trino/filesystem/cache/BlobTrinoInputStream.javalib/trino-filesystem/src/main/java/io/trino/filesystem/cache/CacheFileSystem.javalib/trino-filesystem/src/main/java/io/trino/filesystem/cache/CacheFileSystemFactory.javalib/trino-filesystem/src/main/java/io/trino/filesystem/cache/CacheInputFile.javalib/trino-filesystem/src/main/java/io/trino/filesystem/cache/CacheKeyProvider.javalib/trino-filesystem/src/main/java/io/trino/filesystem/cache/DefaultCacheKeyProvider.javalib/trino-filesystem/src/main/java/io/trino/filesystem/cache/TrinoFileSystemCache.javalib/trino-filesystem/src/main/java/io/trino/filesystem/cache/TrinoInputFileBlobSource.javalib/trino-filesystem/src/main/java/io/trino/filesystem/memory/MemoryFileSystemCache.javalib/trino-filesystem/src/main/java/io/trino/filesystem/memory/MemoryFileSystemCacheModule.javalib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/TracingBlobCache.javalib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/TracingFileSystemCache.javaplugin/trino-blob-cache-alluxio/pom.xmlplugin/trino-blob-cache-alluxio/src/main/java/io/trino/blob/cache/alluxio/AlluxioBlobCache.javaplugin/trino-blob-cache-alluxio/src/main/java/io/trino/blob/cache/alluxio/AlluxioBlobCacheConfig.javaplugin/trino-blob-cache-alluxio/src/main/java/io/trino/blob/cache/alluxio/AlluxioBlobCacheManager.javaplugin/trino-blob-cache-alluxio/src/main/java/io/trino/blob/cache/alluxio/AlluxioBlobCacheManagerFactory.javaplugin/trino-blob-cache-alluxio/src/main/java/io/trino/blob/cache/alluxio/AlluxioBlobCachePlugin.javaplugin/trino-blob-cache-alluxio/src/main/java/io/trino/blob/cache/alluxio/AlluxioBlobSource.javaplugin/trino-blob-cache-alluxio/src/main/java/io/trino/blob/cache/alluxio/AlluxioCacheStats.javaplugin/trino-blob-cache-alluxio/src/main/java/io/trino/blob/cache/alluxio/AlluxioConfigurationFactory.javaplugin/trino-blob-cache-alluxio/src/main/java/io/trino/blob/cache/alluxio/AlluxioInputHelper.javaplugin/trino-blob-cache-alluxio/src/main/java/io/trino/blob/cache/alluxio/TracingCacheManager.javaplugin/trino-blob-cache-alluxio/src/test/java/io/trino/blob/cache/alluxio/IncompleteStreamMemoryFileSystem.javaplugin/trino-blob-cache-alluxio/src/test/java/io/trino/blob/cache/alluxio/TestAlluxioBlobCacheConfig.javaplugin/trino-blob-cache-alluxio/src/test/java/io/trino/blob/cache/alluxio/TestAlluxioCacheFileSystem.javaplugin/trino-blob-cache-alluxio/src/test/java/io/trino/blob/cache/alluxio/TestAlluxioCacheFileSystemAccessOperations.javaplugin/trino-blob-cache-alluxio/src/test/java/io/trino/blob/cache/alluxio/TestFuzzAlluxioCacheFileSystem.javaplugin/trino-blob-cache-alluxio/src/test/java/io/trino/blob/cache/alluxio/TestTracingCacheManager.javaplugin/trino-blob-cache-alluxio/src/test/java/io/trino/blob/cache/alluxio/TestingBlobCache.javaplugin/trino-blob-cache-alluxio/src/test/java/io/trino/blob/cache/alluxio/TestingCacheKeyProvider.javaplugin/trino-blob-cache-memory/pom.xmlplugin/trino-blob-cache-memory/src/main/java/io/trino/blob/cache/memory/InMemoryBlobCache.javaplugin/trino-blob-cache-memory/src/main/java/io/trino/blob/cache/memory/InMemoryBlobCacheConfig.javaplugin/trino-blob-cache-memory/src/main/java/io/trino/blob/cache/memory/InMemoryBlobCacheManager.javaplugin/trino-blob-cache-memory/src/main/java/io/trino/blob/cache/memory/MemoryBlobCacheManagerFactory.javaplugin/trino-blob-cache-memory/src/main/java/io/trino/blob/cache/memory/MemoryBlobCachePlugin.javaplugin/trino-blob-cache-memory/src/main/java/io/trino/blob/cache/memory/MemoryBlobSource.javaplugin/trino-blob-cache-memory/src/test/java/io/trino/blob/cache/memory/TestCacheFileSystemAccessOperations.javaplugin/trino-blob-cache-memory/src/test/java/io/trino/blob/cache/memory/TestInMemoryBlobCache.javaplugin/trino-blob-cache-memory/src/test/java/io/trino/blob/cache/memory/TestInMemoryBlobCacheConfig.javaplugin/trino-delta-lake/pom.xmlplugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.javaplugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/cache/DeltaLakeCacheKeyProvider.javaplugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.javaplugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAlluxioCacheFileOperations.javaplugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAlluxioCacheMinioAndHmsConnectorSmokeTest.javaplugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAlluxioCacheMutableTransactionLog.javaplugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.javaplugin/trino-hive/pom.xmlplugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveAlluxioCacheFileOperations.javaplugin/trino-iceberg/pom.xmlplugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/cache/IcebergCacheKeyProvider.javaplugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.javaplugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMinioConnectorSmokeTest.javaplugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.javaplugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergAlluxioCacheFileOperations.javaplugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMemoryCacheFileOperations.javaplugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMinioParquetCachingConnectorSmokeTest.javaplugin/trino-iceberg/src/test/resources/iceberg/materialized_view_expired_snapshots/materialized_view/metadata/snap-1648307738255131376-1-ed1a5aaf-924c-4956-ad74-a5942bc863ab.avroplugin/trino-iceberg/src/test/resources/iceberg/materialized_view_expired_snapshots/materialized_view/metadata/snap-1783632012146030949-1-0bdd8872-a5d1-4a11-8afa-a1d427282c29.avroplugin/trino-iceberg/src/test/resources/iceberg/materialized_view_expired_snapshots/materialized_view/metadata/snap-1908038061182547556-1-17582660-010e-4fe2-89d0-469798adc68a.avroplugin/trino-iceberg/src/test/resources/iceberg/materialized_view_expired_snapshots/materialized_view/metadata/snap-2085985182010370827-1-e68ef163-0672-4599-991f-136bca8cd14e.avroplugin/trino-iceberg/src/test/resources/iceberg/materialized_view_expired_snapshots/materialized_view/metadata/snap-2326703192737831382-1-9b2367ea-07a5-4b25-a9e2-4c6c666c1321.avroplugin/trino-iceberg/src/test/resources/iceberg/materialized_view_expired_snapshots/materialized_view/metadata/snap-2658918943052245018-1-b7882118-6324-473f-bdf3-3fd9229294ad.avroplugin/trino-iceberg/src/test/resources/iceberg/materialized_view_expired_snapshots/materialized_view/metadata/snap-3369490910652462497-1-5e4945f2-cf2d-447f-b748-8cd03c59d987.avroplugin/trino-iceberg/src/test/resources/iceberg/materialized_view_expired_snapshots/materialized_view/metadata/snap-3591807473350904875-1-fd84797c-e0cd-4adf-b378-7f2917da1168.avroplugin/trino-iceberg/src/test/resources/iceberg/materialized_view_expired_snapshots/materialized_view/metadata/snap-5202011232056132019-1-7181f314-2ee6-4bda-9bdd-88232eaed102.avroplugin/trino-iceberg/src/test/resources/iceberg/materialized_view_expired_snapshots/materialized_view/metadata/snap-8143898955657678113-1-d402bb82-527b-4556-a3e7-f3c32a6835a8.avroplugin/trino-iceberg/src/test/resources/iceberg/materialized_view_expired_snapshots/source_table/metadata/snap-3545425111120995268-1-18a67dd0-3e07-4a65-aae0-432e3ab85180.avropom.xmltesting/trino-faulttolerant-tests/pom.xmltesting/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/common/StandardMultinode.javatesting/trino-product-tests-launcher/src/main/resources/docker/trino-product-tests/common/standard-multinode/cache-manager-alluxio.propertiestesting/trino-product-tests-launcher/src/main/resources/docker/trino-product-tests/common/standard-multinode/cache-manager-memory.propertiestesting/trino-product-tests-launcher/src/main/resources/docker/trino-product-tests/common/standard-multinode/multinode-master-config.propertiestesting/trino-product-tests-launcher/src/main/resources/docker/trino-product-tests/common/standard-multinode/multinode-worker-config.propertiestesting/trino-product-tests-launcher/src/main/resources/docker/trino-product-tests/conf/environment/multinode-hive-cached/hive.propertiestesting/trino-product-tests-launcher/src/main/resources/docker/trino-product-tests/conf/environment/multinode-iceberg-minio-cached/iceberg.propertiestesting/trino-product-tests-launcher/src/main/resources/docker/trino-product-tests/conf/environment/multinode-minio-data-lake-cached/delta.propertiestesting/trino-product-tests/src/main/java/io/trino/tests/product/utils/CachingTestUtils.javatesting/trino-testing/src/main/java/io/trino/testing/DistributedQueryRunner.java
💤 Files with no reviewable changes (12)
- lib/trino-filesystem-manager/pom.xml
- testing/trino-product-tests-launcher/src/main/resources/docker/trino-product-tests/conf/environment/multinode-hive-cached/hive.properties
- testing/trino-product-tests-launcher/src/main/resources/docker/trino-product-tests/conf/environment/multinode-iceberg-minio-cached/iceberg.properties
- testing/trino-product-tests-launcher/src/main/resources/docker/trino-product-tests/conf/environment/multinode-minio-data-lake-cached/delta.properties
- lib/trino-filesystem/pom.xml
- lib/trino-filesystem/src/main/java/io/trino/filesystem/memory/MemoryFileSystemCacheModule.java
- lib/trino-filesystem-cache-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemCacheModule.java
- lib/trino-filesystem-cache-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioInputStream.java
- lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/TrinoFileSystemCache.java
- lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/TracingFileSystemCache.java
- lib/trino-filesystem-cache-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioInput.java
- lib/trino-filesystem/src/main/java/io/trino/filesystem/memory/MemoryFileSystemCache.java
b994536 to
bc62492
Compare
bc62492 to
e60317c
Compare
| void readFully(long position, byte[] buffer, int offset, int length) | ||
| throws IOException; | ||
|
|
||
| default int readTail(byte[] buffer, int offset, int length) |
There was a problem hiding this comment.
What's the benefit of building this into BlobSource vs having a utility that can do this?
| */ | ||
| public interface BlobCache | ||
| { | ||
| BlobSource get(CacheKey key, BlobSource source) |
There was a problem hiding this comment.
BlobSource for input should probably be separate from the output of this method. We may need to add more capability to the source (e.g., read ranges, prefetch, async reads) without polluting what the cache users see as the interface.
There was a problem hiding this comment.
Also, the Blob that's returned from the cache may need to keep state, be closeable so that callers can manage lifecycle, etc. We may also want different declared exceptions for the input BlobSource vs the output one.
There was a problem hiding this comment.
There is no use-case for that one so adding a separate interface with methods that are not used is not reasonable for now. We can revisit this API in the future
| MEMORY, | ||
| DISK, |
There was a problem hiding this comment.
This taxonomy is also very limited. What about "distributed memory", "distributed disk", "flash/ssd", etc. In the end, it depends on how they are supposed to be used. Which part of the system cares about which tier is being used and why does the tier matter?
There was a problem hiding this comment.
Connector cares as tier == latency
The purpose of the unified cache is to move cache control to the engine, including cache invalidation, sizing, etc. Right now every catalog creates and manages its own cache which is suboptimial and hard to manage when dynamic catalogs are used. Instead of having multiple caches, create a single, engine-level cache that is leased to connectors.
Description
Additional context and related issues
Release notes
( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
( ) Release notes are required, with the following suggested text: