【橘子分布式】gRPC(编程篇-中)
一、简介
我们之前已经完成了对于api模块的开发,也就是已经生成了基础的类和对应的接口,现在我们需要完成的是client和server端的开发。其实如同thrift一样,现在要做的就是实现我们之前定义的service里面的hello方法,里面写我们的业务逻辑,然后通过grpc的server发布暴露出去给客户端使用。
// 定义服务
service HelloService{/* 简单rpc,参数为HelloRequest类型,返回类型为HelloResponse */rpc hello(HelloRequest) returns (HelloResponse){}
}
ok,我们就先来实现serve模块,然后再实现client模块。
二、server模块
我们创建一个新的模块叫做rpc-grpc-service,并且在其pom依赖中引入api这个公共模块。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>com.levi</groupId><artifactId>rpc</artifactId><version>1.0-SNAPSHOT</version></parent><artifactId>rpc-grpc-service</artifactId><properties><maven.compiler.source>11</maven.compiler.source><maven.compiler.target>11</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>com.levi</groupId><artifactId>rpc-grpc-api</artifactId><version>1.0-SNAPSHOT</version></dependency></dependencies></project>
1、业务实现
ok,我们前面说过了,我们其实要实现的业务接口或者说重写的接口实现方法是HelloServiceGrpc.HelloServiceImplBase这个Base的内部类,在这个里面我们覆盖我们的业务方法。
我们重新来回顾一下我们的这个接口和方法。
// 定义请求接口参数
message HelloRequest{string name = 1;
}// 定义接口响应参数
message HelloResponse{string result = 1;
}// 定义服务
service HelloService{/* 简单rpc,参数为HelloRequest类型,返回类型为HelloResponse */rpc hello(HelloRequest) returns (HelloResponse){}
}
我们看到我们的请求类里面是一个参数name,响应类里面是一个参数result,并且接口的方法叫做hello。ok,我们就来实现覆盖这个方法。
/ 服务端实现类
public class HelloServiceImpl extends HelloServiceGrpc.HelloServiceImplBase {/*1. 接受client提交的参数 request.getParameter()2. 业务处理 service+dao 调用对应的业务功能。3. 提供返回值*/@Overridepublic void hello(HelloProto.HelloRequest request, StreamObserver<HelloProto.HelloResponse> responseObserver) {}
}
因为HelloServiceGrpc.HelloServiceImplBase不是一个接口,所以我们要继承然后覆盖方法。并且我们看到这个方法和我们当初定义的略有不同
第一个参数没毛病就是HelloProto.HelloRequest,但是这个方法没有返回值,他是个void。这就是grpc的规范,他的返回是通过第二个参数
StreamObserver<HelloProto.HelloResponse> responseObserver来给客户端返回的,因为grpc有流式的返回,所以它是通过这个返回的,
如果弄成返回值就不方便以流的形式不断的推给客户端了。而且responseObserver的泛型就是我们定义的返回类型HelloProto.HelloResponse。
于是我们就来实现这个方法。
/ 服务端实现类
public class HelloServiceImpl extends HelloServiceGrpc.HelloServiceImplBase {/*1. 接受client提交的参数 request.getParameter()2. 业务处理 service+dao 调用对应的业务功能。3. 提供返回值*/@Overridepublic void hello(HelloProto.HelloRequest request, StreamObserver<HelloProto.HelloResponse> responseObserver) {//1.接受client的请求参数,获取我们定义的那个属性nameString name = request.getName();//2.业务处理System.out.println("name parameter "+name);//3.封装响应//3.1 创建相应对象的构造者HelloProto.HelloResponse.Builder builder = HelloProto.HelloResponse.newBuilder();//3.2 填充数据,填充返回值内容builder.setResult("hello method invoke ok");//3.3 封装响应HelloProto.HelloResponse helloResponse = builder.build();// 4. 响应clientresponseObserver.onNext(helloResponse);// 5. 响应完成responseObserver.onCompleted();}
}
2、服务发布
现在我们实现了我们的业务,我们就要把这个服务接口发布出去给客户端做rpc调用。
我们定义一个服务类,然后实现server,并且暴露端口。
package com.levi;import com.levi.service.HelloServiceImpl;
import io.grpc.Server;
import io.grpc.ServerBuilder;import java.io.IOException;public class GrpcServer1 {public static void main(String[] args) throws IOException, InterruptedException {//1. 绑定端口ServerBuilder<?> serverBuilder = ServerBuilder.forPort(9000);//2. 发布服务,这里可能会发布很多业务,我们这里就是一个HelloServiceImpl,实际可能还会有别的业务serverBuilder.addService(new HelloServiceImpl());//serverBuilder.addService(new UserServiceImpl());//3. 创建服务对象Server server = serverBuilder.build();// 启动服务server.start();// 阻塞等待客户端的连接访问,底层其实就是nettyserver.awaitTermination();;}
}
此时我们就暴露出去我们的一个helo的业务实现了。
三、client模块
我们创建一个名为rpc-grpc-client的模块,并且引入api公共模块。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>com.levi</groupId><artifactId>rpc</artifactId><version>1.0-SNAPSHOT</version></parent><artifactId>rpc-grpc-client</artifactId><properties><maven.compiler.source>11</maven.compiler.source><maven.compiler.target>11</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>com.levi</groupId><artifactId>rpc-grpc-api</artifactId><version>1.0-SNAPSHOT</version></dependency></dependencies></project>
我们现在已经把服务端的东西暴露在了9000这个端口,现在就可以在客户端通过grpc的stub代理来访问了。
package com.levi;import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;// client通过代理对象完成远端对象的调用
public class GrpcClient1 {public static void main(String[] args) {//1.创建通信的管道,usePlaintext以普通文本进行访问ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext().build();//2.获得代理对象 stub进行调用try {// 我们这里以阻塞的形式调用,也就是一直等返回值回来才往下走,其实这里就是获取的rpc调用的代理类,grpc给我们提供的就是stub这个,本质是一个东西HelloServiceGrpc.HelloServiceBlockingStub helloService = HelloServiceGrpc.newBlockingStub(managedChannel);//3. 完成RPC调用//3.1 准备参数HelloProto.HelloRequest.Builder builder = HelloProto.HelloRequest.newBuilder();builder.setName("hello");HelloProto.HelloRequest helloRequest = builder.build();//3.1 进行功能rpc调用,获取相应的内容,像本地调用那样调用远程服务HelloProto.HelloResponse helloResponse = helloService.hello(helloRequest);String result = helloResponse.getResult();System.out.println("result = " + result);} catch (Exception e) {throw new RuntimeException(e);}finally {// 4. 关闭通道managedChannel.shutdown();}}
}
成功返回。
四、多值传递
我们之前在学习proto语法的时候提到过一个关键字repeated关键字,我们当时说被这个关键字修饰的属性是一个集合类型的字段,grpc会为它生成集合类型的get set方法,我们来做一个测试。
1、proto编写
我们重新定义一个proto文件的message和方法。
// 定义proto文件版本号
syntax = "proto3";// 生成一个java类即可
option java_multiple_files = false;
// 生成的java类的包名
option java_package = "com.levi";
// 外部类,这里就是HelloProto,实际开发你可以有多个proto管理不同业务类,然后各自的外部类都可以。比如OrderService就是Order.proto 外部类就是OrderProto
option java_outer_classname = "HelloProto";// 定义请求接口参数
message HelloRequest{string name = 1;
}// 定义接口响应参数
message HelloResponse{string result = 1;
}message ManyHelloRequest{repeated string names = 1;
}message ManyHelloResponse{repeated string result = 1;
}// 定义服务
service HelloService{/* 简单rpc,参数为HelloRequest类型,返回类型为HelloResponse */rpc hello(HelloRequest) returns (HelloResponse){}/* 服务端流式rpc,参数为ManyHelloRequest类型,返回类型为ManyHelloResponse */rpc manyHello(ManyHelloRequest) returns (ManyHelloResponse){}
}
我们在原来的基础上添加
message ManyHelloRequest{repeated string names = 1;
}message ManyHelloResponse{repeated string result = 1;
}
和一个
rpc manyHello(ManyHelloRequest) returns (ManyHelloResponse){}
旨在请求多个名字,返回也是多个。我们来编译结果。
我们看到message里面没问题,生成了我们要的。当然之前的那个也保留了。
相应的service里面的也没问题。我们就来修改一下我们的实现。当然为了保险最好刷新一下server和client的pom,重新引入一下api模块,idea有时候会抽风。
2、server端改写
package com.levi.service;import com.google.protobuf.ProtocolStringList;
import com.levi.HelloProto;
import com.levi.HelloServiceGrpc;
import io.grpc.stub.StreamObserver;
import org.slf4j.LoggerFactory;import java.util.ArrayList;
import java.util.List;// 服务端实现类
public class HelloServiceImpl extends HelloServiceGrpc.HelloServiceImplBase {private static final String RES_PREFIX = "server#";@Overridepublic void manyHello(HelloProto.ManyHelloRequest request, StreamObserver<HelloProto.ManyHelloResponse> responseObserver) {//1.接受client的请求参数,我们看到此时就是一个nameList的集合了,因为它被repeated修饰了,当然他的类型是ProtocolStringList,是grpc自己的类型ProtocolStringList requestNamesList = request.getNamesList();//2.业务处理System.out.println("请求参数为:" + requestNamesList);// 给返回值的name都加一个前缀List<String> responseNamesList = new ArrayList<>();for (String requestName : requestNamesList) {responseNamesList.add(RES_PREFIX + requestName);}//3.封装响应//3.1 创建相应对象的构造者HelloProto.ManyHelloResponse.Builder builder = HelloProto.ManyHelloResponse.newBuilder();//3.2 填充数据,多个值要通过addAllResult,或者是下标的方式添加builder.addAllResult(responseNamesList);
// for (int i = 0; i < requestNamesList.size(); i++) {
// builder.setResult(i, requestNamesList.get(i));
// }//3.3 封装响应HelloProto.ManyHelloResponse helloResponse = builder.build();// 4. 响应clientresponseObserver.onNext(helloResponse);// 5. 响应完成responseObserver.onCompleted();}/*1. 接受client提交的参数 request.getParameter()2. 业务处理 service+dao 调用对应的业务功能。3. 提供返回值*/@Overridepublic void hello(HelloProto.HelloRequest request, StreamObserver<HelloProto.HelloResponse> responseObserver) {//1.接受client的请求参数String name = request.getName();//2.业务处理System.out.println("name parameter "+name);//3.封装响应//3.1 创建相应对象的构造者HelloProto.HelloResponse.Builder builder = HelloProto.HelloResponse.newBuilder();//3.2 填充数据builder.setResult("hello method invoke ok");//3.3 封装响应HelloProto.HelloResponse helloResponse = builder.build();// 4. 响应clientresponseObserver.onNext(helloResponse);// 5. 响应完成responseObserver.onCompleted();}
}
然后服务端不用改,还是暴露注册出去HelloServiceImpl。
package com.levi;import com.levi.service.HelloServiceImpl;
import io.grpc.Server;
import io.grpc.ServerBuilder;import java.io.IOException;public class GrpcServer1 {public static void main(String[] args) throws IOException, InterruptedException {//1. 绑定端口ServerBuilder<?> serverBuilder = ServerBuilder.forPort(9000);//2. 发布服务serverBuilder.addService(new HelloServiceImpl());//serverBuilder.addService(new UserServiceImpl());//3. 创建服务对象Server server = serverBuilder.build();// 启动服务server.start();// 阻塞等待server.awaitTermination();;}
}
此时我们需要来修改客户端代码。
3、cilent端改写
package com.levi;import com.google.protobuf.ProtocolStringList;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;import java.util.List;// client通过代理对象完成远端对象的调用
public class GrpcClient2 {public static void main(String[] args) {//1.创建通信的管道ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext().build();//2.获得代理对象 stub进行调用try {// 我们这里以阻塞的形式调用,也就是一直等返回值回来才往下走HelloServiceGrpc.HelloServiceBlockingStub helloService = HelloServiceGrpc.newBlockingStub(managedChannel);//3. 完成RPC调用//3.1 准备参数HelloProto.ManyHelloRequest.Builder builder = HelloProto.ManyHelloRequest.newBuilder();// 多值参数要这样添加或者以下标形式builder.addAllNames(List.of("levi","tom","jerry"));HelloProto.ManyHelloRequest helloRequest = builder.build();//3.1 进行功能rpc调用,获取相应的内容,像本地调用那样调用远程服务HelloProto.ManyHelloResponse helloResponse = helloService.manyHello(helloRequest);ProtocolStringList resultList = helloResponse.getResultList();System.out.println("resultList = " + resultList);} catch (Exception e) {throw new RuntimeException(e);}finally {// 4. 关闭通道managedChannel.shutdown();}}
}
我们启动服务端,然后客户端去请求。
没问题,这就是多值repeated关键字的使用方式。
五、关于服务端响应
我们在服务端响应客户端的时候用的是一个StreamObserver<HelloProto.ManyHelloResponse> responseObserver这个类给客户端响应的。我们简单解释一下这个操作。
public void manyHello(HelloProto.ManyHelloRequest request, StreamObserver<HelloProto.ManyHelloResponse> responseObserver) {...... 省略无关代码// 4. 响应client,这里其实就是把数据返回给了客户端responseObserver.onNext(helloResponse);// 5. 响应完成,这个操作其实是给这个通道设置一个标识,告诉客户端服务端这边传完了,客户端就会拿到数据开始继续往下走// 如果没有这个通知,客户端会一直缓存服务端的数据不会做解析返回。客户端也一直阻塞着。客户端会监听这个通知事件。responseObserver.onCompleted();}
相应的其实客户端给服务端也会有类似的操作,因为grpc是双向流,势必涉及客户端给服务端的操作。这个等我们后面再说。