当前位置: 首页 > web >正文

项目模拟实现消息队列第二天

消息应答的模式

1.自动应答:

消费者把这个消息取走了,就算是应答了(相当于没有应答)

2.手动应答:

basicAck方法属于手动应答(消费者需要主动调用这个api进行应答)

小结

1.需要实现生产者,broker server,消费者这三个部分的

2.针对生产者和消费者来说,

主要编写的是 客户端和服务器的网络通信部分,给客户端提供一组api,让客户端的业务代码来调用,从而通过网络通信的方式远程调用broker server的方法

至于生产者的数据从哪里来,消费者取到数据之后干嘛,生产者和消费者的具体业务逻辑,我们不去关心

3.[重点]

实现broker server以及broker server内部的一些核心概念和核心api

核心概念:Virtual host,Exchange,Queue,Binding,Message

核心api:创建交换机,删除交换机,创建队列,删除队列,创建绑定,解除绑定,发送消息,订阅消息,应答消息

4.持久化

上述这些关键数据,在硬盘上怎么存储,什么格式存储,存储在数据库中,文件中?后续服务器重启了,如何读取当前数据,把内存中的内容恢复过来

上述要做的工作,最终目标就是实现一个"分布式系统"下这样的生产者消费者模型

消息队列中存在的核心概念

1.交换机        Exchange

2.队列.          Queue

3.绑定           Binding

4.消息           Message

针对交换机,队列,绑定,消息---内存中也需要存储(执行效率高),硬盘上也需要存储(持久化),有些交换机,队列,绑定等,需要持久化存储,但是有些则不需要,用户使用的时候,可以通过开关(boolean值)来决定是否真正需要持久化 

管理关键概念需要两个部分

1.内存

2.硬盘 ->数据库,文件

