Skip to content

sync: add closed method to broadcast::WeakSender#7900

Open
mladedav wants to merge 1 commit intotokio-rs:masterfrom
mladedav:dm/broadcast-weak-sender-closed
Open

sync: add closed method to broadcast::WeakSender#7900
mladedav wants to merge 1 commit intotokio-rs:masterfrom
mladedav:dm/broadcast-weak-sender-closed

Conversation

@mladedav
Copy link

Motivation

We have a broadcast channel that is filled by a background task that is expensive to start. We want to stop the task if there were no active subscribers for a given time.

Right now, we clone the Sender and call closed on it along with watching for a channel that notifies us of new subscribers. However, this means that the strong count is incremented by this new sender instance and if we use it wrong, concurrent calls to Sender::closed and Receiver::recv could deadlock.

This method can also be used to react to dropping the last receiver after dropping the sender, something along the lines of

sender.send(msg);
let weak = sender.downgrade();
// This was the last strong sender
drop(sender);
println!("Waiting for all receivers to process the last message...");
weak.closed().await;
println!("All receivers have finished.");

Solution

Reuse the method already present on broadcast::Sender.

@github-actions github-actions bot added the R-loom-sync Run loom sync tests on this PR label Feb 10, 2026
@Darksonn Darksonn added A-tokio Area: The main tokio crate M-sync Module: tokio/sync labels Feb 10, 2026
Copy link
Member

@Darksonn Darksonn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems ok. Thanks!

@Darksonn
Copy link
Member

Actually looks like there are failing tests. We'll need those fixed first.

@@ -1102,6 +1106,36 @@ impl<T> WeakSender<T> {
}
}

/// A future which completes when the number of [Receiver]s subscribed to this `Sender` reaches
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// A future which completes when the number of [Receiver]s subscribed to this `Sender` reaches
/// A future which completes when the number of [Receiver]s subscribed to this channel reaches

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Applied, thanks.

@@ -1053,6 +1041,22 @@ impl<T> Shared<T> {

wakers.wake_all();
}

