当前位置: 首页 > web >正文

RabbitMQ—事务与消息分发

上篇文章:

RabbitMQ—TTL、死信队列、延迟队列https://blog.csdn.net/sniper_fandc/article/details/149311921?fromshare=blogdetail&sharetype=blogdetail&sharerId=149311921&sharerefer=PC&sharesource=sniper_fandc&sharefrom=from_link

目录

1 事务

2 消息分发

2.1 介绍

2.2 限流

2.3 负载均衡(非公平分发)


1 事务

        AMQP协议实现了事务机制,而RabbitMQ基于AMQP协议,因此也支持事务。RabbitMQ的事务机制要求消息的发送和接收是原子性的,要么同时成功,要么同时失败(失败后已发送的消息或接收的消息会回退)。

        这里实现了连续发送两条消息保证这两条消息同时发送成功或失败的事务机制:

        声明队列:

public class RabbitMQConnection {public static final String TRANS_QUEUE = "trans.queue";}
@Configurationpublic class RabbitMQConfig {@Bean("transQueue")public Queue transQueue(){return QueueBuilder.durable(RabbitMQConnection.TRANS_QUEUE).build();}}

        配置事务管理器和RabbitTemplate:

@Configurationpublic class RabbitTemplateConfig {//RabbitTemplate开启事务和下面的事务管理器都必须存在@Bean("transRabbitTemplate")public RabbitTemplate transRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setChannelTransacted(true);//开启事务return rabbitTemplate;}//事务管理器@Beanpublic RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory){return new RabbitTransactionManager(connectionFactory);}}

        生产者代码:

@RestController@RequestMapping("/producer")public class ProducerController {@Resource(name = "transRabbitTemplate")private RabbitTemplate transRabbitTemplate;//@Transactional也是开启事务必须存在的注解@Transactional@RequestMapping("trans")public String trans() {//两条消息要么同时发送成功,要么同时失败transRabbitTemplate.convertAndSend("", RabbitMQConnection.TRANS_QUEUE, "Hello SpringBoot RabbitMQ");int a = 1/0;transRabbitTemplate.convertAndSend("", RabbitMQConnection.TRANS_QUEUE, "Hello SpringBoot RabbitMQ");return "发送成功";}}

        未开启事务,发生异常,第一条消息发送成功。

        开始事务后,发生异常,两条消息均不成功。

2 消息分发

2.1 介绍

        当某个队列被多个消费者订阅,队列向消费者推送消息(消息分发对应推模式,对拉模式无效),每条消息只会发送给一个消费者。默认情况下,RabbitMQ不管消费者是否已经消费完消息并返回确认,而是采用轮询方式推送消息

        在这种情况下,假设有10条消息,两个消费者,每个消费者会被推送5条消息,消费者1消费速度快(容易空闲),消费者2消费速度慢,那么消费者2就会消息积压,系统实际吞吐量并不高。

        可以采用channel.basicQos(int prefetchCount)方式来进行消费分发的控制,prefetchCount表示通道上消费者能同时保持未确认消息的最大数量。

        队列每向消费者推送一条消息,prefetchCount+1。消费者每返回一个确认,prefetchCount-1。当prefetchCount达到设置的上限,队列就不再向消费者推送消息,直到有新的确认到来。这种方式就类似滑动窗口,很好地保证消费者负载压力不会过大。

2.2 限流

        在秒杀场景下,假设订单系统每秒能处理的订单数是10000,但是秒杀场景下可能某一瞬间会有50000订单数,这就会导致订单系统处理不过来而压垮。可以利用basicQos()来进行限流:

        1.SpringBoot配置文件用prefetch控制限流数,对应channel.basicQos(int prefetchCount)的prefetchCount。

        2.开启消息确认机制的手动确认模式manual。未手动确认的消息都视为未消费完的消费,prefetchCount并不会-1。

        配置文件:

spring:rabbitmq:addresses: amqp://admin:admin@192.168.217.150:5672/testVirtuallistener:simple:acknowledge-mode: manual #消息接收确认(MQ-消费者):none(自动确认)、auto(正常自动确认,异常不确认)、manual(手动确认)prefetch: 5 #控制消费者从队列中预取(prefetch)消息的数量

        声明队列和交换机:

public class RabbitMQConnection {public static final String QOS_QUEUE = "qos.queue";public static final String QOS_EXCHANGE = "qos.exchange";}
@Configurationpublic class RabbitMQConfig {@Bean("qosQueue")public Queue qosQueue(){return QueueBuilder.durable(RabbitMQConnection.QOS_QUEUE).build();}@Bean("qosExchange")public DirectExchange qosExchange(){return ExchangeBuilder.directExchange(RabbitMQConnection.QOS_EXCHANGE).durable(true).build();}@Bean("qosQueueBinding")public Binding qosQueueBinding(@Qualifier("qosExchange") DirectExchange directExchange, @Qualifier("qosQueue") Queue queue){return BindingBuilder.bind(queue).to(directExchange).with("qos");}}

        生产者代码:

@RestController@RequestMapping("/producer")public class ProducerController {@Resource(name = "rabbitTemplate")private RabbitTemplate rabbitTemplate;@RequestMapping("qos")public String qos() {for (int i = 0; i < 20; i++) {rabbitTemplate.convertAndSend(RabbitMQConnection.QOS_EXCHANGE, "qos", "Hello SpringBoot RabbitMQ");}return "发送成功";}}

        消费者代码:

@Componentpublic class QosListener {@RabbitListener(queues = RabbitMQConnection.QOS_QUEUE)public void queueListener(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.printf("listener ["+RabbitMQConnection.QOS_QUEUE+"]收到消息:%s, deliveryTag:%d \n",new String(message.getBody(),"UTF-8"),deliveryTag);System.out.println("消息处理完成");channel.basicAck(deliveryTag,false);} catch (Exception e) {channel.basicNack(deliveryTag,false,true);}}}

