当前位置: 首页 > backend >正文

【RocketMQ】一分钟了解RocketMQ

MQ是什么

MQ全称为Message Queue,即消息队列 ,是一种提供消息队列服务的中间件,也称为消息中间件,是一套提供了消息生 产、存储、消费全过程的软件系统,遵循FIFO原则。

MQ的好处有哪些

  • 异步解耦
    最常见的一个场景是用户注册后,需要发送注册邮件和短信通知,以告知用户注册成功。传统的做法有以下两种:
    在这里插入图片描述
    数据流动如下所述:
    1.注册页面填写账号和密码并提交注册信息,这些注册信息首先会被写入注册系统。
    2.注册信息写入注册系统成功后,再发送请求至邮件通知系统。邮件通知系统收到请求后向用户发送邮件通知。
    3.邮件通知系统接收注册系统请求后再向下游的短信通知系统发送请求。短信通知系统收到请求后向用户发送短信通知。

以上三个任务全部完成后,才返回注册结果到客户端,用户才能使用账号登录。假设每个任务耗时分别为 50ms,则用户需要在注册页面等待总共 150ms 才能登录。

并行形式:

对于用户来说,注册功能实际只需要注册系统存储用户的账户信息后,该用户便可以登录,后续的注册短信和邮件不是即时需要关注的步骤。

对于注册系统而言,发送注册成功的短信和邮件通知并不一定要绑定在一起同步完成,所以实际当数据写入注册系统后,注册系统就可以把其他的操作放入对应的 RocketMQ 中然后马上返回用户结果,由 RocketMQ 异步地进行这些操作。

在这里插入图片描述

  • 削峰填谷
    简单来说就是当遇到秒杀等业务时,用户访问量大增,这时候可以使用MQ,将消息存入MQ当中这样就可以减少秒杀等高访问量场景下的造成的影响了
  • 分布式定时/延时调度
    RocketMQ 提供精确度到秒级的分布式定时消息能力(5.0架构后),可广泛应用于订单超时中心处理、分布式延时调度系统等场景。

使用 RocketMQ 定时消息有如下优势:

  • 定时精度高、开发门槛低:消息定时时间不存在阶梯间隔,可以轻松实现任意精度事件触发,无需业务去重。

  • 高性能、可扩展:传统的定时实现方案较为复杂,需要进行数据库扫描,容易遇到性能瓶颈的问题,RocketMQ 可以基于定时消息特性完成事件驱动,实现百万级消息 TPS 能力。

什么是RocketMQ

RocketMQ 是一个开源的分布式消息中间件,由阿里巴巴开发并贡献给 Apache 软件基金会。它主要用于高吞吐量、低延迟的消息传递需求。

RocketMQ 的优点和功能是比较多的,以下是 一些主要特点和功能:

  • 高吞吐量和低延迟:RocketMQ 设计用于处理大量的消息,并提供低延迟的消息传递服务,适合需要高性能的场景。

  • 分布式架构:RocketMQ 使用分布式架构来支持大规模的消息传递。它可以水平扩展,以处理更大的数据量和更高的并发需求。

  • 消息可靠性:RocketMQ 支持消息持久化和多副本机制,确保在系统故障时不会丢失消息。这使得消息的可靠性和一致性得到了保障。

  • 高可用性和容错:RocketMQ 提供了高可用性的解决方案,包括多主多从等架构方案,确保系统的稳定性和连续性。

官网写的很详细,架构、基本概念(主题、队列、生产者、消费者、NameServer、Beroker 等)、工作原理等。推荐大家学习一波:https://rocketmq.apache.org/zh/docs

RocketMQ架构

RocketMQ架构上主要分为四部分

1.1.Producer

消息发布的角色,支持分布式集群方式部署。Producer通过nameserver的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。

1.2.Consumer

消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时 也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。

1.3.Broker

Broker主要负责消息的存储、投递和查询以及服务高可用保证。

1.4.NameServer

