Skip to content

Commit 6cb2e47

Browse files
committed
Make the blocking client a separate API
1 parent 4ae45ab commit 6cb2e47

File tree

8 files changed

+215
-100
lines changed

8 files changed

+215
-100
lines changed
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package examples.grpc;
2+
3+
import io.vertx.core.Future;
4+
import io.vertx.core.Completable;
5+
import io.vertx.core.Handler;
6+
import io.vertx.core.net.SocketAddress;
7+
import io.vertx.grpc.client.GrpcClient;
8+
import io.vertx.grpc.client.GrpcClientRequest;
9+
import io.vertx.core.streams.ReadStream;
10+
import io.vertx.core.streams.WriteStream;
11+
import io.vertx.grpc.common.GrpcStatus;
12+
import io.vertx.grpc.common.ServiceName;
13+
import io.vertx.grpc.common.ServiceMethod;
14+
import io.vertx.grpc.common.GrpcMessageDecoder;
15+
import io.vertx.grpc.common.GrpcMessageEncoder;
16+
import java.util.stream.Stream;
17+
18+
/**
19+
* <p>A client for invoking the Greeter gRPC service.</p>
20+
*/
21+
public interface GreeterGrpcBlockingClient {
22+
23+
static GreeterGrpcBlockingClient create(GreeterGrpcClient client) {
24+
return new GreeterGrpcBlockingClientImpl(client);
25+
}
26+
27+
28+
@io.vertx.codegen.annotations.GenIgnore
29+
examples.grpc.HelloReply sayHello(examples.grpc.HelloRequest request);
30+
}
31+
32+
/**
33+
* The proxy implementation.
34+
*/
35+
class GreeterGrpcBlockingClientImpl implements GreeterGrpcBlockingClient {
36+
37+
private final GreeterGrpcClientImpl client;
38+
39+
GreeterGrpcBlockingClientImpl(Greeter client) {
40+
this.client = (GreeterGrpcClientImpl)java.util.Objects.requireNonNull(client);
41+
}
42+
43+
public examples.grpc.HelloReply sayHello(examples.grpc.HelloRequest request) {
44+
return client.sayHello(request).await();
45+
}
46+
}

vertx-grpc-docs/src/main/java/examples/grpc/GreeterGrpcClient.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,6 @@ static GreeterGrpcClient create(GrpcClient client, SocketAddress host, io.vertx.
7878
*/
7979
@io.vertx.codegen.annotations.GenIgnore(io.vertx.codegen.annotations.GenIgnore.PERMITTED_TYPE)
8080
Future<examples.grpc.HelloReply> sayHello(examples.grpc.HelloRequest request);
81-
82-
@io.vertx.codegen.annotations.GenIgnore
83-
examples.grpc.HelloReply sayHello_sync(examples.grpc.HelloRequest request);
8481
}
8582

