Skip to content

Commit 81f5064

Browse files
committed
Enhance resiliency for producer
1 parent 6026317 commit 81f5064

5 files changed

Lines changed: 89 additions & 33 deletions

File tree

src/DotPulsar.Extensions.Resiliency/Abstractions/IDeadLetterPolicy.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ namespace DotPulsar.Abstractions;
1616

1717
public interface IDeadLetterPolicy
1818
{
19-
ValueTask ReconsumeLater(IMessage message, TimeSpan? delayTime = null, IEnumerable<KeyValuePair<string, string?>>? customProperties = null, bool preventRetry = false, CancellationToken cancellationToken = default);
19+
ValueTask ReconsumeLater(IMessage message, TimeSpan? delayTime = null, IEnumerable<KeyValuePair<string, string?>>? customRetryProperties = null, IEnumerable<KeyValuePair<string, string?>>? customDlqProperties = null, bool preventRetry = false, CancellationToken cancellationToken = default);
2020
}

src/DotPulsar.Extensions.Resiliency/Exceptions/ResilientProducerDisposedException.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@
1212
// * limitations under the License.
1313
// */
1414

15-
using System.Runtime.Serialization;
16-
1715
namespace DotPulsar.Exceptions;
1816

1917
public sealed class ResilientProducerDisposedException : ObjectDisposedException

src/DotPulsar.Extensions.Resiliency/Extensions/DeadLetterFailureHandler.cs

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,27 +22,28 @@ public class DeadLetterFailureHandler : IConsumerFailureHandler
2222
private readonly Func<IMessage, Exception, bool>? retryExceptionHandler;
2323
private readonly Func<IMessage, Exception, TimeSpan?>? delayTimeSelector;
2424
private readonly Func<Exception, IEnumerable<KeyValuePair<string, string?>>> exceptionSerializer;
25+
private readonly Func<IMessage, Exception, IEnumerable<KeyValuePair<string, string?>>>? customPropertyProvider;
2526

2627
public DeadLetterFailureHandler(
2728
IDeadLetterPolicy deadLetterPolicy,
2829
Func<IMessage, Exception, bool>? retryExceptionHandler = null,
2930
Func<Exception, IEnumerable<KeyValuePair<string, string?>>>? exceptionSerializer = null,
31+
Func<IMessage, Exception, IEnumerable<KeyValuePair<string, string?>>>? customPropertyProvider = null,
3032
Func<IMessage, Exception, TimeSpan?>? delayTimeSelector = null) {
3133
this.deadLetterPolicy = deadLetterPolicy ?? throw new ArgumentNullException(nameof(deadLetterPolicy));
3234
this.retryExceptionHandler = retryExceptionHandler;
3335
this.delayTimeSelector = delayTimeSelector;
34-
this.exceptionSerializer = exceptionSerializer ?? SerializeException;
35-
36-
static IEnumerable<KeyValuePair<string, string?>> SerializeException(Exception exception) {
37-
yield return new("EXCEPTION_TYPE", exception.GetType().FullName);
38-
yield return new("EXCEPTION_MESSAGE", exception.Message);
39-
yield return new("STACK_TRACE", exception.StackTrace);
40-
}
36+
this.exceptionSerializer = exceptionSerializer ?? DeadLetterPolicy.SerializeException;
37+
this.customPropertyProvider = customPropertyProvider;
4138
}
4239

4340
public ValueTask HandleAsync(IMessage message, Exception exception, CancellationToken cancellationToken) {
44-
var preventRetry = retryExceptionHandler == null || !retryExceptionHandler(message, exception);
45-
var properties = exceptionSerializer(exception);
46-
return deadLetterPolicy.ReconsumeLater(message, delayTime: delayTimeSelector?.Invoke(message, exception), customProperties: properties, preventRetry: preventRetry, cancellationToken: cancellationToken);
41+
return deadLetterPolicy.ReconsumeLater(
42+
message,
43+
delayTime: delayTimeSelector?.Invoke(message, exception),
44+
customRetryProperties: customPropertyProvider?.Invoke(message, exception),
45+
customDlqProperties: exceptionSerializer(exception),
46+
preventRetry: retryExceptionHandler == null || !retryExceptionHandler(message, exception),
47+
cancellationToken: cancellationToken);
4748
}
4849
}

