Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/Orleans.Streaming/Common/PooledCache/PooledQueueCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Linq;
using Microsoft.Extensions.Logging;
using Orleans.Runtime;
using Orleans.Streaming;
using Orleans.Streams;

namespace Orleans.Providers.Streams.Common
Expand Down Expand Up @@ -185,6 +186,16 @@ private void SetCursor(Cursor cursor, StreamSequenceToken sequenceToken)
cursor.SequenceToken = newestBlock.Value.GetNewestSequenceToken(cacheDataAdapter);
return;
}
if (sequenceToken is OldestInStreamToken)
{
// Start from the oldest message in cache
var block = messageBlocks.Last;
cursor.State = CursorStates.Set;
cursor.CurrentBlock = block;
cursor.Index = block.Value.OldestMessageIndex;
cursor.SequenceToken = block.Value.GetOldestSequenceToken(cacheDataAdapter);
return;
}

// If sequenceToken is too new to be in cache, unset token, and wait for more data.
CachedMessage newestMessage = newestBlock.Value.NewestMessage;
Expand Down
42 changes: 42 additions & 0 deletions src/Orleans.Streaming/Core/OldestInStreamToken.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
using System;
using Orleans.Streams;

namespace Orleans.Streaming;

[Serializable]
[GenerateSerializer]
public sealed class OldestInStreamToken : StreamSequenceToken
{
/// <summary>
/// Always -1, which is less than any other valid sequence number.
/// The setter is protected and does nothing.
/// </summary>
public override long SequenceNumber { get => -1; protected set { } }

/// <summary>
/// Always 0, as this is a conceptual token representing the oldest event in the stream.
/// The setter is protected and does nothing.
/// </summary>
public override int EventIndex { get => 0; protected set { } }

/// <summary>
/// An instance of the <see cref="OldestInStreamToken"/> class.
/// </summary>
public static OldestInStreamToken Instance { get; } = new OldestInStreamToken();

/// <inheritdoc/>
public override bool Equals(StreamSequenceToken other)
{
return other is OldestInStreamToken;
}

/// <summary>
/// Always less than any other token, except another <see cref="OldestInStreamToken"/>.
/// </summary>
public override int CompareTo(StreamSequenceToken other)
{
if (other is OldestInStreamToken) return 0;
return -1;
}
}

Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics.Metrics;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -10,6 +9,7 @@
using Orleans.Internal;
using Orleans.Runtime;
using Orleans.Runtime.Internal;
using Orleans.Streaming;
using Orleans.Streams.Filtering;

namespace Orleans.Streams
Expand Down Expand Up @@ -543,7 +543,7 @@ private async Task RunConsumerCursor(StreamConsumerData consumerData)
{
exceptionOccured = exc;
consumerData.SafeDisposeCursor(logger);
consumerData.Cursor = queueCache.GetCacheCursor(consumerData.StreamId, null);
consumerData.Cursor = queueCache.GetCacheCursor(consumerData.StreamId, OldestInStreamToken.Instance);
}

if (batch != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using Microsoft.Extensions.Logging.Abstractions;
using Orleans.Providers.Streams.Common;
using Orleans.Runtime;
using Orleans.Streaming;
using Orleans.Streams;
using Xunit;

Expand Down Expand Up @@ -348,6 +349,48 @@ long EnqueueMessage(Guid streamId)
}
}

[Fact, TestCategory("BVT"), TestCategory("Streaming")]
public void GetCursorAtOldestEntry()
{
var bufferPool = new ObjectPool<FixedSizeBuffer>(() => new FixedSizeBuffer(PooledBufferSize));
var dataAdapter = new TestCacheDataAdapter();
var cache = new PooledQueueCache(dataAdapter, NullLogger.Instance, null, null, TimeSpan.FromSeconds(10));
var evictionStrategy = new ChronologicalEvictionStrategy(NullLogger.Instance, new TimePurgePredicate(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1)), null, null);
evictionStrategy.PurgeObservable = cache;
var converter = new CachedMessageConverter(bufferPool, evictionStrategy);

var seqNumber = 123;
var streamKey = Guid.NewGuid();
var stream = StreamId.Create(TestStreamNamespace, streamKey);

// Start by enqueuing messages for stream
EnqueueMessage(streamKey);
EnqueueMessage(streamKey);

// Get a cursor at the oldest entry
var cursor = cache.GetCursor(stream, OldestInStreamToken.Instance);

// Should have a cursor able to walk the two messages
Assert.NotNull(cursor);

Assert.True(cache.TryGetNextMessage(cursor, out var _));
Assert.True(cache.TryGetNextMessage(cursor, out var _));
Assert.False(cache.TryGetNextMessage(cursor, out var _));

long EnqueueMessage(Guid streamId)
{
var now = DateTime.UtcNow;
var msg = new TestQueueMessage
{
StreamId = StreamId.Create(TestStreamNamespace, streamId),
SequenceNumber = seqNumber,
};
cache.Add(new List<CachedMessage>() { converter.ToCachedMessage(msg, now) }, now);
seqNumber++;
return msg.SequenceNumber;
}
}

private int RunGoldenPath(PooledQueueCache cache, CachedMessageConverter converter, int startOfCache)
{
int sequenceNumber = startOfCache;
Expand Down
Loading