8683
/**
@@ -119,8 +116,4 @@ public Future<examples.grpc.HelloReply> sayHello(examples.grpc.HelloRequest requ
119116
return req.response().compose(resp -> resp.last());
120117
});
121118
}
122-
123-
public examples.grpc.HelloReply sayHello_sync(examples.grpc.HelloRequest request) {
124-
return sayHello(request).await();
125-
}
126119
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package examples.grpc;
2+
3+
import io.vertx.core.Future;
4+
import io.vertx.core.Completable;
5+
import io.vertx.core.Handler;
6+
import io.vertx.core.net.SocketAddress;
7+
import io.vertx.grpc.client.GrpcClient;
8+
import io.vertx.grpc.client.GrpcClientRequest;
9+
import io.vertx.core.streams.ReadStream;
10+
import io.vertx.core.streams.WriteStream;
11+
import io.vertx.grpc.common.GrpcStatus;
12+
import io.vertx.grpc.common.ServiceName;
13+
import io.vertx.grpc.common.ServiceMethod;
14+
import io.vertx.grpc.common.GrpcMessageDecoder;
15+
import io.vertx.grpc.common.GrpcMessageEncoder;
16+
import java.util.stream.Stream;
17+
18+
/**
19+
* <p>A client for invoking the Streaming gRPC service.</p>
20+
*/
21+
public interface StreamingGrpcBlockingClient {
22+
23+
static StreamingGrpcBlockingClient create(StreamingGrpcClient client) {
24+
return new StreamingGrpcBlockingClientImpl(client);
25+
}
26+
27+
28+
@io.vertx.codegen.annotations.GenIgnore
29+
Stream<examples.grpc.Item> source(examples.grpc.Empty request);
30+
31+
@io.vertx.codegen.annotations.GenIgnore
32+
default examples.grpc.Empty sink(java.util.List<examples.grpc.Item> streamOfMessages) {
33+
return sink(streamOfMessages.iterator());
34+
}
35+
36+
@io.vertx.codegen.annotations.GenIgnore
37+
examples.grpc.Empty sink(java.util.Iterator<examples.grpc.Item> streamOfMessages);
38+
}
39+
40+
/**
41+
* The proxy implementation.
42+
*/
43+
class StreamingGrpcBlockingClientImpl implements StreamingGrpcBlockingClient {
44+
45+
private final StreamingGrpcClientImpl client;
46+
47+
StreamingGrpcBlockingClientImpl(Streaming client) {
48+
this.client = (StreamingGrpcClientImpl)java.util.Objects.requireNonNull(client);
49+
}
50+
51+
public Stream<examples.grpc.Item> source(examples.grpc.Empty request) {
52+
Stream<examples.grpc.Item> iterator = client.source_(request)
53+
.compose(req -> {
54+
req.end(request);
55+
return req.response().compose(resp -> {
56+
if (resp.status() != null && resp.status() != GrpcStatus.OK) {
57+
return Future.failedFuture("Invalid gRPC status " + resp.status());
58+
} else {
59+
return Future.succeededFuture(resp.blockingStream());
60+
}
61+
});
62+
}).await();
63+
return iterator;
64+
}
65+
66+
public examples.grpc.Empty sink(java.util.Iterator<examples.grpc.Item> request) {
67+
throw new UnsupportedOperationException();
68+
}
69+
}

vertx-grpc-docs/src/main/java/examples/grpc/StreamingGrpcClient.java