src/DotPulsar.Extensions.Resiliency/Extensions/DeadLetterPolicy.cs

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ public class DeadLetterPolicy : IDeadLetterPolicy, IAsyncDisposable
2727
public const string RealTopicMetadataKey = "REAL_TOPIC";
2828
public const string RealSubscriptionMetadataKey = "REAL_SUBSCRIPTION";
2929
public const string OriginMessageIdMetadataKey = "ORIGIN_MESSAGE_ID";
30+
public const string ExceptionTypeMetadataKey = "EXCEPTION_TYPE";
31+
public const string ExceptionMessageMetadataKey = "EXCEPTION_MESSAGE";
32+
public const string StackTraceMetadataKey = "STACK_TRACE";
3033
public const int DefaultMaxReconsumeTimes = 16;
3134
public const string RetryTopicSuffix = "-RETRY";
3235
public const string DeadLetterTopicSuffix = "-DLQ";
@@ -102,7 +105,7 @@ public async ValueTask DisposeAsync() {
102105
GC.SuppressFinalize(this);
103106
}
104107

105-
public async ValueTask ReconsumeLater(IMessage message, TimeSpan? delayTime = null, IEnumerable<KeyValuePair<string, string?>>? customProperties = null, bool preventRetry = false, CancellationToken cancellationToken = default) {
108+
public async ValueTask ReconsumeLater(IMessage message, TimeSpan? delayTime = null, IEnumerable<KeyValuePair<string, string?>>? customRetryProperties = null, IEnumerable<KeyValuePair<string, string?>>? customDlqProperties = null, bool preventRetry = false, CancellationToken cancellationToken = default) {
106109
#if NET6_0_OR_GREATER
107110
ArgumentNullException.ThrowIfNull(message);
108111
#else
@@ -111,7 +114,7 @@ public async ValueTask ReconsumeLater(IMessage message, TimeSpan? delayTime = nu
111114
}
112115
#endif
113116

114-
var metadata = PrepareMetadata(message, delayTime ?? RetryDelay, customProperties);
117+
var metadata = PrepareMetadata(message, delayTime ?? RetryDelay, customRetryProperties);
115118
if (retryProducer != null && !preventRetry) {
116119
var reconsumeTimes = GetReconsumeAndUpdate(metadata);
117120
if (reconsumeTimes <= MaxRedeliveryCount) {
@@ -128,6 +131,7 @@ public async ValueTask ReconsumeLater(IMessage message, TimeSpan? delayTime = nu
128131
}
129132

130133
if (deadLetterProducer != null) {
134+
AddProperties(metadata, customDlqProperties);
131135
await deadLetterProducer(metadata, message.Data, cancellationToken).ConfigureAwait(false);
132136
}
133137

@@ -166,15 +170,18 @@ static MessageMetadata PrepareMetadata(IMessage message, TimeSpan? delayTime, IE
166170
metadata[RealTopicMetadataKey] = originTopicNameStr;
167171
metadata[OriginMessageIdMetadataKey] = message.MessageId.ToString();
168172

173+
AddProperties(metadata, customProperties);
174+
return metadata;
175+
}
176+
177+
static void AddProperties(MessageMetadata metadata, IEnumerable<KeyValuePair<string, string?>>? customProperties) {
169178
if (customProperties != null) {
170179
foreach (var property in customProperties) {
171180
if (property.Value != null) {
172181
metadata[property.Key] = property.Value;
173182
}
174183
}
175184
}
176-
177-
return metadata;
178185
}
179186

180187
static int GetReconsumeAndUpdate(MessageMetadata metadata) {
@@ -187,4 +194,17 @@ static int GetReconsumeAndUpdate(MessageMetadata metadata) {
187194
return reconsumeTimes;
188195
}
189196
}
197+
198+
public static IEnumerable<KeyValuePair<string, string?>> SerializeException(Exception exception) {
199+
#if NET6_0_OR_GREATER
200+
ArgumentNullException.ThrowIfNull(exception);
201+
#else
202+
if (exception == null) {
203+
throw new ArgumentNullException(nameof(exception));
204+
}
205+
#endif
206+
yield return new(ExceptionTypeMetadataKey, exception.GetType().FullName);
207+
yield return new(ExceptionMessageMetadataKey, exception.Message);
208+
yield return new(StackTraceMetadataKey, exception.StackTrace);
209+
}
190210
}

src/DotPulsar.Extensions.Resiliency/Internal/ResilientProducer.cs

Lines changed: 53 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
using DotPulsar.Abstractions;
1616
using DotPulsar.Exceptions;
17+
using DotPulsar.Internal.Exceptions;
1718
using Polly;
1819

1920
namespace DotPulsar.Internal;
@@ -24,15 +25,15 @@ public sealed class ResilientProducer<TMessage> : IProducer<TMessage>
2425
private readonly ResiliencePipeline resiliencePipeline;
2526
private bool disposed;
2627
#pragma warning disable CA2213
27-
private IProducer<TMessage>? producer;
28+
private IProducer<TMessage>? currentProducer;
2829
#pragma warning restore CA2213
2930

3031
public ResilientProducer(IProducerBuilder<TMessage> producerBuilder, ResiliencePipeline? resiliencePipeline = null) {
3132
this.producerBuilder = producerBuilder ?? throw new ArgumentNullException(nameof(producerBuilder));
3233
this.resiliencePipeline = resiliencePipeline ?? ResiliencePipeline.Empty;
33-
producer = GetOrCreateProducer();
34-
ServiceUrl = producer.ServiceUrl;
35-
Topic = producer.Topic;
34+
currentProducer = GetOrCreateProducer();
35+
ServiceUrl = currentProducer.ServiceUrl;
36+
Topic = currentProducer.Topic;
3637
State = new ResilienceState(this);
3738
SendChannel = new ResilienceSendChannel(this);
3839
}
@@ -54,43 +55,61 @@ public ISendChannel<TMessage> SendChannel {
5455
}
5556

5657
public ValueTask<MessageId> Send(MessageMetadata metadata, TMessage message, CancellationToken cancellationToken = new CancellationToken()) {
58+
ThrowIfDisposed();
5759
return resiliencePipeline.ExecuteAsync(static (state, ct) => {
5860
var (topicProducer, message, metadata) = state;
5961
return topicProducer.GetOrCreateProducer().Send(metadata, message, ct);
6062
}, (this, message, metadata), cancellationToken);
6163
}
6264

6365
private IProducer<TMessage> GetOrCreateProducer() {
64-
if (disposed) {
65-
throw new ResilientProducerDisposedException(GetType().FullName);
66-
}
66+
ThrowIfDisposed();
6767

68-
var current = producer;
68+
var current = currentProducer;
6969
if (current != null) {
7070
return current;
7171
}
7272

7373
var created = producerBuilder.Create();
74-
var result = Interlocked.CompareExchange(ref producer, created, null);
74+
var result = Interlocked.CompareExchange(ref currentProducer, created, null);
7575
if (result == null) {
7676
_ = StateMonitor.MonitorProducer(created, new StateChangedHandler(this));
7777
return created;
7878
}
7979

8080
if (!ReferenceEquals(result, created)) {
81-
created.DisposeAsync().AsTask().Wait();
81+
DisposeProducer(created);
8282
}
8383

8484
return result;
85+
86+
static void DisposeProducer(IProducer producer) {
87+
var task = DisposeAndIgnoreException(producer);
88+
if (!task.IsCompleted) {
89+
try {
90+
task.AsTask().Wait();
91+
#pragma warning disable CA1031
92+
} catch (Exception) {
93+
#pragma warning restore CA1031
94+
// Ignore
95+
}
96+
}
97+
}
98+
}
99+
100+
private void ThrowIfDisposed() {
101+
if (disposed) {
102+
throw new ResilientProducerDisposedException("DotPulsar.Internal.ResilientProducer");
103+
}
85104
}
86105

87106
#pragma warning disable CA1816
88107
public ValueTask DisposeAsync() {
89108
if (!disposed) {
90109
disposed = true;
91-
var last = Interlocked.Exchange(ref producer, null);
110+
var last = Interlocked.Exchange(ref currentProducer, null);
92111
if (last != null) {
93-
return last.DisposeAsync();
112+
return DisposeAndIgnoreException(last);
94113
}
95114
}
96115
#if NET6_0_OR_GREATER
@@ -101,6 +120,16 @@ public ValueTask DisposeAsync() {
101120
}
102121
#pragma warning restore CA1816
103122

123+
private static async ValueTask DisposeAndIgnoreException(IProducer producer) {
124+
try {
125+
await producer.DisposeAsync().ConfigureAwait(false);
126+
#pragma warning disable CA1031
127+
} catch (Exception) {
128+
#pragma warning restore CA1031
129+
// Ignore
130+
}
131+
}
132+
104133
private sealed class StateChangedHandler : IHandleStateChanged<ProducerStateChanged>
105134
{
106135
private readonly ResilientProducer<TMessage> instance;
@@ -111,9 +140,9 @@ public StateChangedHandler(ResilientProducer<TMessage> instance) {
111140

112141
public ValueTask OnStateChanged(ProducerStateChanged stateChanged, CancellationToken cancellationToken = new CancellationToken()) {
113142
if (stateChanged.Producer.State.IsFinalState(stateChanged.ProducerState)) {
114-
var toDispose = Interlocked.CompareExchange(ref instance.producer, null, (IProducer<TMessage>)stateChanged.Producer);
143+
var toDispose = Interlocked.CompareExchange(ref instance.currentProducer, null, (IProducer<TMessage>)stateChanged.Producer);
115144
if (toDispose != null) {
116-
return toDispose.DisposeAsync();
145+
return DisposeAndIgnoreException(toDispose);
117146
}
118147
}
119148

@@ -140,13 +169,15 @@ public ResilienceState(ResilientProducer<TMessage> producer) {
140169
public bool IsFinalState(ProducerState state) => producer.GetOrCreateProducer().State.IsFinalState(state);
141170

142171
public ValueTask<ProducerState> OnStateChangeTo(ProducerState state, CancellationToken cancellationToken) {
172+
producer.ThrowIfDisposed();
143173
return producer.resiliencePipeline.ExecuteAsync(async static (args, ct) => {
144174
var (instance, state) = args;
145175
return await instance.GetOrCreateProducer().State.OnStateChangeTo(state, ct).ConfigureAwait(false);
146176
}, (producer, state), cancellationToken);
147177
}
148178

149179
public ValueTask<ProducerState> OnStateChangeFrom(ProducerState state, CancellationToken cancellationToken = new CancellationToken()) {
180+
producer.ThrowIfDisposed();
150181
return producer.resiliencePipeline.ExecuteAsync(async static (args, ct) => {
151182
var (instance, state) = args;
152183
return await instance.GetOrCreateProducer().State.OnStateChangeFrom(state, ct).ConfigureAwait(false);
@@ -157,12 +188,17 @@ public ValueTask<ProducerState> OnStateChangeTo(ProducerState state, Cancellatio
157188
private sealed class ResilienceSendChannel : ISendChannel<TMessage>
158189
{
159190
private readonly ResilientProducer<TMessage> producer;
191+
private int isCompleted;
160192

161193
public ResilienceSendChannel(ResilientProducer<TMessage> producer) {
162194
this.producer = producer;
163195
}
164196

165197
public ValueTask Send(MessageMetadata metadata, TMessage message, Func<MessageId, ValueTask>? onMessageSent, CancellationToken cancellationToken) {
198+
if (isCompleted != 0) {
199+
throw new SendChannelCompletedException();
200+
}
201+
producer.ThrowIfDisposed();
166202
return producer.resiliencePipeline.ExecuteAsync(static (state, ct) => {
167203
var (sendChannel, message, metadata, onMessageSent) = state;
168204
var producer = sendChannel.producer.GetOrCreateProducer();
@@ -171,11 +207,12 @@ public ValueTask Send(MessageMetadata metadata, TMessage message, Func<MessageId
171207
}
172208

173209
public void Complete() {
174-
producer.producer?.SendChannel.Complete();
210+
isCompleted = 1;
211+
producer.currentProducer?.SendChannel.Complete();
175212
}
176213

177214
public ValueTask Completion(CancellationToken cancellationToken) {
178-
return producer.producer?.SendChannel.Completion(cancellationToken) ?? default;
215+
return producer.currentProducer?.SendChannel.Completion(cancellationToken) ?? default;
179216
}
180217
}
181218
}

0 commit comments

Comments
 (0)