当前位置: 首页 > backend >正文

【橘子分布式】gRPC(编程篇-下)

一、简介

我们之前完成了一个基本的客户端和服务端通过grpc来调用的代码功能。我们使用的其实是一种最简单的通信方式,貌似和以前的spring mvc没啥区别,那么grpc作为一个基于http的双工的设计,肯定不是简单的只支持这样的能力。
下面我们就来看看grpc所支持的这四种传输模式。

二、grpc的通信模式

grpc一种支持四种通信模式。1. 简单rpc也叫一元rpc (Unary RPC)
2. 服务端流式RPC   (Server Streaming RPC)
3. 客户端流式RPC   (Client Streaming RPC)
4. 双向流RPC (Bi-directional Stream RPC)

下面我们就一一来介绍一下。
我们先说明一下,区分这几种通信模式的源头就在于proto的语法定义上。

1、简单RPC(一元RPC)

之前我们写的那种方式就叫做简单rpc。
他的特点就是客户端发起请求,然后服务端响应,在响应之前客户端是阻塞等待的。一般我们用的就是这种。当然你可以说有异步的模式,但是要想获取结果,也是要阻塞等待的。
在这里插入图片描述
他的语法定义就是我们之前用的那种。

/* 简单rpc,参数为HelloRequest类型,返回类型为HelloResponse */
rpc hello(HelloRequest) returns (HelloResponse){}/* 多值rpc,参数为ManyHelloRequest类型,返回类型为ManyHelloResponse */
rpc manyHello(ManyHelloRequest) returns (ManyHelloResponse){}

2、服务端流式RPC

他的特点是客户端发起一个请求对象,服务端可以回传多个结果对象。
注意,这个返回多个结果对象指的不是返回多个内容封装在一个集合里面返回(这种还是一个返回值),而是在不同的时刻各自返回该时刻的不同的内容,他是一个流式的一个形式。所以他一定是在这个交互的过程中保持这个连接不断的,换言之他是一个长连接。
你可以用来做那种类似sse的服务端推流的场景,股票一类的。
在这里插入图片描述

他的语法定义为

// 服务端流式rpc,参数为HelloRequest类型,返回类型为HelloResponse,但是返回值这里要加一个stream关键字,标识服务端返回的格式为流
rpc c2ss(HelloRequest) returns (stream HelloResponse){}

我们来按照这个proto定义来生成一下结果,然后来编写客户端和服务端。

2.1、服务端代码

