Skip to content

Commit 21382e6

Browse files
committed
Fix CompressionDictionary being closed while still in use
Patch by Yifan Cai; Reviewed by Stefan Miklosovic for CASSANDRA-21047
1 parent c086665 commit 21382e6

File tree

10 files changed

+480
-68
lines changed

10 files changed

+480
-68
lines changed

CHANGES.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
5.1
2+
* Fix CompressionDictionary being closed while still in use (CASSANDRA-21047)
23
* When updating a multi cell collection element, if the update is rejected then the shared Row.Builder is not freed causing all future mutations to be rejected (CASSANDRA-21055)
34
* Schema annotations escape validation on CREATE and ALTER DDL statements (CASSANDRA-21046)
45
* Calculate once and cache the result of ModificationStatement#requiresRead as a perf optimization (CASSANDRA-21040)

src/java/org/apache/cassandra/db/compression/CompressionDictionary.java

Lines changed: 79 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,49 @@
2525
import java.util.Objects;
2626
import javax.annotation.Nullable;
2727

28+
import com.google.common.annotations.VisibleForTesting;
2829
import com.google.common.base.Preconditions;
2930
import com.google.common.hash.Hasher;
3031
import com.google.common.hash.Hashing;
3132

3233
import org.apache.cassandra.cql3.UntypedResultSet;
3334
import org.apache.cassandra.io.compress.ICompressor;
3435
import org.apache.cassandra.io.compress.ZstdDictionaryCompressor;
36+
import org.apache.cassandra.utils.concurrent.Ref;
3537

