Skip to content

Commit 4fcff47

Browse files
authored
fix: drain pending stream output after interrupt in ClaudeCodeSdk (#11)
- keep reading ReceiveResponseAsync during cancellation to flush buffered messages - trigger InterruptAsync from cancellation callback instead of aborting the reader token - stop disposing the client inside ClaudeSdkClient.InterruptAsync so buffered output is not lost to the next query (cherry picked from commit 2f1d4ec9a0cc27ce5741a10d6ff6afd3ae277485) Signed-off-by: zxyao145 <[email protected]>
1 parent 7dee948 commit 4fcff47

2 files changed

Lines changed: 28 additions & 16 deletions

File tree

src/ClaudeCodeSdk.MAF/ClaudeCodeAIAgent.cs

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -100,11 +100,6 @@ protected override async Task<AgentResponse> RunCoreAsync(
100100

101101
await foreach (var claudeMessage in asyncEnumMsgs)
102102
{
103-
if (client != null && cancellationToken.IsCancellationRequested)
104-
{
105-
await InterruptAsync(client);
106-
}
107-
108103
if (claudeMessage is ResultMessage resultMessage)
109104
{
110105
usageDetails = resultMessage.ToUsageDetails();
@@ -150,7 +145,7 @@ protected override async IAsyncEnumerable<AgentResponseUpdate> RunCoreStreamingA
150145
if (!string.IsNullOrWhiteSpace(content))
151146
{
152147
var (asyncEnumMsgs, client) = await SendUserInput(claudeThread, content, cancellationToken);
153-
148+
154149
if (client != null && cancellationToken.IsCancellationRequested)
155150
{
156151
await InterruptAsync(client);
@@ -160,11 +155,6 @@ protected override async IAsyncEnumerable<AgentResponseUpdate> RunCoreStreamingA
160155
// Receive and yield responses
161156
await foreach (var claudeMessage in asyncEnumMsgs)
162157
{
163-
if (client != null && cancellationToken.IsCancellationRequested)
164-
{
165-
await InterruptAsync(client);
166-
}
167-
168158
var update = claudeMessage.ToAgentRunResponseUpdate();
169159
if (update != null)
170160
{
@@ -203,15 +193,37 @@ private async Task InterruptAsync(ClaudeSdkClient client)
203193
}
204194
else
205195
{
206-
client = await _clientManager.GetClientAsync(claudeThread, cancellationToken);
196+
client = await _clientManager.GetClientAsync(claudeThread, CancellationToken.None);
207197

208198
await client.QueryAsync(content,
209199
sessionId: claudeThread.SessionId.ToString(),
210-
cancellationToken: cancellationToken);
200+
cancellationToken: CancellationToken.None);
211201

212-
asyncEnumMsgs = client.ReceiveResponseAsync(cancellationToken);
202+
asyncEnumMsgs = client.ReceiveResponseAsync(CancellationToken.None);
213203
}
214204

205+
//
206+
var interruptRequested = 0;
207+
using var cancellationRegistration = cancellationToken.Register(() =>
208+
{
209+
if (client == null) return;
210+
if (Interlocked.Exchange(ref interruptRequested, 1) != 0)
211+
return;
212+
213+
_ = Task.Run(async () =>
214+
{
215+
try
216+
{
217+
await InterruptAsync(client);
218+
}
219+
catch (Exception ex)
220+
{
221+
_logger?.LogDebug(ex, "Failed to interrupt Claude SDK client during streaming cancellation");
222+
}
223+
});
224+
});
225+
226+
215227
return (asyncEnumMsgs, client);
216228
}
217229

src/ClaudeCodeSdk/ClaudeSDKClient.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ public async Task InterruptAsync(CancellationToken cancellationToken = default)
120120

121121
ConnectStatus = ConnectStatus.DisConnecting;
122122
await _process.InterruptAsync();
123-
await this.DisposeAsync();
123+
ConnectStatus = ConnectStatus.DisConnected;
124124
}
125125

126126
/// <summary>
@@ -171,4 +171,4 @@ public async ValueTask DisposeAsync()
171171
}
172172
GC.SuppressFinalize(this);
173173
}
174-
}
174+
}

0 commit comments

Comments
 (0)