Skip to content

Commit 9c055c6

Browse files
authored
Merge pull request #3 from netifi/fix-fuseable-metrics-issue
fix issue with fuseable metrics subscriber
2 parents f77c70a + 9713b31 commit 9c055c6

File tree

13 files changed

+426
-33
lines changed

13 files changed

+426
-33
lines changed

.travis.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@ matrix:
55
jdk: oraclejdk8
66
language: generic
77
before_install:
8-
- wget https://github.com/google/protobuf/releases/download/v3.6.0/protobuf-cpp-3.6.0.tar.gz
9-
- tar -xzvf protobuf-cpp-3.6.0.tar.gz
10-
- pushd protobuf-3.6.0 && ./configure --disable-shared && make && sudo make install
8+
- wget https://github.com/google/protobuf/releases/download/v3.6.1/protobuf-cpp-3.6.1.tar.gz
9+
- tar -xzvf protobuf-cpp-3.6.1.tar.gz
10+
- pushd protobuf-3.6.1 && ./configure --disable-shared && make && sudo make install
1111
&& popd
1212
script: "./buildtravis.sh"
1313
- os: osx

docs/index.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ RSocket RPC Java uses a Protobuf plugin to generate application code. Add the fo
8484
```
8585
protobuf {
8686
protoc {
87-
artifact = 'com.google.protobuf:protoc:3.6.0'
87+
artifact = 'com.google.protobuf:protoc:3.6.1'
8888
}
8989
plugins {
9090
rsocketRpc {

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
group=io.rsocket.rpc
2-
version=0.2.0
2+
version=0.2.1

gradle/java.gradle

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,14 +80,14 @@ dependencies {
8080
testCompile 'junit:junit:4.12'
8181

8282
testCompile 'javax.inject:javax.inject:1'
83-
testCompile 'io.projectreactor:reactor-test:3.1.7.RELEASE'
84-
testCompile 'com.google.protobuf:protobuf-java:3.6.0'
83+
testCompile 'io.projectreactor:reactor-test:3.1.9.RELEASE'
84+
testCompile 'com.google.protobuf:protobuf-java:3.6.1'
8585
testCompile 'org.hdrhistogram:HdrHistogram:2.1.10'
8686
testCompile 'org.apache.logging.log4j:log4j-api:2.8.2'
8787
testCompile 'org.apache.logging.log4j:log4j-core:2.8.2'
8888
testCompile 'org.apache.logging.log4j:log4j-slf4j-impl:2.8.2'
89-
testCompile 'io.rsocket:rsocket-transport-netty:0.11.5'
90-
testCompile 'io.rsocket:rsocket-transport-local:0.11.5'
89+
testCompile 'io.rsocket:rsocket-transport-netty:0.11.6'
90+
testCompile 'io.rsocket:rsocket-transport-local:0.11.6'
9191
testCompile 'org.mockito:mockito-all:1.10.19'
9292
}
9393

rsocket-rpc-core/build.gradle

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@ dependencies {
99
compile project (':rsocket-rpc-protobuf')
1010
compile 'io.opentracing:opentracing-api:0.31.0'
1111
compile 'javax.inject:javax.inject:1'
12-
compile 'com.google.protobuf:protobuf-java:3.6.0'
13-
compile 'io.rsocket:rsocket-core:0.11.5'
14-
compile 'io.rsocket:rsocket-transport-netty:0.11.5'
15-
compile 'io.micrometer:micrometer-core:1.0.3'
12+
compile 'com.google.protobuf:protobuf-java:3.6.1'
13+
compile 'io.rsocket:rsocket-core:0.11.6'
14+
compile 'io.rsocket:rsocket-transport-netty:0.11.6'
15+
compile 'io.micrometer:micrometer-core:1.0.6'
1616

1717
protobuf project(':rsocket-rpc-metrics-idl')
1818

@@ -28,7 +28,7 @@ protobuf {
2828
generatedFilesBaseDir = "${projectDir}/src/generated"
2929

3030
protoc {
31-
artifact = 'com.google.protobuf:protoc:3.6.0'
31+
artifact = 'com.google.protobuf:protoc:3.6.1'
3232
}
3333
plugins {
3434
rsocketRpc {

rsocket-rpc-core/src/main/java/io/rsocket/rpc/metrics/Metrics.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import io.micrometer.core.instrument.*;
44
import java.util.function.Function;
55
import org.reactivestreams.Publisher;
6+
import reactor.core.Fuseable;
67
import reactor.core.publisher.Operators;
78

89
public class Metrics {
@@ -13,6 +14,7 @@ public class Metrics {
1314
return timed(registry, name, Tags.of(keyValues));
1415
}
1516

17+
@SuppressWarnings("unchecked")
1618
public static <T> Function<? super Publisher<T>, ? extends Publisher<T>> timed(
1719
MeterRegistry registry, String name, Iterable<Tag> tags) {
1820
Counter next =
@@ -32,7 +34,33 @@ public class Metrics {
3234
.tags(tags)
3335
.register(registry);
3436
return Operators.lift(
35-
(scannable, subscriber) ->
36-
new MetricsSubscriber<>(subscriber, next, complete, error, cancelled, timer));
37+
(scannable, subscriber) -> {
38+
if (scannable instanceof Fuseable) {
39+
if (subscriber instanceof Fuseable.ConditionalSubscriber) {
40+
return new MetricsFuseableConditionalSubscriber<>(
41+
(Fuseable.ConditionalSubscriber<? super T>) subscriber,
42+
next,
43+
complete,
44+
error,
45+
cancelled,
46+
timer);
47+
} else {
48+
return new MetricsFuseableSubscriber<>(
49+
subscriber, next, complete, error, cancelled, timer);
50+
}
51+
} else {
52+
if (subscriber instanceof Fuseable.ConditionalSubscriber) {
53+
return new MetricsFuseableConditionalSubscriber<>(
54+
(Fuseable.ConditionalSubscriber<? super T>) subscriber,
55+
next,
56+
complete,
57+
error,
58+
cancelled,
59+
timer);
60+
} else {
61+
return new MetricsSubscriber<>(subscriber, next, complete, error, cancelled, timer);
62+
}
63+
}
64+
});
3765
}
3866
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package io.rsocket.rpc.metrics;
2+
3+
import io.micrometer.core.instrument.Counter;
4+
import io.micrometer.core.instrument.Timer;
5+
import java.util.concurrent.TimeUnit;
6+
import java.util.concurrent.atomic.AtomicBoolean;
7+
import org.reactivestreams.Subscription;
8+
import reactor.core.Fuseable.ConditionalSubscriber;
9+
import reactor.core.publisher.Operators;
10+
import reactor.util.context.Context;
11+
12+
public class MetricsConditionalSubscriber<T> extends AtomicBoolean
13+
implements Subscription, ConditionalSubscriber<T> {
14+
private final ConditionalSubscriber<? super T> actual;
15+
private final Counter next, complete, error, cancelled;
16+
private final Timer timer;
17+
18+
private Subscription s;
19+
private long start;
20+
21+
MetricsConditionalSubscriber(
22+
ConditionalSubscriber<? super T> actual,
23+
Counter next,
24+
Counter complete,
25+
Counter error,
26+
Counter cancelled,
27+
Timer timer) {
28+
this.actual = actual;
29+
this.next = next;
30+
this.complete = complete;
31+
this.error = error;
32+
this.cancelled = cancelled;
33+
this.timer = timer;
34+
}
35+
36+
@Override
37+
public void onSubscribe(Subscription s) {
38+
if (Operators.validate(this.s, s)) {
39+
this.s = s;
40+
this.start = System.nanoTime();
41+
42+
actual.onSubscribe(this);
43+
}
44+
}
45+
46+
@Override
47+
public void onNext(T t) {
48+
next.increment();
49+
actual.onNext(t);
50+
}
51+
52+
@Override
53+
public boolean tryOnNext(T t) {
54+
next.increment();
55+
return actual.tryOnNext(t);
56+
}
57+
58+
@Override
59+
public void onError(Throwable t) {
60+
if (compareAndSet(false, true)) {
61+
error.increment();
62+
timer.record(System.nanoTime() - start, TimeUnit.NANOSECONDS);
63+
}
64+
actual.onError(t);
65+
}
66+
67+
@Override
68+
public void onComplete() {
69+
if (compareAndSet(false, true)) {
70+
complete.increment();
71+
timer.record(System.nanoTime() - start, TimeUnit.NANOSECONDS);
72+
}
73+
actual.onComplete();
74+
}
75+
76+
@Override
77+
public void request(long n) {
78+
s.request(n);
79+
}
80+
81+
@Override
82+
public void cancel() {
83+
if (compareAndSet(false, true)) {
84+
cancelled.increment();
85+
timer.record(System.nanoTime() - start, TimeUnit.NANOSECONDS);
86+
}
87+
s.cancel();
88+
}
89+
90+
@Override
91+
public Context currentContext() {
92+
return actual.currentContext();
93+
}
94+
}
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
package io.rsocket.rpc.metrics;
2+
3+
import static reactor.core.Fuseable.ASYNC;
4+
5+
import io.micrometer.core.instrument.Counter;
6+
import io.micrometer.core.instrument.Timer;
7+
import java.util.concurrent.TimeUnit;
8+
import java.util.concurrent.atomic.AtomicBoolean;
9+
import org.reactivestreams.Subscription;
10+
import reactor.core.Fuseable;
11+
import reactor.core.Fuseable.ConditionalSubscriber;
12+
import reactor.core.Fuseable.QueueSubscription;
13+
import reactor.core.publisher.Operators;
14+
import reactor.util.annotation.Nullable;
15+
import reactor.util.context.Context;
16+
17+
public class MetricsFuseableConditionalSubscriber<T> extends AtomicBoolean
18+
implements QueueSubscription<T>, ConditionalSubscriber<T> {
19+
private final ConditionalSubscriber<? super T> actual;
20+
private final Counter next, complete, error, cancelled;
21+
private final Timer timer;
22+
23+
private QueueSubscription<T> s;
24+
private int sourceMode;
25+
26+
private long start;
27+
28+
MetricsFuseableConditionalSubscriber(
29+
ConditionalSubscriber<? super T> actual,
30+
Counter next,
31+
Counter complete,
32+
Counter error,
33+
Counter cancelled,
34+
Timer timer) {
35+
this.actual = actual;
36+
this.next = next;
37+
this.complete = complete;
38+
this.error = error;
39+
this.cancelled = cancelled;
40+
this.timer = timer;
41+
}
42+
43+
@Override
44+
@SuppressWarnings("unchecked")
45+
public void onSubscribe(Subscription s) {
46+
if (Operators.validate(this.s, s)) {
47+
this.s = (QueueSubscription<T>) s;
48+
this.start = System.nanoTime();
49+
50+
actual.onSubscribe(this);
51+
}
52+
}
53+
54+
@Override
55+
public void onNext(T t) {
56+
if (sourceMode == ASYNC) {
57+
actual.onNext(null);
58+
} else {
59+
next.increment();
60+
actual.onNext(t);
61+
}
62+
}
63+
64+
@Override
65+
public boolean tryOnNext(T t) {
66+
next.increment();
67+
return actual.tryOnNext(t);
68+
}
69+
70+
@Override
71+
public void onError(Throwable t) {
72+
if (compareAndSet(false, true)) {
73+
error.increment();
74+
timer.record(System.nanoTime() - start, TimeUnit.NANOSECONDS);
75+
}
76+
actual.onError(t);
77+
}
78+
79+
@Override
80+
public void onComplete() {
81+
if (compareAndSet(false, true)) {
82+
complete.increment();
83+
timer.record(System.nanoTime() - start, TimeUnit.NANOSECONDS);
84+
}
85+
actual.onComplete();
86+
}
87+
88+
@Override
89+
public void request(long n) {
90+
s.request(n);
91+
}
92+
93+
@Override
94+
public void cancel() {
95+
if (compareAndSet(false, true)) {
96+
cancelled.increment();
97+
timer.record(System.nanoTime() - start, TimeUnit.NANOSECONDS);
98+
}
99+
s.cancel();
100+
}
101+
102+
@Override
103+
public Context currentContext() {
104+
return actual.currentContext();
105+
}
106+
107+
@Override
108+
public int requestFusion(int requestedMode) {
109+
int m;
110+
if ((requestedMode & Fuseable.THREAD_BARRIER) != 0) {
111+
return Fuseable.NONE;
112+
} else {
113+
m = s.requestFusion(requestedMode);
114+
}
115+
sourceMode = m;
116+
return m;
117+
}
118+
119+
@Override
120+
@Nullable
121+
public T poll() {
122+
T v = s.poll();
123+
if (v != null) {
124+
next.increment();
125+
return v;
126+
}
127+
return null;
128+
}
129+
130+
@Override
131+
public boolean isEmpty() {
132+
return s.isEmpty();
133+
}
134+
135+
@Override
136+
public void clear() {
137+
s.clear();
138+
}
139+
140+
@Override
141+
public int size() {
142+
return s.size();
143+
}
144+
}

0 commit comments

Comments
 (0)