        当进行限流时,如果不进行确认,则消费者最多只有5条消息:

        如果进行确认,则消息很快就被消费完了:

2.3 负载均衡(非公平分发)

        负载均衡就是指在分布式环境下,让每个服务接收其能同时处理上限的任务数,这样每个服务都不会空闲下来(最大化利用硬件资源),也不会因为负载过大导致崩溃。

        对应prefetchCount就应该配置为机器所能处理的最大上限数。假设消费者只能一次处理一个消息,此时prefetch就配置为1:

spring:rabbitmq:addresses: amqp://admin:admin@192.168.217.150:5672/testVirtuallistener:simple:acknowledge-mode: manual #消息接收确认(MQ-消费者):none(自动确认)、auto(正常自动确认,异常不确认)、manual(手动确认)prefetch: 1 #控制消费者从队列中预取(prefetch)消息的数量

        其它代码不变,增加一个消费者来模拟不同处理速度的服务:

@Componentpublic class QosListener {@RabbitListener(queues = RabbitMQConnection.QOS_QUEUE)public void queueListener(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.printf("listener ["+RabbitMQConnection.QOS_QUEUE+"]收到消息:%s, deliveryTag:%d \n",new String(message.getBody(),"UTF-8"),deliveryTag);Thread.sleep(5);channel.basicAck(deliveryTag,false);} catch (Exception e) {channel.basicNack(deliveryTag,false,true);}}@RabbitListener(queues = RabbitMQConnection.QOS_QUEUE)public void queueListener2(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.printf("listener2 ["+RabbitMQConnection.QOS_QUEUE+"]收到消息:%s, deliveryTag:%d \n",new String(message.getBody(),"UTF-8"),deliveryTag);Thread.sleep(10);channel.basicAck(deliveryTag,false);} catch (Exception e) {channel.basicNack(deliveryTag,false,true);}}}

        最终消费者2处理了8条消息,消费者1处理了12条消息。这样就不会出现某些消费者大量时间空闲,整个系统的吞吐量就会得到很大提升。

        注意:deliveryTag有重复是因为两个消费者占用不同的通道,deliveryTag在同一个通道里保持连续,通道与通道之间相互独立,因此出现这样的现象。

下篇文章:

http://www.xdnf.cn/news/15881.html

相关文章:

  • 软考 系统架构设计师系列知识点之杂项集萃(113)
  • AJAX概述
  • c++ 基本语法易错与技巧总结
  • 零基础学习性能测试-linux服务器监控:内存监控
  • fastjson2 下划线字段转驼峰对象
  • 【RK3576】【Android14】分区划分
  • 石子问题(区间dp)
  • 从Prompt到结构建模:如何以数据驱动重构日本语言学校体系?以国际日本语学院为例
  • Linux:lvs集群技术
  • LVS四种工作模式深度解析
  • 千线万网,电路之行——LVS检查的内核逻辑
  • Python day18
  • 统计EfficientNet-B7的参数个数。
  • 华为擎云L420安装LocalSend
  • 单元测试学习+AI辅助单测
  • 【图像处理基石】什么是小波变换?
  • 美国VPS服务器Linux内核参数调优的实践与验证
  • iOS 通知机制及底层原理
  • 突破 MySQL 性能瓶颈:死锁分析 + 慢查询诊断 + 海量数据比对实战
  • 【设计模式C#】状态模式(用于解决解耦多种状态之间的交互)
  • 中间件安全攻防全解:从Tomcat到Weblogic反序列化漏洞介绍
  • 使用DataGrip连接安装在Linux上的Redis
  • FreeRTOS—列表和列表项
  • 相机参数的格式与作用
  • Vue3 学习教程,从入门到精通,Vue 3 声明式渲染语法指南(10)
  • 快速上手AI整合包!GPT-SoVITS-v2打包教程,解锁AIStarter应用市场潜力
  • DC-DC降压转换5.5V/3A高效率低静态同步降压转换具有自适应关断功能
  • Bicep入门篇
  • 小谈相机的学习过程
  • Linux_基础指令(一)