当前位置: 首页 > web >正文

【RabbitMQ】实现RPC通信的完整指南

文章目录

    • RPC 通信
    • 创建相关队列
    • 客户端代码
      • 声明队列
      • 发送请求
      • 接收响应
      • 完整代码
    • 服务端代码
      • 设置同时只能获取一个消息
      • 接收消息
      • 完整代码
    • 运行程序
      • 启动客户端
      • 启动服务端

RPC 通信

RPC (Remote Procedure Call), 即远过程调用。它是一种通过网络从远程计算机上请求服务,而不需要了解底层网络的技术

  • 类似 Http 远程调用

RabbitMQ 实现 RPC 通信的过程,大概是通过两个队列实现一个可回调的过程
image.png

  • 注意
    • 没有生产者和消费者,取而代之的是客户端和服务器
    • reply_to:回调队列的名称
    • correlation_id:不能重复,用来确保请求和响应是一对
  • 大概流程:
    1. 客户端发送消息到一个指定的队列,并在消息属性中设置 replyTo 字段,这个字段制定了一个回调队列,服务端处理之后,会把响应结果发送到这个队列
    2. 服务端收到请求后,处理请求并发送响应到 replyTo 指定的回调队列
    3. 客户端再回调队列上等待响应消息,一旦收到响应,客户端会检查消息的 correlationID 属性,以确保它是所期望的响应
      • 等待响应消息,是通过一个阻塞队列来实现
      • 如果没有响应进来,就会一直阻塞。通过一个阻塞队列,来让其等待响应完成
      • 如果阻塞队列里面没有消息,就会一直等待,等到有消息为止

大致流程

  • 客户端:
    1. 发送请求(携带 replyToCorrelationID
    2. 接收响应(校验 correlationID
  • 服务端:
    1. 接收请求,进行响应
    2. 发送响应(按照客户端指定的 replyTo,设置 correlationID

创建相关队列

public static final String RPC_REQUEST_QUEUE = "rpc.request.queue";  public static final String RPC_RESPONSE_QUEUE = "rpc.response.queue";  
  • 涉及到两个队列
    • 请求队列
    • 响应队列

客户端代码

客户端代码主要流程如下:

  1. 声明两个队列,包含回调队列 RPC_REQUEST_QUEUE,声明本次请求的唯一标志 correlationID
  2. RPC_REQUEST_QUEUEcorrelationID 配置到要发送的消息队列中
  3. 使用阻塞队列来阻塞当前进程,监听回调队列中的消息,把请求放到阻塞队列中
  4. 使用阻塞队列有消息后,主线程被唤醒,打印返回内容

声明队列

channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);  
channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);

发送请求

//发送请求(使用内置交换机)  
String msg = "hello rpc...";  
//设置请求的唯一标识  
String correlationID = UUID.randomUUID().toString();  
//设置请求的相关属性  
AMQP.BasicProperties props = new AMQP.BasicProperties().builder()  .correlationId(correlationID)  .replyTo(Constants.RPC_RESPONSE_QUEUE)  .build();  
channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, props, msg.getBytes());

接收响应

使用阻塞队列,来存储回调结果

// 接收响应  
// 如果不阻塞,就会从上到下执行完了。所以要使用一个阻塞队列,完成同步机制  
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);  
DefaultConsumer consumer = new DefaultConsumer(channel){  //逻辑是比对 correlationID 是否一致  @Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  String respMsg = new String(body);  System.out.println("接收到回调信息:" + respMsg);  if (correlationID.equals(properties.getCorrelationId())) {  // 如果 correlationID 校验一致,说明就是我们要的响应  response.offer(respMsg);  }  }  
};  
channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer);  // 获取回调的结果
String result = response.take(); //阻塞队列一直阻塞到这里,直到 response 有值了  
System.out.println("[RPC Client 响应结果]: " + result);

完整代码

package rabbitmq.rpc;  import com.rabbitmq.client.*;  
import rabbitmq.constant.Constants;  import java.io.IOException;  
import java.util.UUID;  
import java.util.concurrent.*;  /**  * RPC Client * 1. 发送请求  * 2. 接收响应  */  
public class RpcClient {  public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {  //1. 建立连接  ConnectionFactory connectionFactory = new ConnectionFactory();  connectionFactory.setHost(Constants.HOST);  connectionFactory.setPort(Constants.PORT);  connectionFactory.setUsername(Constants.USER_NAME);  connectionFactory.setPassword(Constants.PASSWORD);  connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);  Connection connection = connectionFactory.newConnection();  //2. 开启信道  Channel channel = connection.createChannel();  //3. 声明队列  channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);  channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);  //4. 发送请求(使用内置交换机)  String msg = "hello rpc...";  //设置请求的唯一标识  String correlationID = UUID.randomUUID().toString();  //设置请求的相关属性  AMQP.BasicProperties props = new AMQP.BasicProperties().builder()  .correlationId(correlationID)  .replyTo(Constants.RPC_RESPONSE_QUEUE)  .build();  channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, props, msg.getBytes());  //5. 接收响应  // 如果不阻塞,就会从上到下执行完了。所以要使用一个阻塞队列,完成同步机制  final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);  DefaultConsumer consumer = new DefaultConsumer(channel){  //逻辑是比对 correlationID 是否一致  @Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  String respMsg = new String(body);  System.out.println("接收到回调信息:" + respMsg);  if (correlationID.equals(properties.getCorrelationId())) {  // 如果 correlationID 校验一致,说明就是我们要的响应  response.offer(respMsg);  }  }  };  channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer);  String result = response.take(); //阻塞队列一直阻塞到这里,直到 response 有值了  System.out.println("[RPC Client 响应结果]: " + result);  }  
}

服务端代码

服务端代码主要流程如下:

