Skip to content

Commit f40acd3

Browse files
authored
Merge pull request #33 from rsocket/feature/overrideError
Override Error
2 parents 3621f99 + 25c3695 commit f40acd3

File tree

4 files changed

+14
-6
lines changed

4 files changed

+14
-6
lines changed

rsocket-rpc-core/src/main/java/io/rsocket/rpc/AbstractRSocketService.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import io.rsocket.AbstractRSocket;
44
import io.rsocket.Payload;
5-
import org.reactivestreams.Publisher;
65
import reactor.core.publisher.Flux;
76

87
public abstract class AbstractRSocketService extends AbstractRSocket implements RSocketRpcService {
@@ -12,7 +11,7 @@ public String getService() {
1211
}
1312

1413
@Override
15-
public Flux<Payload> requestChannel(Payload payload, Publisher<Payload> publisher) {
14+
public Flux<Payload> requestChannel(Payload payload, Flux<Payload> publisher) {
1615
return Flux.error(new UnsupportedOperationException("Request-Channel not implemented."));
1716
}
1817

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,16 @@
11
package io.rsocket.rpc;
22

3+
import io.rsocket.Payload;
34
import io.rsocket.ResponderRSocket;
5+
import org.reactivestreams.Publisher;
6+
import reactor.core.publisher.Flux;
47

58
public interface RSocketRpcService extends ResponderRSocket {
69
String getService();
10+
11+
Flux<Payload> requestChannel(Payload payload, Flux<Payload> publisher);
12+
13+
default Flux<Payload> requestChannel(Payload payload, Publisher<Payload> payloads) {
14+
return requestChannel(payload, Flux.from(payloads));
15+
}
716
}

rsocket-rpc-protobuf/src/java_plugin/cpp/blocking_java_generator.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -924,7 +924,7 @@ static void PrintServer(const ServiceDescriptor* service,
924924
p->Print(
925925
*vars,
926926
"@$Override$\n"
927-
"public $Flux$<$Payload$> requestChannel($Payload$ payload, $Publisher$<$Payload$> publisher) {\n");
927+
"public $Flux$<$Payload$> requestChannel($Payload$ payload, $Flux$<$Payload$> publisher) {\n");
928928
p->Indent();
929929
if (request_channel.empty()) {
930930
p->Print(
@@ -956,7 +956,7 @@ static void PrintServer(const ServiceDescriptor* service,
956956
p->Indent();
957957
p->Print(
958958
*vars,
959-
"$Flux$.from(publisher).map(deserializer($input_type$.parser()));\n");
959+
"publisher.map(deserializer($input_type$.parser()));\n");
960960
p->Outdent();
961961
if (method->server_streaming()) {
962962
p->Print(

rsocket-rpc-protobuf/src/java_plugin/cpp/java_generator.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1311,7 +1311,7 @@ static void PrintServer(const ServiceDescriptor* service,
13111311
p->Print(
13121312
*vars,
13131313
"@$Override$\n"
1314-
"public $Flux$<$Payload$> requestChannel($Payload$ payload, $Publisher$<$Payload$> publisher) {\n");
1314+
"public $Flux$<$Payload$> requestChannel($Payload$ payload, $Flux$<$Payload$> publisher) {\n");
13151315
p->Indent();
13161316
if (request_channel.empty()) {
13171317
p->Print(
@@ -1344,7 +1344,7 @@ static void PrintServer(const ServiceDescriptor* service,
13441344
p->Indent();
13451345
p->Print(
13461346
*vars,
1347-
"$Flux$.from(publisher).map(deserializer($input_type$.parser()));\n");
1347+
"publisher.map(deserializer($input_type$.parser()));\n");
13481348
p->Outdent();
13491349
if (method->server_streaming()) {
13501350
p->Print(

0 commit comments

Comments
 (0)