diff --git a/src/Orleans.Streaming/Common/PooledCache/PooledQueueCache.cs b/src/Orleans.Streaming/Common/PooledCache/PooledQueueCache.cs
index c920ac4f50d..7bb385dc0d1 100644
--- a/src/Orleans.Streaming/Common/PooledCache/PooledQueueCache.cs
+++ b/src/Orleans.Streaming/Common/PooledCache/PooledQueueCache.cs
@@ -4,6 +4,7 @@
using System.Linq;
using Microsoft.Extensions.Logging;
using Orleans.Runtime;
+using Orleans.Streaming;
using Orleans.Streams;
namespace Orleans.Providers.Streams.Common
@@ -185,6 +186,16 @@ private void SetCursor(Cursor cursor, StreamSequenceToken sequenceToken)
cursor.SequenceToken = newestBlock.Value.GetNewestSequenceToken(cacheDataAdapter);
return;
}
+ if (sequenceToken is OldestInStreamToken)
+ {
+ // Start from the oldest message in cache
+ var block = messageBlocks.Last;
+ cursor.State = CursorStates.Set;
+ cursor.CurrentBlock = block;
+ cursor.Index = block.Value.OldestMessageIndex;
+ cursor.SequenceToken = block.Value.GetOldestSequenceToken(cacheDataAdapter);
+ return;
+ }
// If sequenceToken is too new to be in cache, unset token, and wait for more data.
CachedMessage newestMessage = newestBlock.Value.NewestMessage;
diff --git a/src/Orleans.Streaming/Core/OldestInStreamToken.cs b/src/Orleans.Streaming/Core/OldestInStreamToken.cs
new file mode 100644
index 00000000000..5243b3ece0a
--- /dev/null
+++ b/src/Orleans.Streaming/Core/OldestInStreamToken.cs
@@ -0,0 +1,42 @@
+using System;
+using Orleans.Streams;
+
+namespace Orleans.Streaming;
+
+[Serializable]
+[GenerateSerializer]
+public sealed class OldestInStreamToken : StreamSequenceToken
+{
+ ///
+ /// Always -1, which is less than any other valid sequence number.
+ /// The setter is protected and does nothing.
+ ///
+ public override long SequenceNumber { get => -1; protected set { } }
+
+ ///
+ /// Always 0, as this is a conceptual token representing the oldest event in the stream.
+ /// The setter is protected and does nothing.
+ ///
+ public override int EventIndex { get => 0; protected set { } }
+
+ ///
+ /// An instance of the class.
+ ///
+ public static OldestInStreamToken Instance { get; } = new OldestInStreamToken();
+
+ ///
+ public override bool Equals(StreamSequenceToken other)
+ {
+ return other is OldestInStreamToken;
+ }
+
+ ///
+ /// Always less than any other token, except another .
+ ///
+ public override int CompareTo(StreamSequenceToken other)
+ {
+ if (other is OldestInStreamToken) return 0;
+ return -1;
+ }
+}
+
diff --git a/src/Orleans.Streaming/PersistentStreams/PersistentStreamPullingAgent.cs b/src/Orleans.Streaming/PersistentStreams/PersistentStreamPullingAgent.cs
index db93df6a100..950d5deca72 100644
--- a/src/Orleans.Streaming/PersistentStreams/PersistentStreamPullingAgent.cs
+++ b/src/Orleans.Streaming/PersistentStreams/PersistentStreamPullingAgent.cs
@@ -1,7 +1,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics.Metrics;
-using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
@@ -10,6 +9,7 @@
using Orleans.Internal;
using Orleans.Runtime;
using Orleans.Runtime.Internal;
+using Orleans.Streaming;
using Orleans.Streams.Filtering;
namespace Orleans.Streams
@@ -543,7 +543,7 @@ private async Task RunConsumerCursor(StreamConsumerData consumerData)
{
exceptionOccured = exc;
consumerData.SafeDisposeCursor(logger);
- consumerData.Cursor = queueCache.GetCacheCursor(consumerData.StreamId, null);
+ consumerData.Cursor = queueCache.GetCacheCursor(consumerData.StreamId, OldestInStreamToken.Instance);
}
if (batch != null)
diff --git a/test/TesterInternal/OrleansRuntime/Streams/PooledQueueCacheTests.cs b/test/TesterInternal/OrleansRuntime/Streams/PooledQueueCacheTests.cs
index 75eb1cb7433..ea1a2bd2220 100644
--- a/test/TesterInternal/OrleansRuntime/Streams/PooledQueueCacheTests.cs
+++ b/test/TesterInternal/OrleansRuntime/Streams/PooledQueueCacheTests.cs
@@ -2,6 +2,7 @@
using Microsoft.Extensions.Logging.Abstractions;
using Orleans.Providers.Streams.Common;
using Orleans.Runtime;
+using Orleans.Streaming;
using Orleans.Streams;
using Xunit;
@@ -348,6 +349,48 @@ long EnqueueMessage(Guid streamId)
}
}
+ [Fact, TestCategory("BVT"), TestCategory("Streaming")]
+ public void GetCursorAtOldestEntry()
+ {
+ var bufferPool = new ObjectPool(() => new FixedSizeBuffer(PooledBufferSize));
+ var dataAdapter = new TestCacheDataAdapter();
+ var cache = new PooledQueueCache(dataAdapter, NullLogger.Instance, null, null, TimeSpan.FromSeconds(10));
+ var evictionStrategy = new ChronologicalEvictionStrategy(NullLogger.Instance, new TimePurgePredicate(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1)), null, null);
+ evictionStrategy.PurgeObservable = cache;
+ var converter = new CachedMessageConverter(bufferPool, evictionStrategy);
+
+ var seqNumber = 123;
+ var streamKey = Guid.NewGuid();
+ var stream = StreamId.Create(TestStreamNamespace, streamKey);
+
+ // Start by enqueuing messages for stream
+ EnqueueMessage(streamKey);
+ EnqueueMessage(streamKey);
+
+ // Get a cursor at the oldest entry
+ var cursor = cache.GetCursor(stream, OldestInStreamToken.Instance);
+
+ // Should have a cursor able to walk the two messages
+ Assert.NotNull(cursor);
+
+ Assert.True(cache.TryGetNextMessage(cursor, out var _));
+ Assert.True(cache.TryGetNextMessage(cursor, out var _));
+ Assert.False(cache.TryGetNextMessage(cursor, out var _));
+
+ long EnqueueMessage(Guid streamId)
+ {
+ var now = DateTime.UtcNow;
+ var msg = new TestQueueMessage
+ {
+ StreamId = StreamId.Create(TestStreamNamespace, streamId),
+ SequenceNumber = seqNumber,
+ };
+ cache.Add(new List() { converter.ToCachedMessage(msg, now) }, now);
+ seqNumber++;
+ return msg.SequenceNumber;
+ }
+ }
+
private int RunGoldenPath(PooledQueueCache cache, CachedMessageConverter converter, int startOfCache)
{
int sequenceNumber = startOfCache;