当前位置: 首页 > news >正文

Redis实战(7)-- 高级特性 Redis Stream数据结构与基础命令

Redis stream的数据结构:

【注意】消息 ID 的形式是timestampInMillis-sequence,例如1527846880572-5;而使用*,进行设置对于的消息ID,则是表示系统默认生成。

每个 Stream 都可以挂多个消费组,每个消费组会有个游标last_delivered_id在 Stream 数组之上往前移动,表示当前消费组已经消费到哪条消息了。

【重点】每个消费组都有一个Stream 内唯一的名称,消费组不会自动创建,它需要单独的指令xgroup create进行创建,需要指定从 Stream 的某个消息 ID 开始消费,这个 ID 用来初始化last_delivered_id变量。

同时每个消费组状态都是独立的,互不影响,同一个消息,可以被每个组都消费。

命令端操作

生产者端

(1)xadd追加消息:

(2)xrange获取消息队列,会自动过滤已删除信息

其中-表示最小值,+表示最大值

获取指定ID信息

(3)xlen获取消息长度

即代表当前Stream中有多少消息存在且未被消费

(4)del删除Stream

Del streamtest

即表示删除整个Stream流

而xdel streamtest ID 可以删除对应ID消息。

消费端

单消费者【了解】

虽然Stream中有消费者组的概念,但是可以在不定义消费组的情况下进行 Stream 消息的独立消费,当 Stream 没有新消息时,甚至可以阻塞等待。Redis 设计了一个单独的消费指令xread,可以将 Stream 当成普通的消息队列 (list) 来使用。使用 xread 时,我们可以完全忽略消费组 (Consumer Group) 的存在,就好比 Stream 就是一个普通的列表 (list)。

【注意】0-0表示头消息,$表示最新消息开始

单消费者操作:

(1)读取指定消息ID后消息:

Xread count 1 streams stream2 1665644057756-0

【注意】当指定位置是0-0时候,则会从第一条消息开始读。$则是读取最尾部消息,如果这个尾部消息已经被读过了,则返回nil (null),通常会配合阻塞block进行使用。

(2)以阻塞方式从尾部一次读取一个信息,并阻塞直至最新消息到来

xread block 0 count 1 streams stream1 $

一般来说客户端如果想要使用 xread 进行顺序消费,一定要记住当前消费到哪里了,也就是返回的消息 ID。下次继续调用 xread 时,将上次返回的最后一个消息 ID 作为参数传递进去,就可以继续消费后续的消息。

消费群【重点】

【面试考题】为什么会用到群组?

Redis群组消费是为了应对生产者发送过多消息到redis stream中,但是消费者消费不过来的情况,这种情况可以使用多个群组进行消费,从而合理提升消费者组整体消费能力。

结构图展示:

针对消费组进行解析,消费组会有一个值last_delivered_id进行标识偏移值,表明当前消费到哪条消息。而消费者中也有 pending_ids[] 即消费ID记录表,其中记录当前消费者消费过哪些内容,相当于一个ack机制(其实这个就是PEL机制)。

(1)消费组群组创建:

xgroup create stream1 c1 0-0

(2)只接受最新的消息,当前Stream消息会被全部忽略处理:

xgroup create stream1 c2 $

(3)查看对应Stream状态

xinfo stream stream1

(4)查看消费组状态:

Xinfo groups stream1

(5)查看一组中多个消费者状态:

Xinfo consumers stream2 c1

可以看到目前c1这个消费者有 7 条等待ACK的消息,空闲了2086176ms 没有读取消息。

(6)消费组中固定消费者消费

xreadgroup GROUP c1 consumer1 count 1 block 0 streams stream1 >

【解析】

让c1消费组中的consumer1消费者进行消费一条消息,">"表示从当前消费组的 last_delivered_id 后面开始读,每当消费者读取一条消息,last_delivered_id 变量就会前进。Block 0表示进行阻塞等待。

【注意】

由于有偏移下标,群组中任意消费者进行消费消息后,偏移量都会向后移动,所以说除非消费失败导致异常,消费者组中消费者不会消费已经比消费的数据。

(7)消费者确认

Xack stream1 cg1 160219318311-0

确认消费者消费stream1消息队列中160219318311-0的消息。(主要用于PEL机制确定)

http://www.xdnf.cn/news/1238347.html

相关文章:

  • HCIE-Datacom题库_07_设备【道题】
  • kafka与其他消息队列(如 RabbitMQ, ActiveMQ)相比,有什么优缺点?
  • Qt-vs加载exe图标
  • 日常--详细介绍qt Designer常用快捷键(详细图文)
  • 其它IO函数
  • Fay数字人如何使用GPT-SOVITS进行TTS转换以及遇到的一些问题
  • 《基于通道注意力与空洞卷积的胸片肺气肿检测算法》论文解析
  • [硬件电路-138]:模拟电路 - 什么是正电源?什么是负电源?集成运放为什么有VCC+和VCC-
  • Python切片命名技术详解:提升代码可读性与维护性的专业实践
  • 2106. 摘水果
  • 关于assert()函数,eval()函数,include
  • 第N个泰波那契数
  • Spring lookup-method实现原理深度解析
  • e2studio开发RA4M2(6)----GPIO外部中断(IRQ)配置
  • 信创及一次ORACLE到OB的信创迁移
  • 图像、视频、音频多模态大模型中长上下文token压缩方法综述
  • 使用 Vuepress + GitHub Pages 搭建项目文档
  • 【Bluetooth】【Transport层篇】第四章 基于基础UART的蓝牙硬件发送协议 UART H4 Transport详解
  • Docker 国内可用镜像
  • 关于 xrdp远程桌面报错“Error connecting to sesman on 127.0.0.1:3350“的解决方法
  • [自动化Adapt] 录制引擎
  • 计算机视觉CS231n学习(2)
  • 第六章第三节 TIM 输出比较
  • Java 大视界 -- Java 大数据在智能教育学习资源个性化推荐与学习路径动态调整中的深度应用(378)
  • ARPO:让LLM智能体更高效探索
  • 三角洲行动ACE反作弊VT-d报错?CPU虚拟化如何开启!
  • 嵌入式学习-(李宏毅)机器学习(5)-day32
  • 苍穹外卖项目学习——day1(项目概述、环境搭建)
  • 音视频学习(五十):音频无损压缩
  • 力扣-437.路径总和III