sync: add closed method to broadcast::WeakSender#7900
sync: add closed method to broadcast::WeakSender#7900mladedav wants to merge 1 commit intotokio-rs:masterfrom
closed method to broadcast::WeakSender#7900Conversation
|
Actually looks like there are failing tests. We'll need those fixed first. |
tokio/src/sync/broadcast.rs
Outdated
| @@ -1102,6 +1106,36 @@ impl<T> WeakSender<T> { | |||
| } | |||
| } | |||
|
|
|||
| /// A future which completes when the number of [Receiver]s subscribed to this `Sender` reaches | |||
There was a problem hiding this comment.
| /// A future which completes when the number of [Receiver]s subscribed to this `Sender` reaches | |
| /// A future which completes when the number of [Receiver]s subscribed to this channel reaches |
tokio/src/sync/broadcast.rs
Outdated
| @@ -1053,6 +1041,22 @@ impl<T> Shared<T> { | |||
|
|
|||
| wakers.wake_all(); | |||
| } | |||
|
|
|||
| async fn closed(&self) { | |||
| loop { | |||
There was a problem hiding this comment.
Not caused by this PR but watch::Sender::closed() uses cooperative() + async_trace_leaf() around the loop.
Should the same be done here too ?
There was a problem hiding this comment.
Sure, I've added it similarly to what is in watch.rs.
tokio/src/sync/broadcast.rs
Outdated
| /// } | ||
| /// ``` | ||
| pub async fn closed(&self) { | ||
| self.shared.closed().await; |
There was a problem hiding this comment.
I am not sure that WeakSender#closed() can delegate to Shared::close() because of its usage of tail.closed (line 1052).
tail.closed = true is done at two places:
- https://github.com/mladedav/tokio/blob/22dcbdde18347b1c3c14478fc4662f35defcdc0b/tokio/src/sync/broadcast.rs#L1589 - when the last receiver is dropped
- https://github.com/mladedav/tokio/blob/22dcbdde18347b1c3c14478fc4662f35defcdc0b/tokio/src/sync/broadcast.rs#L895 - at
close_channel()which is called when the last strong Sender is dropped (https://github.com/mladedav/tokio/blob/22dcbdde18347b1c3c14478fc4662f35defcdc0b/tokio/src/sync/broadcast.rs#L1074)
This means Shared::closed() will return immediately whenever all strong senders have been dropped, even if receivers are still alive.
IMO the WeakSender::closed() should depend on tail.rx_cnt == 0
There was a problem hiding this comment.
You're right, that's also what the failed test was about.
I've split Sahred::closed into two parts - closed for senders and closed for receivers. When I read through all the usages it seems like it's these two concepts together. Basically, closed meant either that there were no senders and then only receivers could observe the value or that there were no receivers and then only senders could observe the value.
Furthermore, after this split, I think we could also just not have closed_for_senders at all and use rx_cnt == 0 in all places instead. I did not do that yet in case you think the boolean should be kept for whatever reason, but I don't see any.
2d7152c to
dce1c97
Compare
tokio/src/sync/broadcast.rs
Outdated
| self.shared.notify_last_rx_drop.notify_waiters(); | ||
| tail.closed = true; | ||
| tail.closed_for_senders = true; |
There was a problem hiding this comment.
IMO it will be more correct to set the new state before notifying the waiters.
Currently it works because of the acquired tail lock.
There was a problem hiding this comment.
Sure. I even though so too but after noticing the lock decided not to change things that were not necessary or could just cause confusion about why I'm changing them.
tokio/src/sync/broadcast.rs
Outdated
| /// | ||
| /// assert_eq!(rx1.recv().await.unwrap(), 10); | ||
| /// drop(rx1); | ||
| /// assert!(tx.closed().now_or_never().is_none()); |
There was a problem hiding this comment.
| /// assert!(tx.closed().now_or_never().is_none()); | |
| /// assert!(weak.closed().now_or_never().is_none()); |
tokio/src/sync/broadcast.rs
Outdated
| /// | ||
| /// assert_eq!(rx2.recv().await.unwrap(), 10); | ||
| /// drop(rx2); | ||
| /// assert!(tx.closed().now_or_never().is_some()); |
There was a problem hiding this comment.
| /// assert!(tx.closed().now_or_never().is_some()); | |
| /// assert!(weak.closed().now_or_never().is_some()); |
dce1c97 to
e4804ae
Compare
|
I don't think the netlify CI failures are caused by the changes here, when I tried to look for more information it gave me an internal server error so maybe they have some problems. |
| pub async fn closed(&self) { | ||
| loop { | ||
| let notified = self.shared.notify_last_rx_drop.notified(); | ||
|
|
||
| { | ||
| // Ensure the lock drops if the channel isn't closed | ||
| let tail = self.shared.tail.lock(); | ||
| if tail.closed { | ||
| return; | ||
| } | ||
| } | ||
|
|
||
| notified.await; | ||
| } | ||
| self.shared.closed_for_senders().await; |
There was a problem hiding this comment.
I was initially going to say this changes behavior of the existing function, but I guess it doesn't because if you can call this method then there is a sender, so the closed for receivers condition does not matter here.
There was a problem hiding this comment.
Yes, the closed boolean was a bit overloaded - if there were no receivers, senders could observe that there are no receivers; if there were no senders, receivers could observe that there are no senders.
I'll just add that has_receivers is now the same as rx_cnt != 0 and has_senders is the same as num_tx.load(Ordering::Relaxed) != 0. The first one can be safely deleted because they are both in a locked Tail. I'm not entirely sure about the other one since num_tx is in Shared and not in Tail so it's not behind the same lock as the boolean currently is. I'm just mentioning this if you'd want me to delete has_receivers so that it cannot diverge and if I should look closer into deleting has_senders if it's safe.
There was a problem hiding this comment.
If you can delete the fields entirely, then that's great. But don't do it if it's too complex.
There was a problem hiding this comment.
Ok, I've deleted just has_receivers. I'm pretty sure that's safe. I'm not so sure about the other one so I'll leave it alone.
tokio/src/sync/broadcast.rs
Outdated
| /// True if there are no receivers. | ||
| closed_for_senders: bool, | ||
|
|
||
| /// True if the last sender has been dropped. | ||
| closed_for_receivers: bool, |
There was a problem hiding this comment.
If we're going to split this into two booleans, then please use better names. For example, what about has_senders and has_receivers?
There was a problem hiding this comment.
Sure, renamed, thanks for the suggestion. I didn't think of flipping the logic hence the awkward names I came up with.
There was a problem hiding this comment.
I kept just the closed_for_senders name for the (private) method. We could do something like wait_for_no_receivers instead, but at that level this feels better to me.
e2a17ed to
366cf1e
Compare
Darksonn
left a comment
There was a problem hiding this comment.
This looks ok, but I'd like someone else to review too.
martin-g
left a comment
There was a problem hiding this comment.
Just a few documentation related nits.
tokio/src/sync/broadcast.rs
Outdated
| } | ||
|
|
||
| /// A future which completes when the number of [Receiver]s subscribed to this channel reaches | ||
| /// zero. |
There was a problem hiding this comment.
| /// zero. | |
| /// zero, regardless of whether strong senders still exist. |
nit: Maybe clarify that strong senders are not taken into account for WeakSender::closed() ?!
There was a problem hiding this comment.
I tried to have the same method as Sender but now that I think about it, that might be confusing because Sender can really only be interested in all receivers being done while someone might want to use WeakSender to checking either that there are no receivers or no senders.
So do you prefer this method to be named closed same as on Sender and behave in the same way or do you think it would be better to name it something different (though I can't think of a good name) to better signify that the receivers are gone and have an option to add a method that all strong senders are gone later?
For now applied exactly as suggested.
366cf1e to
69b854a
Compare
69b854a to
74bd934
Compare
Motivation
We have a broadcast channel that is filled by a background task that is expensive to start. We want to stop the task if there were no active subscribers for a given time.
Right now, we clone the
Senderand callclosedon it along with watching for a channel that notifies us of new subscribers. However, this means that the strong count is incremented by this new sender instance and if we use it wrong, concurrent calls toSender::closedandReceiver::recvcould deadlock.This method can also be used to react to dropping the last receiver after dropping the sender, something along the lines of
Solution
Reuse the method already present on
broadcast::Sender.