当前位置: 首页 > ds >正文

RabbitMQ工作模式(下)

路由模式

在这里插入图片描述

交换机通过不同的 routingkey 绑定队列,生产者通过 routingkey 来向不同的队列发送消息

生产者代码演示:

public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//设置 MQ 参数ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);factory.setVirtualHost(Constants.VIRTUAL_HOST);//建立连接Connection connection = factory.newConnection();// 开启信道Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare("DIRECT_EXCHANGE", BuiltinExchangeType.DIRECT, true, false, null);//声明队列channel.queueDeclare("direct1", true, false, false, null);channel.queueDeclare("direct2", true, false, false, null);//绑定队列和交换机channel.queueBind("direct1","DIRECT_EXCHANGE","a");channel.queueBind("direct2","DIRECT_EXCHANGE","a");channel.queueBind("direct2","DIRECT_EXCHANGE","b");channel.queueBind("direct2","DIRECT_EXCHANGE","c");//发送消息for (int i = 0; i < 10; i++) {channel.basicPublish("DIRECT_EXCHANGE", "a", null, ("hello" + i).getBytes());channel.basicPublish("DIRECT_EXCHANGE", "b", null, ("hello" + i).getBytes());channel.basicPublish("DIRECT_EXCHANGE", "c", null, ("hello" + i).getBytes());}//关闭资源channel.close();connection.close();}
}

前面的建立连接的代码大家可以抽离出来,这里不抽离是方便大家了解整个代码的编写过程

在上面我们建立的队列和交换机的绑定关系图如下:
在这里插入图片描述
这里可以看到同样的 routingkey 为 a 绑定了两个队列,如果生产者使用 a 这个 routingkey 发送消息,那么两个队列都会接收到
在这里插入图片描述

消费者代码演示:

public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {//设置 MQ 参数ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);factory.setVirtualHost(Constants.VIRTUAL_HOST);//建立连接Connection connection = factory.newConnection();//开启信道Channel channel = connection.createChannel();//声明队列channel.queueDeclare("direct1", true, false, false, null);//消费消息Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(new String(body));}};channel.basicConsume("direct1", true, consumer);}
}
public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {//设置 MQ 参数ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);factory.setVirtualHost(Constants.VIRTUAL_HOST);//建立连接Connection connection = factory.newConnection();//开启信道Channel channel = connection.createChannel();//声明队列channel.queueDeclare("direct2", true, false, false, null);//进行消费Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(new String(body));}};channel.basicConsume("direct2", true, consumer);}
}

通配符模式

在这里插入图片描述

这里有两种符号需要认识,首先 * 表示匹配一个单词,# 表示匹配 0 - n 个单词,只要符合上面的路由规则交换机就会将消息发送到对应的队列上。

生产者代码演示:

public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//建立连接ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);factory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = factory.newConnection();//开启信道Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true, false, null);//声明队列channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);//绑定队列和交换机channel.queueBind(Constants.TOPIC_QUEUE1, Constants.TOPIC_EXCHANGE, "*.orange.*");channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "*.*.rabbit");channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "lazy.#");//发送消息for (int i = 0; i < 10; i++) {channel.basicPublish(Constants.TOPIC_EXCHANGE, "quick.orange.rabbit", null, ("quick.orange.rabbit hello" + i).getBytes());channel.basicPublish(Constants.TOPIC_EXCHANGE, "lazy", null, ("lazy hello" + i).getBytes());}//关闭资源channel.close();connection.close();}
}

消费者代码演示:

public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {//建立连接ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);factory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = factory.newConnection();//开启信道Channel channel = connection.createChannel();//声明队列channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);//进行消费Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(new String(body));}};channel.basicConsume(Constants.TOPIC_QUEUE1, true, consumer);}
}
public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {//建立连接ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);factory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = factory.newConnection();//开启信道Channel channel = connection.createChannel();//声明队列 避免没有队列发生异常channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);//从队列中获取信息Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(new String(body));}};channel.basicConsume(Constants.TOPIC_QUEUE2, true, consumer);}
}

RPC

在这里插入图片描述

PRC 模式一般很少使用,当我们的生产者需要消费者的响应的时候,我们才会使用这个模式。

这里有两个重要的参数,correlation_id 是消息的标识符,主要用于区分消息,由于Client需要得到 Server 的响应所以这里的 correlation_id 需要区分这是哪条消息

reply_to 用于Client指定对应的队列去路由消息

Client 代码演示:

public class RpcClient {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//建立连接ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);factory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = factory.newConnection();//开启信道Channel channel = connection.createChannel();//声明队列channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);// 设置消息String msg = "hello rpc";//设置请求的唯一标识String correlationId = UUID.randomUUID().toString();AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().correlationId(correlationId).replyTo(Constants.RPC_RESPONSE_QUEUE).build();channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, properties, msg.getBytes());//接收响应//使用阻塞队列存储响应信息final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);//消费者逻辑Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String responseMsg = new String(body);System.out.println("接收到回调信息:" + responseMsg);if (correlationId.equals(properties.getAppId())) {response.offer(responseMsg);}}};channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true , consumer);//取出消息进行消费String res = response.take();System.out.println("最终结果:" + res);}
}

