当前位置: 首页 > news >正文

(02)Redis 的订阅发布Pub/Sub

我们为了自己实现一个MQ功能,就要深入底层挖掘现有开源产品的实现过程。

Redis 发布订阅底层结构解析

Redis 不存储消息,仅作为“实时中转”;只有订阅者在线时才能收到消息;消息是广播给所有订阅此频道的客户端。

1. 核心数据结构:哈希表(dict)+ 链表(链式客户端列表)

1.1 pubsub_channels:频道订阅表(dict)

Redis 使用一个全局字典(哈希表)结构来维护频道与订阅者之间的映射关系:

dict *pubsub_channels; // key: channel name (sds),value: list of clients
  • Key 是频道名称(channel),类型为 Redis 字符串(SDS)
  • Value 是一个客户端链表(list *),保存所有订阅该频道的客户端连接指针
1.2 客户端链表(client list)

对于每个频道,对应一个链表或列表结构(底层用的是 Redis 自身的 list 类型):

typedef struct client {...int fd;                // 客户端 socket fdlist *subscribed_channels; // 客户端自身也记录了订阅了哪些频道...
} client;

这个链表中每个节点是一个 client * 指针,表示一个订阅该频道的活跃客户端连接。

2. 消息发布流程

当执行 PUBLISH channel message 时,Redis 会:

  1. 查找 pubsub_channels[channel],得到订阅该频道的客户端链表
  2. 遍历该链表,对每个客户端执行 addReply(client, message)
  3. 消息会直接写入客户端输出缓冲区,等待发送至客户端 socket

如果客户端网络异常导致写失败,可能会断开连接并清理其订阅记录。

3. 订阅流程

订阅(SUBSCRIBE):
  • Redis 检查频道是否存在于 pubsub_channels
    • 如果存在,将当前客户端加入该频道对应的客户端链表
    • 如果不存在,新建一个键值对:channel -> list,然后加入客户端
      在这里插入图片描述

4、取消订阅流程(UNSUBSCRIBE):

  • Redis 从 pubsub_channels[channel] 的链表中移除该客户端
  • 如果链表为空,则从字典中移除该频道
  • 同时更新客户端的 subscribed_channels 属性

5、 客户端断开连接时的清理机制

当客户端断开连接,Redis 会调用清理逻辑:

pubsubUnsubscribeAllChannels(client *c)

该函数会遍历客户端 subscribed_channels,从所有频道对应的客户端链表中移除该客户端,并删除空频道。

在这里插入图片描述

  • SUBSCRIBE:注册订阅关系。
  • PUBLISH:消息发布,立即广播给在线订阅者。
  • 掉线(连接断开):Redis 会检测到客户端断连,并自动移除其订阅关系。
  • UNSUBSCRIBE:客户端主动取消订阅,Redis 也会移除其订阅记录。

实践过程:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

http://www.xdnf.cn/news/219565.html

相关文章:

  • Ubuntu上搭建python环境并安装第三方库
  • C语言教程(二十四):C 语言中递归的详解
  • cuda学习3: 全局线程id计算
  • 大语言模型能否替代心理治疗师的深度拓展研究:fou
  • 两数之和II-输入有序数组(中等)
  • 洛谷题解 | CF1979C Earning on Bets
  • DNA复制过程3D动画教学工具
  • 稳定性 复杂度
  • 浅析localhost、127.0.0.1 和 0.0.0.0的区别
  • 【RocketMq延迟消息操作流程】
  • 鸟笼效应——AI与思维模型【84】
  • Canvas基础篇:概述
  • DeepSeek 本地化部署与 WebUI 配置的方法
  • Fiddler抓取APP端,HTTPS报错全解析及解决方案(一篇解决常见问题)
  • 在Ubuntu中安装python
  • 02_高并发系统问题及解决方案
  • 大模型高效化三大核心技术:量化、蒸馏与剪枝详解
  • 【AI论文】BitNet v2:针对1位LLM的原生4位激活和哈达玛变换
  • 物流新速度:数字孪生让仓库“聪明”起来
  • 民锋视角下的价格波动管理思路
  • 健康养生:拥抱活力生活
  • 【AI提示词】机会成本决策分析师
  • 理解 EKS CloudWatch Pod CPU Utilization 指标:与 `kubectl top` 及节点 CPU 的关系
  • 企业架构之旅(3):TOGAF ADM架构愿景的核心价值
  • C#学习——类型、变量
  • SpringSecurity+JWT
  • linux安装部署配置docker环境
  • 基于STM32的虚线绘制函数改造
  • linux下创建c++项目的docker镜像和容器
  • try catch + throw