-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathProcessorBase.java
More file actions
80 lines (63 loc) · 1.95 KB
/
ProcessorBase.java
File metadata and controls
80 lines (63 loc) · 1.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package mutiny.zero.operators;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import org.jspecify.annotations.Nullable;
abstract class ProcessorBase<I, O> implements Flow.Processor<I, O>, Flow.Subscription {
private Flow.@Nullable Subscriber<? super O> downstream;
private Flow.@Nullable Subscription upstreamSubscription;
private final AtomicBoolean cancelled = new AtomicBoolean();
protected boolean cancelled() {
return cancelled.get();
}
protected Flow.Subscription upstreamSubscription() {
assert upstreamSubscription != null;
return upstreamSubscription;
}
protected Flow.Subscriber<? super O> downstream() {
assert downstream != null;
return downstream;
}
// ---- Publisher
@Override
public void subscribe(Flow.Subscriber<? super O> subscriber) {
downstream = subscriber;
}
// ---- Subscriber
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.upstreamSubscription = subscription;
assert downstream != null;
downstream.onSubscribe(this);
}
@Override
public void onError(Throwable throwable) {
if (!cancelled()) {
cancel();
assert downstream != null;
downstream.onError(throwable);
}
}
@Override
public void onComplete() {
if (!cancelled()) {
assert downstream != null;
downstream.onComplete();
}
}
// ---- Subscription
@Override
public void request(long n) {
if (!cancelled()) {
assert upstreamSubscription != null;
upstreamSubscription.request(n);
}
}
@Override
public void cancel() {
if (cancelled.compareAndSet(false, true)) {
assert upstreamSubscription != null;
upstreamSubscription.cancel();
upstreamSubscription = null;
}
}
}