当前位置: 首页 > java >正文

Redis--黑马点评--消息队列

Redis消息队列

在进行redis秒杀优化时,我们在异步秒杀的实现中发现了两个较为严重的问题:内存限制问题以及数据安全问题。

想要解决这两个难题,就需要使用消息队列

消息队列(Message Queue),字面意思就是存放消息的队列。最简单的消息队列模型包括三个角色:

  • 消息队列:存储和管理消息,也称消息代理(Message Broker)

  • 生产者:发送消息到消息队列

  • 消费者:从消息队列获取消息并处理消息

优点:解除耦合,提高工作效率

这样看来和我们开始用的阻塞队列好像没什么不同,但是实际有三点不同:

  • 消息队列是在JVM以外的一个独立服务,不受JVM内存的限制,这就解决了内存限制问题。

  • 消息队列不仅仅做消息存储,而且还需要确保数据的安全,就是指存入在消息队列的所有数据要去做持久化,这样不管是服务宕机还是重启,数据就不存在丢失的问题。这就解决了数据安全问题

  • 并且在消息投递给消费者以后,还需要要求消费者做消息的确认,如果消息没有得到确认,那么这个消息就会在队列中依然存在,下一次会再次投递给消费者,让它继续处理,直到成功为止,就是确保消息至少被消费一次。

正常市面上的消息队列也有很多,比如RabbitMQ,RocketMQ,等等,但是上述的消息队列都需要去额外搭建一些服务,去学习另外的一些技术,是有一定的成本。

而在Redis中,提供了三种不同的方式来实现消息队列:

  • list结构:基于list结构模拟消息队列

  • PubSub:基本的点对点消息模型

  • Stream:比较完善的消息队列模型

接下来就来详细了解redis的三种消息队列实现方式:

基于List模拟消息队列

消息队列(Message Queue),字面意思就是存放消息的队列,而redis的list数据结构是一个双向链表,很容易模拟出队列效果。

队列:先进先出,出口和入口不一样,因此可以利用redis命令:lpush集合rpop,或者rpush结合lpop来实现。

不过要注意的是,当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。

因此这里应该使用BRPOP或者BLPOP来实现阻塞效果。

进入客户端进行测试:

image-20250627215444671

这就是在模仿jvm中的阻塞队列,相比于阻塞队列的优点就是不依赖于JVM内存,不用担心存储上限的问题,redis支持数据持久化,数据安全也能得到保证。还能保证消息有序性。

数据持久化:List不是消息队列,只是一种存储结构,被当做消息碎队列来使用,因此数据存储结构具有数据持久化能力

但是List机构只是基本满足了消息队列的功能,还有一些欠缺点:

  • 无法避免消息丢失:比如从redis的队列中取到一条消息,但是在还没有处理就出现了异常,这个消息就丢失了。

  • 只支持单消费者:一条消息只能被一个用户拿到,无法实现一条消息被共享的场景

基于PubSub的消息队列