代码简单介绍:
String correlationId = UUID.randomUUID().toString(); 用于标识消息

AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.correlationId(correlationId)
.replyTo(Constants.RPC_RESPONSE_QUEUE)
.build();
通过设置 属性,将我们的 correlationId 和 replyTo(指定对应的队列接发消息)设置进去

Server 代码演示:

public class PpcServer {public static void main(String[] args) throws IOException, TimeoutException {//建立连接ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);factory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = factory.newConnection();//开启信道Channel channel = connection.createChannel();//声明队列channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);//由于使用的是默认的交换机,所以绑定队列省略//消费者最多获取到一个未确认的消息channel.basicQos(1);Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String request = new String(body, "UTF-8");System.out.println("接收到的请求为:" + request);String response = "针对 resquest 的响应为:" + request + "响应成功";AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().correlationId(properties.getCorrelationId()).build();channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, response.getBytes());channel.basicAck(envelope.getDeliveryTag(), false);}};channel.basicConsume(Constants.RPC_REQUEST_QUEUE, false, consumer);}
}

代码简单介绍
channel.basicQos(1); 这是rabbitmq 的高级特性,用于消费者最多接收多少个未确认的消息,可以调节消费者的吞吐量

在 RPC 中我们也要设置correlationId,这里要求correlationId和生产者的correlationId要保持一致,这样生产者才能识别出来这是哪一条消息

AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()
.correlationId(properties.getCorrelationId())
.build();
channel.basicPublish(“”, Constants.RPC_RESPONSE_QUEUE, basicProperties, response.getBytes());

Publisher Confirms(发布确认)

这个也可以当作是生产者将消息成功发送到指定的broker的可靠保证的方式

发布确认一共有三种方式:单独确认,批量确认,异步确认

下面是建立连接的方法:

    //建立连接private static Connection createConnection() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);factory.setVirtualHost(Constants.VIRTUAL_HOST);return factory.newConnection();}

Publishing Messages Individually(单独确认)

单独确认就是生成者每发送一条消息,都要等待broker 回复 ack,只有等到ack,才会发送下一则消息,效率不高

开启发布确认模式需要进行下面的设置:

channel.confirmSelect();

等到ack 回复可以使用下面的方法:

channel.waitForConfirmsOrDie(5000);

