Skip to content

Commit 680a686

Browse files
committed
fix issue with fuseable metrics subscriber; update reactor/rsocket version to latest
1 parent f77c70a commit 680a686

File tree

5 files changed

+157
-9
lines changed

5 files changed

+157
-9
lines changed

gradle/java.gradle

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,14 +80,14 @@ dependencies {
8080
testCompile 'junit:junit:4.12'
8181

8282
testCompile 'javax.inject:javax.inject:1'
83-
testCompile 'io.projectreactor:reactor-test:3.1.7.RELEASE'
83+
testCompile 'io.projectreactor:reactor-test:3.1.9.RELEASE'
8484
testCompile 'com.google.protobuf:protobuf-java:3.6.0'
8585
testCompile 'org.hdrhistogram:HdrHistogram:2.1.10'
8686
testCompile 'org.apache.logging.log4j:log4j-api:2.8.2'
8787
testCompile 'org.apache.logging.log4j:log4j-core:2.8.2'
8888
testCompile 'org.apache.logging.log4j:log4j-slf4j-impl:2.8.2'
89-
testCompile 'io.rsocket:rsocket-transport-netty:0.11.5'
90-
testCompile 'io.rsocket:rsocket-transport-local:0.11.5'
89+
testCompile 'io.rsocket:rsocket-transport-netty:0.11.6'
90+
testCompile 'io.rsocket:rsocket-transport-local:0.11.6'
9191
testCompile 'org.mockito:mockito-all:1.10.19'
9292
}
9393

