当前位置: 首页 > backend >正文

【Fifty Project - D31】

结束了一个超级消耗周末,满安排之健身+梅溪湖游泳+做饭喝酒+羽毛球赛
完全力竭了,久久不能恢复过来,暂停健身安排了 端午后再继续

今日完成记录

TimePlan完成情况
7:30 - 8:10有氧爬坡
9:00 - 11:00RabbitMQ学习
12:00 - 14:30继续RabbitMQ学习
14:30 - 17:30小论文2

RabbitMQ

整体架构以及核心概念

rabbitmq由消息生产者publisher、消息消费者consumer以及消息代理server broker构成,生产者生产消息,消息代理根据规则将消息放在消息队列中,消费者从消息队列消费消息。

什么是rabbitmq server broker?

rabbitmq server broker是一种消息代理(message broker),它由exchange交换机、queue消息队列和binding路由规则组成,规定了消息如何传递。

为什么需要broker?(broker的好处)

  1. 解耦生产者和消费者,实现异步通信!
  2. 提供灵活的路由策略(一对一【直连】,一对多【主题、扇出】)
  3. 支持消息持久化、重试等高级功能

什么是virtual host?

(简称vhost)
vhost是RabbitMQ的逻辑隔离机制,类似于文件系统中的目录或者数据库中的schema。它允许在同一个rabbitmq server上创建多个独立环境,每个vhost有自己的exchange、queue、binding和权限,彼此互不干扰。

vhost和集群的关系?

  1. vhost是逻辑隔离,集群式物理部署
  2. 集群中所有节点都会同步vhost定义,但队列数据默认只存储在创建它的节点上(除非配置镜像队列)

vhost能跨服务器共享吗?

  • 不能。vhost是单服务器内的逻辑概念,不同mq server之间vhost完全独立。

Rabbit MQ架构

快速实战(Java)

这里直接使用spring提供的基于RabbitMQ的消息收发模板:SpringAMQP

maven依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

环境配置

基本配置如下,部署rabbitmq的主机ip、端口、业务对应虚拟主机、用户名、密码

spring:rabbitmq:host: 192.168.4.144port: 5672virtual-host: /hmallusername: hmallpassword: 123321

发送信息

@Autowired
RabbitMQTemplate rt;String queueName = "hello.queue";
String msg = "hello mq";
// 使用
rt.convertAndSend(queueName, msg);

消费消息

@RabbitListener(queues = "hello.queue")
public void ListenSimpleQueue(String msg){System.out.println(msg);
}

上面两部分中的消息,并非一定是String类型,任意类型都可以,SpringAMQP会自动进行类型转换。

WorkQueues模型

任务模型,就是让多个消费者绑定到同一个队列,共同消费队列中的消息。
(1)接下来代码实现两个消费者绑定在同一个队列上以每秒50条消息的速度消费消息,并且由一个生产者以每秒钟50条消息的速度生产消息。

生产者

@SpringBootTest
public class MQTest {@Autowiredprivate RabbitTemplate rt;@Testvoid testSendMQHello() throws InterruptedException {for(int i = 0; i < 50; i ++ ) {Thread.sleep(20);rt.convertAndSend("hello.queue", "hello, dame mq + " + i + " 号消息");}}
}

消费者

@Slf4j
@Component
public class MyMQListener {@RabbitListener(queues = "hello.queue")public void listenSimpleQueue1(String msg){System.out.println("消费者1收到了simple.queue的消息                :【" + msg +"】");try {Thread.sleep(20);}   catch (InterruptedException e) {}}@RabbitListener(queues = "hello.queue")public void listenSimpleQueue2(String msg){System.out.println("消费者2收到了simple.queue的消息:【" + msg +"】");try {Thread.sleep(20);}   catch (InterruptedException e) {}}
}

结果如下:两个消费者依次消费生产者产生的消息
在这里插入图片描述

(2)实际上部署在不同设备上的代码执行速度会因为设备性能而表现有所差异,因此接下来修改消费者代码,将一个消费者消费速度改为5个消息一秒钟,观察一下结果