async fn closed(&self) {
loop {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not caused by this PR but watch::Sender::closed() uses cooperative() + async_trace_leaf() around the loop.
Should the same be done here too ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I've added it similarly to what is in watch.rs.

/// }
/// ```
pub async fn closed(&self) {
self.shared.closed().await;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure that WeakSender#closed() can delegate to Shared::close() because of its usage of tail.closed (line 1052).
tail.closed = true is done at two places:

  1. https://github.com/mladedav/tokio/blob/22dcbdde18347b1c3c14478fc4662f35defcdc0b/tokio/src/sync/broadcast.rs#L1589 - when the last receiver is dropped
  2. https://github.com/mladedav/tokio/blob/22dcbdde18347b1c3c14478fc4662f35defcdc0b/tokio/src/sync/broadcast.rs#L895 - at close_channel() which is called when the last strong Sender is dropped (https://github.com/mladedav/tokio/blob/22dcbdde18347b1c3c14478fc4662f35defcdc0b/tokio/src/sync/broadcast.rs#L1074)

This means Shared::closed() will return immediately whenever all strong senders have been dropped, even if receivers are still alive.
IMO the WeakSender::closed() should depend on tail.rx_cnt == 0

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, that's also what the failed test was about.

I've split Sahred::closed into two parts - closed for senders and closed for receivers. When I read through all the usages it seems like it's these two concepts together. Basically, closed meant either that there were no senders and then only receivers could observe the value or that there were no receivers and then only senders could observe the value.

Furthermore, after this split, I think we could also just not have closed_for_senders at all and use rx_cnt == 0 in all places instead. I did not do that yet in case you think the boolean should be kept for whatever reason, but I don't see any.

@mladedav mladedav force-pushed the dm/broadcast-weak-sender-closed branch 2 times, most recently from 2d7152c to dce1c97 Compare February 11, 2026 10:03
Comment on lines +1600 to +1601
self.shared.notify_last_rx_drop.notify_waiters();
tail.closed = true;
tail.closed_for_senders = true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO it will be more correct to set the new state before notifying the waiters.
Currently it works because of the acquired tail lock.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. I even though so too but after noticing the lock decided not to change things that were not necessary or could just cause confusion about why I'm changing them.

///
/// assert_eq!(rx1.recv().await.unwrap(), 10);
/// drop(rx1);
/// assert!(tx.closed().now_or_never().is_none());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// assert!(tx.closed().now_or_never().is_none());
/// assert!(weak.closed().now_or_never().is_none());

///
/// assert_eq!(rx2.recv().await.unwrap(), 10);
/// drop(rx2);
/// assert!(tx.closed().now_or_never().is_some());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// assert!(tx.closed().now_or_never().is_some());
/// assert!(weak.closed().now_or_never().is_some());

@mladedav mladedav force-pushed the dm/broadcast-weak-sender-closed branch from dce1c97 to e4804ae Compare February 11, 2026 12:03
@mladedav
Copy link
Author

I don't think the netlify CI failures are caused by the changes here, when I tried to look for more information it gave me an internal server error so maybe they have some problems.

Comment on lines 893 to +894
pub async fn closed(&self) {
loop {
let notified = self.shared.notify_last_rx_drop.notified();

{
// Ensure the lock drops if the channel isn't closed
let tail = self.shared.tail.lock();
if tail.closed {
return;
}
}

notified.await;
}
self.shared.closed_for_senders().await;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was initially going to say this changes behavior of the existing function, but I guess it doesn't because if you can call this method then there is a sender, so the closed for receivers condition does not matter here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the closed boolean was a bit overloaded - if there were no receivers, senders could observe that there are no receivers; if there were no senders, receivers could observe that there are no senders.

I'll just add that has_receivers is now the same as rx_cnt != 0 and has_senders is the same as num_tx.load(Ordering::Relaxed) != 0. The first one can be safely deleted because they are both in a locked Tail. I'm not entirely sure about the other one since num_tx is in Shared and not in Tail so it's not behind the same lock as the boolean currently is. I'm just mentioning this if you'd want me to delete has_receivers so that it cannot diverge and if I should look closer into deleting has_senders if it's safe.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you can delete the fields entirely, then that's great. But don't do it if it's too complex.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I've deleted just has_receivers. I'm pretty sure that's safe. I'm not so sure about the other one so I'll leave it alone.

Comment on lines +369 to +373
/// True if there are no receivers.
closed_for_senders: bool,

/// True if the last sender has been dropped.
closed_for_receivers: bool,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're going to split this into two booleans, then please use better names. For example, what about has_senders and has_receivers?

Copy link
Author

@mladedav mladedav Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, renamed, thanks for the suggestion. I didn't think of flipping the logic hence the awkward names I came up with.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept just the closed_for_senders name for the (private) method. We could do something like wait_for_no_receivers instead, but at that level this feels better to me.

@mladedav mladedav force-pushed the dm/broadcast-weak-sender-closed branch 2 times, most recently from e2a17ed to 366cf1e Compare February 12, 2026 11:52
Copy link
Member

@Darksonn Darksonn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks ok, but I'd like someone else to review too.

Copy link
Member

@martin-g martin-g left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a few documentation related nits.

}

/// A future which completes when the number of [Receiver]s subscribed to this channel reaches
/// zero.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// zero.
/// zero, regardless of whether strong senders still exist.

nit: Maybe clarify that strong senders are not taken into account for WeakSender::closed() ?!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to have the same method as Sender but now that I think about it, that might be confusing because Sender can really only be interested in all receivers being done while someone might want to use WeakSender to checking either that there are no receivers or no senders.

So do you prefer this method to be named closed same as on Sender and behave in the same way or do you think it would be better to name it something different (though I can't think of a good name) to better signify that the receivers are gone and have an option to add a method that all strong senders are gone later?

For now applied exactly as suggested.

@mladedav mladedav force-pushed the dm/broadcast-weak-sender-closed branch from 366cf1e to 69b854a Compare February 16, 2026 10:01
@mladedav mladedav force-pushed the dm/broadcast-weak-sender-closed branch from 69b854a to 74bd934 Compare February 16, 2026 10:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

A-tokio Area: The main tokio crate M-sync Module: tokio/sync R-loom-sync Run loom sync tests on this PR

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants