Skip to content

Commit c3ec1d8

Browse files
authored
Merge pull request #881 from jpdillingham/chaos
Number of bug fixes and improvements around connection writes
2 parents 7273691 + 398f8f3 commit c3ec1d8

File tree

6 files changed

+250
-15
lines changed

6 files changed

+250
-15
lines changed

src/Network/Tcp/Connection.cs

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ public Connection(IPEndPoint ipEndPoint, ConnectionOptions options = null, ITcpC
5151
// is pretty much the only option.
5252
Options.ConfigureSocket(TcpClient.Client);
5353

54+
// this should call SetSocketOptions on the socket (Client)
55+
TcpClient.Client.ReceiveTimeout = Options.InactivityTimeout;
56+
TcpClient.Client.SendTimeout = Options.InactivityTimeout;
57+
5458
WriteQueueSemaphore = new SemaphoreSlim(Options.WriteQueueSize);
5559

5660
if (Options.InactivityTimeout > 0)
@@ -89,7 +93,13 @@ public Connection(IPEndPoint ipEndPoint, ConnectionOptions options = null, ITcpC
8993
State = ConnectionState.Connected;
9094
InactivityTimer?.Start();
9195
WatchdogTimer.Start();
96+
9297
Stream = TcpClient.GetStream();
98+
99+
// these should also call SetSocketOptions on the socket. doing it twice to make sure!
100+
// read and write timeouts can cause the client to hang, even if the inactivity timer disconnects
101+
Stream.WriteTimeout = Options.InactivityTimeout;
102+
Stream.ReadTimeout = Options.InactivityTimeout;
93103
}
94104
}
95105