Lines changed: 0 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,6 @@ static StreamingGrpcClient create(GrpcClient client, SocketAddress host, io.vert
117117
@io.vertx.codegen.annotations.GenIgnore(io.vertx.codegen.annotations.GenIgnore.PERMITTED_TYPE)
118118
Future<ReadStream<examples.grpc.Item>> source(examples.grpc.Empty request);
119119

120-
@io.vertx.codegen.annotations.GenIgnore
121-
Stream<examples.grpc.Item> source_sync(examples.grpc.Empty request);
122-
123120
/**
124121
* Calls the Sink RPC service method.
125122
*
@@ -138,26 +135,6 @@ static StreamingGrpcClient create(GrpcClient client, SocketAddress host, io.vert
138135
@io.vertx.codegen.annotations.GenIgnore(io.vertx.codegen.annotations.GenIgnore.PERMITTED_TYPE)
139136
Future<examples.grpc.Empty> sink(ReadStream<examples.grpc.Item> streamOfMessages);
140137

141-
/**
142-
* Calls the Sink RPC service method.
143-
*
144-
* @param streamOfMessages a stream of messages to be sent to the service
145-
* @return a future of the examples.grpc.Empty response message
146-
*/
147-
@io.vertx.codegen.annotations.GenIgnore
148-
default examples.grpc.Empty sink_sync(java.util.List<examples.grpc.Item> streamOfMessages) {
149-
return sink_sync(streamOfMessages.iterator());
150-
}
151-
152-
/**
153-
* Calls the Sink RPC service method.
154-
*
155-
* @param streamOfMessages a stream of messages to be sent to the service
156-
* @return a future of the examples.grpc.Empty response message
157-
*/
158-
@io.vertx.codegen.annotations.GenIgnore
159-
examples.grpc.Empty sink_sync(java.util.Iterator<examples.grpc.Item> streamOfMessages);
160-
161138
/**
162139
* Calls the Pipe RPC service method.
163140
*
@@ -209,21 +186,6 @@ public Future<ReadStream<examples.grpc.Item>> source(examples.grpc.Empty request
209186
});
210187
}
211188

212-
public Stream<examples.grpc.Item> source_sync(examples.grpc.Empty request) {
213-
Stream<examples.grpc.Item> iterator = source_(request)
214-
.compose(req -> {
215-
req.end(request);
216-
return req.response().compose(resp -> {
217-
if (resp.status() != null && resp.status() != GrpcStatus.OK) {
218-
return Future.failedFuture("Invalid gRPC status " + resp.status());
219-
} else {
220-
return Future.succeededFuture(resp.blockingStream());
221-
}
222-
});
223-
}).await();
224-
return iterator;
225-
}
226-
227189
public Future<GrpcClientRequest<examples.grpc.Empty, examples.grpc.Item>> source_(examples.grpc.Empty request) {
228190
ServiceMethod<examples.grpc.Item, examples.grpc.Empty> serviceMethod;
229191
switch (wireFormat) {
@@ -269,10 +231,6 @@ public Future<examples.grpc.Empty> sink(ReadStream<examples.grpc.Item> request)
269231
});
270232
}
271233

272-
public examples.grpc.Empty sink_sync(java.util.Iterator<examples.grpc.Item> request) {
273-
throw new UnsupportedOperationException();
274-
}
275-
276234
public Future<ReadStream<examples.grpc.Item>> pipe(Completable<WriteStream<examples.grpc.Item>> completable) {
277235
ServiceMethod<examples.grpc.Item, examples.grpc.Item> serviceMethod;
278236
switch (wireFormat) {

vertx-grpc-it/src/test/java/io/vertx/grpc/it/ProtocPluginTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import io.grpc.examples.helloworld.HelloReply;
1717
import io.grpc.examples.helloworld.HelloRequest;
1818
import io.grpc.testing.integration.Messages;
19+
import io.grpc.testing.integration.TestServiceGrpcBlockingClient;
1920
import io.grpc.testing.integration.TestServiceGrpcClient;
2021
import io.grpc.testing.integration.TestServiceGrpcService;
2122
import io.vertx.core.Future;
@@ -30,7 +31,6 @@
3031
import io.vertx.grpc.common.GrpcStatus;
3132
import io.vertx.grpc.server.GrpcServer;
3233
import io.vertx.grpc.client.GrpcClient;
33-
import io.vertx.test.fakestream.FakeStream;
3434
import org.junit.Test;
3535

3636
import java.nio.charset.StandardCharsets;
@@ -395,11 +395,12 @@ protected void streamingOutputCall(Messages.StreamingOutputCallRequest request,
395395
// Create gRPC Client
396396
GrpcClient grpcClient = GrpcClient.client(vertx);
397397
TestServiceGrpcClient client = TestServiceGrpcClient.create(grpcClient, SocketAddress.inetSocketAddress(port, "localhost"));
398+
TestServiceGrpcBlockingClient blockingClient = TestServiceGrpcBlockingClient.create(client);
398399

399400
Messages.StreamingOutputCallRequest request = Messages.StreamingOutputCallRequest.newBuilder()
400401
.setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFrom("StreamingOutputRequest", StandardCharsets.UTF_8)).build())
401402
.build();
402-
Stream<Messages.StreamingOutputCallResponse> res = client.streamingOutputCall_sync(request);
403+
Stream<Messages.StreamingOutputCallResponse> res = blockingClient.streamingOutputCall(request);
403404
Thread.sleep(100);
404405
List<Messages.StreamingOutputCallResponse> list = new ArrayList<>();
405406
res.forEach(msg -> list.add(msg));

vertx-grpc-protoc-plugin2/src/main/java/io/vertx/grpc/plugin/VertxGrpcGeneratorImpl.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,7 @@ private List<PluginProtos.CodeGeneratorResponse.File> buildFiles(ServiceContext
328328
files.add(buildBaseFile(context));
329329
if (generateClient) {
330330
files.add(buildClientFile(context));
331+
files.add(buildBlockingClientFile(context));
331332
}
332333
if (generateServer) {
333334
files.add(buildServerFile(context));
@@ -347,6 +348,12 @@ private PluginProtos.CodeGeneratorResponse.File buildClientFile(ServiceContext c
347348
return buildFile(context, applyTemplate("client.mustache", context));
348349
}
349350

351+
private PluginProtos.CodeGeneratorResponse.File buildBlockingClientFile(ServiceContext context) {
352+
context.fileName = context.serviceName + "GrpcBlockingClient.java";
353+
context.className = context.serviceName + "GrpcBlockingClient";
354+
return buildFile(context, applyTemplate("blocking-client.mustache", context));
355+
}
356+
350357
private PluginProtos.CodeGeneratorResponse.File buildServerFile(ServiceContext context) {
351358
context.fileName = context.serviceName + "GrpcService.java";
352359
context.className = context.serviceName + "GrpcService";
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
{{#vertxPackageName}}
2+
package {{vertxPackageName}};
3+
{{/vertxPackageName}}
4+
5+
import io.vertx.core.Future;
6+
import io.vertx.core.Completable;
7+
import io.vertx.core.Handler;
8+
import io.vertx.core.net.SocketAddress;
9+
import io.vertx.grpc.client.GrpcClient;
10+
import io.vertx.grpc.client.GrpcClientRequest;
11+
import io.vertx.core.streams.ReadStream;
12+
import io.vertx.core.streams.WriteStream;
13+
import io.vertx.grpc.common.GrpcStatus;
14+
import io.vertx.grpc.common.ServiceName;
15+
import io.vertx.grpc.common.ServiceMethod;
16+
import io.vertx.grpc.common.GrpcMessageDecoder;
17+
import io.vertx.grpc.common.GrpcMessageEncoder;
18+
import java.util.stream.Stream;
19+
20+
/**
21+
* <p>A client for invoking the {{serviceName}} gRPC service.</p>
22+
*/
23+
public interface {{className}} {
24+
25+
static {{className}} create({{serviceName}}GrpcClient client) {
26+
return new {{className}}Impl(client);
27+
}
28+
29+
{{#unaryMethods}}
30+
31+
@io.vertx.codegen.annotations.GenIgnore
32+
{{outputType}} {{vertxMethodName}}({{inputType}} request);
33+
{{/unaryMethods}}
34+
{{#unaryManyMethods}}
35+
36+
@io.vertx.codegen.annotations.GenIgnore
37+
Stream<{{outputType}}> {{vertxMethodName}}({{inputType}} request);
38+
{{/unaryManyMethods}}
39+
{{#manyUnaryMethods}}
40+
41+
@io.vertx.codegen.annotations.GenIgnore
42+
default {{outputType}} {{vertxMethodName}}(java.util.List<{{inputType}}> streamOfMessages) {
43+
return {{vertxMethodName}}(streamOfMessages.iterator());
44+
}
45+
46+
@io.vertx.codegen.annotations.GenIgnore
47+
{{outputType}} {{vertxMethodName}}(java.util.Iterator<{{inputType}}> streamOfMessages);
48+
{{/manyUnaryMethods}}
49+
}
50+
51+
/**
52+
* The proxy implementation.
53+
*/
54+
class {{className}}Impl implements {{className}} {
55+
56+
private final {{serviceName}}GrpcClientImpl client;
57+
58+
{{className}}Impl({{serviceName}} client) {
59+
this.client = ({{serviceName}}GrpcClientImpl)java.util.Objects.requireNonNull(client);
60+
}
61+
{{#unaryMethods}}
62+
63+
public {{outputType}} {{vertxMethodName}}({{inputType}} request) {
64+
return client.{{vertxMethodName}}(request).await();
65+
}
66+
{{/unaryMethods}}
67+
{{#unaryManyMethods}}
68+
69+
public Stream<{{outputType}}> {{vertxMethodName}}({{inputType}} request) {
70+
Stream<{{outputType}}> iterator = client.{{vertxMethodName}}_(request)
71+
.compose(req -> {
72+
req.end(request);
73+
return req.response().compose(resp -> {
74+
if (resp.status() != null && resp.status() != GrpcStatus.OK) {
75+
return Future.failedFuture("Invalid gRPC status " + resp.status());
76+
} else {
77+
return Future.succeededFuture(resp.blockingStream());
78+
}
79+
});
80+
}).await();
81+
return iterator;
82+
}
83+
{{/unaryManyMethods}}
84+
{{#manyUnaryMethods}}
85+
86+
public {{outputType}} {{vertxMethodName}}(java.util.Iterator<{{inputType}}> request) {
87+
throw new UnsupportedOperationException();
88+
}
89+
{{/manyUnaryMethods}}
90+
}

0 commit comments

Comments
 (0)