// 服务端实现类
public class HelloServiceImpl extends HelloServiceGrpc.HelloServiceImplBase {private static final String RES_PREFIX = "server#";@Overridepublic void c2ss(HelloProto.HelloRequest request, StreamObserver<HelloProto.HelloResponse> responseObserver) {//1 接受client的请求参数String requestName = request.getName();//2 做业务处理System.out.println("name: " + requestName);//3 根据业务处理的结果,提供响应,推送多个给客户端for (int i = 0; i < 9; i++) {HelloProto.HelloResponse.Builder builder = HelloProto.HelloResponse.newBuilder();builder.setResult("服务端处理的结果:" + i);HelloProto.HelloResponse helloResponse = builder.build();responseObserver.onNext(helloResponse);try {// 暂停一秒推送数据给客户端TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}}responseObserver.onCompleted();}@Overridepublic void manyHello(HelloProto.ManyHelloRequest request, StreamObserver<HelloProto.ManyHelloResponse> responseObserver) {//1.接受client的请求参数,我们看到此时就是一个nameList的集合了,因为它被repeated修饰了,当然他的类型是ProtocolStringList,是grpc自己的类型ProtocolStringList requestNamesList = request.getNamesList();//2.业务处理System.out.println("请求参数为:" + requestNamesList);// 给返回值的name都加一个前缀List<String> responseNamesList = new ArrayList<>();for (String requestName : requestNamesList) {responseNamesList.add(RES_PREFIX + requestName);}//3.封装响应//3.1 创建相应对象的构造者HelloProto.ManyHelloResponse.Builder builder = HelloProto.ManyHelloResponse.newBuilder();//3.2 填充数据,多个值要通过addAllResult,或者是下标的方式添加builder.addAllResult(responseNamesList);
//        for (int i = 0; i < requestNamesList.size(); i++) {
//            builder.setResult(i, requestNamesList.get(i));
//        }//3.3 封装响应HelloProto.ManyHelloResponse helloResponse = builder.build();// 4. 响应clientresponseObserver.onNext(helloResponse);// 5. 响应完成responseObserver.onCompleted();}/*1. 接受client提交的参数  request.getParameter()2. 业务处理 service+dao 调用对应的业务功能。3. 提供返回值*/@Overridepublic void hello(HelloProto.HelloRequest request, StreamObserver<HelloProto.HelloResponse> responseObserver) {//1.接受client的请求参数String name = request.getName();//2.业务处理System.out.println("name parameter "+name);//3.封装响应//3.1 创建相应对象的构造者HelloProto.HelloResponse.Builder builder = HelloProto.HelloResponse.newBuilder();//3.2 填充数据builder.setResult("hello method invoke ok");//3.3 封装响应HelloProto.HelloResponse helloResponse = builder.build();// 4. 响应clientresponseObserver.onNext(helloResponse);// 5. 响应完成responseObserver.onCompleted();}
}

服务端发布依然是:

public static void main(String[] args) throws IOException, InterruptedException {//1. 绑定端口ServerBuilder<?> serverBuilder = ServerBuilder.forPort(9000);//2. 发布服务serverBuilder.addService(new HelloServiceImpl());//serverBuilder.addService(new UserServiceImpl());//3. 创建服务对象Server server = serverBuilder.build();// 启动服务server.start();// 阻塞等待server.awaitTermination();;
}

2.2、客户端代码

2.2.1、同步阻塞调用
package com.levi.serverStream;import com.levi.HelloProto;
import com.levi.HelloServiceGrpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;import java.util.Iterator;/*** 服务端流模式*/
public class ServerStreamClient {public static void main(String[] args) {//1.创建通信的管道ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext().build();//2.获得代理对象 stub进行调用try {// 我们这里以阻塞的形式调用,也就是一直等返回值回来才往下走HelloServiceGrpc.HelloServiceBlockingStub helloService = HelloServiceGrpc.newBlockingStub(managedChannel);//3. 完成RPC调用//3.1 准备参数HelloProto.HelloRequest.Builder builder = HelloProto.HelloRequest.newBuilder();builder.setName("hello");HelloProto.HelloRequest helloRequest = builder.build();//3.1 进行功能rpc调用,获取相应的内容,像本地调用那样调用远程服务,流式的返回是一个迭代器Iterator<HelloProto.HelloResponse> helloResponseIterator = helloService.c2ss(helloRequest);// 流式获取while (helloResponseIterator.hasNext()) {HelloProto.HelloResponse helloResponse = helloResponseIterator.next();System.out.println("客户端接收到服务端的响应:" + helloResponse.getResult());}} catch (Exception e) {throw new RuntimeException(e);}finally {// 4. 关闭通道managedChannel.shutdown();}}
}

我们启动代码发现他就是流的推送模式,没毛病。
在这里插入图片描述
以上代码我们可以看到他是一种同步阻塞调用的,虽然服务端是流的形式推送,但是客户端要一起等待服务端的推送做完了拿到了才能继续往下走。
而grpc其实是支持我们的异步调用的。也就是我们调用之后可以继续往下走执行我们的业务,服务端的数据是通过回调监听来获取的。

2.2.2、异步非阻塞调用
package com.levi.serverStream;import com.levi.HelloProto;
import com.levi.HelloServiceGrpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;import java.util.Iterator;
import java.util.concurrent.TimeUnit;/*** 服务端流模式*/
public class ServerStreamAsyncClient {public static void main(String[] args) {//1.创建通信的管道ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext().build();//2.获得代理对象 stub进行调用try {// 我们这里以异步的形式发起服务端流模式的rpc调用HelloServiceGrpc.HelloServiceStub helloServiceStub = HelloServiceGrpc.newStub(managedChannel);//3. 完成RPC调用//3.1 准备参数HelloProto.HelloRequest.Builder builder = HelloProto.HelloRequest.newBuilder();builder.setName("hello");HelloProto.HelloRequest helloRequest = builder.build();//3.1 进行功能rpc调用,获取相应的内容,像本地调用那样调用远程服务,异步回调的方式来获取服务端内容helloServiceStub.c2ss(helloRequest, new StreamObserver<HelloProto.HelloResponse>() {@Overridepublic void onNext(HelloProto.HelloResponse value) {//服务端 响应了 一个消息后,需要立即处理的话。把代码写在这个方法中。响应一个监听获取一个System.out.println("服务端每一次响应的信息 " + value.getResult());}@Overridepublic void onError(Throwable t) {// 发生异常的时候回调事件// 暂时不实现}@Overridepublic void onCompleted() {//需要把服务端 响应的所有数据 拿到后,在进行业务处理。这里就是发送完最后一个数据触发,我们可以在这里整体处理,看你业务System.out.println("服务端响应结束 后续可以根据需要 在这里统一处理服务端响应的所有内容");}});// 正常业务执行doTask();// 等待服务端响应结束,不然main线程会结束,导致进程结束managedChannel.awaitTermination(12, TimeUnit.SECONDS);} catch (Exception e) {throw new RuntimeException(e);}finally {// 4. 关闭通道managedChannel.shutdown();}}private static void doTask(){System.out.println("处理我们自己的业务......start");try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("处理我们自己的业务......end");}
}

在这里插入图片描述

3、客户端流式RPC

和服务端流式对应,他是客户端发送多个请求对象,服务端只返回一个结果。这里的客户端发送的多个同样也不是一个集合里面多个,而是不同时刻的不同发送对象。所以他一定是在这个交互的过程中保持这个连接不断的,换言之他是一个长连接。
比较适合在IOT(物联网 【传感器】) 客户端的数据向服务端发送数据,持续发送多个,那种气温数据的不断上报。
在这里插入图片描述

他的语法定义为

// 客户端流式rpc,参数为HelloRequest类型,返回类型为HelloResponse,但是请求参数这里要加一个stream关键字,标识客户端请求的格式为流
rpc cs2s(stream HelloRequest) returns (HelloResponse){}

我们来按照这个proto定义来生成一下结果,然后来编写客户端和服务端。

3.1、服务端实现

/*** 客户端 流模式 我们看到客户端的流式实现,他的返回值不再是void,而是StreamObserver<HelloProto.HelloRequest>* 因为他的请求是流式的,所以要获取为流式接收。因为不知道客户端啥时候请求,请求多久,所以就需要监控请求。监控* HelloProto.HelloRequest这个流请求。* @param responseObserver* @return*/
public StreamObserver<HelloProto.HelloRequest> cs2s(StreamObserver<HelloProto.HelloResponse> responseObserver) {return new StreamObserver<HelloProto.HelloRequest>() {@Overridepublic void onNext(HelloProto.HelloRequest value) {System.out.println("接受到了client发送一条消息 " + value.getName());}@Overridepublic void onError(Throwable t) {}@Overridepublic void onCompleted() {System.out.println("client的所有消息 都发送到了 服务端 ....");//提供响应:响应的目的:当接受了全部client提交的信息,并处理后,提供相应HelloProto.HelloResponse.Builder builder = HelloProto.HelloResponse.newBuilder();builder.setResult("this is result");HelloProto.HelloResponse helloResponse = builder.build();// responseObserver服务端 响应 客户端,你可以在onCompleted一起返回,也可以在onNext一个一个返回,看你自己业务responseObserver.onNext(helloResponse);responseObserver.onCompleted();}};
}

3.2、客户端实现

我们这里就直接实现为异步非阻塞的模式了。

package com.levi.clientStream;import com.levi.HelloProto;
import com.levi.HelloServiceGrpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;import java.util.Iterator;
import java.util.concurrent.TimeUnit;/*** 客户端流模式*/
public class ClientStreamAsyncClient {public static void main(String[] args) {ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext().build();try {// 使用异步HelloServiceGrpc.HelloServiceStub helloService = HelloServiceGrpc.newStub(managedChannel);// rpc调用,构建回调获取服务端数据的回调函数StreamObserver<HelloProto.HelloRequest> helloRequestStreamObserver = helloService.cs2s(new StreamObserver<HelloProto.HelloResponse>() {@Overridepublic void onNext(HelloProto.HelloResponse value) {// 监控响应System.out.println("服务端 响应 数据内容为 " + value.getResult());}@Overridepublic void onError(Throwable t) {}@Overridepublic void onCompleted() {System.out.println("服务端响应结束 ... ");}});//客户端 发送数据 到服务端  多条数据 ,不定时...for (int i = 0; i < 10; i++) {HelloProto.HelloRequest.Builder builder = HelloProto.HelloRequest.newBuilder();builder.setName("hello " + i);HelloProto.HelloRequest helloRequest = builder.build();// 发送数据,服务端的响应就是基于上面的回调获取的helloRequestStreamObserver.onNext(helloRequest);Thread.sleep(1000);}helloRequestStreamObserver.onCompleted();managedChannel.awaitTermination(12, TimeUnit.SECONDS);} catch (Exception e) {e.printStackTrace();} finally {managedChannel.shutdown();}}
}

我们可以看到客户端流的模式下,编码格式略有不同,客户端的实现为异步非阻塞的时候,他是先构建一个回调。这个回调后面会监听服务端的响应。而不是直接调用就完了。

4、双向流式RPC

和客户端可以发送多个请求消息,服务端响应多个响应消息,所以他也是一个长连接。
比较适合在那种聊天,群聊,聊天室的业务使用。
在这里插入图片描述
他的语法定义为

// 双向流式rpc,参数为HelloRequest类型,返回类型为HelloResponse,请求参数和响应参数都是stream表示请求和响应两端是双向流
rpc b2s(stream HelloRequest) returns (stream HelloResponse){}

我们来按照这个proto定义来生成一下结果,然后来编写客户端和服务端。

4.2、服务端实现

/*** 双向流 流模式* @param responseObserver* @return*/
@Override
public StreamObserver<HelloProto.HelloRequest> b2s(StreamObserver<HelloProto.HelloResponse> responseObserver) {return new StreamObserver<HelloProto.HelloRequest>() {@Overridepublic void onNext(HelloProto.HelloRequest value) {// 监听客户端的消息,并且通过responseObserver.onNext回写给客户端System.out.println("接受到client 提交的消息 "+value.getName());responseObserver.onNext(HelloProto.HelloResponse.newBuilder().setResult("response "+value.getName()+" result ").build());}@Overridepublic void onError(Throwable t) {}@Overridepublic void onCompleted() {// 客户端消息接收完的时候可以onCompleted,告知客户端我发完了。客户端就会触发他那边的onCompleted。System.out.println("接受到了所有的请求消息 ... ");responseObserver.onCompleted();}};
}

4.3、客户端实现

public class ClientServerStreamAsyncClient {public static void main(String[] args) {ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext().build();try {HelloServiceGrpc.HelloServiceStub helloService = HelloServiceGrpc.newStub(managedChannel);// 构建回调监听,处理响应,请求是在下面的onNextStreamObserver<HelloProto.HelloRequest> helloRequestStreamObserver = helloService.cs2s(new StreamObserver<HelloProto.HelloResponse>() {@Overridepublic void onNext(HelloProto.HelloResponse value) {System.out.println("接收到服务端响应的结果 "+value.getResult());}@Overridepublic void onError(Throwable t) {}@Overridepublic void onCompleted() {System.out.println("响应全部结束...");}});// 客户端发送数据以流的形式发送多个for (int i = 0; i < 10; i++) {helloRequestStreamObserver.onNext(HelloProto.HelloRequest.newBuilder().setName("sunshuai " + i).build());}// 发送完毕,触发onCompleted,服务端就会触发他那边的onCompletedhelloRequestStreamObserver.onCompleted();managedChannel.awaitTermination(12, TimeUnit.SECONDS);} catch (Exception e) {e.printStackTrace();} finally {managedChannel.shutdown();}}
}

我们可以看到不管哪种模式,他的一个模式就是对方如果是流,那我们就要以一个StreamObserver来回调监听处理响应,给对方发送就是要通过onNext来发送。

三、grpc的代理方式(stub)

我们知道rpc的本质就是代理类访问远程,我们在grpc中的这个代理就是你通过proto生成的各种stub,他为你生成了多种stub。
在这里插入图片描述
这多种stub分别代表不同的调用方式。

grpc有三种代理:
1. BlockingStub阻塞 通信方式 
2. Stub异步 通过监听处理的
3. FutureStub,支持同步和异步的,类似netty那种,不过注意,他只能用于一元RPC

BlockingStub和Stub(流监听)我们上面都用过了,下面我们来使用一下FutureStub这种调用方式。
既然是基于一元的调用,我们就使用我们之前的HelloServiceImpl里面的hello。因为主要是客户端调用的代理,所以我们只需要实现客户端即可,我们提供三种future的调用,分别是同步和异步。

package com.levi;import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;public class GrpcFutureClient {public static void main(String[] args) {//1.创建通信的管道ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext().build();//2.获得代理对象 stub进行调用try {// 我们这里以阻塞的形式调用,也就是一直等返回值回来才往下走HelloServiceGrpc.HelloServiceFutureStub helloServiceFutureStub = HelloServiceGrpc.newFutureStub(managedChannel);//3. 完成RPC调用,我们演示三种,具体选择哪个看你要求//3.1 准备参数HelloProto.HelloRequest helloRequest = HelloProto.HelloRequest.newBuilder().setName("hello").build();//3.1 进行功能rpc调用,获取相应的内容,像本地调用那样调用远程服务// 同步阻塞方式调用,阻塞等待future.get()返回结果ListenableFuture<HelloProto.HelloResponse> helloResponseListenableFuture = helloServiceFutureStub.hello(helloRequest);// syncCall(helloResponseListenableFuture);// 异步回调函数方式调用(推荐)asyncCall(helloResponseListenableFuture);// 异步监听方式调用// asyncListener(helloResponseListenableFuture);} catch (Exception e) {throw new RuntimeException(e);}finally {// 4. 关闭通道managedChannel.shutdown();}}/*** 同步阻塞方式调用,阻塞等待future.get()返回结果* * @param helloResponseListenableFuture* @throws ExecutionException* @throws InterruptedException*/private static void syncCall(ListenableFuture<HelloProto.HelloResponse> helloResponseListenableFuture) throws ExecutionException, InterruptedException {HelloProto.HelloResponse helloResponse = helloResponseListenableFuture.get();System.out.println("result = " + helloResponse.getResult());}/*** 异步回调函数方式调用(推荐)** @param helloResponseListenableFuture*/private static void asyncCall(ListenableFuture<HelloProto.HelloResponse> helloResponseListenableFuture) {Futures.addCallback(helloResponseListenableFuture, new FutureCallback<HelloProto.HelloResponse>() {@Overridepublic void onSuccess(HelloProto.HelloResponse result) {System.out.println("result.getResult() = " + result.getResult());}@Overridepublic void onFailure(Throwable t) {}}, Executors.newCachedThreadPool());}/*** 异步监听方式调用** @param helloResponseListenableFuture*/private static void asyncListener(ListenableFuture<HelloProto.HelloResponse> helloResponseListenableFuture) {helloResponseListenableFuture.addListener(() -> {System.out.println("helloResponseListenableFuture 完成");}, Executors.newCachedThreadPool());}
}
http://www.xdnf.cn/news/15845.html

相关文章:

  • 嵌入式硬件篇---机械臂运动学解算(3自由度)
  • 机器学习-数据预处理
  • 【机器学习【9】】评估算法:数据集划分与算法泛化能力评估
  • 物联网安装调试-继电器
  • Java设计模式之行为型模式(备忘录模式)实现方式与测试用例
  • 【Unity3D实例-功能-移动】角色移动-通过WSAD(CharacterController方式)
  • 第四次作业
  • haproxy七层代理
  • 嵌入式硬件篇---继电器
  • C#.NET EFCore.BulkExtensions 扩展详解
  • 物联网安装调试-温湿度传感器
  • 分布式文件系统04-DataNode海量数据分布式高可靠存储
  • python中读取 Excel 表格数据
  • 【数据结构】揭秘二叉树与堆--用C语言实现堆
  • 【MySQL】索引中的页以及索引的分类
  • LLVM中AST节点类型
  • string【下】- 补充
  • MySQL中的排序和分页
  • 集群与高可用
  • Facebook 开源多季节性时间序列数据预测工具:Prophet 饱和预测 Saturating Forecasts
  • Go并发聊天室:从零构建实战
  • Shell脚本-tee工具
  • 小程序和H5数据mock配置过程
  • 前端环境搭建---基于SpringBoot+MySQL+Vue+ElementUI+Mybatis前后端分离面向小白管理系统搭建
  • LLM 的Top-P参数 是在LLM中的每一层发挥作用,还是最后一层?
  • SpringBoot五分钟快速入门指南
  • NW993NX584美光固态闪存NX559NX561
  • [故障诊断方向]基于二维时频图像和数据增强技术的轴承故障诊断模型
  • 数据分析综合应用 30分钟精通计划
  • 动态规划——数位DP经典题目