NameServer是一个Broker与Topic路由的注册中心支持Broker的动态注册与发现主要包括两个功能

  • Broker管理
    NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活。

  • 路由信息管理
    每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费

Springboot整合

生产者

@Service
public class RocketMQProducer{@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Value("${rocketmq.producer.send-message-timeout}")private Integer messageTimeOut;/*** 发送普通消息* @return*/public SendResult sendMsg(String msgBody){SendResult result = rocketMQTemplate.syncSend("queue_test_topic", MessageBuilder.withPayload(msgBody).build());return result;}/*** 发送异步消息 在SendCallback中可处理相关成功失败时的逻辑*/public void sendAsyncMsg(String msgBody){rocketMQTemplate.asyncSend("queue_test_topic",MessageBuilder.withPayload(msgBody).build(), new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {// 处理消息发送成功逻辑}@Overridepublic void onException(Throwable e) {// 处理消息发送异常逻辑}});}/*** 发送延时消息<br/>* 在start版本中 延时消息一共分为18个等级分别为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h<br/>*/public void sendDelayMsg(String msgBody, Integer delayLevel){rocketMQTemplate.syncSend("queue_test_topic",MessageBuilder.withPayload(msgBody).build(),messageTimeOut,delayLevel);}/*** 发送带tag的消息,直接在topic后面加上":tag"*/public void sendTagMsg(String msgBody){rocketMQTemplate.syncSend("queue_test_topic:tag1",MessageBuilder.withPayload(msgBody).build());}}

消费者

/*** rocketmq 消息监听,@RocketMQMessageListener中的selectorExpression为tag,默认为**/
@Slf4j
@Component
@RocketMQMessageListener(topic = "queue_test_topic",selectorExpression="*",consumerGroup = "queue_group_test")
public class RocketMQMsgListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {byte[] body = message.getBody();String msg = new String(body, CharsetUtil.UTF_8);log.info("接收到消息:{}", msg);}}

测试

@Controller
public class ProducerController {@Autowiredprivate RocketMQProducer rocketMQProducer;@RequestMapping("/send")@ResponseBodypublic SendResult send(String msg)  {//formats: `topicName:tags`return rocketMQProducer.sendMsg(msg);}}
http://www.xdnf.cn/news/16336.html

相关文章:

  • Earth靶机攻略
  • linux线程概念和控制
  • 字符串缓冲区和正则表达式
  • Mingw 与MSYS2 与Cygwin区别
  • Linux如何执行系统调用及高效执行系统调用:深入浅出的解析
  • 基于深度学习的胸部 X 光图像肺炎分类系统(七)
  • 凝思系统6.0.80安装chorme,亲测可用
  • 如何创建或查看具有 repo 权限的 GitHub 个人访问令牌(PAT)
  • mount: /mnt/sd: wrong fs type, bad option, bad superblock on /dev/mmcblk1
  • FitCoach AI:基于React+CloudBase的智能健身教练应用开发全解析
  • 缓存一致性:从单核到异构多核的演进之路
  • Android Jetpack 组件库 ->WorkManager
  • Linux系统架构核心全景详解
  • Unity 实现帧率(FPS)显示功能
  • 11Linux文件压缩与链接实战技巧
  • 深入解析YARN中的FairScheduler与CapacityScheduler:资源分配策略的核心区别
  • Python 数据分析(二):Matplotlib 绘图
  • 小白成长之路-部署Zabbix7(二)
  • 【GoLang#3】:数据结构(切片 | map 映射)
  • Linux726 raid0,raid1,raid5;raid 创建、保存、停止、删除
  • KubeKey安装KubeSphere、部署应用实践问题总结
  • 零基础学习性能测试第四章:从0到1学会编写性能测试报告
  • 【Spring AI】SiliconFlow-硅基流动
  • C# 位运算及应用
  • GStreamer中Element(元素)
  • 面试150 回文数
  • python---字典(dict)
  • 从压缩到加水印,如何实现一站式图片处理
  • HDMI-IN调试:双MIPI支持4K60方案
  • AMBA - CHI(5) System coherency Interface