考虑使用MySQL本身比较重量,此处为了使用更方便,简化环境,采用更轻量的数据库,SQLite(一个完整的SQLite数据库,只有一个单独的可执行文件(不到1M),这个数据库相当于是直接操作本地的硬盘文件

SQLite应用非常广泛,在一些性能不高的设备上,使用数据库的首选,尤其是移动端和嵌入式设备Android系统,就是内置的SQLite(不用额外安装,直接使用maven,把SQLite的依赖引入进来即可,会自动加载jar和动态库文件

SQLite如何建库能

当把依赖和配置都搞好了,

难点1:

如何把

private Map<String,Object> arguments=new HashMap<>()转化成数据库中的字符串类型呢?

关键要点是:MyBatis在玩成数据库操作的时候,会自动调用对象的getter和setter

1.比如MyBatis往数据库中写数据,就会调用对象的getter方法拿到属性的值,再往数据库中写,如果这个过程让,getArguments得到的结果是String类型的,此时就可以把数据写到数据库中了

2.比如MyBatis从数据库读数据的时候,就会调用对象的setter方法,把数据库智能鼓独到的结果设置到对象的属性中,比如这个过程,让setArguments,参数是一个String,并且在setArguments内部针对字符串解析,解析成一个Map对象。

使用Mybatis来创建数据表的时候,需要先创建出数据库来,创建meta.db这个文件,由于data目录不存在,所以创建meta.db文件的操作,也就失败了。

Message,如何在硬盘上存储

1.消息操作并不涉及到复杂的增删改查

2.消息数量可能非常多,数据库访问效率是不高的

直接把消息存储在文件中

一下设定了消息如何在文件中存储

消息是依附于队列的

因此存储的时候,就把消息按照队列的维度展开

此处已经有了个data目录,meta.db就在这个目录中

在data中创建一些子目录,每个队列有一个子目录,子目录的名字,就是队列名

data->

         meta.db

         testQueue1               这几个也是目录,用来存储对应的消息

         testQueue2

         testQueue3

每个队列的子目录喜爱,再分配两个文件,来存储信息

第一个文件:queue_data.txt这里保存消息的内容

第二个文件:   queue_stat.txt这里保存消息的统计信息

queue_data这个文件是一个二进制格式的文件

做出以下约定~

这个文件中包含若干个消息,每个消息都以二进制的方式存储

每个消息都由这几个部分构成

二进制的格式是java标准库序列化来实现的

Message对象,是在内存中记录一份,硬盘上也记录一份,内存中这一份,要记录offsetBeg和offsetEnd随时找到内存中的Message对象,就能找到对应的硬盘上的Message对象了。

对于Broker Server来说,消息是需要新增,也是需要删除的,生产者生产一个消息过来,就得新增这个消息,消费者吧这个消息消费掉,这个消息就要删除

新增和删除,对于内存来说-好办(直接使用一些集合类)

但是在文件上就麻烦了,新增消息,可以直接把新的消息追加到文件末尾~删除消息不好处理,文件可以视为一个顺序表,这样的结构,如果此时直接删除中间元素,就需要涉及到类似于顺序表搬运这样的操作-效率非常低的是。

因此我们采用逻辑删除-isValid为1,有效消息,isValid为0-无效消息

随着时间的推移,这个消息文件可能会越来越大~并且,这里可能大部分都是无效的消息,针对这种情况,就需要考虑对当前消息数据文件,进行垃圾回收。

使用复制算法,针对消息数据文件中的垃圾,进行回收

如果某个队列中,消息特别多,而且都是有效消息,此时就会导致整个消息数据文件特别大,后续针对这个文件的各种操作,成本就会上升很多,假如这个文件大小是10G,此时如果出发一次GC,整体耗时就会非常高了

对于RabbitMQ来说,解决方案是把一个大的文件,拆成若干个小文件。

文件拆分:当单个文件长度到达一个阈值之后,就会拆分成两个文件(拆者拆着,就变成了很多文件)

文件合并:每个单独的文件都会进行GC,如果GC之后,发现文件变小了很多,就可能会和相邻的文件合并。

这样做,就可以在消息特别多的时候,也能保证性能上的及时响应

这一块逻辑比较复杂,因此此处不进行实现,咱们只考虑单个文件的情况

如果要实现这个机制,大概思路:需要专门的数据结构,来存储当前队列中有多少个数据文件,每个文件大小是多少,消息数目是多少,无效消息是多少

2.设计策略,什么时候出发文件的拆分,什么时候触发文件的合并

统计文件,读写比较简单

消息数据文件,比较复杂

消息序列化如何实现:

首先什么叫做序列化:把一个对象(结构化数据)转成一个字符串/字节数组->(降维打击了)

注意这样的序列化完成之后,对象的信息是不丢失的。这样后面才可以进行反序列化

序列化之后,方便存储和传输->(相当于干粮脱水)

一般是存储在文件中,文件只能存字符串/二进制数据,不能直接存对象

通过网络传输 JSON来完成序列化,反序列化

由于Message,里面存储的body是二进制数据,不太方便使用JSON进行序列化,JSON序列化得到的结果是文本数据,无法存储二进制。

我们准备直接使用二进制的序列化方式,针对Message对象进行序列化~

针对二进制序列化,也有很多种解决方案

1.JAVA标准库提高了序列化的方案,ObjectInputStream(用来序列化)和ObjectOutputStream(反序列化)

2.Hessian也是一个解决方案

3.protobuffer

4.thrift

咱们此处使用第一个方案,标准库自带的方案,好处:不必引入额外以来,学习成本低

遇到的并发问题。,此时写入文件之后,真实消息所在的位置,就和刚才计算的offset不匹配了!

出路:加锁

如果两个线程,是往同一个队列中写消息,此时需要阻塞等待

如果两个线程,往不同队列中写消息,此时不需要阻塞队列。(不同队列,对应不同的文件,各自写各自的,就不会产生冲突了)

实现消息文件的垃圾回收

由于当前会不停的往消息文件中写入消息,并且删除消息只是逻辑删除,这就可能导致消息文件越来越大,并且里面又包含大量无效消息

此处的垃圾回收,使用复制算法

判定,当文件中信息总数超过2000,并且有效消息的数目不足50%,就要触发垃圾回收~,就把文件中所有有效消息取出来,单独的在写入到一个新的文件中,删除旧文件,使用新文件代替

http://www.xdnf.cn/news/4358.html

相关文章:

  • Compose Multiplatform iOS 稳定版发布:可用于生产环境,并支持 hotload
  • C++命名空间、内联与捕获
  • linux环境安装docker
  • AI一键替换商品融入场景,5分钟打造专业级商品图
  • 自编码器(Autoencoder)
  • 遥感数据处理、机器学习建模与空间预测的全流程指南——涵盖R语言(随机森林、XGBoost、SVM等)、特征提取、模型优化及生态学案例分析
  • Ubuntu日志文件清空的三种方式
  • 力扣49. 字母异位词分组(哈希表)
  • suna界面实现原理分析(三):Terminal工具调用可视化
  • 强化学习是AI Agent的进化引擎还是技术枷锁呢?
  • ​Spring + Shiro 整合的核心要点及详细实现说明
  • vLLM 推理 Qwen2.5-VL-7B 视频
  • 2025最新vmware-17虚拟机安装教程(保姆级,图文讲解,带安装包)
  • 餐饮加盟店如何通过日事清全流程闭环管理实现进度同步与效率升级?
  • 强化学习策略梯度推导
  • 漏洞检测服务费用受哪些因素制约?费用区间是多少?
  • 蓝肽子序列--字符串+最长子序列的dp
  • 从零开始学java--集合类(2)
  • 职业短线交易系统
  • SAM详解2(初级应用)
  • Python入门(一)
  • 数字人驱动方向最新顶会期刊论文收集整理 | AAAI 2025
  • 系统级编程(一):内存的段页式管理
  • x-cmd install | Tuistash - Logstash 实时监控,告别图形界面,高效便捷!
  • VBA之Excel应用第四章第三节:Range对象内容的复制Copy粘贴Paste
  • 根据蓝牙名称自动匹配对应 UI
  • 逻辑越权--水平垂直越权(WEB漏洞)
  • 什么是原子变量
  • Linux死锁实验分析与总结
  • 安卓基础(拖拽)