Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/Orleans.Core/Configuration/Options/ClusterMembershipOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,18 @@ public class ClusterMembershipOptions
/// </summary>
public bool EnableIndirectProbes { get; set; } = true;

/// <summary>
/// Gets or sets a value indicating whether to consider connection-level message activity when evaluating silo liveness.
/// </summary>
/// <remarks>
/// When enabled, if an active connection to a silo has recently received messages within the monitoring window
/// (<see cref="ProbeTimeout"/> × <see cref="NumMissedProbesLimit"/>), votes to suspect that silo will be suppressed
/// since the connection activity demonstrates the silo is alive. This helps prevent false death declarations
/// when probes fail due to local issues such as GC pauses or thread pool saturation.
/// </remarks>
/// <value>Connection liveness checks are enabled by default.</value>
public bool EnableConnectionLivenessCheck { get; set; } = true;

/// <summary>
/// Gets or sets a value indicating whether to enable membership eviction of silos when in a state of `Joining` or `Created` for longer than MaxJoinAttemptTime
/// </summary>
Expand Down
24 changes: 23 additions & 1 deletion src/Orleans.Core/Networking/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ internal abstract partial class Connection
private Task _processIncomingTask;
private Task _processOutgoingTask;
private Task _closeTask;
private CoarseStopwatch _lastMessageReceivedTimestamp;

protected Connection(
ConnectionContext connection,
Expand Down Expand Up @@ -74,6 +75,19 @@ protected Connection(

public Task Initialized => _initializationTcs.Task;

/// <summary>
/// Gets the time elapsed since the last message was received on this connection,
/// or <see langword="null"/> if no message has been received yet.
/// </summary>
public TimeSpan? ElapsedSinceLastMessageReceived
{
get
{
if (!_lastMessageReceivedTimestamp.IsRunning) return null;
return _lastMessageReceivedTimestamp.Elapsed;
}
}

public static void ConfigureBuilder(ConnectionBuilder builder) => builder.Run(OnConnectedDelegate);

/// <summary>
Expand Down Expand Up @@ -275,6 +289,7 @@ private async Task ProcessIncoming()

Exception error = default;
var serializer = this.shared.ServiceProvider.GetRequiredService<MessageSerializer>();
var prevBufferLength = 0L;
try
{
var input = this._transport.Input;
Expand All @@ -284,8 +299,15 @@ private async Task ProcessIncoming()
var readResult = await input.ReadAsync();

var buffer = readResult.Buffer;
if (buffer.Length > prevBufferLength)
{
prevBufferLength = buffer.Length;
_lastMessageReceivedTimestamp.Restart();
}

if (buffer.Length >= requiredBytes)
{
prevBufferLength = 0;
do
{
Message message = default;
Expand All @@ -307,7 +329,7 @@ private async Task ProcessIncoming()
if (!HandleReceiveMessageFailure(message, exception))
{
throw;
}
}
}
} while (requiredBytes == 0);
}
Expand Down
26 changes: 26 additions & 0 deletions src/Orleans.Core/Networking/ConnectionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,32 @@ public bool TryGetConnection(SiloAddress endpoint, out Connection connection)
return false;
}

/// <summary>
/// Gets the minimum elapsed time since any connection to the specified silo last received a message,
/// or <see langword="null"/> if no connections exist or no messages have been received.
/// </summary>
/// <param name="endpoint">The silo address to check.</param>
/// <returns>The elapsed time since the most recently received message across all connections, or <see langword="null"/>.</returns>
public TimeSpan? GetElapsedSinceLastMessageReceived(SiloAddress endpoint)
{
if (!this.connections.TryGetValue(endpoint, out var entry))
{
return null;
}

TimeSpan? minElapsed = null;
foreach (var connection in entry.Connections)
{
if (connection.ElapsedSinceLastMessageReceived is { } elapsed
&& (minElapsed is null || elapsed < minElapsed))
{
minElapsed = elapsed;
}
}

return minElapsed;
}

