diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index da6f47073df88..b3af6e7b4d9d9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -2308,9 +2308,10 @@ int hash() { boolean heap, boolean offheap, AffinityTopologyVersion topVer, - @Nullable IgniteCacheExpiryPolicy expiryPlc) - throws GridCacheEntryRemovedException, IgniteCheckedException { + @Nullable IgniteCacheExpiryPolicy expiryPlc + ) throws GridCacheEntryRemovedException, IgniteCheckedException { assert heap || offheap; + assert cctx.shared().database().checkpointLockIsHeldByThread(); boolean rmv = false; @@ -2377,7 +2378,17 @@ int hash() { AffinityTopologyVersion topVer = tx != null ? tx.topologyVersion() : cctx.affinity().affinityTopologyVersion(); - return peek(true, false, topVer, null); + assert cctx.shared().database().checkpointLockIsHeldByThread() || !lockedByCurrentThread() : + "Lock order violation, checkpoint lock must be acquired before entry lock"; + + cctx.shared().database().checkpointReadLock(); + + try { + return peek(true, false, topVer, null); + } + finally { + cctx.shared().database().checkpointReadUnlock(); + } } /** @@ -3790,6 +3801,11 @@ private CacheEntryImplEx wrapVersionedWithValue() { public final boolean visitable(CacheEntryPredicate[] filter) { boolean rmv = false; + assert cctx.shared().database().checkpointLockIsHeldByThread() || !lockedByCurrentThread() : + "Lock order violation, checkpoint lock must be acquired before entry lock"; + + cctx.shared().database().checkpointReadLock(); + try { lockEntry(); @@ -3826,6 +3842,8 @@ public final boolean visitable(CacheEntryPredicate[] filter) { return false; } finally { + cctx.shared().database().checkpointReadUnlock(); + if (rmv) { onMarkedObsolete(); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java index 2b397a34d960f..13e231d7ebccc 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java @@ -64,6 +64,7 @@ import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.WithSystemProperty; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; import org.junit.Test; import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; @@ -484,6 +485,54 @@ public void testExpirationNonPersistentRegion() throws Exception { assertFalse("Failure handler should not be triggered.", failureHndTriggered); } + /** */ + @Test + public void testNearCacheExpiredEntriesIteration() throws Exception { + IgniteEx srv = startGrids(2); + srv.cluster().state(ACTIVE); + + IgniteCache cache = srv.getOrCreateCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME) + .setNearConfiguration(new NearCacheConfiguration<>()) + .setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.MILLISECONDS, 1))) + .setEagerTtl(false) + ); + + for (int i = 0; i < 100; i++) + cache.put(i, "val"); + + doSleep(10); // Wait for expiration. + + cache.clear(); + + assertEquals(0, cache.size()); + } + + /** */ + @Test + public void testReplaceExpiredOnheapEntriesUnderTx() throws Exception { + IgniteEx srv = startGrid(0); + srv.cluster().state(ACTIVE); + + IgniteCache cache = srv.getOrCreateCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME) + .setOnheapCacheEnabled(true) + .setAtomicityMode(TRANSACTIONAL) + .setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.MILLISECONDS, 1))) + .setEagerTtl(false) + ); + + cache.put(0, "val"); + + doSleep(10); // Wait for expiration. + + try (Transaction tx = srv.transactions().txStart()) { + cache.replace(0, "val", "val1"); + + tx.commit(); + } + + assertNull(cache.get(0)); // Expired. + } + /** * */