@@ -191,6 +201,7 @@ public Connection(IPEndPoint ipEndPoint, ConnectionOptions options = null, ITcpC
191201
private TaskCompletionSource<string> DisconnectTaskCompletionSource { get; } = new TaskCompletionSource<string>(TaskCreationOptions.RunContinuationsAsynchronously);
192202
private SemaphoreSlim WriteSemaphore { get; set; } = new SemaphoreSlim(initialCount: 1, maxCount: 1);
193203
private SemaphoreSlim WriteQueueSemaphore { get; set; }
204+
private bool WriteQueueFull { get; set; }
194205

195206
/// <summary>
196207
/// Asynchronously connects the client to the configured <see cref="IPEndPoint"/>.
@@ -279,8 +290,14 @@ public async Task ConnectAsync(CancellationToken? cancellationToken = null)
279290

280291
InactivityTimer?.Start();
281292
WatchdogTimer.Start();
293+
282294
Stream = TcpClient.GetStream();
283295

296+
// these should also call SetSocketOptions on the socket. doing it twice to make sure!
297+
// read and write timeouts can cause the client to hang, even if the inactivity timer disconnects
298+
Stream.ReadTimeout = Options.InactivityTimeout;
299+
Stream.WriteTimeout = Options.InactivityTimeout;
300+
284301
ChangeState(ConnectionState.Connected, $"Connected to {IPEndPoint}");
285302
}
286303
catch (Exception ex)
@@ -698,24 +715,25 @@ private async Task WriteInternalAsync(byte[] bytes, CancellationToken cancellati
698715

699716
private async Task WriteInternalAsync(long length, Stream inputStream, Func<int, CancellationToken, Task<int>> governor, Action<int, int, int> reporter, CancellationToken cancellationToken)
700717
{
718+
// a failure to allocate memory will throw, so we need to do it within the try/catch
719+
// declare and initialize it here so it's available in the finally block
720+
byte[] buffer = Array.Empty<byte>();
721+
701722
// in the case of a bad (or failing) connection, it is possible for us to continue to write data, particularly
702723
// distributed search requests, to the connection for quite a while before the underlying socket figures out that it
703724
// is in a bad state. when this happens memory usage skyrockets. see https://github.com/slskd/slskd/issues/251 for
704-
// more information
705-
if (WriteQueueSemaphore.CurrentCount == 0)
725+
// more information. note that this isn't for synchronization, it's to maintain a count of waiting writes.
726+
if (WriteQueueFull || !await WriteQueueSemaphore.WaitAsync(0, cancellationToken).ConfigureAwait(false))
706727
{
728+
// note: the semaphore check and this latch are not atomic! it's possible for one thread to fail to get the semaphore,
729+
// and for the next to succeed (if one is released in the finally) before we set this to true. if that happens,
730+
// *something* below will fail and the exception will bubble out (and if it doesn't throw, it probably got sent)
731+
WriteQueueFull = true;
732+
707733
Disconnect("The write buffer is full");
708734
throw new ConnectionWriteDroppedException($"Dropped buffered message to {IPEndPoint}; the write buffer is full");
709735
}
710736

711-
// a failure to allocate memory will throw, so we need to do it within the try/catch
712-
// declare and initialize it here so it's available in the finally block
713-
byte[] buffer = Array.Empty<byte>();
714-
715-
// grab a slot on the queue semaphore. note that this isn't for synchronization, it's to
716-
// maintain a count of waiting writes
717-
await WriteQueueSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
718-
719737
// obtain the write semaphore for this connection. this keeps concurrent writes
720738
// from interleaving, which will mangle the messages on the receiving end
721739
await WriteSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
@@ -732,8 +750,13 @@ private async Task WriteInternalAsync(long length, Stream inputStream, Func<int,
732750

733751
long totalBytesWritten = 0;
734752

735-
while (!Disposed && totalBytesWritten < length)
753+
while (totalBytesWritten < length)
736754
{
755+
if (Disposed || State == ConnectionState.Disconnecting || State == ConnectionState.Disconnected)
756+
{
757+
throw new ConnectionWriteException($"Write aborted after {totalBytesWritten} bytes written; the connection has been or is being {(Disposed ? "disposed" : "disconnected")}");
758+
}
759+
737760
var bytesRemaining = length - totalBytesWritten;
738761
var bytesToRead = bytesRemaining >= buffer.Length ? buffer.Length : (int)bytesRemaining;
739762

@@ -781,12 +804,15 @@ private async Task WriteInternalAsync(long length, Stream inputStream, Func<int,
781804
}
782805
finally
783806
{
784-
WriteQueueSemaphore.Release();
785-
WriteSemaphore.Release();
786-
787807
#if NETSTANDARD2_1_OR_GREATER
788808
System.Buffers.ArrayPool<byte>.Shared.Return(buffer);
789809
#endif
810+
811+
if (!Disposed)
812+
{
813+
WriteQueueSemaphore.Release();
814+
WriteSemaphore.Release();
815+
}
790816
}
791817
}
792818
}

src/Network/Tcp/INetworkStream.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,16 @@ namespace Soulseek.Network.Tcp
2828
/// </summary>
2929
internal interface INetworkStream : IDisposable
3030
{
31+
/// <summary>
32+
/// Gets or sets the read timeout for the stream.
33+
/// </summary>
34+
int ReadTimeout { get; set; }
35+
36+
/// <summary>
37+
/// Gets or sets the write timeout for the stream.
38+
/// </summary>
39+
int WriteTimeout { get; set; }
40+
3141
/// <summary>
3242
/// Closes the <see cref="NetworkStream"/>.
3343
/// </summary>

src/Network/Tcp/NetworkStreamAdapter.cs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,42 @@ public NetworkStreamAdapter(NetworkStream networkStream)
4343
NetworkStream = networkStream;
4444
}
4545

46+
/// <summary>
47+
/// Gets or sets the read timeout for the <see cref="NetworkStream"/>.
48+
/// </summary>
49+
/// <remarks>
50+
/// Uses SetSocketOption under the hood: https://github.com/microsoft/referencesource/blob/main/System/net/System/Net/Sockets/NetworkStream.cs.
51+
/// </remarks>
52+
public int ReadTimeout
53+
{
54+
get
55+
{
56+
return NetworkStream.ReadTimeout;
57+
}
58+
set
59+
{
60+
NetworkStream.ReadTimeout = value;
61+
}
62+
}
63+
64+
/// <summary>
65+
/// Gets or sets the write timeout for the <see cref="NetworkStream"/>.
66+
/// </summary>
67+
/// <remarks>
68+
/// Uses SetSocketOption under the hood: https://github.com/microsoft/referencesource/blob/main/System/net/System/Net/Sockets/NetworkStream.cs.
69+
/// </remarks>
70+
public int WriteTimeout
71+
{
72+
get
73+
{
74+
return NetworkStream.WriteTimeout;
75+
}
76+
set
77+
{
78+
NetworkStream.WriteTimeout = value;
79+
}
80+
}
81+
4682
private bool Disposed { get; set; }
4783
private NetworkStream NetworkStream { get; set; }
4884

src/Soulseek.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232

3333
<PropertyGroup>
3434
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
35-
<Version>8.2.1</Version>
35+
<Version>8.3.0</Version>
3636
<Authors>JP Dillingham</Authors>
3737
<Product>Soulseek.NET</Product>
3838
<PackageProjectUrl>https://github.com/jpdillingham/Soulseek.NET</PackageProjectUrl>

src/SoulseekClient.cs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3185,6 +3185,7 @@ void UpdateProgress(long bytesDownloaded)
31853185

31863186
var endpoint = await GetUserEndPointAsync(username, cancellationToken).ConfigureAwait(false);
31873187
var peerConnection = await PeerConnectionManager.GetOrAddMessageConnectionAsync(username, endpoint, cancellationToken).ConfigureAwait(false);
3188+
Diagnostic.Debug($"Fetched peer connection for download of {Path.GetFileName(download.Filename)} from {username} (id: {peerConnection.Id}, state: {peerConnection.State})");
31883189

31893190
// prepare two waits; one for the transfer response to confirm that our request is acknowledged and another for
31903191
// the eventual transfer request sent when the peer is ready to send the file. the response message should be
@@ -3195,9 +3196,12 @@ void UpdateProgress(long bytesDownloaded)
31953196

31963197
// request the file
31973198
await peerConnection.WriteAsync(new TransferRequest(TransferDirection.Download, token, remoteFilename), cancellationToken).ConfigureAwait(false);
3199+
Diagnostic.Debug($"Wrote transfer request for download of {Path.GetFileName(download.Filename)} from {username} (id: {peerConnection.Id}, state: {peerConnection.State})");
3200+
31983201
UpdateState(TransferStates.Requested);
31993202

32003203
var transferRequestAcknowledgement = await transferRequestAcknowledged.ConfigureAwait(false);
3204+
Diagnostic.Debug($"Received transfer request ACK for download of {Path.GetFileName(download.Filename)} from {username}: allowed: {transferRequestAcknowledgement.IsAllowed}, message: {transferRequestAcknowledgement.Message} (token: {token})");
32013205

32023206
if (transferRequestAcknowledgement.IsAllowed)
32033207
{
@@ -3221,6 +3225,7 @@ void UpdateProgress(long bytesDownloaded)
32213225
download.Connection = await PeerConnectionManager
32223226
.GetTransferConnectionAsync(username, endpoint, transferRequestAcknowledgement.Token, cancellationToken)
32233227
.ConfigureAwait(false);
3228+
Diagnostic.Debug($"Fetched transfer connection for download of {Path.GetFileName(download.Filename)} from {username} (id: {download.Connection.Id}, state: {download.Connection.State})");
32243229
}
32253230
else if (!string.Equals(transferRequestAcknowledgement.Message.TrimEnd('.'), "Queued", StringComparison.OrdinalIgnoreCase))
32263231
{
@@ -3252,6 +3257,7 @@ void UpdateProgress(long bytesDownloaded)
32523257
peerConnection = await PeerConnectionManager
32533258
.GetOrAddMessageConnectionAsync(username, endpoint, cancellationToken)
32543259
.ConfigureAwait(false);
3260+
Diagnostic.Debug($"Fetched peer connection for download of {Path.GetFileName(download.Filename)} from {username} (id: {peerConnection.Id}, state: {peerConnection.State})");
32553261

32563262
// prepare a wait for the eventual transfer connection
32573263
var connectionTask = PeerConnectionManager
@@ -3263,6 +3269,7 @@ void UpdateProgress(long bytesDownloaded)
32633269
try
32643270
{
32653271
download.Connection = await connectionTask.ConfigureAwait(false);
3272+
Diagnostic.Debug($"Fetched transfer connection for download of {Path.GetFileName(download.Filename)} from {username} (id: {download.Connection.Id}, state: {download.Connection.State})");
32663273
}
32673274
catch (ConnectionException)
32683275
{
@@ -4256,17 +4263,22 @@ void UpdateProgress(long bytesUploaded)
42564263
var messageConnection = await PeerConnectionManager
42574264
.GetOrAddMessageConnectionAsync(username, endpoint, cancellationToken)
42584265
.ConfigureAwait(false);
4266+
Diagnostic.Debug($"Fetched peer connection for upload of {Path.GetFileName(upload.Filename)} to {username} (id: {messageConnection.Id}, state: {messageConnection.State})");
42594267

42604268
// prepare a wait for the transfer response
42614269
var transferRequestAcknowledged = Waiter.Wait<TransferResponse>(
42624270
new WaitKey(MessageCode.Peer.TransferResponse, upload.Username, upload.Token), Options.PeerConnectionOptions.InactivityTimeout, cancellationToken);
42634271

42644272
// request to start the upload
42654273
var transferRequest = new TransferRequest(TransferDirection.Upload, upload.Token, upload.Filename, size);
4274+
42664275
await messageConnection.WriteAsync(transferRequest, cancellationToken).ConfigureAwait(false);
4276+
Diagnostic.Debug($"Wrote transfer request for upload of {Path.GetFileName(upload.Filename)} to {username} (id: {messageConnection.Id}, state: {messageConnection.State})");
4277+
42674278
UpdateState(TransferStates.Requested);
42684279

42694280
var transferRequestAcknowledgement = await transferRequestAcknowledged.ConfigureAwait(false);
4281+
Diagnostic.Debug($"Received transfer request ACK for upload of {Path.GetFileName(upload.Filename)} to {username}: allowed: {transferRequestAcknowledgement.IsAllowed}, message: {transferRequestAcknowledgement.Message} (token: {token})");
42704282

42714283
if (!transferRequestAcknowledgement.IsAllowed)
42724284
{
@@ -4278,6 +4290,7 @@ void UpdateProgress(long bytesUploaded)
42784290
upload.Connection = await PeerConnectionManager
42794291
.GetTransferConnectionAsync(upload.Username, endpoint, upload.Token, cancellationToken)
42804292
.ConfigureAwait(false);
4293+
Diagnostic.Debug($"Fetched transfer connection for upload of {Path.GetFileName(upload.Filename)} to {username} (id: {upload.Connection.Id}, state: {upload.Connection.State})");
42814294

42824295
// create a task completion source that represents the disconnect of the transfer connection. this is one of two tasks that will 'race'
42834296
// to determine the outcome of the upload.

0 commit comments

Comments
 (0)