36-
public interface CompressionDictionary extends AutoCloseable
38+
/**
39+
* Interface for compression dictionaries with reference-counted lifecycle management.
40+
*
41+
* <h2>Reference Counting Model</h2>
42+
* Compression dictionaries hold native resources that must be explicitly managed. This interface
43+
* uses {@link Ref} for safe lifecycle management across multiple concurrent users.
44+
*
45+
* <h3>Ownership and Usage in Cassandra</h3>
46+
* <ul>
47+
* <li><b>CompressionDictionaryManager</b>: Holds the primary reference ({@link #selfRef()}) for cached dictionaries</li>
48+
* <li><b>CompressionMetadata.Writer</b>: Acquires a reference during SSTable write, held for the writer's lifetime</li>
49+
* <li><b>CompressionMetadata</b>: Acquires a reference when created (via {@link #tryRef()}), held for the SSTable reader's lifetime.
50+
* All copies created via sharedCopy() share this single reference through WrappedSharedCloseable</li>
51+
* </ul>
52+
*
53+
* <h3>Correctness Guarantee</h3>
54+
* The reference counting prevents premature cleanup of native resources:
55+
* <ol>
56+
* <li>CompressionMetadata acquires a reference when an SSTable is opened</li>
57+
* <li>Native resources remain valid as long as any reference exists (refcount &gt; 0)</li>
58+
* <li>Even if the cache evicts the dictionary, the SSTable's reference keeps resources alive</li>
59+
* <li>Cleanup runs exactly once when the last reference is released (refcount goes 0 → -1)</li>
60+
* <li>After cleanup, {@link #tryRef()} returns null, preventing new references to released resources</li>
61+
* </ol>
62+
*
63+
* This ensures dictionaries cannot be freed while SSTables are using them for compression/decompression,
64+
* even when the cache evicts the dictionary concurrently.
65+
*
66+
* @see Ref for reference counting implementation
67+
* @see CompressionDictionaryManager for cache management
68+
* @see org.apache.cassandra.io.compress.CompressionMetadata for SSTable usage
69+
*/
70+
public interface CompressionDictionary
3771
{
3872
/**
3973
* Get the dictionary id
@@ -75,6 +109,48 @@ default Kind kind()
75109
return dictId().kind;
76110
}
77111

112+
/**
113+
* Try to acquire a new reference to this dictionary.
114+
* Returns null if the dictionary is already released.
115+
* <p>
116+
* The caller must ensure the returned reference is released when no longer needed,
117+
* either by calling {@code ref.release()} or {@code ref.close()} (they are equivalent).
118+
* Failing to release the reference will prevent cleanup of native resources and cause
119+
* a memory leak.
120+
*
121+
* @return a new reference to this dictionary, or null if already released
122+
*/
123+
Ref<? extends CompressionDictionary> tryRef();
124+
125+
/**
126+
* Get the self-reference of this dictionary.
127+
* This is used to release the primary reference held by the cache.
128+
*
129+
* @return the self-reference
130+
*/
131+
Ref<? extends CompressionDictionary> selfRef();
132+
133+
/**
134+
* Releases the self-reference of this dictionary.
135+
* This is a convenience method equivalent to calling {@code selfRef().close()}.
136+
* <p>
137+
* This method is idempotent - calling it multiple times is safe and will only
138+
* release the self-reference once. Subsequent calls have no effect.
139+
* <p>
140+
* This method is typically used when creating a dictionary outside the cache
141+
* (e.g., in tests or temporary usage) and needing to clean it up. For dictionaries
142+
* managed by the cache, the cache's removal listener handles cleanup via
143+
* {@code selfRef().release()}.
144+
*
145+
* @see #selfRef()
146+
* @see #tryRef()
147+
*/
148+
@VisibleForTesting
149+
default void close()
150+
{
151+
selfRef().close();
152+
}
153+
78154
/**
79155
* Write compression dictionary to file
80156
*
@@ -192,15 +268,15 @@ static CompressionDictionary createFromRow(UntypedResultSet.Row row)
192268
if (dict.length != storedLength)
193269
{
194270
throw new IllegalStateException(String.format("Dictionary length mismatch for %s dict id %d. Expected: %d, actual: %d",
195-
kindStr, dictId, storedLength, dict.length));
271+
kindStr, dictId, storedLength, dict.length));
196272
}
197273

198274
// Validate checksum
199275
int calculatedChecksum = calculateChecksum((byte) kind.ordinal(), dictId, dict);
200276
if (calculatedChecksum != storedChecksum)
201277
{
202278
throw new IllegalStateException(String.format("Dictionary checksum mismatch for %s dict id %d. Expected: %d, actual: %d",
203-
kindStr, dictId, storedChecksum, calculatedChecksum));
279+
kindStr, dictId, storedChecksum, calculatedChecksum));
204280
}
205281

206282
return kind.createDictionary(new DictId(kind, dictId), row.getByteArray("dict"), storedChecksum);

src/java/org/apache/cassandra/db/compression/CompressionDictionaryCache.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,17 +62,17 @@ public CompressionDictionaryCache()
6262
.removalListener((DictId dictId,
6363
CompressionDictionary dictionary,
6464
RemovalCause cause) -> {
65-
// Close dictionary when evicted from cache to free native resources
66-
// SelfRefCounted ensures dictionary won't be actually closed if still referenced by compressors
65+
// Release the cache's reference to the dictionary when evicted
66+
// The dictionary will only be truly cleaned up when all references are released
6767
if (dictionary != null)
6868
{
6969
try
7070
{
71-
dictionary.close();
71+
dictionary.selfRef().release();
7272
}
7373
catch (Exception e)
7474
{
75-
logger.warn("Failed to close compression dictionary {}", dictId, e);
75+
logger.warn("Failed to release compression dictionary {}", dictId, e);
7676
}
7777
}
7878
})

src/java/org/apache/cassandra/db/compression/ZstdCompressionDictionary.java

Lines changed: 66 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
import java.util.Objects;
2222
import java.util.concurrent.ConcurrentHashMap;
23-
import java.util.concurrent.atomic.AtomicBoolean;
23+
import java.util.concurrent.atomic.AtomicReference;
2424

2525
import com.google.common.annotations.VisibleForTesting;
2626
import org.slf4j.Logger;
@@ -42,8 +42,7 @@ public class ZstdCompressionDictionary implements CompressionDictionary, SelfRef
4242
private final int checksum;
4343
// One ZstdDictDecompress and multiple ZstdDictCompress (per level) can be derived from the same raw dictionary content
4444
private final ConcurrentHashMap<Integer, ZstdDictCompress> zstdDictCompressPerLevel = new ConcurrentHashMap<>();
45-
private volatile ZstdDictDecompress dictDecompress;
46-
private final AtomicBoolean closed = new AtomicBoolean(false);
45+
private final AtomicReference<ZstdDictDecompress> dictDecompress = new AtomicReference<>();
4746
private final Ref<ZstdCompressionDictionary> selfRef;
4847

4948
@VisibleForTesting
@@ -90,7 +89,7 @@ public int checksum()
9089
public int estimatedOccupiedMemoryBytes()
9190
{
9291
int occupied = rawDictionary.length;
93-
occupied += dictDecompress != null ? rawDictionary.length : 0;
92+
occupied += dictDecompress.get() != null ? rawDictionary.length : 0;
9493
occupied += zstdDictCompressPerLevel.size() * rawDictionary.length;
9594

9695
return occupied;
@@ -114,50 +113,65 @@ public int hashCode()
114113
* Get a pre-processed compression tables that is optimized for compression.
115114
* It is derived/computed from dictionary bytes.
116115
* The internal data structure is different from the tables for decompression.
117-
*
116+
* <br>
117+
* IMPORTANT: Caller MUST hold a valid reference (via tryRef/ref) to this dictionary.
118+
* The reference counting mechanism ensures tidy() cannot run while references exist,
119+
* making synchronization unnecessary. This method is safe to call concurrently as long
120+
* as each caller holds a reference.
121+
* <br>
118122
* @param compressionLevel compression level to create the compression table
119-
* @return ZstdDictCompress
123+
* @return ZstdDictCompress for the specified compression level
124+
* @throws IllegalStateException if called without holding a valid reference
120125
*/
121126
public ZstdDictCompress dictionaryForCompression(int compressionLevel)
122127
{
123-
if (closed.get())
124-
throw new IllegalStateException("Dictionary has been closed. " + dictId);
125-
128+
ensureNotReleased();
126129
ZstdCompressorBase.validateCompressionLevel(compressionLevel);
127130

128-
return zstdDictCompressPerLevel.computeIfAbsent(compressionLevel, level -> {
129-
if (closed.get())
130-
throw new IllegalStateException("Dictionary has been closed");
131-
return new ZstdDictCompress(rawDictionary, level);
132-
});
131+
// Fast path: check if already exists to avoid locking the bin
132+
ZstdDictCompress existing = zstdDictCompressPerLevel.get(compressionLevel);
133+
if (existing != null)
134+
return existing;
135+
136+
// A little slow path: create new dictionary for this compression level
137+
// No additional synchronization needed - reference counting prevents tidy() while in use
138+
return zstdDictCompressPerLevel.computeIfAbsent(compressionLevel, level ->
139+
new ZstdDictCompress(rawDictionary, level));
133140
}
134141

135142
/**
136143
* Get a pre-processed decompression tables that is optimized for decompression.
137144
* It is derived/computed from dictionary bytes.
138145
* The internal data structure is different from the tables for compression.
146+
* <br>
147+
* IMPORTANT: Caller MUST hold a valid reference (via tryRef/ref) to this dictionary.
148+
* The reference counting mechanism ensures tidy() cannot run while references exist,
149+
* making synchronization unnecessary. This method is safe to call concurrently as long
150+
* as each caller holds a reference.
151+
* <br>
152+
* Thread-safe: Multiple threads can safely call this method concurrently.
153+
* The decompression dictionary will be created exactly once on first access.
139154
*
140-
* @return ZstdDictDecompress
155+
* @return ZstdDictDecompress for decompression operations
156+
* @throws IllegalStateException if called without holding a valid reference
141157
*/
142158
public ZstdDictDecompress dictionaryForDecompression()
143159
{
144-
if (closed.get())
145-
throw new IllegalStateException("Dictionary has been closed");
146-
147-
ZstdDictDecompress result = dictDecompress;
160+
ensureNotReleased();
161+
// Fast path: if already initialized, return immediately
162+
ZstdDictDecompress result = dictDecompress.get();
148163
if (result != null)
149164
return result;
150165

166+
// Slow path: need to initialize with proper double-checked locking
167+
// Reference counting guarantees tidy() won't run during this operation
151168
synchronized (this)
152169
{
153-
if (closed.get())
154-
throw new IllegalStateException("Dictionary has been closed");
155-
156-
result = dictDecompress;
170+
result = dictDecompress.get();
157171
if (result == null)
158172
{
159173
result = new ZstdDictDecompress(rawDictionary);
160-
dictDecompress = result;
174+
dictDecompress.set(result);
161175
}
162176
return result;
163177
}
@@ -181,30 +195,48 @@ public Ref<ZstdCompressionDictionary> ref()
181195
return selfRef.ref();
182196
}
183197

184-
@Override
185-
public void close()
198+
private void ensureNotReleased()
186199
{
187-
if (closed.compareAndSet(false, true))
188-
{
189-
selfRef.release();
190-
}
200+
if (selfRef.globalCount() <= 0)
201+
throw new IllegalStateException("Dictionary has been released: " + dictId);
191202
}
192203

204+
/**
205+
* Tidy implementation for cleaning up native Zstd resources.
206+
*
207+
* This class holds direct references to the resources that need cleanup,
208+
* avoiding a circular reference pattern where Tidy would hold a reference
209+
* to the parent dictionary object.
210+
*/
193211
private static class Tidy implements RefCounted.Tidy
194212
{
195213
private final ConcurrentHashMap<Integer, ZstdDictCompress> zstdDictCompressPerLevel;
196-
private volatile ZstdDictDecompress dictDecompress;
214+
private final AtomicReference<ZstdDictDecompress> dictDecompress;
197215

198-
Tidy(ConcurrentHashMap<Integer, ZstdDictCompress> zstdDictCompressPerLevel, ZstdDictDecompress dictDecompress)
216+
Tidy(ConcurrentHashMap<Integer, ZstdDictCompress> zstdDictCompressPerLevel,
217+
AtomicReference<ZstdDictDecompress> dictDecompress)
199218
{
200219
this.zstdDictCompressPerLevel = zstdDictCompressPerLevel;
201220
this.dictDecompress = dictDecompress;
202221
}
203222

223+
/**
224+
* Clean up native resources when reference count reaches zero.
225+
*
226+
* IMPORTANT: This method is called exactly once when the last reference is released.
227+
* Reference counting guarantees that no other thread can be executing
228+
* dictionaryForCompression/Decompression when this runs, because:
229+
* 1. Those methods require holding a valid reference
230+
* 2. This only runs when refcount goes from 0 to -1
231+
* 3. Once refcount is negative, tryRef() returns null, preventing new references
232+
*
233+
* Therefore, no synchronization is needed - we have exclusive access to clean up.
234+
*/
204235
@Override
205236
public void tidy()
206237
{
207238
// Close all compression dictionaries
239+
// No synchronization needed - reference counting ensures exclusive access
208240
for (ZstdDictCompress compressDict : zstdDictCompressPerLevel.values())
209241
{
210242
try
@@ -220,7 +252,7 @@ public void tidy()
220252
zstdDictCompressPerLevel.clear();
221253

222254
// Close decompression dictionary
223-
ZstdDictDecompress decompressDict = dictDecompress;
255+
ZstdDictDecompress decompressDict = dictDecompress.get();
224256
if (decompressDict != null)
225257
{
226258
try
@@ -231,7 +263,7 @@ public void tidy()
231263
{
232264
logger.warn("Failed to close ZstdDictDecompress", e);
233265
}
234-
dictDecompress = null;
266+
dictDecompress.set(null);
235267
}
236268
}
237269

0 commit comments

Comments
 (0)