    /*** 单独确认模式*/public static void publishingMessagesIndividually() throws IOException, TimeoutException, InterruptedException {//获取连接Connection connection = createConnection();//开启信道Channel channel = connection.createChannel();//设置为发布确认模式channel.confirmSelect();//声明队列channel.queueDeclare(Constants.PUBLISHER_CONFIRM_QUEUE, true, false, false, null);//发布消息long start = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String msg = "publishingMessagesIndividually:" + i;channel.basicPublish("", Constants.PUBLISHER_CONFIRM_QUEUE, null, msg.getBytes());//最多等待消息确认的时间 5schannel.waitForConfirmsOrDie(5000);}long end = System.currentTimeMillis();System.out.println("单独确认消息总耗时:" + (end - start));}

Publishing Messages in Batches(批量确认)

批量确认顾名思义就是等待消息发送到一定数量的时候才进行确认,这里有个问题就是在发生故障时我们无法确切知道哪里出了问题,因此我们可能需要将整个批次保存在内存中,以记录一些有意义的信息或重新发布消息。此外,该解决方案仍然是同步的,因此会阻塞消息的发布。下面的官方的解释:

在这里插入图片描述

对比我们的单独确认,批量确认确实能够提高我们的吞吐量。

    /*** 批量确认模式*/public static void publishingMessagesBatches() throws IOException, TimeoutException, InterruptedException {//获取连接Connection connection = createConnection();//开启信道Channel channel = connection.createChannel();//设置为发布确认模式channel.confirmSelect();//声明队列channel.queueDeclare(Constants.PUBLISHER_CONFIRM_QUEUE, true, false, false, null);//发布消息long start = System.currentTimeMillis();//计数int count = 0;for (int i = 0; i < MESSAGE_COUNT; i++) {String msg = "publishingMessagesBatches:" + i;channel.basicPublish("", Constants.PUBLISHER_CONFIRM_QUEUE, null, msg.getBytes());count++;if(count == 100) {//最大等待时间channel.waitForConfirmsOrDie(5000);count = 0;}}//最后一次确认,确保剩余的不足100条消息全部确认掉if(count > 0) {channel.waitForConfirmsOrDie(5000);}long end = System.currentTimeMillis();System.out.println("批量确认消息总耗时:" + (end - start));}

Handling Publisher Confirms Asynchronously(异步确认)

异步确认是指发送消息和接收消息的 ack 这两个动作是异步的,也就意味着在高吞吐量的情况下,我们可以更好地应对,比起前两种方式,这种异步确认的方式确实效率更高

    /*** 异步确认模式*/public static void handlingPublisherConfirmsAsynchronously() throws IOException, TimeoutException, InterruptedException {//获取连接Connection connection = createConnection();//开启信道Channel channel = connection.createChannel();//设置为发布确认模式channel.confirmSelect();//声明队列channel.queueDeclare(Constants.PUBLISHER_CONFIRM_QUEUE, true, false, false, null);//发布消息long start = System.currentTimeMillis();//存放消息序号的容器SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<>());//添加监听器channel.addConfirmListener(new ConfirmListener() {//收到确认信息@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {if(multiple) {//将当前消息以及当前消息前面所有的消息删除confirmSet.headSet(deliveryTag + 1).clear();} else {//只删除当前消息confirmSet.remove(deliveryTag);}}//收到nack 的消息@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {if (multiple){confirmSet.headSet(deliveryTag+1).clear();}else {confirmSet.remove(deliveryTag);}//业务需要根据实际场景进行处理, 比如重发, 此处代码省略}});//发布消息for (int i = 0; i < MESSAGE_COUNT; i++) {String msg = "handlingPublisherConfirmsAsynchronously: " + i;//将消息的序号放入到容器中//channel.getNextPublishSeqNo() 在确认模式下,返回要发布的下一条消息的序列号。confirmSet.add(channel.getNextPublishSeqNo());channel.basicPublish("", Constants.PUBLISHER_CONFIRM_QUEUE, null, msg.getBytes());}while(!confirmSet.isEmpty()) {Thread.sleep(10);}long end = System.currentTimeMillis();System.out.println("异步确认消息总耗时:" + (end - start));}

代码解读:
异步确认最重要的就是设置监听器:我们要设置两个方法,一个是接收到 ack 的处理,另一个则是接收到 nack 的处理,这里有一个参数 multiple 【表示是否批量确认,如果是批量确认的话,意味着接受到序列号为deliveryTag的消息的时候,小于等于deliveryTag的消息一同都要被确认掉,如果不是批量确认,那就单独确认,需要我们一条一条消息进行确认】

        //添加监听器channel.addConfirmListener(new ConfirmListener() {//收到确认信息@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {if(multiple) {//将当前消息以及当前消息前面所有的消息删除confirmSet.headSet(deliveryTag + 1).clear();} else {//只删除当前消息confirmSet.remove(deliveryTag);}}//收到nack 的消息@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {if (multiple){confirmSet.headSet(deliveryTag+1).clear();}else {confirmSet.remove(deliveryTag);}//业务需要根据实际场景进行处理, 比如重发, 此处代码省略}});

其次就是要创建一个容器用于保存消息的序列号

        //存放消息序号的容器SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<>());

channel.getNextPublishSeqNo() 这个方法是在确认模式下,返回要发布的下一条消息的序列号deliveryTag

channel.getNextPublishSeqNo()
http://www.xdnf.cn/news/20457.html

相关文章:

  • 贪心算法应用:蛋白质折叠问题详解
  • Eureka与Nacos的区别-服务注册+配置管理
  • AI模型测评平台工程化实战十二讲(第一讲:从手工测试到系统化的觉醒)
  • 力扣29. 两数相除题解
  • Qt资源系统学习
  • 【继承和派生】
  • 【Flask】测试平台开发,重构提测管理页面-第二十篇
  • 把装配想象成移动物体的问题
  • java基础学习(五):对象中的封装、继承和多态
  • C++经典的数据结构与算法之经典算法思想:排序算法
  • phpMyAdmin文件包含漏洞复现:原理详解+环境搭建+渗透实战(windows CVE-2014-8959)
  • 吴恩达机器学习(七)
  • 综合安防集成系统解决方案,智慧园区,智慧小区安防方案(300页Word方案)
  • 《2025国赛/高教杯》C题 完整实战教程(代码+公式详解)
  • 关于连接池
  • 【PostgreSQL】如何实现主从复制?
  • 网络原理-
  • 在Ubuntu平台搭建RTMP直播服务器使用SRS简要指南
  • Qt 基础教程合集(完)
  • 分布式数据架构
  • 硬件开发_基于物联网的老人跌倒监测报警系统
  • 数据结构——栈(Java)
  • MySQL数据库约束和设计
  • 附050.Kubernetes Karmada Helm部署联邦及使用
  • C++_哈希
  • 基于阿里云ECS搭建Tailscale DERP中继服务器:提升跨网络连接速度
  • 前端登录鉴权详解
  • C++面试10——构造函数、拷贝构造函数和赋值运算符
  • 西门子S7-200 SMART PLC:编写最基础的“起保停”程序
  • [特殊字符] 从零到一:打造你的VSCode圈复杂度分析插件