@RabbitListener(queues = "hello.queue")public void listenSimpleQueue2(String msg){System.out.println("消费者2收到了simple.queue的消息:【" + msg +"】");try {Thread.sleep(200);}   catch (InterruptedException e) {}}

结果如下,我们发现消费者2消费速度下降了以后,它们消费的编号依旧没有区别,也就是队列分配给不同性能的消费者相同的消息量,不考虑消费者的实际消费能力,这样会导致性能差的机器持续阻塞很多消息,而性能好的机器空闲。
在这里插入图片描述
(3)SpringAMQP中提供了prefetch的设置,可以限制消费者获取消息队列中消息的数量

spring:rabbitmq:listener:simple:prefetch: 1

设置后测试,结果如下:可见消费能力强的消费者1不再是固定地消费编号为02468的消息,而是消费了02345678,也就是每个消费者每次只能取一条消息,消费完成后再取下一个消息。
在这里插入图片描述

交换机类型

交换机没有存储消息的能力,只负责转发消息,因此如果没有任何队列和交换机绑定,或者没有符合路由规则的队列,那么消息会丢失。
交换机一共有四种:

  • Fanout:扇出型,广播给所有绑定到交换机的队列
  • Direct:订阅型,基于RouteKey发送给订阅了消息的队列
  • Topic:主题订阅,根据通配符匹配RoutingKey进行转发
  • Headers:头匹配

Fanout交换机

Fanout交换机
如图,fanout交换机接收到生产者的消息会将其路由给所有绑定的消息队列,消息会在每个消息队列中被消费一次

Direct交换机

direct exchange
如图,direct exchange会根据消息的routing key 匹配相同的key发送到消息队列中

Topic交换机

Topic Exchange
如图,Topic Exchange也是使用routing key匹配binding key,但是允许bindingkey使用通配符。

http://www.xdnf.cn/news/8974.html

相关文章:

  • 【ArcGIS】ArcGIS AI 助手----复现
  • Java设计模式之观察者模式:从基础到高级的全面解析
  • MySql(二)
  • 高效的接口自动化测试工具:Apifox
  • 学习threejs,使用three-spritetext实现黑客帝国数字雨效果
  • Kafka Kraft模式集群 + ssl
  • 14.测速小车(测速模块)
  • Linux连接服务器全攻略:从基础到进阶
  • AI时代新词-生成对抗网络(GAN)
  • 最新Spring Security实战教程(十六)微服务间安全通信 - JWT令牌传递与校验机制
  • CMake指令:set()
  • 行为型:策略模式
  • Flink流处理基础概论
  • 20250526惠普HP锐14 AMD锐龙 14英寸轻薄笔记本电脑(八核R7-7730U)的显卡驱动下载
  • 记录 | Android TextView 中的滚动方向
  • 基于Python flask 的豆瓣电影top250数据评分可视化
  • 数据结构与算法学习笔记(Acwing 提高课)----动态规划·区间DP
  • 【C++指南】string(四):编码
  • 单细胞数据分析(五):三种整合单细胞数据(Harmony、fastMNN、SCTransform)的完整流程
  • 学员投稿:华为,ov等手机主流大厂桌面未读计数角标更新接口汇总
  • 解析Java String.getBytes()编码与new String()解码的字符集转换机制
  • 深入解析Kafka JVM堆内存:优化策略与监控实践
  • 深入理解JavaScript设计模式之原型模式
  • SpringBoot(四)--- Mybatis、PageHelper、事务
  • 【LLM】LLM源码阅读与分析工具DeepWiki项目
  • C++ 中的引用参数(Reference Parameter)‌
  • 数据结构第2章绪论 (竟成)
  • JavaWeb:SpringBoot Bean管理
  • 豆瓣电视剧数据工程实践:从爬虫到智能存储的技术演进(含完整代码)
  • 墨水屏 函数Paint_SetScale的详解