PubSub(publish subscribe发布 订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel(频道),生产者向对应channel发送消息后,所有订阅者都能够收到相关消息。

相关Redis命令

  • SUBSCRIBE channel[channel]:订阅一个或多个频道

  • publish channel msg:向一个频道发送消息

  • psubscribe pattern[pattern]:订阅和pattern格式匹配的所有频道(pattern:通配符)

相应的通配符可以去Redis官网查询:PSUBSCRIBE | Docs

image-20250627221644654

流程图:

image-20250627221935252

与基于list模拟消息队列的区别就在于允许多个消费者去进行订阅,在订阅时指定自己的频道名称。

进行演示:

image-20250627222939860

基于PubSub的消息队列有哪些优缺点?

优点:

  • 采用发布订阅模型,支持多生产、多消费

缺点:

  • 不支持数据持久化:PubSub只是用来做消息对列的,没有数据持久化能力

  • 无法避免消息丢失:PubSub就是用来做消息发送的,因此如果发送消息的频道没有被任意进程订阅,那么数据就直接丢失。

  • 消息堆积有上限,超出时数据丢失。当发送消息时,如果有消费者进程监听频道,就会在消费者那里有一个缓存区域,将消息缓存,消费者去处理,如果消费者处理消息较慢,消息来得又多,消费者缓存空间是有上限的,如果超出就会数据丢失。

如果对可靠性要求较高,不建议使用PubSub消息队列。

基于Stream的消息队列--单消费模式

Stream是Redis5.0引入的一种新数据类型,可以实现一个功能非常完善的消息队列。

先从官网查看:Commands | Docs

较为重要的命令:发送消息与接收消息

image-20250627224946845

举例说明:

image-20250627225226995

xlen 命令:用于查看指定队列中元素的数量

image-20250627225452244

读取消息的方式之一:XREAD

image-20250627230113934

进行演示:

image-20250627230528482

我们发现,在不同消费者可以读取一条消息,并且相同的消费者可以重复读取一条消息,即在stream这种数据类型中,一个消息读取完后不会删除,是永久存在的。

不仅如此,stream消息队列也可以实现阻塞效果:

演示如下:

image-20250627231130207

那么在业务开发中,我们可以循环的调用xread阻塞方式来查询最新消息,从而实现持续监听队列的效果。伪代码如下:

while(true){//尝试读取队列中的消息,最多阻塞两秒Object msg = redis.execute("XREAD COUNT 1 BLOCK 2000 STREAMS users $");if(msg == null){continue;}//处理消息handleMessage(msg);
}

注意事项:当指定起始ID为$时,代表读取最新的消息,如果我们处理一条消息的过程中,又有超过一条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题。

演示:

image-20250627232211252

stream类型消息队列的xread命令特点:

  • 消息可回溯(消息永久存在)

  • 一个消息可以被多个消费者读取

  • 可以阻塞读取

  • 消息漏读的风险

接下来就是消费者组。

基于stream的消息队列--消费者组

消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。

具备以下特点:

  • 消息分流:队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理的速度,从一定程度上避免了消息堆积的问题。

  • 消息标识:消费者组会维护一个标识,记录最后一个被处理的消息。就类似于书签,哪怕消费者宕机重启,还会从标识之后读取消息,确保每一个消息都会被消费。解决了单消费时的消息漏读风险

  • 消息确认:消费者获取消息后,消息处于pending状态,并存入一个pending-list。档处理完成后需要通过XACK来确认消息。标记消息为已处理,才会从pending-list移除。

较为重要的命令

  • 创建消费者组:

 XGROUP CREATE key groupName ID [MKSTREAM]
  • key:队列名称

  • groupName:消费者组名称

  • ID:起始id标识,$代表队列中最后一个消息,0则代表列表中第一个消息

  • MKSTREAM:队列不存在时自动创建队列

其他常见命令:

 # 删除指定的消费者组XGROUP destory key groupName# 给指定的消费者组添加消费者xgroup createconsumer key groupName consumername#删除消费者组中的指定消费者xgroup delconsumer key groupName consumername

注意事项:一般情况下不需要去添加消费者,因为当从这个消费组中指定一个消费者并且监听消息时,如果stream发现消费者不存在,就会自动创建。

从消费者组读取消息:

image-20250629175000819

  • group:消费者组名称

  • consumer:消费者名称,如果消费者不存在,会自动创建一个消费者

  • count:本次查询的最大数量

  • block milliseconds:当没有消息时最长等待时间

  • NOACK:无需手动ACK,获取到消息后自动确认

  • streams key:指定队列名称

  • ID:获取消息的起始ID:

    • ">":从下一个未消费的消息开始

    • 其他:根据指定id从pending-list中获取已消费但未确认的消息。例如0,是从pending-list中的第一个消息开始

命令演示:

image-20250629213711915

确认消息:

image-20250629213829716

  • key:队列名称

  • group:组名

  • ID:需要确认的消息ID

命令演示:

image-20250629214418033

那如果此时,一个消费者读到了消息之后,未确认消息就挂了,此时,该消息就会在pending-list中,那么如何查看pending-list呢?

xpending key group [IDLE min-idle-time] start end count [consumer]

  • key:队列名称

  • group:组名称

  • IDLE:空闲时间,获取消息以后确认之前的这段时间,比如5000毫秒,空闲时间在5000毫秒以上的这些消息,才会接收到pending-list,低于空闲时间的消息不接收。

  • start end:起始范围,在pending-list中最小的ID和最大的ID是什么,- +是指最小到最大的所有消息

  • count:查看的消息数量

  • consumer:想要获取哪个消费者的pending-list。

命令演示:

image-20250629220036360

那么在讲解读取消费者组消息的时候,我们说过,将最后一个ID参数换为数字,就是根据指定id从pending-list中获取已消费但未确认的消息。例如0,是从pending-list中的第一个消息开始。

命令演示:

image-20250629220459141

处理消息流程:

在正常情况下先利用获取未消费信息,拿到后就处理,处理后就确认,确认的过程中如果出现了异常,这个消息就会进入pending-list,在Java代码中体现出来的就是我们出现了异常,catch到之后,再去查询pending-list的消息(出现异常的消息),处理异常消息后,再次确认pending-list是否清空,最后再去获取未消费消息,回归正常。

以上介绍了在命令行界面中处理消息的流程,较为抽象,那么,在Java代码中处理消息流程应该是怎么样的呢?

消费者监听消息的基本思路:

 while(true){//尝试今天队列,使用阻塞模式,最长等待2000毫秒Object msg = redis.call("xreadgroup group g1 c1 count 1 block 200 streams s1 >");if(msg == null){//null说明没有消息,继续下一次continue;}try{//处理消息,完成后需要确认消息(ACK)handleMessage(msg);}catch(Exception e){while(true){Object msg = redis.call("xreadgroup group g1 c1 count 1 block 200 streams s1 0");if(msg == null){//null说明没有异常消息,所有消息已确认,结束循环break;}try{//说明有异常消息,再次处理handleMessage(msg);}catch(Exception e){//再次出现异常,记录日志,继续循环continue;}}}}

总结:stream类型消息队列的xreadgroup命令特点:

  • 消息可回溯

  • 可以同组多消费者争抢消息,加快消费速度

  • 可以阻塞读取

  • 没有消息漏读的风险

  • 有消息确认机制,保证消息至少被消费一次

目前看来。可以满足大部分简单需求,缺点在于持久化时数据容易丢失,积压的消息会占用大量内存。

与前几种消息队列的比较

ListPubSubStream
消息持久化支持不支持支持
阻塞读取支持支持支持
消息堆积处理受限于内存空间,可以利用多消费者加快处理受限于消费者缓冲区受限于队列长度,可以利用消费者组提高消费速度,减少堆积
消息确认机制不支持不支持支持
消息回溯不支持不支持支持

List:利用list的数据结构去模拟组合队列从而实现消息队列效果,因为他本身就是一种数据存储的模式,所以支持持久化,brpop 或者blpop可以实现阻塞读取,list内存上限就是redis对于list这种数据结构的内存上限,但是因为list可以有多个消费者同时去取,因此处理速度较快。而list本身不是消息队列,因此没有消息确认机制以及消息回溯。

PubSub其实还不如list结构,首先不支持持久化,发布订阅模式使其支持阻塞读取,而PubSub本身是消息队列,没有存储能力,直接是一个通道发送消息,最多就是在消费者中做一个缓冲,但是缓存空间有限,更别说消息确认机制,只要没人接收,直接消失,消息回溯能力也没有。

相比之下,Stream仿佛是完美的,消息持久化没问题,消息阻塞也OK,详细堆积处理也没问题,最多是受限于队列长度,并且可以利用多消费者消费,提高消息处理速度,减少堆积问题。消息确认机制:通过ACK与pending-list确保消息一定被消费一次。

因此如果要在redis的三种消息队列中选择的话,肯定是用stream模式,但是如果业务较为庞大,对于消息队列的要求更加严格,还是选要使用更加专业的消息队列比如RabbitMQ等,因为stream虽然支持消息的持久化,但这种持久化是依赖于Redis本身持久化的,Redis的持久化也可能会出现问题,而且消息确认机制只支持消费者的确认机制,不支持生产者确认机制,另外消息的事务机制,再多消费者下的消息有序性等等,还是有很多问题的,还是需要更加强大的消息队列去支持,但如果对消息队列要求不高,还是可以使用的。

希望对大家有所帮助!

http://www.xdnf.cn/news/14703.html

相关文章:

  • 基于 SpringBoot 实现一个 JAVA 代理 HTTP / WS
  • 电压跟随器输入电压正常、输出电压等于0V?
  • WebRTC(十三):信令服务器
  • python动漫周边电商网站系统
  • 视频序列中的帧间匹配技术 FrameMatcher 详解
  • 领域驱动设计(DDD)【23】之泛化:从概念到实践
  • SQL 子查询全位置解析:可编写子查询的 7 大子句
  • Web基础关键_004_CSS(二)
  • 2023国赛linux的应急响应-wp
  • JSON简介及其应用
  • 【LLIE专题】EnlightenGAN 无监督低照度图像增强
  • 实现一个AI大模型当前都无法正确实现的基础二叉树读取算法
  • 商业秘密中经营信息的法律保护探析——以客户名册为例
  • 数字孪生技术引领UI前端设计新革命:实时交互与模拟预测
  • 【Bluedroid】蓝牙启动之BTM_reset_complete源码解析
  • yolov13+bytetrack的目标跟踪实现
  • pytorch中的几个概念
  • 港澳地区,海外服务器ping通可能是地区运营商问题
  • c# sugersql 获取子表数据排序
  • MySQL彻底卸载教程
  • 桌面小屏幕实战课程:DesktopScreen 16 HTTP
  • Java锁机制知识点
  • 《Go语言高级编程》RPC 入门
  • python -日期与天数的转换
  • 量化面试绿皮书:56. 多项式求和
  • web3 docs
  • Linux进程关系
  • Flutter 网络请求指南, 从 iOS 到 Flutter 的 Dio + Retrofit 组合
  • 飞算科技依托 JavaAI 核心技术,打造企业级智能开发全场景方案
  • 数据应该如何组织,才能让Excel“读懂”?