diff --git a/src/Orleans.Streaming/Common/PooledCache/PooledQueueCache.cs b/src/Orleans.Streaming/Common/PooledCache/PooledQueueCache.cs index c920ac4f50d..7bb385dc0d1 100644 --- a/src/Orleans.Streaming/Common/PooledCache/PooledQueueCache.cs +++ b/src/Orleans.Streaming/Common/PooledCache/PooledQueueCache.cs @@ -4,6 +4,7 @@ using System.Linq; using Microsoft.Extensions.Logging; using Orleans.Runtime; +using Orleans.Streaming; using Orleans.Streams; namespace Orleans.Providers.Streams.Common @@ -185,6 +186,16 @@ private void SetCursor(Cursor cursor, StreamSequenceToken sequenceToken) cursor.SequenceToken = newestBlock.Value.GetNewestSequenceToken(cacheDataAdapter); return; } + if (sequenceToken is OldestInStreamToken) + { + // Start from the oldest message in cache + var block = messageBlocks.Last; + cursor.State = CursorStates.Set; + cursor.CurrentBlock = block; + cursor.Index = block.Value.OldestMessageIndex; + cursor.SequenceToken = block.Value.GetOldestSequenceToken(cacheDataAdapter); + return; + } // If sequenceToken is too new to be in cache, unset token, and wait for more data. CachedMessage newestMessage = newestBlock.Value.NewestMessage; diff --git a/src/Orleans.Streaming/Core/OldestInStreamToken.cs b/src/Orleans.Streaming/Core/OldestInStreamToken.cs new file mode 100644 index 00000000000..5243b3ece0a --- /dev/null +++ b/src/Orleans.Streaming/Core/OldestInStreamToken.cs @@ -0,0 +1,42 @@ +using System; +using Orleans.Streams; + +namespace Orleans.Streaming; + +[Serializable] +[GenerateSerializer] +public sealed class OldestInStreamToken : StreamSequenceToken +{ + /// + /// Always -1, which is less than any other valid sequence number. + /// The setter is protected and does nothing. + /// + public override long SequenceNumber { get => -1; protected set { } } + + /// + /// Always 0, as this is a conceptual token representing the oldest event in the stream. + /// The setter is protected and does nothing. + /// + public override int EventIndex { get => 0; protected set { } } + + /// + /// An instance of the class. + /// + public static OldestInStreamToken Instance { get; } = new OldestInStreamToken(); + + /// + public override bool Equals(StreamSequenceToken other) + { + return other is OldestInStreamToken; + } + + /// + /// Always less than any other token, except another . + /// + public override int CompareTo(StreamSequenceToken other) + { + if (other is OldestInStreamToken) return 0; + return -1; + } +} + diff --git a/src/Orleans.Streaming/PersistentStreams/PersistentStreamPullingAgent.cs b/src/Orleans.Streaming/PersistentStreams/PersistentStreamPullingAgent.cs index db93df6a100..950d5deca72 100644 --- a/src/Orleans.Streaming/PersistentStreams/PersistentStreamPullingAgent.cs +++ b/src/Orleans.Streaming/PersistentStreams/PersistentStreamPullingAgent.cs @@ -1,7 +1,6 @@ using System; using System.Collections.Generic; using System.Diagnostics.Metrics; -using System.IO; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -10,6 +9,7 @@ using Orleans.Internal; using Orleans.Runtime; using Orleans.Runtime.Internal; +using Orleans.Streaming; using Orleans.Streams.Filtering; namespace Orleans.Streams @@ -543,7 +543,7 @@ private async Task RunConsumerCursor(StreamConsumerData consumerData) { exceptionOccured = exc; consumerData.SafeDisposeCursor(logger); - consumerData.Cursor = queueCache.GetCacheCursor(consumerData.StreamId, null); + consumerData.Cursor = queueCache.GetCacheCursor(consumerData.StreamId, OldestInStreamToken.Instance); } if (batch != null) diff --git a/test/TesterInternal/OrleansRuntime/Streams/PooledQueueCacheTests.cs b/test/TesterInternal/OrleansRuntime/Streams/PooledQueueCacheTests.cs index 75eb1cb7433..ea1a2bd2220 100644 --- a/test/TesterInternal/OrleansRuntime/Streams/PooledQueueCacheTests.cs +++ b/test/TesterInternal/OrleansRuntime/Streams/PooledQueueCacheTests.cs @@ -2,6 +2,7 @@ using Microsoft.Extensions.Logging.Abstractions; using Orleans.Providers.Streams.Common; using Orleans.Runtime; +using Orleans.Streaming; using Orleans.Streams; using Xunit; @@ -348,6 +349,48 @@ long EnqueueMessage(Guid streamId) } } + [Fact, TestCategory("BVT"), TestCategory("Streaming")] + public void GetCursorAtOldestEntry() + { + var bufferPool = new ObjectPool(() => new FixedSizeBuffer(PooledBufferSize)); + var dataAdapter = new TestCacheDataAdapter(); + var cache = new PooledQueueCache(dataAdapter, NullLogger.Instance, null, null, TimeSpan.FromSeconds(10)); + var evictionStrategy = new ChronologicalEvictionStrategy(NullLogger.Instance, new TimePurgePredicate(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1)), null, null); + evictionStrategy.PurgeObservable = cache; + var converter = new CachedMessageConverter(bufferPool, evictionStrategy); + + var seqNumber = 123; + var streamKey = Guid.NewGuid(); + var stream = StreamId.Create(TestStreamNamespace, streamKey); + + // Start by enqueuing messages for stream + EnqueueMessage(streamKey); + EnqueueMessage(streamKey); + + // Get a cursor at the oldest entry + var cursor = cache.GetCursor(stream, OldestInStreamToken.Instance); + + // Should have a cursor able to walk the two messages + Assert.NotNull(cursor); + + Assert.True(cache.TryGetNextMessage(cursor, out var _)); + Assert.True(cache.TryGetNextMessage(cursor, out var _)); + Assert.False(cache.TryGetNextMessage(cursor, out var _)); + + long EnqueueMessage(Guid streamId) + { + var now = DateTime.UtcNow; + var msg = new TestQueueMessage + { + StreamId = StreamId.Create(TestStreamNamespace, streamId), + SequenceNumber = seqNumber, + }; + cache.Add(new List() { converter.ToCachedMessage(msg, now) }, now); + seqNumber++; + return msg.SequenceNumber; + } + } + private int RunGoldenPath(PooledQueueCache cache, CachedMessageConverter converter, int startOfCache) { int sequenceNumber = startOfCache;