目录
1.2、一元 RPC(代理方式一:阻塞式 BlockingStub)
1.4、服务端流式 RPC(代理方式一:阻塞式 BlockingStub)
1.8、一元 RPC 扩展(代理方式三:FutureStub 异步/同步 式)
gRPC 项目结构主要分成三个 Module:
可以看出,由于 api 模块既提供了 service 的接口,有提供了 client 的 stub,因此创建完三个?module 之后,client 和 service 中都需要引入 api 模块.
a)api 模块可以通过 Maven 插件,编译 protobuf 文件,生成 Java 代码,并把他放在我们配置的位置.? 那么首先要去配置 pom.xml 文件.
以下配置来自官网:GitHub - grpc/grpc-java: The Java gRPC implementation. HTTP/2 based RPC
依赖如下:
<dependencies>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>1.60.0</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>1.60.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>1.60.0</version>
</dependency>
<dependency> <!-- necessary for Java 9+ -->
<groupId>org.apache.tomcat</groupId>
<artifactId>annotations-api</artifactId>
<version>6.0.53</version>
<scope>provided</scope>
</dependency>
</dependencies>
构建插件如下:
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.7.1</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.24.0:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.60.0:exe:${os.detected.classifier}</pluginArtifact>
<!-- 输出目录 -->
<outputDirectory>${basedir}/src/main/java</outputDirectory>
<!-- 每次执行命令时不清空之前生成的代码(追加的方式) -->
<clearOutputDirectory>false</clearOutputDirectory>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>17</source>
<target>17</target>
</configuration>
</plugin>
</plugins>
</build>
Ps:上述代码中注释涉及到的内容需要自己配置(官网没有配置)。
complie 命令就是通过 protoc 命令将 message 转化成实体数据.
complie-custom 命令就是用来生成服务接口 service 的.
a)为了简便开发,建议大家下载以下插件,可以自定义命令,也就是说可以把上述多个命令打包成一个命令.
生成目录对应关系如下:
b)如果不满意配置也可以从这里删除
当 client 发起调用以后,提交数据,机会阻塞等待服务端响应。
Ps:实际的开发中,95% 的应用场景都是一元 RPC 这种通信方式.
syntax = "proto3";
option java_multiple_files = false;
option java_package = "com.cyk";
option java_outer_classname = "HelloProto";
message HelloRequest {
string name = 1;
}
message HelloResponse {
string result = 1;
}
service HelloService {
rpc hello(HelloRequest) returns(HelloResponse) {};
}
Ps:不要忘记再次通过 maven 插件生成代码!
a)继承 HelloServiceGrpc,实现自定义的 hello 方法.
public class HelloServiceImpl extends HelloServiceGrpc.HelloServiceImplBase {
public void hello(HelloProto.HelloRequest request, StreamObserver<HelloProto.HelloResponse> responseObserver) {
//1.接收 client 的请求参数
String name = request.getName();
//2.业务处理
System.out.println("name: " + name);
//3.封装响应
HelloProto.HelloResponse response = HelloProto.HelloResponse
.newBuilder()
.setResult("ok!") //填充数据
.build();
//通过这个方法,把响应消息回传给 client
responseObserver.onNext(response);
//通知 client,整个服务结束(底层返回一个标记,client 就能监听到)
responseObserver.onCompleted();
}
}
b)服务端绑定端口、发布服务、创建服务对象,启动服务器
public class GrpcServer1 {
public static void main(String[] args) throws IOException, InterruptedException {
//1.绑定端口
ServerBuilder serverBuilder = ServerBuilder.forPort(9000);
//2.发布服务
serverBuilder.addService(new HelloServiceImpl());
//3.创建服务对象
Server server = serverBuilder.build();
server.start();
server.awaitTermination();
}
}
public class Client1 {
public static void main(String[] args) {
//1.创建通信管道
ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext().build();
//2.创建代理对象 stub
try {
HelloServiceGrpc.HelloServiceBlockingStub helloService = HelloServiceGrpc.newBlockingStub(managedChannel);
//3.完成 RPC 调用
//3.1 准备参数
HelloProto.HelloRequest request = HelloProto.HelloRequest
.newBuilder()
.setName("cyk")
.build();
//3.2 进行 rpc 调用
HelloProto.HelloResponse response = helloService.hello(request);
System.out.println("response: " + response);
} catch (Exception e) {
e.printStackTrace();
} finally {
managedChannel.shutdown();
}
}
}
syntax = "proto3";
option java_multiple_files = false;
option java_package = "com.cyk";
option java_outer_classname = "HelloProto";
message HelloRequest {
string name = 1;
}
message HelloResponse {
string result = 1;
}
message HelloListRequest {
repeated string name = 1;
}
service HelloService {
rpc hello(HelloRequest) returns(HelloResponse) {};
rpc helloList(HelloListRequest) returns(HelloResponse) {};
}
Ps:不要忘记再次通过 maven 插件生成代码!
@Override
public void helloList(HelloProto.HelloListRequest request, StreamObserver<HelloProto.HelloResponse> responseObserver) {
//1.获取 client 的请求参数
ProtocolStringList nameList = request.getNameList();
//2.业务处理
for(String name : nameList) {
System.out.println("name: " + name);
}
//3.封装响应
HelloProto.HelloResponse response = HelloProto.HelloResponse
.newBuilder()
.setResult("ok!")
.build();
//通过这个方法,把响应消息回传给 client
responseObserver.onNext(response);
//通知 client,整个服务结束(底层返回一个标记,client 就能监听到)
responseObserver.onCompleted();
}
public class Client2 {
public static void main(String[] args) {
//1.创建通信管道
ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext().build();
//2.创建代理对象 stub
try {
HelloServiceGrpc.HelloServiceBlockingStub helloService = HelloServiceGrpc.newBlockingStub(managedChannel);
//3.完成 RPC 调用
//3.1 准备参数
HelloProto.HelloListRequest request = HelloProto.HelloListRequest
.newBuilder()
.addName("cyk1")
.addName("cyk2")
.addName("cyk3")
.addName("cyk4")
.build();
//3.2 进行 rpc 调用
HelloProto.HelloResponse response = helloService.helloList(request);
System.out.println("response: " + response);
} catch (Exception e) {
e.printStackTrace();
} finally {
managedChannel.shutdown();
}
}
}
客户端发送一个请求对象,服务端可以在未来多个不同的时刻返回不同的响应对象.
例如,你去投一个股票,一旦股票有变化,就会给你返回结果.
service HelloService {
//一元 RPC
rpc hello1(HelloRequest) returns(HelloResponse) {};
//服务端流式 RPC
rpc hello2(HelloRequest) returns(stream HelloResponse) {};
}
message HelloRequest {
string name = 1;
}
message HelloResponse {
string result = 1;
}
Ps:不要忘记再次通过 maven 插件生成代码!
服务端通过 sleep 模拟在接受到请求之后,每秒返回一个响应(实际的开发中,一般不会是固定的间隔的时间).
@Override
public void hello2(HelloProto.HelloRequest request, StreamObserver<HelloProto.HelloResponse> responseObserver) {
//1.获取请求参数
String name = request.getName();
//2.进行业务处理
System.out.println("name: " + name);
//3.封装响应
for(int i = 1; i <= 10; i++) {
HelloProto.HelloResponse response = HelloProto.HelloResponse
.newBuilder()
.setResult("ok~ - " + i)
.build();
//返回响应
responseObserver.onNext(response);
//模拟每秒发送一个数据
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
//结束
responseObserver.onCompleted();
}
客户端远程调用后,会返回一个迭代器(收到服务端 onCompleted 标志),这个迭代器中就包含了服务端发送 onCompleted 标志前,不同时刻返回的响应.
public class Client2 {
public static void main(String[] args) {
//1.创建通信通道
ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext().build();
try {
//2.获取代理对象
HelloServiceGrpc.HelloServiceBlockingStub helloService = HelloServiceGrpc.newBlockingStub(managedChannel);
//3.准备参数
HelloProto.HelloRequest request = HelloProto.HelloRequest
.newBuilder()
.setName("cyk")
.build();
//4.rpc调用
//此时获取到的是一个迭代器
Iterator<HelloProto.HelloResponse> helloResponseIterator = helloService.hello2(request);
while(helloResponseIterator.hasNext()) {
String result = helloResponseIterator.next().getResult();
System.out.println("result: " + result);
}
System.out.println("end!");
} catch (Exception e) {
e.printStackTrace();
} finally {
managedChannel.shutdownNow();
}
}
}
由于这里采用的是 阻塞式 服务端流RPC ,因此在服务端返回 omCompleted 标志之前,客户端会阻塞在 hasNext() 这里.? ?客户端运行结果如下:
api 和 server 都不用变,只有 client 需要修改,如下:
可以看到,在获取 gRPC 代理对象时,有三种方式,其中 newStub 就是异步方式,newBlockingStub 就是同步(阻塞) 的方式,newFutrueStub 即可以同步,也可以异步(几乎不用最后这种方式).
因此这里就是使用 newStub 的方式创建代理对象.
public class Client2 {
public static void main(String[] args) {
//1.创建通信通道
ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext().build();
try {
//2.获取代理对象(异步式)
HelloServiceGrpc.HelloServiceStub helloServiceStub = HelloServiceGrpc.newStub(managedChannel);
//3.准备参数
HelloProto.HelloRequest request = HelloProto.HelloRequest
.newBuilder()
.setName("cyk")
.build();
//4.rpc调用(不会阻塞在这里,会继续执行后面的逻辑)
helloServiceStub.hello2(request, new StreamObserver<HelloProto.HelloResponse>() {
/**
* 服务端每调用一次 onNext,都会触发该方法(实现异步的本质)
* @param helloResponse
*/
@Override
public void onNext(HelloProto.HelloResponse helloResponse) {
System.out.println("收到服务端响应: " + helloResponse.getResult());
}
/**
* 服务端抛出异常时,触发该方法.
* @param throwable
*/
@Override
public void onError(Throwable throwable) {
System.out.println("服务端执行出错!msg: " + throwable.getMessage());
}
/**
* 服务端调用 onCompleted 方法,就会触发该方法.
*/
@Override
public void onCompleted() {
System.out.println("服务端所有信息发送完毕!");
}
});
System.out.println("end!"); //因为不会在前面阻塞住,因此就会直接执行到这里(异步)
//不设置等待时间,会导致服务端还没来得及反应就结束了
managedChannel.awaitTermination(12, TimeUnit.SECONDS);
} catch (Exception e) {
e.printStackTrace();
} finally {
managedChannel.shutdownNow();
}
}
}
客户端执行结果如下:?
客户端在不同时间发送多个请求,服务端只返回一个结果.
service HelloService {
//一元 RPC
rpc hello1(HelloRequest) returns(HelloResponse) {};
//服务端流式 RPC
rpc hello2(HelloRequest) returns(stream HelloResponse) {};
//客户端流式 RPC
rpc hello3(stream HelloRequest) returns(HelloResponse) {};
}
message HelloRequest {
string name = 1;
}
message HelloResponse {
string result = 1;
}
public StreamObserver<HelloProto.HelloRequest> hello3(StreamObserver<HelloProto.HelloResponse> responseObserver) {
return new StreamObserver<HelloProto.HelloRequest>() {
@Override
public void onNext(HelloProto.HelloRequest helloRequest) {
System.out.println("收到 client 请求: " + helloRequest.getName());
}
@Override
public void onError(Throwable throwable) {
System.out.println("客户端异常: " + throwable.getMessage());
}
@Override
public void onCompleted() {
//1.构造响应
HelloProto.HelloResponse response = HelloProto.HelloResponse
.newBuilder()
.setResult("ok!")
.build();
//2.返回响应
responseObserver.onNext(response);
responseObserver.onCompleted();
}
};
}
public class Client3 {
public static void main(String[] args) {
//1.创建通信通道
ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext().build();
try {
//2.获取代理对象(异步式)
HelloServiceGrpc.HelloServiceStub helloServiceStub = HelloServiceGrpc.newStub(managedChannel);
//3.rpc调用(不会阻塞在这里,会继续执行后面的逻辑)
StreamObserver<HelloProto.HelloRequest> helloRequestStreamObserver = helloServiceStub.hello3(new StreamObserver<HelloProto.HelloResponse>() {
@Override
public void onNext(HelloProto.HelloResponse helloResponse) {
System.out.println("收到服务端响应: " + helloResponse.getResult());
}
@Override
public void onError(Throwable throwable) {
System.out.println("服务端响应异常! msg:" + throwable.getMessage());
}
@Override
public void onCompleted() {
System.out.println("服务端响应结束!");
}
});
//4.客户端发送数据到服务端
for(int i = 1; i <= 10; i++) {
//4.1 准备参数
HelloProto.HelloRequest request = HelloProto.HelloRequest
.newBuilder()
.setName("cyk" + i)
.build();
//4.2 发送数据
helloRequestStreamObserver.onNext(request);
//4.3 不同时刻发送数据
Thread.sleep(1000);
}
System.out.println("end!"); //因为不会在前面阻塞住,因此就会直接执行到这里(异步)
//5.结束响应
helloRequestStreamObserver.onCompleted();
managedChannel.awaitTermination(12, TimeUnit.SECONDS);
} catch (Exception e) {
e.printStackTrace();
} finally {
managedChannel.shutdownNow();
}
}
}
客户端在不同时刻可以发送多个请求,服务端也可以在接受到不同时刻的请求时进行响应.
最典型的例子就是,QQ 聊天、微信聊天这种.
syntax = "proto3";
option java_multiple_files = false;
option java_package = "org.cyk";
option java_outer_classname = "HelloProto";
service HelloService {
//一元 RPC
rpc hello1(HelloRequest) returns(HelloResponse) {};
//服务端流式 RPC
rpc hello2(HelloRequest) returns(stream HelloResponse) {};
//客户端流式 RPC
rpc hello3(stream HelloRequest) returns(HelloResponse) {};
//双向流式 RPC
rpc hello4(stream HelloRequest) returns(stream HelloResponse) {};
}
message HelloRequest {
string name = 1;
}
message HelloResponse {
string result = 1;
}
@Override
public StreamObserver<HelloProto.HelloRequest> hello4(StreamObserver<HelloProto.HelloResponse> responseObserver) {
return new StreamObserver<HelloProto.HelloRequest>() {
@Override
public void onNext(HelloProto.HelloRequest helloRequest) {
//处理客户端请求
System.out.println("收到客户端请求: " + helloRequest.getName());
//返回响应
responseObserver.onNext(
HelloProto.HelloResponse
.newBuilder()
.setResult("ok~")
.build()
);
}
@Override
public void onError(Throwable throwable) {
System.out.println("客户端出错! msg:" + throwable.getMessage());
}
@Override
public void onCompleted() {
//处理客户端结束
System.out.println("客户端请求结束!");
//服务端返回结束标志
responseObserver.onCompleted();
}
};
}
public class Client4 {
public static void main(String[] args) {
//1.创建通信通道
ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext().build();
try {
//2.获取代理对象(异步式)
HelloServiceGrpc.HelloServiceStub helloServiceStub = HelloServiceGrpc.newStub(managedChannel);
//3.rpc调用(不会阻塞在这里,会继续执行后面的逻辑)
StreamObserver<HelloProto.HelloRequest> helloRequestStreamObserver = helloServiceStub.hello4(new StreamObserver<HelloProto.HelloResponse>() {
@Override
public void onNext(HelloProto.HelloResponse helloResponse) {
System.out.println("收到服务端响应: " + helloResponse.getResult());
}
@Override
public void onError(Throwable throwable) {
System.out.println("服务端响应异常! msg:" + throwable.getMessage());
}
@Override
public void onCompleted() {
System.out.println("服务端响应结束!");
}
});
//4.客户端发送数据到服务端
for(int i = 1; i <= 10; i++) {
//4.1 准备参数
HelloProto.HelloRequest request = HelloProto.HelloRequest
.newBuilder()
.setName("cyk" + i)
.build();
//4.2 发送数据
helloRequestStreamObserver.onNext(request);
//4.3 不同时刻发送数据
Thread.sleep(1000);
}
System.out.println("end!"); //因为不会在前面阻塞住,因此就会直接执行到这里(异步)
//5.结束响应
helloRequestStreamObserver.onCompleted();
managedChannel.awaitTermination(12, TimeUnit.SECONDS);
} catch (Exception e) {
e.printStackTrace();
} finally {
managedChannel.shutdownNow();
}
}
}
FutureStub 只能用于一元 RPC,既可以实现同步式,也可以实现异步式.
syntax = "proto3";
option java_multiple_files = false;
option java_package = "org.cyk";
option java_outer_classname = "FutureProto";
service FutureService {
rpc future(FutureRequest) returns(FutureResponse) {};
}
message FutureRequest {
string name = 1;
}
message FutureResponse {
string data = 1;
}
public class FutureServiceImpl extends FutureServiceGrpc.FutureServiceImplBase {
@Override
public void future(FutureProto.FutureRequest request, StreamObserver<FutureProto.FutureResponse> responseObserver) {
//1.接受客户端请求
String name = request.getName();
//2.业务处理
System.out.println("name: " + name);
//3.构造响应
FutureProto.FutureResponse response = FutureProto.FutureResponse
.newBuilder()
.setData("ok!")
.build();
//4.返回响应和标记
responseObserver.onNext(response);
responseObserver.onCompleted();
}
}
这里另起了一个服务Impl,别忘了发布服务.
public class GrpcServer1 {
public static void main(String[] args) throws IOException, InterruptedException {
//1.绑定端口号
ServerBuilder serverBuilder = ServerBuilder.forPort(9000);
//2.发布服务
serverBuilder.addService(new HelloServiceImpl());
serverBuilder.addService(new FutureServiceImpl());
//3.创建服务对象
Server server = serverBuilder.build();
//4.启动服务
server.start();
server.awaitTermination();
}
}
public class Client5 {
public static void main(String[] args) {
//1.创建通信通道
ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext().build();
try {
//2.获取代理对象
FutureServiceGrpc.FutureServiceFutureStub futureServiceFutureStub = FutureServiceGrpc.newFutureStub(managedChannel);
//3.准备参数
FutureProto.FutureRequest request = FutureProto.FutureRequest
.newBuilder()
.setName("cyk")
.build();
//4.rpc调用
ListenableFuture<FutureProto.FutureResponse> response = futureServiceFutureStub.future(request);
System.out.println("result: " + response.get().getData());
} catch (Exception e) {
e.printStackTrace();
} finally {
managedChannel.shutdownNow();
}
}
}
public class Client5 {
public static void main(String[] args) {
//1.创建通信通道
ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext().build();
try {
//2.获取代理对象
FutureServiceGrpc.FutureServiceFutureStub futureServiceFutureStub = FutureServiceGrpc.newFutureStub(managedChannel);
ListenableFuture<FutureProto.FutureResponse> response = futureServiceFutureStub.future(
FutureProto.FutureRequest.newBuilder().setName("cyk").build()
);
//3.rpc调用
Futures.addCallback(response, new FutureCallback<FutureProto.FutureResponse>() {
@Override
public void onSuccess(FutureProto.FutureResponse result) {
System.out.println("收到服务器异步响应:" + result);
}
@Override
public void onFailure(Throwable t) {
System.out.println(t.getMessage());
}
}, Executors.newCachedThreadPool());
System.out.println("前面的操作不会阻塞,会直接执行到这里~");
managedChannel.awaitTermination(12, TimeUnit.SECONDS);
} catch (Exception e) {
e.printStackTrace();
} finally {
managedChannel.shutdownNow();
}
}
}