Skip to content

Commit 285eeea

Browse files
committed
fail action async
Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent f3b131c commit 285eeea

File tree

5 files changed

+40
-2
lines changed

5 files changed

+40
-2
lines changed

RabbitMQ.Stream.Client/ICrc32.cs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,20 @@
66

77
namespace RabbitMQ.Stream.Client
88
{
9+
public struct ChunkInfo
10+
{
11+
/// <summary>
12+
/// The stream name of the chunk.
13+
/// </summary>
14+
public string StreamName { get; init; }
15+
16+
public ulong Id { get; init; }
17+
18+
public uint ServerHash { get; init; }
19+
20+
public uint LocalHash { get; init; }
21+
}
22+
923
public enum ChunkAction
1024
{
1125
/// <summary>
@@ -37,5 +51,7 @@ public interface ICrc32
3751
/// The code here should be safe
3852
/// </summary>
3953
Func<IConsumer, ChunkAction> FailAction { get; init; }
54+
55+
Func<IConsumer, ChunkInfo, ChunkAction> AsyncFailAction { get; init; }
4056
}
4157
}

RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,16 @@ RabbitMQ.Stream.Client.Chunk.MagicVersion.get -> byte
3838
RabbitMQ.Stream.Client.ChunkAction
3939
RabbitMQ.Stream.Client.ChunkAction.Skip = 1 -> RabbitMQ.Stream.Client.ChunkAction
4040
RabbitMQ.Stream.Client.ChunkAction.TryToProcess = 0 -> RabbitMQ.Stream.Client.ChunkAction
41+
RabbitMQ.Stream.Client.ChunkInfo
42+
RabbitMQ.Stream.Client.ChunkInfo.ChunkInfo() -> void
43+
RabbitMQ.Stream.Client.ChunkInfo.Id.get -> ulong
44+
RabbitMQ.Stream.Client.ChunkInfo.Id.init -> void
45+
RabbitMQ.Stream.Client.ChunkInfo.LocalHash.get -> uint
46+
RabbitMQ.Stream.Client.ChunkInfo.LocalHash.init -> void
47+
RabbitMQ.Stream.Client.ChunkInfo.ServerHash.get -> uint
48+
RabbitMQ.Stream.Client.ChunkInfo.ServerHash.init -> void
49+
RabbitMQ.Stream.Client.ChunkInfo.StreamName.get -> string
50+
RabbitMQ.Stream.Client.ChunkInfo.StreamName.init -> void
4151
RabbitMQ.Stream.Client.Client.ClientId.get -> string
4252
RabbitMQ.Stream.Client.Client.ClientId.init -> void
4353
RabbitMQ.Stream.Client.Client.Consumers.get -> System.Collections.Generic.IDictionary<byte, (string, RabbitMQ.Stream.Client.ConsumerEvents)>
@@ -186,6 +196,8 @@ RabbitMQ.Stream.Client.IConsumerConfig.FlowControl.set -> void
186196
RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.get -> ushort
187197
RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.set -> void
188198
RabbitMQ.Stream.Client.ICrc32
199+
RabbitMQ.Stream.Client.ICrc32.AsyncFailAction.get -> System.Func<RabbitMQ.Stream.Client.IConsumer, RabbitMQ.Stream.Client.ChunkInfo, RabbitMQ.Stream.Client.ChunkAction>
200+
RabbitMQ.Stream.Client.ICrc32.AsyncFailAction.init -> void
189201
RabbitMQ.Stream.Client.ICrc32.FailAction.get -> System.Func<RabbitMQ.Stream.Client.IConsumer, RabbitMQ.Stream.Client.ChunkAction>
190202
RabbitMQ.Stream.Client.ICrc32.FailAction.init -> void
191203
RabbitMQ.Stream.Client.ICrc32.Hash(byte[] data) -> byte[]
@@ -339,6 +351,8 @@ RabbitMQ.Stream.Client.RoutingStrategyType
339351
RabbitMQ.Stream.Client.RoutingStrategyType.Hash = 0 -> RabbitMQ.Stream.Client.RoutingStrategyType
340352
RabbitMQ.Stream.Client.RoutingStrategyType.Key = 1 -> RabbitMQ.Stream.Client.RoutingStrategyType
341353
RabbitMQ.Stream.Client.StreamCrc32
354+
RabbitMQ.Stream.Client.StreamCrc32.AsyncFailAction.get -> System.Func<RabbitMQ.Stream.Client.IConsumer, RabbitMQ.Stream.Client.ChunkInfo, RabbitMQ.Stream.Client.ChunkAction>
355+
RabbitMQ.Stream.Client.StreamCrc32.AsyncFailAction.init -> void
342356
RabbitMQ.Stream.Client.StreamCrc32.FailAction.get -> System.Func<RabbitMQ.Stream.Client.IConsumer, RabbitMQ.Stream.Client.ChunkAction>
343357
RabbitMQ.Stream.Client.StreamCrc32.FailAction.init -> void
344358
RabbitMQ.Stream.Client.StreamCrc32.Hash(byte[] data) -> byte[]

RabbitMQ.Stream.Client/RawConsumer.cs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,6 @@ internal void Validate()
107107
}
108108

109109
FlowControl ??= new FlowControl();
110-
111110
}
112111

113112
internal bool IsFiltering => ConsumerFilter is { Values.Count: > 0 };
@@ -642,7 +641,14 @@ private async Task Init()
642641
// if the user has set the FailAction, we call it
643642
// to allow the user to handle the chunk action
644643
// if the FailAction is not set, we skip the chunk
645-
chunkAction = _config.Crc32.FailAction?.Invoke(this) ?? ChunkAction.Skip;
644+
chunkAction = _config.Crc32.AsyncFailAction?.Invoke(this,
645+
new ChunkInfo()
646+
{
647+
Id = deliver.Chunk.ChunkId,
648+
ServerHash = deliver.Chunk.Crc,
649+
LocalHash = crcCalculated,
650+
StreamName = _config.Stream
651+
}) ?? ChunkAction.Skip;
646652
}
647653
}
648654

RabbitMQ.Stream.Client/StreamCrc32.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,5 @@ public byte[] Hash(byte[] data)
2020
}
2121

2222
public Func<IConsumer, ChunkAction> FailAction { get; init; } = null;
23+
public Func<IConsumer, ChunkInfo, ChunkAction> AsyncFailAction { get; init; }
2324
}

Tests/Crc32Tests.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ public byte[] Hash(byte[] data)
2727
}
2828

2929
public Func<IConsumer, ChunkAction> FailAction { get; init; }
30+
public Func<IConsumer, ChunkInfo, ChunkAction> AsyncFailAction { get; init; }
3031
}
3132

3233
public class Crc32Tests(ITestOutputHelper testOutputHelper)

0 commit comments

Comments
 (0)