rsocket-rpc-core/build.gradle

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@ dependencies {
1010
compile 'io.opentracing:opentracing-api:0.31.0'
1111
compile 'javax.inject:javax.inject:1'
1212
compile 'com.google.protobuf:protobuf-java:3.6.0'
13-
compile 'io.rsocket:rsocket-core:0.11.5'
14-
compile 'io.rsocket:rsocket-transport-netty:0.11.5'
15-
compile 'io.micrometer:micrometer-core:1.0.3'
13+
compile 'io.rsocket:rsocket-core:0.11.6'
14+
compile 'io.rsocket:rsocket-transport-netty:0.11.6'
15+
compile 'io.micrometer:micrometer-core:1.0.6'
1616

1717
protobuf project(':rsocket-rpc-metrics-idl')
1818

rsocket-rpc-core/src/main/java/io/rsocket/rpc/metrics/Metrics.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import io.micrometer.core.instrument.*;
44
import java.util.function.Function;
55
import org.reactivestreams.Publisher;
6+
import reactor.core.Fuseable;
67
import reactor.core.publisher.Operators;
78

89
public class Metrics {
@@ -32,7 +33,13 @@ public class Metrics {
3233
.tags(tags)
3334
.register(registry);
3435
return Operators.lift(
35-
(scannable, subscriber) ->
36-
new MetricsSubscriber<>(subscriber, next, complete, error, cancelled, timer));
36+
(scannable, subscriber) -> {
37+
if (scannable instanceof Fuseable) {
38+
return new MetricsFuseableSubscriber<>(subscriber, next, complete, error, cancelled, timer);
39+
}
40+
else {
41+
return new MetricsSubscriber<>(subscriber, next, complete, error, cancelled, timer);
42+
}
43+
});
3744
}
3845
}
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
package io.rsocket.rpc.metrics;
2+
3+
import java.util.concurrent.TimeUnit;
4+
import java.util.concurrent.atomic.AtomicBoolean;
5+
6+
import io.micrometer.core.instrument.Counter;
7+
import io.micrometer.core.instrument.Timer;
8+
import org.reactivestreams.Subscription;
9+
import reactor.core.CoreSubscriber;
10+
import reactor.core.Fuseable.QueueSubscription;
11+
import reactor.core.Fuseable;
12+
import reactor.core.publisher.Operators;
13+
import reactor.util.annotation.Nullable;
14+
import reactor.util.context.Context;
15+
16+
import static reactor.core.Fuseable.ASYNC;
17+
18+
public class MetricsFuseableSubscriber<T> extends AtomicBoolean implements QueueSubscription<T>,
19+
CoreSubscriber<T> {
20+
private final CoreSubscriber<? super T> actual;
21+
private final Counter next, complete, error, cancelled;
22+
private final Timer timer;
23+
24+
private QueueSubscription<T> s;
25+
private int sourceMode;
26+
27+
private long start;
28+
29+
MetricsFuseableSubscriber(
30+
CoreSubscriber<? super T> actual,
31+
Counter next,
32+
Counter complete,
33+
Counter error,
34+
Counter cancelled,
35+
Timer timer) {
36+
this.actual = actual;
37+
this.next = next;
38+
this.complete = complete;
39+
this.error = error;
40+
this.cancelled = cancelled;
41+
this.timer = timer;
42+
}
43+
44+
@Override
45+
@SuppressWarnings("unchecked")
46+
public void onSubscribe(Subscription s) {
47+
if (Operators.validate(this.s, s)) {
48+
this.s = (QueueSubscription<T>) s;
49+
this.start = System.nanoTime();
50+
51+
actual.onSubscribe(this);
52+
}
53+
}
54+
55+
@Override
56+
public void onNext(T t) {
57+
if (sourceMode == ASYNC) {
58+
actual.onNext(null);
59+
}
60+
else {
61+
next.increment();
62+
actual.onNext(t);
63+
}
64+
}
65+
66+
@Override
67+
public void onError(Throwable t) {
68+
if (compareAndSet(false, true)) {
69+
error.increment();
70+
timer.record(System.nanoTime() - start, TimeUnit.NANOSECONDS);
71+
}
72+
actual.onError(t);
73+
}
74+
75+
@Override
76+
public void onComplete() {
77+
if (compareAndSet(false, true)) {
78+
complete.increment();
79+
timer.record(System.nanoTime() - start, TimeUnit.NANOSECONDS);
80+
}
81+
actual.onComplete();
82+
}
83+
84+
@Override
85+
public void request(long n) {
86+
s.request(n);
87+
}
88+
89+
@Override
90+
public void cancel() {
91+
if (compareAndSet(false, true)) {
92+
cancelled.increment();
93+
timer.record(System.nanoTime() - start, TimeUnit.NANOSECONDS);
94+
}
95+
s.cancel();
96+
}
97+
98+
@Override
99+
public Context currentContext() {
100+
return actual.currentContext();
101+
}
102+
103+
@Override
104+
public int requestFusion(int requestedMode) {
105+
int m;
106+
if ((requestedMode & Fuseable.THREAD_BARRIER) != 0) {
107+
return Fuseable.NONE;
108+
}
109+
else {
110+
m = s.requestFusion(requestedMode);
111+
}
112+
sourceMode = m;
113+
return m;
114+
}
115+
116+
@Override
117+
@Nullable
118+
public T poll() {
119+
T v = s.poll();
120+
if (v != null) {
121+
next.increment();
122+
return v;
123+
}
124+
return null;
125+
}
126+
127+
@Override
128+
public boolean isEmpty() {
129+
return s.isEmpty();
130+
}
131+
132+
@Override
133+
public void clear() {
134+
s.clear();
135+
}
136+
137+
@Override
138+
public int size() {
139+
return s.size();
140+
}
141+
}

rsocket-rpc-protobuf/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ sourceCompatibility = 1.8
5454
targetCompatibility = 1.8
5555

5656
dependencies {
57-
compileOnly 'io.projectreactor:reactor-core:3.1.7.RELEASE'
57+
compileOnly 'io.projectreactor:reactor-core:3.1.9.RELEASE'
5858
compileOnly 'com.google.protobuf:protobuf-java:3.6.0'
5959
compileOnly 'javax.inject:javax.inject:1'
6060
}

0 commit comments

Comments
 (0)