当前位置: 首页 > java >正文

IM系统群消息推送方案

常见的群消息推送流程

场景:a、b、c 位于同一个群中。a、b 在线,c 离线
常见的消息推送流程如下:

  1. a 发送一条消息
  2. 服务端接收消息,查询在线用户,将消息转发给在线用户 b
  3. 服务端将消息存储到 c 的离线库

上述这种推送模型是“写扩散”,如果群里有 200 个用户,最坏的情况一条消息需要存储199次,消息扩散系数为 n-1

存在的问题

同一份消息存储了多份,极大的增加了数据库的存储压力。

优化1:减少存储量

存储离线消息时,可以只存储消息的 seq_id (唯一 id)。这样消息本体只存储一份,可以大幅度的降低数据库的冗余数据。

假设 a 在群里连续发了 100 条消息,那么 c 的离线消息库里面将会有对应 100 条记录。

有必要吗?

聪明的你肯定想到了我们可以只存储 c 的last_ack_seq_id(最后一次收到的消息的 seq_id)。因为 c 离线后的所有消息都是未收到的。在 c 上线时,拉取群内 last_ack_seq_id 后的消息即可。

现在的流程如下:

  1. a 发送一条消息
  2. 服务器收到消息,持久化消息
  3. 服务端查询在线用户,将消息推送给在线用户 b,并更新 b 的 last_ack_seq_id
  4. c 上线后,拉取群内 last_ack_seq_id后的消息

存在的问题

虽然 websocket 是基于 tcp 实现的,但是仍然需要我们在应用层引入相关机制,确保消息可靠传输。因为 tcp 的可靠传输针对的是传输层,对应用层不负责,例如:客户端成功接收消息,但是消息处理失败。

优化2:可靠传输

参考 tcp 的可靠传输,我们可以引入 ack 应答、超时重传、去重机制。

首先我们需要确保服务端一定收到消息。

  1. 当客户发送消息时为每条消息生成唯一 seq_id,并开启定时任务。
  2. 当服务端收到 a 发送的群消息将其持久化后,回复 ack 给 a。
  3. 当定时任务超时仍为收到 ack 时重新发送消息。

其次我们需要确保客户端一定收到消息。

  1. 服务端向在线用户 b 发送消息,携带消息 seq_id,并开启定时任务
  2. 客户收到消息,返回 ack 给服务端
  3. 服务端修改 last_ack_seq_id
  4. 当定时任务超时仍未收到 ack 时重新发送消息

至于离线用户,不用管他

那我问你,这样真的就没问题了吗?

存在的问题

客户端向服务端发送消息的过程没有问题,问题在服务端修改 last_ack_seq_id
例如:

  1. 在同一个群内,服务端先向 b 发送消息 1
  2. 服务端又向 b 发送消息 2
  3. b 返回了 消息 2 的 ack
  4. 服务端修改 last_ack_seq_id 为消息2 的 seq_id

由于网络的原因,无法保证服务端接收 ack 的顺序,我们可以使用 redis 解决这个问题。我们在 redis 中定义两个类型数据结构:list,hash。list 中维护着服务器在当前群组中推送消息的顺序。hash 中维护着消息的状态,是否 ack。发送消息流程如下:

  1. 服务端在 redis 的 list 中添加消息1,再在 hash 把消息1 的状态设为 pending,发送消息1
  2. 服务端在 redis 的 list 中添加消息2,再在 hash 把消息2 的状态设为 pending,发送消息2
  3. b 返回了消息2 的 ack
  4. 服务端将 hash 中的消息2 状态设为 acknowledged,判断队头元素是否为acknowledged状态,此时队头 member 为消息1,消息1的状态为pending,跳出循环
  5. b 返回了消息1 的 ack
  6. 服务端将 hash 中的消息1 状态设置为 acknowledged,断队头元素是否为acknowledged状态,此时队头 member 为消息1,消息1的状态为acknowledged,弹出消息1,重复操作,直到队头 member 状态为 pending
  7. 服务端更新 MySQL 中群成员关系表中的的 last_act_seq_id 字段

群消息推送流程图

在这里插入图片描述

beautiful~

http://www.xdnf.cn/news/4949.html

相关文章:

  • 多模型协同预测在风机故障预测的应用(demo)
  • 订阅“科技爱好者周刊”,每周五与你相约科技前沿!
  • Docker下Gogs设置Webhook推送Spug,踩坑记录与解决方案
  • Git clone时出现SSL certificate problem unable to get local issuer certificate
  • 安装docker
  • 【网络编程】四、守护进程实现 前后台作业 会话与进程组
  • ChatTempMail - AI驱动的免费临时邮箱服务
  • 线程中常用的方法
  • PX4开始之旅(二)通过自定义 MAVLink 消息与 QGroundControl (QGC) 通信
  • 开源数字人框架 AWESOME - DIGITAL - HUMAN:技术革新与行业标杆价值剖析
  • AWS IoT Core与MSK集成实战:打造高可靠实时IoT数据管道
  • 探索表访问方法功能:顺序扫描分析
  • 复合机器人案例启示:富唯智能如何以模块化创新引领工业自动化新标杆
  • Oracle版本、补丁及升级(12)——版本体系
  • [C#] async和await(腾讯元宝)
  • 从逻辑学视角理解统计学在数据挖掘中的作用
  • 数据结构-堆
  • C++中static关键字详解:不同情况下的使用方式
  • 谷云科技iPaaS发布 MCP Server加速业务系统API 跨入 MCP 时代
  • JAVA将一个同步方法改为异步执行
  • CAN转ModbusTCP网关:破解电池生产线设备协议壁垒,实现全链路智能互联
  • 单调栈所有模版类型(4)
  • 为特定领域微调嵌入模型:打造专属的自然语言处理利器
  • 钉钉打卡教程
  • Go Modules 的基本使用
  • 什么是直播美颜SDK?跨平台安卓、iOS美颜SDK开发实战详解
  • 排序算法-希尔排序
  • 操作系统面试问题(4)
  • 不拆机查看电脑硬盘型号的常用方法
  • JVM之jcmd命令详解