Skip to content

Commit 59ce319

Browse files
vazoisCopilot
andcommitted
Fix dispose hang in network handler and buffer pool cleanup
Two bugs caused LimitedFixedBufferPool.Dispose() to hang indefinitely during server teardown (blocking ClusterResetHardDuringDisklessReplicationAttach): 1. TcpNetworkHandlerBase.Dispose() never called DisposeImpl(), so when a handler thread was blocked synchronously (e.g. in TryBeginDisklessSync), the CTS was never cancelled and activeHandlerCount was never decremented. DisposeActiveHandlers() would spin forever waiting for it to reach 0. 2. GarnetTcpNetworkSender.DisposeNetworkSender() disposed the saeaStack but not the current responseObject, leaking a PoolEntry that was never returned to the pool. LimitedFixedBufferPool.Dispose() then spun forever waiting for totalReferences to reach 0. Also adds PoolEntry source tracking infrastructure (PoolEntryBufferType and PoolOwnerType enums) with DEBUG-only diagnostics that log unreturned buffer details after a 5-second timeout during pool disposal. Co-authored-by: Copilot <[email protected]>
1 parent 16d1a6d commit 59ce319

File tree

12 files changed

+152
-27
lines changed

12 files changed

+152
-27
lines changed

libs/client/ClientSession/GarnetClientSession.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ public GarnetClientSession(
127127

128128
this.usingManagedNetworkPool = networkPool != null;
129129
this.networkBufferSettings = networkBufferSettings;
130-
this.networkPool = networkPool ?? networkBufferSettings.CreateBufferPool();
130+
this.networkPool = networkPool ?? networkBufferSettings.CreateBufferPool(ownerType: PoolOwnerType.ClientSession);
131131
this.bufferSizeDigits = NumUtils.CountDigits(this.networkBufferSettings.sendBufferSize);
132132

133133
this.logger = logger;

libs/cluster/Server/Replication/ReplicationManager.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ public ReplicationManager(ClusterProvider clusterProvider, ILogger logger = null
102102
this.pageSizeBits = storeWrapper.appendOnlyFile == null ? 0 : storeWrapper.appendOnlyFile.UnsafeGetLogPageSizeBits();
103103

104104
networkBufferSettings.Log(logger, nameof(ReplicationManager));
105-
this.networkPool = networkBufferSettings.CreateBufferPool(logger: logger);
105+
this.networkPool = networkBufferSettings.CreateBufferPool(logger: logger, ownerType: PoolOwnerType.Replication);
106106
ValidateNetworkBufferSettings();
107107

108108
aofProcessor = new AofProcessor(storeWrapper, recordToAof: false, clusterProvider: clusterProvider, logger: logger);

libs/common/Memory/LimitedFixedBufferPool.cs

Lines changed: 57 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Licensed under the MIT license.
33

44
using System;
5+
using System.Collections.Concurrent;
56
using System.Diagnostics;
67
using System.Numerics;
78
using System.Runtime.CompilerServices;
@@ -26,6 +27,11 @@ public sealed class LimitedFixedBufferPool : IDisposable
2627
readonly int maxAllocationSize;
2728
readonly ILogger logger;
2829

30+
/// <summary>
31+
/// Pool owner type, packed into byte 1 of each <see cref="PoolEntry.source"/>.
32+
/// </summary>
33+
readonly int ownerByte;
34+
2935
/// <summary>
3036
/// Min allocation size
3137
/// </summary>
@@ -41,16 +47,29 @@ public sealed class LimitedFixedBufferPool : IDisposable
4147
/// </summary>
4248
int totalOutOfBoundAllocations;
4349

50+
#if DEBUG
51+
/// <summary>
52+
/// Tracks all outstanding (checked-out) pool entries for leak diagnosis.
53+
/// </summary>
54+
readonly ConcurrentDictionary<PoolEntry, byte> outstandingEntries = new();
55+
56+
/// <summary>
57+
/// Timeout in milliseconds for Dispose to wait before logging outstanding entries.
58+
/// </summary>
59+
const int DisposeWaitDiagnosticMs = 5_000;
60+
#endif
61+
4462
/// <summary>
4563
/// Constructor
4664
/// </summary>
47-
public LimitedFixedBufferPool(int minAllocationSize, int maxEntriesPerLevel = 16, int numLevels = 4, ILogger logger = null)
65+
public LimitedFixedBufferPool(int minAllocationSize, int maxEntriesPerLevel = 16, int numLevels = 4, PoolOwnerType ownerType = PoolOwnerType.Unknown, ILogger logger = null)
4866
{
4967
this.minAllocationSize = minAllocationSize;
5068
this.maxAllocationSize = minAllocationSize << (numLevels - 1);
5169
this.maxEntriesPerLevel = maxEntriesPerLevel;
5270
this.numLevels = numLevels;
5371
this.logger = logger;
72+
this.ownerByte = (int)ownerType << 8;
5473
pool = new PoolLevel[numLevels];
5574
}
5675

@@ -85,6 +104,9 @@ public bool Validate(NetworkBufferSettings settings)
85104
[MethodImpl(MethodImplOptions.AggressiveInlining)]
86105
public void Return(PoolEntry buffer)
87106
{
107+
#if DEBUG
108+
outstandingEntries.TryRemove(buffer, out _);
109+
#endif
88110
var level = Position(buffer.entry.Length);
89111
if (level >= 0)
90112
{
@@ -107,9 +129,10 @@ public void Return(PoolEntry buffer)
107129
/// Get buffer
108130
/// </summary>
109131
/// <param name="size"></param>
132+
/// <param name="bufferType">Identifies the caller for leak diagnosis.</param>
110133
/// <returns></returns>
111134
[MethodImpl(MethodImplOptions.AggressiveInlining)]
112-
public unsafe PoolEntry Get(int size)
135+
public unsafe PoolEntry Get(int size, PoolEntryBufferType bufferType = PoolEntryBufferType.Unknown)
113136
{
114137
if (Interlocked.Increment(ref totalReferences) < 0)
115138
{
@@ -118,6 +141,8 @@ public unsafe PoolEntry Get(int size)
118141
return null;
119142
}
120143

144+
var source = ownerByte | (int)bufferType;
145+
121146
var level = Position(size);
122147
if (level == -1) Interlocked.Increment(ref totalOutOfBoundAllocations);
123148

@@ -132,10 +157,19 @@ public unsafe PoolEntry Get(int size)
132157
{
133158
Interlocked.Decrement(ref pool[level].size);
134159
page.Reuse();
160+
page.source = source;
161+
#if DEBUG
162+
outstandingEntries[page] = 0;
163+
#endif
135164
return page;
136165
}
137166
}
138-
return new PoolEntry(size, this);
167+
var entry = new PoolEntry(size, this);
168+
entry.source = source;
169+
#if DEBUG
170+
outstandingEntries[entry] = 0;
171+
#endif
172+
return entry;
139173
}
140174

141175
/// <summary>
@@ -157,23 +191,37 @@ public void Purge()
157191
}
158192

159193
/// <summary>
160-
/// Dipose pool entries from all levels
194+
/// Dispose pool entries from all levels
161195
/// NOTE:
162196
/// This is used to destroy the instance and reclaim all allocated buffer pool entries.
163197
/// As a consequence it spin waits until totalReferences goes back down to 0 and blocks any future allocations.
198+
/// In DEBUG builds, logs outstanding unreturned entries after a timeout for leak diagnosis.
164199
/// </summary>
165200
[MethodImpl(MethodImplOptions.AggressiveInlining)]
166201
public void Dispose()
167202
{
168-
#if HANGDETECT
169-
int count = 0;
203+
#if DEBUG
204+
var sw = Stopwatch.StartNew();
205+
var diagnosed = false;
170206
#endif
171207
while (totalReferences > int.MinValue &&
172208
Interlocked.CompareExchange(ref totalReferences, int.MinValue, 0) != 0)
173209
{
174-
#if HANGDETECT
175-
if (++count % 10000 == 0)
176-
logger?.LogTrace("Dispose iteration {count}, {activeHandlerCount}", count, activeHandlerCount);
210+
#if DEBUG
211+
if (!diagnosed && sw.ElapsedMilliseconds > DisposeWaitDiagnosticMs)
212+
{
213+
diagnosed = true;
214+
var remaining = totalReferences;
215+
var ownerType = (PoolOwnerType)(ownerByte >> 8);
216+
logger?.LogError("LimitedFixedBufferPool.Dispose blocked with {remaining} unreturned references (poolOwner={ownerType}). Outstanding entries:", remaining, ownerType);
217+
foreach (var kvp in outstandingEntries)
218+
{
219+
var entryBufferType = (PoolEntryBufferType)(kvp.Key.source & 0xFF);
220+
var entryOwnerType = (PoolOwnerType)((kvp.Key.source >> 8) & 0xFF);
221+
logger?.LogError(" Unreturned buffer: ownerType={ownerType}, bufferType={bufferType}, size={size}",
222+
entryOwnerType, entryBufferType, kvp.Key.entry.Length);
223+
}
224+
}
177225
#endif
178226
Thread.Yield();
179227
}

libs/common/Memory/PoolEntry.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,12 @@ public unsafe class PoolEntry : IDisposable
2525
readonly LimitedFixedBufferPool pool;
2626
bool disposed;
2727

28+
/// <summary>
29+
/// Packed source identifier: low byte = <see cref="PoolEntryBufferType"/>, byte 1 = <see cref="PoolOwnerType"/>.
30+
/// Set when the entry is acquired via <see cref="LimitedFixedBufferPool.Get"/>.
31+
/// </summary>
32+
internal int source;
33+
2834
/// <summary>
2935
/// Constructor
3036
/// </summary>
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT license.
3+
4+
namespace Garnet.common
5+
{
6+
/// <summary>
7+
/// Identifies the buffer role when a <see cref="PoolEntry"/> is acquired from <see cref="LimitedFixedBufferPool"/>.
8+
/// </summary>
9+
public enum PoolEntryBufferType : int
10+
{
11+
/// <summary>Default/unknown buffer type.</summary>
12+
Unknown = 0,
13+
14+
/// <summary>Initial network receive buffer (TcpNetworkHandlerBase).</summary>
15+
NetworkReceiveBuffer = 1,
16+
17+
/// <summary>Transport receive buffer for TLS (NetworkHandler).</summary>
18+
TransportReceiveBuffer = 2,
19+
20+
/// <summary>Transport send buffer for TLS (NetworkHandler).</summary>
21+
TransportSendBuffer = 3,
22+
23+
/// <summary>Doubled network receive buffer (NetworkHandler).</summary>
24+
DoubleNetworkReceiveBuffer = 4,
25+
26+
/// <summary>Shrunk network receive buffer (NetworkHandler).</summary>
27+
ShrinkNetworkReceiveBuffer = 5,
28+
29+
/// <summary>Doubled transport receive buffer for TLS (NetworkHandler).</summary>
30+
DoubleTransportReceiveBuffer = 6,
31+
32+
/// <summary>Send buffer for async socket operations (GarnetSaeaBuffer).</summary>
33+
SaeaSendBuffer = 7,
34+
}
35+
36+
/// <summary>
37+
/// Identifies the owner of a <see cref="LimitedFixedBufferPool"/> instance.
38+
/// Set at pool construction time to indicate which subsystem created the pool.
39+
/// </summary>
40+
public enum PoolOwnerType : int
41+
{
42+
/// <summary>Default/unknown owner.</summary>
43+
Unknown = 0,
44+
45+
/// <summary>Server-side network pool (GarnetServerTcp).</summary>
46+
ServerNetwork = 1,
47+
48+
/// <summary>Replication network pool (ReplicationManager).</summary>
49+
Replication = 2,
50+
51+
/// <summary>Client-side network pool (GarnetClientSession, self-managed).</summary>
52+
ClientSession = 3,
53+
}
54+
}

libs/common/NetworkBufferSettings.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,17 +73,18 @@ public static NetworkBufferSettings GetInclusive(NetworkBufferSettings[] setting
7373
/// Allocate network buffer pool
7474
/// </summary>
7575
/// <param name="maxEntriesPerLevel"></param>
76+
/// <param name="ownerType"></param>
7677
/// <param name="logger"></param>
7778
/// <returns></returns>
78-
public LimitedFixedBufferPool CreateBufferPool(int maxEntriesPerLevel = 16, ILogger logger = null)
79+
public LimitedFixedBufferPool CreateBufferPool(int maxEntriesPerLevel = 16, PoolOwnerType ownerType = PoolOwnerType.Unknown, ILogger logger = null)
7980
{
8081
var minSize = Math.Min(Math.Min(sendBufferSize, initialReceiveBufferSize), maxReceiveBufferSize);
8182
var maxSize = Math.Max(Math.Max(sendBufferSize, initialReceiveBufferSize), maxReceiveBufferSize);
8283

8384
var levels = LimitedFixedBufferPool.GetLevel(minSize, maxSize) + 1;
8485
Debug.Assert(levels >= 0);
8586
levels = Math.Max(4, levels);
86-
return new LimitedFixedBufferPool(minSize, maxEntriesPerLevel: maxEntriesPerLevel, numLevels: levels, logger: logger);
87+
return new LimitedFixedBufferPool(minSize, maxEntriesPerLevel: maxEntriesPerLevel, numLevels: levels, logger: logger, ownerType: ownerType);
8788
}
8889

8990
public void Log(ILogger logger, string category)

libs/common/Networking/GarnetSaeaBuffer.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public GarnetSaeaBuffer(EventHandler<SocketAsyncEventArgs> eventHandler, Network
3030
{
3131
socketEventAsyncArgs = new SocketAsyncEventArgs();
3232

33-
buffer = networkPool.Get(networkBufferSettings.sendBufferSize);
33+
buffer = networkPool.Get(networkBufferSettings.sendBufferSize, PoolEntryBufferType.SaeaSendBuffer);
3434
socketEventAsyncArgs.SetBuffer(buffer.entry, 0, buffer.entry.Length);
3535
socketEventAsyncArgs.Completed += eventHandler;
3636
}

libs/common/Networking/GarnetTcpNetworkSender.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,10 @@ public override void DisposeNetworkSender(bool waitForSendCompletion)
252252
// Wait for ongoing sends to complete
253253
while (throttleCount >= 0 && Interlocked.CompareExchange(ref throttleCount, int.MinValue, 0) != 0) Thread.Yield();
254254

255+
// Dispose the current response object if one is held
256+
responseObject?.Dispose();
257+
responseObject = null;
258+
255259
// Empty and dispose the stack
256260
saeaStack.Dispose();
257261

libs/common/Networking/NetworkHandler.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -129,11 +129,11 @@ public unsafe NetworkHandler(TServerHook serverHook, TNetworkSender networkSende
129129
expectingData = new SemaphoreSlim(0);
130130
cancellationTokenSource = new();
131131

132-
transportReceiveBufferEntry = this.networkPool.Get(this.networkBufferSettings.initialReceiveBufferSize);
132+
transportReceiveBufferEntry = this.networkPool.Get(this.networkBufferSettings.initialReceiveBufferSize, PoolEntryBufferType.TransportReceiveBuffer);
133133
transportReceiveBuffer = transportReceiveBufferEntry.entry;
134134
transportReceiveBufferPtr = transportReceiveBufferEntry.entryPtr;
135135

136-
transportSendBufferEntry = this.networkPool.Get(this.networkBufferSettings.sendBufferSize);
136+
transportSendBufferEntry = this.networkPool.Get(this.networkBufferSettings.sendBufferSize, PoolEntryBufferType.TransportSendBuffer);
137137
transportSendBuffer = transportSendBufferEntry.entry;
138138
transportSendBufferPtr = transportSendBufferEntry.entryPtr;
139139
}
@@ -501,7 +501,7 @@ unsafe bool TryProcessRequest()
501501

502502
unsafe void DoubleNetworkReceiveBuffer()
503503
{
504-
var tmp = networkPool.Get(networkReceiveBuffer.Length * 2);
504+
var tmp = networkPool.Get(networkReceiveBuffer.Length * 2, PoolEntryBufferType.DoubleNetworkReceiveBuffer);
505505
Array.Copy(networkReceiveBuffer, tmp.entry, networkReceiveBuffer.Length);
506506
networkReceiveBufferEntry.Dispose();
507507
networkReceiveBufferEntry = tmp;
@@ -515,7 +515,7 @@ unsafe void ShrinkNetworkReceiveBuffer()
515515
{
516516
Debug.Assert(networkReadHead == 0, "Shouldn't call if remaining data not already moved to head of receive buffer");
517517

518-
var tmp = networkPool.Get(networkBufferSettings.maxReceiveBufferSize);
518+
var tmp = networkPool.Get(networkBufferSettings.maxReceiveBufferSize, PoolEntryBufferType.ShrinkNetworkReceiveBuffer);
519519
if (networkBytesRead > 0)
520520
{
521521
Array.Copy(networkReceiveBuffer, tmp.entry, networkBytesRead);
@@ -543,7 +543,7 @@ unsafe void DoubleTransportReceiveBuffer()
543543
{
544544
if (sslStream != null)
545545
{
546-
var tmp = networkPool.Get(transportReceiveBuffer.Length * 2);
546+
var tmp = networkPool.Get(transportReceiveBuffer.Length * 2, PoolEntryBufferType.DoubleTransportReceiveBuffer);
547547
Array.Copy(transportReceiveBuffer, tmp.entry, transportReceiveBuffer.Length);
548548
transportReceiveBufferEntry.Dispose();
549549
transportReceiveBufferEntry = tmp;

libs/common/Networking/TcpNetworkHandlerBase.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,11 @@ public override void Dispose()
170170
// Dispose of the socket to free up unmanaged resources
171171
socket.Dispose();
172172
}
173+
174+
// Ensure the handler is fully cleaned up: cancel CTS, remove from activeHandlers,
175+
// and return pool buffers. DisposeImpl is guarded by disposeCount so it is safe to
176+
// call even when the SAEA callback path has already invoked it.
177+
DisposeImpl();
173178
}
174179

175180
/// <summary>
@@ -252,7 +257,7 @@ void HandleReceiveFailure(Exception ex, SocketAsyncEventArgs e)
252257

253258
unsafe void AllocateNetworkReceiveBuffer()
254259
{
255-
networkReceiveBufferEntry = networkPool.Get(networkBufferSettings.initialReceiveBufferSize);
260+
networkReceiveBufferEntry = networkPool.Get(networkBufferSettings.initialReceiveBufferSize, PoolEntryBufferType.NetworkReceiveBuffer);
256261
networkReceiveBuffer = networkReceiveBufferEntry.entry;
257262
networkReceiveBufferPtr = networkReceiveBufferEntry.entryPtr;
258263
}

0 commit comments

Comments
 (0)