Skip to content

Commit 18fdcd7

Browse files
authored
Support 8.6 idempotent streams (#3006)
* - XADD [IDMP|IDMPAUTO] - XINFO new fields - fix delta in XTRIM in 8.6 * - XCFGSET * - add [Experimental] - add docs
1 parent e71373b commit 18fdcd7

File tree

19 files changed

+762
-65
lines changed

19 files changed

+762
-65
lines changed

Directory.Build.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
1111
<CodeAnalysisRuleset>$(MSBuildThisFileDirectory)Shared.ruleset</CodeAnalysisRuleset>
1212
<MSBuildWarningsAsMessages>NETSDK1069</MSBuildWarningsAsMessages>
13-
<NoWarn>$(NoWarn);NU5105;NU1507;SER001;SER002</NoWarn>
13+
<NoWarn>$(NoWarn);NU5105;NU1507;SER001;SER002;SER003</NoWarn>
1414
<PackageReleaseNotes>https://stackexchange.github.io/StackExchange.Redis/ReleaseNotes</PackageReleaseNotes>
1515
<PackageProjectUrl>https://stackexchange.github.io/StackExchange.Redis/</PackageProjectUrl>
1616
<PackageLicenseExpression>MIT</PackageLicenseExpression>

docs/ReleaseNotes.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@ Current package versions:
66
| ------------ | ----------------- | ----- |
77
| [![StackExchange.Redis](https://img.shields.io/nuget/v/StackExchange.Redis.svg)](https://www.nuget.org/packages/StackExchange.Redis/) | [![StackExchange.Redis](https://img.shields.io/nuget/vpre/StackExchange.Redis.svg)](https://www.nuget.org/packages/StackExchange.Redis/) | [![StackExchange.Redis MyGet](https://img.shields.io/myget/stackoverflow/vpre/StackExchange.Redis.svg)](https://www.myget.org/feed/stackoverflow/package/nuget/StackExchange.Redis) |
88

9+
## unreleased
10+
11+
- Implement idempotent stream entry (IDMP) support ([#3006 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/3006))
12+
913
## 2.10.14
1014

1115
- Fix bug with connection startup failing in low-memory scenarios ([#3002 by nathan-miller23](https://github.com/StackExchange/StackExchange.Redis/pull/3002))

docs/Streams.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,24 @@ You also have the option to override the auto-generated message ID by passing yo
3737
db.StreamAdd("events_stream", "foo_name", "bar_value", messageId: "0-1", maxLength: 100);
3838
```
3939

40+
Idempotent write-at-most-once production
41+
===
42+
43+
From Redis 8.6, streams support idempotent write-at-most-once production. This is achieved by passing a `StreamIdempotentId` to the `StreamAdd` method. Using idempotent ids avoids
44+
duplicate entries in the stream, even in the event of a failure and retry.
45+
46+
The `StreamIdempotentId` contains a producer id and an optional idempotent id. The producer id should be unique for a given data generator and should be stable and consistent between runs.
47+
The optional idempotent id should be unique for a given data item. If the idempotent id is not provided, the server will generate it from the content of the data item.
48+
49+
```csharp
50+
// int someUniqueExternalSourceId = ... // optional
51+
var idempotentId = new StreamIdempotentId("ticket_generator");
52+
// optionally, new StreamIdempotentId("ticket_generator", someUniqueExternalSourceId)
53+
var messageId = db.StreamAdd("events_stream", "foo_name", "bar_value", idempotentId);
54+
```
55+
56+
~~~~The `StreamConfigure` method can be used to configure the stream, in particular the IDMP map. The `StreamConfiguration` class has properties for the idempotent producer (IDMP) duration and max-size.
57+
4058
Reading from Streams
4159
===
4260

docs/exp/SER003.md

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
Redis 8.6 is currently in preview and may be subject to change.
2+
3+
New features in Redis 8.6 include:
4+
5+
- `HOTKEYS` for profiling CPU and network hot-spots by key
6+
- `XADD IDMP[AUTP]` for idempotent (write-at-most-once) stream addition
7+
8+
The corresponding library feature must also be considered subject to change:
9+
10+
1. Existing bindings may cease working correctly if the underlying server API changes.
11+
2. Changes to the server API may require changes to the library API, manifesting in either/both of build-time
12+
or run-time breaks.
13+
14+
While this seems *unlikely*, it must be considered a possibility. If you acknowledge this, you can suppress
15+
this warning by adding the following to your `csproj` file:
16+
17+
```xml
18+
<NoWarn>$(NoWarn);SER003</NoWarn>
19+
```
20+
21+
or more granularly / locally in C#:
22+
23+
``` c#
24+
#pragma warning disable SER003
25+
```

src/StackExchange.Redis/APITypes/StreamInfo.cs

Lines changed: 107 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,31 @@
1-
namespace StackExchange.Redis;
1+
using System.Diagnostics.CodeAnalysis;
2+
3+
namespace StackExchange.Redis;
24

35
/// <summary>
46
/// Describes stream information retrieved using the XINFO STREAM command. <see cref="IDatabase.StreamInfo"/>.
57
/// </summary>
68
public readonly struct StreamInfo
79
{
8-
internal StreamInfo(int length, int radixTreeKeys, int radixTreeNodes, int groups, StreamEntry firstEntry, StreamEntry lastEntry, RedisValue lastGeneratedId)
10+
// OK, I accept that this parameter list / size is getting silly, but: it is too late
11+
// to refactor this as a class.
12+
internal StreamInfo(
13+
int length,
14+
int radixTreeKeys,
15+
int radixTreeNodes,
16+
int groups,
17+
StreamEntry firstEntry,
18+
StreamEntry lastEntry,
19+
RedisValue lastGeneratedId,
20+
RedisValue maxDeletedEntryId,
21+
long entriesAdded,
22+
RedisValue recordedFirstEntryId,
23+
long idmpDuration,
24+
long idmpMaxSize,
25+
long pidsTracked,
26+
long iidsTracked,
27+
long iidsAdded,
28+
long iidsDuplicates)
929
{
1030
Length = length;
1131
RadixTreeKeys = radixTreeKeys;
@@ -14,6 +34,19 @@ internal StreamInfo(int length, int radixTreeKeys, int radixTreeNodes, int group
1434
FirstEntry = firstEntry;
1535
LastEntry = lastEntry;
1636
LastGeneratedId = lastGeneratedId;
37+
38+
// 7.0
39+
MaxDeletedEntryId = maxDeletedEntryId;
40+
EntriesAdded = entriesAdded;
41+
RecordedFirstEntryId = recordedFirstEntryId;
42+
43+
// 8.6
44+
IdmpDuration = idmpDuration;
45+
IdmpMaxSize = idmpMaxSize;
46+
PidsTracked = pidsTracked;
47+
IidsTracked = iidsTracked;
48+
IidsAdded = iidsAdded;
49+
IidsDuplicates = iidsDuplicates;
1750
}
1851

1952
/// <summary>
@@ -50,4 +83,76 @@ internal StreamInfo(int length, int radixTreeKeys, int radixTreeNodes, int group
5083
/// The last generated id.
5184
/// </summary>
5285
public RedisValue LastGeneratedId { get; }
86+
87+
/// <summary>
88+
/// The first id recorded for the stream.
89+
/// </summary>
90+
public RedisValue RecordedFirstEntryId { get; }
91+
92+
/// <summary>
93+
/// The count of all entries added to the stream during its lifetime.
94+
/// </summary>
95+
public long EntriesAdded { get; }
96+
97+
/// <summary>
98+
/// The maximal entry ID that was deleted from the stream.
99+
/// </summary>
100+
public RedisValue MaxDeletedEntryId { get; }
101+
102+
/// <summary>
103+
/// The duration value configured for the stream’s IDMP map (seconds), or <c>-1</c> if unavailable.
104+
/// </summary>
105+
public long IdmpDuration
106+
{
107+
[Experimental(Experiments.Server_8_6, UrlFormat = Experiments.UrlFormat)]
108+
get;
109+
}
110+
111+
/// <summary>
112+
/// The maxsize value configured for the stream’s IDMP map, or <c>-1</c> if unavailable.
113+
/// </summary>
114+
public long IdmpMaxSize
115+
{
116+
[Experimental(Experiments.Server_8_6, UrlFormat = Experiments.UrlFormat)]
117+
get;
118+
}
119+
120+
/// <summary>
121+
/// The number of idempotent pids currently tracked in the stream, or <c>-1</c> if unavailable.
122+
/// </summary>
123+
public long PidsTracked
124+
{
125+
[Experimental(Experiments.Server_8_6, UrlFormat = Experiments.UrlFormat)]
126+
get;
127+
}
128+
129+
/// <summary>
130+
/// The number of idempotent ids currently tracked in the stream, or <c>-1</c> if unavailable.
131+
/// This count reflects active iids that haven't expired or been evicted yet.
132+
/// </summary>
133+
public long IidsTracked
134+
{
135+
[Experimental(Experiments.Server_8_6, UrlFormat = Experiments.UrlFormat)]
136+
get;
137+
}
138+
139+
/// <summary>
140+
/// The count of all entries with an idempotent iid added to the stream during its lifetime, or <c>-1</c> if unavailable.
141+
/// This is a cumulative counter that increases with each idempotent entry added.
142+
/// </summary>
143+
public long IidsAdded
144+
{
145+
[Experimental(Experiments.Server_8_6, UrlFormat = Experiments.UrlFormat)]
146+
get;
147+
}
148+
149+
/// <summary>
150+
/// The count of all duplicate iids (for all pids) detected during the stream's lifetime, or <c>-1</c> if unavailable.
151+
/// This is a cumulative counter that increases with each duplicate iid.
152+
/// </summary>
153+
public long IidsDuplicates
154+
{
155+
[Experimental(Experiments.Server_8_6, UrlFormat = Experiments.UrlFormat)]
156+
get;
157+
}
53158
}

src/StackExchange.Redis/Enums/RedisCommand.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,7 @@ internal enum RedisCommand
229229
XADD,
230230
XAUTOCLAIM,
231231
XCLAIM,
232+
XCFGSET,
232233
XDEL,
233234
XDELEX,
234235
XGROUP,
@@ -375,6 +376,7 @@ internal static bool IsPrimaryOnly(this RedisCommand command)
375376
case RedisCommand.VREM:
376377
case RedisCommand.VSETATTR:
377378
case RedisCommand.XAUTOCLAIM:
379+
case RedisCommand.XCFGSET:
378380
case RedisCommand.ZADD:
379381
case RedisCommand.ZDIFFSTORE:
380382
case RedisCommand.ZINTERSTORE:

src/StackExchange.Redis/Experiments.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ internal static class Experiments
1212
public const string VectorSets = "SER001";
1313
// ReSharper disable once InconsistentNaming
1414
public const string Server_8_4 = "SER002";
15+
// ReSharper disable once InconsistentNaming
16+
public const string Server_8_6 = "SER003";
1517
}
1618
}
1719

src/StackExchange.Redis/Interfaces/IDatabase.cs

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2662,7 +2662,27 @@ IEnumerable<SortedSetEntry> SortedSetScan(
26622662
/// <remarks><seealso href="https://redis.io/commands/xadd"/></remarks>
26632663
#pragma warning disable RS0026 // different shape
26642664
RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None);
2665-
#pragma warning restore RS0026
2665+
2666+
/// <summary>
2667+
/// Adds an entry using the specified values to the given stream key.
2668+
/// If key does not exist, a new key holding a stream is created.
2669+
/// The command returns the ID of the newly created stream entry, using
2670+
/// the idempotent id (pid/iid) mechanism to ensure at-most-once production.
2671+
/// See <see cref="StreamIdempotentId"/> for more information of the idempotent API.
2672+
/// </summary>
2673+
/// <param name="key">The key of the stream.</param>
2674+
/// <param name="streamField">The field name for the stream entry.</param>
2675+
/// <param name="streamValue">The value to set in the stream entry.</param>
2676+
/// <param name="idempotentId">The idempotent producer (pid) and optionally id (iid) to use for this entry.</param>
2677+
/// <param name="maxLength">The maximum length of the stream.</param>
2678+
/// <param name="useApproximateMaxLength">If true, the "~" argument is used to allow the stream to exceed max length by a small number. This improves performance when removing messages.</param>
2679+
/// <param name="limit">Specifies the maximal count of entries that will be evicted.</param>
2680+
/// <param name="trimMode">Determines how stream trimming should be performed.</param>
2681+
/// <param name="flags">The flags to use for this operation.</param>
2682+
/// <returns>The ID of the newly created message.</returns>
2683+
/// <remarks><seealso href="https://redis.io/commands/xadd"/></remarks>
2684+
[Experimental(Experiments.Server_8_6, UrlFormat = Experiments.UrlFormat)]
2685+
RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None);
26662686

26672687
/// <summary>
26682688
/// Adds an entry using the specified values to the given stream key.
@@ -2679,10 +2699,38 @@ IEnumerable<SortedSetEntry> SortedSetScan(
26792699
/// <param name="flags">The flags to use for this operation.</param>
26802700
/// <returns>The ID of the newly created message.</returns>
26812701
/// <remarks><seealso href="https://redis.io/commands/xadd"/></remarks>
2682-
#pragma warning disable RS0026 // different shape
26832702
RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None);
2703+
2704+
/// <summary>
2705+
/// Adds an entry using the specified values to the given stream key.
2706+
/// If key does not exist, a new key holding a stream is created.
2707+
/// The command returns the ID of the newly created stream entry, using
2708+
/// the idempotent id (pid/iid) mechanism to ensure at-most-once production.
2709+
/// See <see cref="StreamIdempotentId"/> for more information of the idempotent API.
2710+
/// </summary>
2711+
/// <param name="key">The key of the stream.</param>
2712+
/// <param name="streamPairs">The fields and their associated values to set in the stream entry.</param>
2713+
/// <param name="idempotentId">The idempotent producer (pid) and optionally id (iid) to use for this entry.</param>
2714+
/// <param name="maxLength">The maximum length of the stream.</param>
2715+
/// <param name="useApproximateMaxLength">If true, the "~" argument is used to allow the stream to exceed max length by a small number. This improves performance when removing messages.</param>
2716+
/// <param name="limit">Specifies the maximal count of entries that will be evicted.</param>
2717+
/// <param name="trimMode">Determines how stream trimming should be performed.</param>
2718+
/// <param name="flags">The flags to use for this operation.</param>
2719+
/// <returns>The ID of the newly created message.</returns>
2720+
/// <remarks><seealso href="https://redis.io/commands/xadd"/></remarks>
2721+
[Experimental(Experiments.Server_8_6, UrlFormat = Experiments.UrlFormat)]
2722+
RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None);
26842723
#pragma warning restore RS0026
26852724

2725+
/// <summary>
2726+
/// Configures a stream, in particular the IDMP map.
2727+
/// </summary>
2728+
/// <param name="key">The key of the stream.</param>
2729+
/// <param name="configuration">The configuration to apply.</param>
2730+
/// <param name="flags">The flags to use for this operation.</param>
2731+
[Experimental(Experiments.Server_8_6, UrlFormat = Experiments.UrlFormat)]
2732+
void StreamConfigure(RedisKey key, StreamConfiguration configuration, CommandFlags flags = CommandFlags.None);
2733+
26862734
/// <summary>
26872735
/// Change ownership of messages consumed, but not yet acknowledged, by a different consumer.
26882736
/// Messages that have been idle for more than <paramref name="minIdleTimeInMs"/> will be claimed.

src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using System.Collections.Generic;
33
using System.ComponentModel;
44
using System.Diagnostics.CodeAnalysis;
5+
using System.IO;
56
using System.Net;
67
using System.Threading.Tasks;
78

@@ -655,8 +656,20 @@ IAsyncEnumerable<SortedSetEntry> SortedSetScanAsync(
655656

656657
/// <inheritdoc cref="IDatabase.StreamAdd(RedisKey, NameValueEntry[], RedisValue?, long?, bool, long?, StreamTrimMode, CommandFlags)"/>
657658
Task<RedisValue> StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None);
659+
660+
/// <inheritdoc cref="IDatabase.StreamAdd(RedisKey, RedisValue, RedisValue, StreamIdempotentId, long?, bool, long?, StreamTrimMode, CommandFlags)"/>
661+
[Experimental(Experiments.Server_8_6, UrlFormat = Experiments.UrlFormat)]
662+
Task<RedisValue> StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None);
663+
664+
/// <inheritdoc cref="IDatabase.StreamAdd(RedisKey, NameValueEntry[], StreamIdempotentId, long?, bool, long?, StreamTrimMode, CommandFlags)"/>
665+
[Experimental(Experiments.Server_8_6, UrlFormat = Experiments.UrlFormat)]
666+
Task<RedisValue> StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None);
658667
#pragma warning restore RS0026
659668

669+
/// <inheritdoc cref="IDatabase.StreamConfigure(RedisKey, StreamConfiguration, CommandFlags)"/>
670+
[Experimental(Experiments.Server_8_6, UrlFormat = Experiments.UrlFormat)]
671+
Task StreamConfigureAsync(RedisKey key, StreamConfiguration configuration, CommandFlags flags = CommandFlags.None);
672+
660673
/// <inheritdoc cref="IDatabase.StreamAutoClaim(RedisKey, RedisValue, RedisValue, long, RedisValue, int?, CommandFlags)"/>
661674
Task<StreamAutoClaimResult> StreamAutoClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None);
662675

src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -621,6 +621,15 @@ public Task<RedisValue> StreamAddAsync(RedisKey key, RedisValue streamField, Red
621621
public Task<RedisValue> StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) =>
622622
Inner.StreamAddAsync(ToInner(key), streamPairs, messageId, maxLength, useApproximateMaxLength, limit, mode, flags);
623623

624+
public Task<RedisValue> StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) =>
625+
Inner.StreamAddAsync(ToInner(key), streamField, streamValue, idempotentId, maxLength, useApproximateMaxLength, limit, mode, flags);
626+
627+
public Task<RedisValue> StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, StreamIdempotentId idempotentId, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) =>
628+
Inner.StreamAddAsync(ToInner(key), streamPairs, idempotentId, maxLength, useApproximateMaxLength, limit, mode, flags);
629+
630+
public Task StreamConfigureAsync(RedisKey key, StreamConfiguration configuration, CommandFlags flags = CommandFlags.None) =>
631+
Inner.StreamConfigureAsync(ToInner(key), configuration, flags);
632+
624633
public Task<StreamAutoClaimResult> StreamAutoClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None) =>
625634
Inner.StreamAutoClaimAsync(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, startAtId, count, flags);
626635

0 commit comments

Comments
 (0)