private async Task<Connection> GetConnectionAsync(SiloAddress endpoint)
{
await Task.Yield();
Expand Down
2 changes: 1 addition & 1 deletion src/Orleans.Core/Timers/CoarseStopwatch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ private CoarseStopwatch(long timestamp)
/// <summary>
/// Returns the elapsed time.
/// </summary>
public TimeSpan Elapsed => TimeSpan.FromMilliseconds(ElapsedMilliseconds);
public readonly TimeSpan Elapsed => TimeSpan.FromMilliseconds(ElapsedMilliseconds);

/// <summary>
/// Returns a value indicating whether this instance has the default value.
Expand Down
46 changes: 45 additions & 1 deletion src/Orleans.Runtime/MembershipService/ClusterHealthMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using Microsoft.Extensions.Options;
using Orleans.Configuration;
using Orleans.Internal;
using Orleans.Runtime.Messaging;
using static Orleans.Runtime.MembershipService.SiloHealthMonitor;

#nullable disable
Expand All @@ -26,6 +27,7 @@ internal partial class ClusterHealthMonitor : IClusterHealthMonitor, ClusterHeal
private readonly ILocalSiloDetails localSiloDetails;
private readonly IServiceProvider serviceProvider;
private readonly IMembershipManager membershipManager;
private readonly ConnectionManager connectionManager;
private readonly ILogger<ClusterHealthMonitor> log;
private readonly IFatalErrorHandler fatalErrorHandler;
private readonly IOptionsMonitor<ClusterMembershipOptions> clusterMembershipOptions;
Expand All @@ -51,11 +53,13 @@ public ClusterHealthMonitor(
ILogger<ClusterHealthMonitor> log,
IOptionsMonitor<ClusterMembershipOptions> clusterMembershipOptions,
IFatalErrorHandler fatalErrorHandler,
IServiceProvider serviceProvider)
IServiceProvider serviceProvider,
ConnectionManager connectionManager)
{
this.localSiloDetails = localSiloDetails;
this.serviceProvider = serviceProvider;
this.membershipManager = membershipManager;
this.connectionManager = connectionManager;
this.log = log;
this.fatalErrorHandler = fatalErrorHandler;
this.clusterMembershipOptions = clusterMembershipOptions;
Expand Down Expand Up @@ -317,15 +321,49 @@ private async Task OnProbeResultInternal(SiloHealthMonitor monitor, ProbeResult
{
if (probeResult.Status == ProbeResultStatus.Failed && probeResult.FailedProbeCount >= this.clusterMembershipOptions.CurrentValue.NumMissedProbesLimit)
{
if (IsConnectionActiveWithinMonitoringWindow(monitor.TargetSiloAddress))
{
return;
}

await this.membershipManager.TrySuspectSilo(monitor.TargetSiloAddress, null, this.shutdownCancellation.Token).ConfigureAwait(false);
}
}
else if (probeResult.Status == ProbeResultStatus.Failed)
{
if (IsConnectionActiveWithinMonitoringWindow(monitor.TargetSiloAddress))
{
return;
}

await this.membershipManager.TrySuspectSilo(monitor.TargetSiloAddress, probeResult.Intermediary, this.shutdownCancellation.Token).ConfigureAwait(false);
}
}

/// <summary>
/// Checks whether a connection to the specified silo has received a message within the monitoring window
/// (<see cref="ClusterMembershipOptions.ProbeTimeout"/> × <see cref="ClusterMembershipOptions.NumMissedProbesLimit"/>).
/// If so, the silo is demonstrably alive and the vote should be suppressed.
/// </summary>
private bool IsConnectionActiveWithinMonitoringWindow(SiloAddress targetSilo)
{
var options = this.clusterMembershipOptions.CurrentValue;
if (!options.EnableConnectionLivenessCheck)
{
return false;
}

var monitoringWindow = options.ProbeTimeout.Multiply(options.NumMissedProbesLimit);

if (this.connectionManager.GetElapsedSinceLastMessageReceived(targetSilo) is { } elapsed && elapsed <= monitoringWindow)
{
LogInformationSuppressingVoteDueToActiveConnection(log, targetSilo, elapsed, monitoringWindow);
return true;
}

return false;
}

bool IHealthCheckable.CheckHealth(DateTime lastCheckTime, out string reason)
{
var ok = true;
Expand Down Expand Up @@ -457,5 +495,11 @@ private readonly struct ProbedSilosLogRecord(IEnumerable<SiloAddress> probedSilo
Message = "Error disposing monitor for {SiloAddress}."
)]
private static partial void LogErrorDisposingMonitorForSilo(ILogger logger, Exception exception, SiloAddress siloAddress);

[LoggerMessage(
Level = LogLevel.Information,
Message = "Suppressing vote to suspect silo {SiloAddress}: connection received a message {Elapsed} ago, within the {MonitoringWindow} monitoring window. The silo is demonstrably alive."
)]
private static partial void LogInformationSuppressingVoteDueToActiveConnection(ILogger logger, SiloAddress siloAddress, TimeSpan elapsed, TimeSpan monitoringWindow);
}
}
Loading
Loading