  1. 接收消息
  2. 根据消息内容进行相应处理,把应答结果返回到回调队列中

设置同时只能获取一个消息

//设置一次只能接收一条消息  
channel.basicQos(1);

如果不设置 basicQosRabbitMQ 会使用默认的 Qos 设置,其 prefetchCount 默认值为 0

  • prefetchCount0 时,RabbitMQ 会根据内部实现和当前的网络状况等因素,可能会同时发送多条消息给消费者
  • 这意味着在默认情况下,消费者可能会同时接收到多条消息,但具体数量不是严格保证的,可能会有所波动

RPC 模式下,同上期望的是一对一的消息处理,即一个请求对应一个相应。消费者在处理完一个消息并确认之后,才会接收到下一条消息

接收消息

接收消息,并做出相应处理

DefaultConsumer consumer = new DefaultConsumer(channel) {  @Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  String request = new String(body, "UTF-8");  System.out.println("接收到请求:" + request);  String responses = "针对 request:" + request + ",响应成功";  AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()  .correlationId(properties.getCorrelationId())  .build();  channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, responses.getBytes());  channel.basicAck(envelope.getDeliveryTag(), false);  }  
};  
channel.basicConsume(Constants.RPC_REQUEST_QUEUE, false, consumer);

RabbitMQ 消息确定机制

  • RabbitMQ 中,basicConsume 方法的 autoAck 参数用于指定消费者是否应该自动向消息对类确认消息
    • 自动确认(autoAck=true):消息对类在将消息发送给消费者之后,会立即从内存中删除该消息。这意味着,如果消费者处理消息失败,消息将丢失,因为消息队列认为消息已经被成功消费
    • 手动确认(autoAck=false):消息队列在将消息发送给消费者之后,需要消费者显式地调用 basicAck 方法来确认消息。手动确认提供了更高的可靠性,确保消息不会意外丢失,适用于消息处理重要且需要确保每个消息都被正确处理的场景

完整代码

package rabbitmq.rpc;  import com.rabbitmq.client.*;  
import rabbitmq.constant.Constants;  import java.io.IOException;  
import java.util.concurrent.TimeoutException;  /**  * RPC server * 1. 接收请求  * 2. 发送响应  */  
public class RpcServer {  public static void main(String[] args) throws IOException, TimeoutException {  //1. 建立连接  ConnectionFactory connectionFactory = new ConnectionFactory();  connectionFactory.setHost(Constants.HOST);  connectionFactory.setPort(Constants.PORT);  connectionFactory.setUsername(Constants.USER_NAME);  connectionFactory.setPassword(Constants.PASSWORD);  connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);  Connection connection = connectionFactory.newConnection();  //2. 开启信道  Channel channel = connection.createChannel();  //3. 接收请求  //设置一次只能接收一条消息  channel.basicQos(1);  DefaultConsumer consumer = new DefaultConsumer(channel) {  @Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  String request = new String(body, "UTF-8");  System.out.println("接收到请求:" + request);  String responses = "针对 request:" + request + ",响应成功";  AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()  .correlationId(properties.getCorrelationId())  .build();  channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, responses.getBytes());  channel.basicAck(envelope.getDeliveryTag(), false);  }  };  channel.basicConsume(Constants.RPC_REQUEST_QUEUE, false, consumer);  }  
}

运行程序

启动客户端

image.png

启动服务端

运行服务端

接收到请求:hello rpc...
http://www.xdnf.cn/news/6604.html

相关文章:

  • 浅谈算法中的贪心策略:从直觉到策略的思维跨越
  • ios打包ipa获取证书和打包创建经验分享
  • (独家)SAP CO模块中 销售发票对应的Cost Document中的PSG对象是什么东东??
  • leetcode0621. 任务调度器-medium
  • 论QT6多线程技术
  • linux-配置定时任务
  • 一道canvas算法题(看过记录下)
  • js在浏览器执行原理
  • 【Linux】Linux安装并配置mysql
  • vue基本介绍
  • H.264/AVC 变换量化编码核心技术拆解
  • C#语言中 (元,组) 的发展史
  • Unity基础学习(十五)核心系统——音效系统
  • PC:使用WinSCP密钥文件连接sftp服务器
  • c++作业整理2
  • 纯前端实现基于位置的天气和动态背景图片
  • 行为型模式:责任链模式
  • 代码随想录 算法训练 Day2:数组
  • 第七节第三部分:从JDK8开始接口新增的方法、接口的多继承、注意事项
  • 一.android Studio开发系统应用——导入TvSettings源码
  • Medical | 药品追溯码【待续】
  • 2025-5-15Vue3快速上手
  • 散热片为何“失效”?热阻路径建模的常见误区解析
  • 并发控制:确保多线程环境下的数据一致性与完整性
  • SymPy | 使用SymPy求解多元非线性方程组
  • 3DVR制作的工具或平台
  • windows ffmpeg msvc x64编译
  • keil uniFlash烧录出现八字节对齐错误
  • 并发编程(二)
  • ProfibusDP主站转ModbusRTU/TCP与横河AXG电磁流量计通讯案例