当前位置: 首页 > web >正文

【RelayMQ】基于 Java 实现轻量级消息队列(六)

目录

一. 数据存储方式

二. 交换机管理

三. 队列管理

四. 绑定管理

五. 消息管理 ! ! !

六.数据加载


文件负责数据的存储, 内存负责数据的管理, 为了保证MQ的传输效率, 所以注定无法使用硬盘管理数据

一. 数据存储方式

这里主要使用哈希表, 链表, 嵌套结构存储和管理数据

交换机管理

第一个key为交换名字(exchangeName), value为交换机对象,

    private ConcurrentHashMap<String, Exchange> exchangeMap = new ConcurrentHashMap<>();

队列管理

第一个key为队列名(queueName), value为队列对象

private ConcurrentHashMap<String, MSGQueue> msgQueueMap = new ConcurrentHashMap<>();

绑定关系管理

第一个key为交换机名(exchangeName), value为HashMap,HashMap中存储的key为队列名(queueName),value为Binding对象

private ConcurrentHashMap<String,ConcurrentHashMap<String, Binding>> bindingsMap = new ConcurrentHashMap<>();

消息管理

第一个Key为消息Id(messageId),  value为消息对象 

  private ConcurrentHashMap<String, Message> messageMap = new ConcurrentHashMap<>();

可以使用消息Id快速的查找对象

队列中的所有消息

第一个Key为队列名(queueName),  value为链表(链表中的每一个对象都是message对象)

private ConcurrentHashMap<String, LinkedList<Message>> queueMessageMap = new ConcurrentHashMap<>();

便于对队列中的对象进行管理,适用于队列这种先来先服务模式

队列中未应答消息

第一个key为队列名(queueName),value为hashMap, 其中key为MessageId, value为消息对象

 private ConcurrentHashMap<String,ConcurrentHashMap<String,Message>> queueMessageWaitMap = new ConcurrentHashMap<>();

便于维护队列中已发送给消费者但还未被确认(ACK)的消息


总体上采用HashMap的方式, 可以快速的进行索引查询

二. 交换机管理

这里的添加操作和之前的不同, 这里是添加进入内存中的容器HashMap中, 并非数据库中的容器

//增加交换机public void insertExchange(String exchangeName,Exchange exchange){exchangeMap.put(exchangeName, exchange);System.out.println("[MemoryDataCenter] 交换机添加成功");}
//删除交换机public void deleteExchange(String exchangeName){exchangeMap.remove(exchangeName);System.out.println("[MemoryDataCenter] 交换机删除成功");}//查找交换机public Exchange getExchange(String exchangeName){return exchangeMap.get(exchangeName);}

三. 队列管理

这里也是在内存中进行增加, 删除, 查找操作

  public void insertQueue(String queueName,MSGQueue queue){msgQueueMap.put(queueName, queue);System.out.println("[MemoryDataCenter] 队列添加成功");}public void deleteQueue(String queueName){msgQueueMap.remove(queueName);System.out.println("[MemoryDataCenter] 队列删除成功");}public MSGQueue getQueue(String queueName){return msgQueueMap.get(queueName);}

四. 绑定管理

这里采用由内到外的方式

添加绑定

  1. 先检查内部的HashMap中是否存在数据
  2. 不存在则创建, 存在则检查最外层的是否存在
  3. 不存在则创建, 存在则抛出异常,无法添加
  public void insertBinding(Binding binding) throws MqException {
//        1.检查这个交换机的value是否存在,如果不存在则需要创建(根据key检查value是否存在)
/*        ConcurrentHashMap<String, Binding> bindMap = bindingsMap.get(binding.getExchangeName());if(bindMap == null){bindMap = new ConcurrentHashMap<>();bindingsMap.put(binding.getExchangeName(),bindMap);}*///另一种写法ConcurrentHashMap<String, Binding> bindMap = bindingsMap.computeIfAbsent(binding.getExchangeName(),k -> new ConcurrentHashMap<>());synchronized (bindMap) {
//        2.如果根据交换机名和队列名,查询出来的关系存在,则抛出异常,因为已经两者之间已经存在关系,所以不能插入if (bindMap.get(binding.getQueueName()) != null) {throw new MqException("[MemoryDataCenter] 绑定关系已经存在,不能继续绑定");}bindMap.put(binding.getQueueName(), binding);System.out.println("[MemoryDataCenter] 绑定添加成功");}}

删除绑定

  1. 先检查内部绑定是否存在
  2. 不存在则无法删除,存在则删除里面的HashMap
  3. 然后删除外面的hashMap
  public void deleteBinding(Binding binding) throws MqException {ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.get(binding.getExchangeName());if(bindingMap == null){throw new MqException("[MemoryDataCenter]  绑定关系不存在,无法删除");}bindingMap.remove(binding.getQueueName());System.out.println("[MemoryDataCenter] 绑定删除成功");}

获取绑定(唯一)

这里采用从外到内的方式查询

  1. 先检查交换机是否存在
  2. 不存在则返回,存在则继续查看交换机
  3. 存在返回数据,不存在返回 null
  public Binding getBinding(String exchangeName ,String queueName){ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(exchangeName);//判断最外层value是否为空if(bindingMap==null){return null;}return bindingMap.get(queueName);}

获取所有的绑定

  1. 直接根据交换机进行查找并返回
  public ConcurrentHashMap<String,Binding> getBindings(String exchangeName){return bindingsMap.get(exchangeName);}

五. 消息管理 ! ! !

添加消息

    public void insertMessage(Message message){messageMap.put(message.getMessageId(),message);System.out.println("[MemoryDataCenter] 添加消息成功");}

查询消息(根据MessageID查询消息)

    //根据messageID查询消息public Message getMessage(String messageId){return messageMap.get(messageId);}

删除消息(根据MessageId删除消息)

  1. 先检查消息是否存在
  2. 如果存在才能进行删除操作
    //根据messageId删除消息public void deleteMessage(String messageId) throws MqException {Message message = messageMap.get(messageId);if(message==null){throw new MqException("[MemoryDataCenter] 消息不存在,无法删除");}messageMap.remove(messageId);System.out.println("[MemoryDataCenter] 删除消息成功");}

队列中的消息管理

发送消息到队列

  1. 检查队列中是否存在数据列表
  2. 存在则向其中添加数据, 不存在则创建一个链表
  3. 将消息直接添加到队列中
  4. 在总消息统计中也需要添加
    public void sendMessage(MSGQueue queue,Message message){LinkedList<Message> messages = queueMessageMap.computeIfAbsent(queue.getName(),k->new LinkedList<>());synchronized (messages){messages.add(message);}//在队列中添加,在总的消息管理中也需要添加//因为messageID相同,所有也不用担心内容不一样insertMessage(message);System.out.println("[MemoryDataCenter] 消息成功放入队列中");}

从队列中取出消息

  1. 检查是否存在列表
  2. 如果不存在, 则直接返回 null, 如果存在判断里面是否存在数据
  3. 如果存在数据,则返回第一个,如果不存在则返回 null
    public Message pollMessage(String queueName){LinkedList<Message> messages = queueMessageMap.get(queueName);//判断队列中有没有消息if(messages==null){return null;}synchronized (messages){if(messages.size()==0){return null;}//采用从头取出(头插法)Message curmessage = messages.remove(0);System.out.println("[MemoryDataCenter] 消息成功从队列中取出");return curmessage;}}

获取队列中消息的个数

  • 判断链表是否存在
  • 如果不存在,直接返回0,
  • 如果存在则返回消息个数
    public int getQueueMessageCount(String queueName){LinkedList<Message> messages = queueMessageMap.get(queueName);//判断队列中有没有消息if(messages == null){return 0;}synchronized (messages){return messages.size();}}

队列中未确认的消息管理

添加未确认消息

  1. 检查HashMap是否存在,
  2. 如果不存在,则创建一个,如果存在,则向里面添加未处理的消息
    public void insertMessageWaitAck(MSGQueue queue,Message message) {ConcurrentHashMap<String, Message> messageWaitAckMap = queueMessageWaitMap.computeIfAbsent(queue.getName(), k -> new ConcurrentHashMap<>());messageWaitAckMap.put(message.getMessageId(), message);System.out.println("[MemoryDataCenter] 未处理消息添加成功");}

删除未确认的消息

  1. 检查HashMap是否存在
  2. 如果不存在, 则返回null, 如果存在,则进行删除操作
    public void deleteMessageWaitAck(String queueName,String messageId){ConcurrentHashMap<String, Message> messageWaitAckMap = queueMessageWaitMap.get(queueName);if(messageWaitAckMap==null){return;}messageWaitAckMap.remove(messageId);System.out.println("[MemoryDataCenter] 未处理消息删除成功");}

获取指定的未确认消息

  1. 检查HashMap是否存在
  2. 如果不存在, 则返回null, 如果存在, 则返回其中具体消息(根据MessageId返回)
    public Message getMessageWaitAck(String queueName,String messageId){ConcurrentHashMap<String, Message> messageWaitAckMap = queueMessageWaitMap.get(queueName);if(messageWaitAckMap==null){return null;}return messageWaitAckMap.get(messageId);}

六.数据加载

将文件数据加载到内存中

  1. 保证内存存储数据为空
  2. 加载交换机数据
  3. 加载队列数据
  4. 加载绑定关系
  5. 加载消息数据 (1. 所有的消息  2. 队列下的消息)
    public void recovery(DiskDataCenter dataCenter) throws IOException, MqException, ClassNotFoundException {
//        1.清空之前的数据exchangeMap.clear();msgQueueMap.clear();bindingsMap.clear();messageMap.clear();queueMessageWaitMap.clear();
//        2.加载交换机数据List<Exchange> exchanges = dataCenter.selectAllExchange();for (Exchange exchange :exchanges){exchangeMap.put(exchange.getName(),exchange);}
//        3.加载队列数据List<MSGQueue> msgQueues = dataCenter.selectAllMSGQueue();for (MSGQueue queue:msgQueues){msgQueueMap.put(queue.getName(),queue);}
//        4.加载绑定关系(将一个个绑定添加进里面)//先检查交互机是否存在,不存在则创建对应的hashMap,然后添加其中的内容List<Binding> bindings = dataCenter.selectAllBinding();for (Binding binding:bindings){ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(), k -> new ConcurrentHashMap<>());bindingMap.put(binding.getQueueName(), binding);}
//        5.恢复所有的消息数据(添加每一个队列中的消息)(这里有两个,1.管理所有的消息,2.管理队列下的消息)for(MSGQueue queue: msgQueues){LinkedList<Message> messages = dataCenter.selectAllMessage(queue);//队列下的消息queueMessageMap.put(queue.getName(), messages);for (Message message:messages){messageMap.put(message.getMessageId(),message);}}
//        6.未被确认的消息//这里未被确认的消息,并不会被加载进入内存,考虑情况:服务器如果重启,那么未被确认的消息,再一次被加载,实际上被当做没有取出的消息//这时候只需要重新取出未被取出的消息,那么和方法5(加载数据)是一样的}

未被确认的消息怎么处理?

在本项目中, 数据加载的过程, 多数出现在重启服务器的场景中, 未被确认的消息并不会直接存储进入内存, 而是被当做普通消息加载进入内存,  已经被确认的数据会被直接删除掉, 未被确认的数据会被当做新数据加载进入内存

缺点: 造成消息的冗余发送  优点: 实现方便

优化建议: 

1. 数据的存储持久化

引入一个标志位, 表示消息是否处于发送且未确认状态,  在数据加载的时候, 根据标志位将数据加载放入内存存储

2. 超时重投机制

如果消息发送给消费者, 消费者消费数据时, 未发送确认就崩溃, 那么消息会一直处于发送未确认状态, 会造成资源浪费

检测超时未确认的消息, 超过一定时间间隔会触发重新发送消息

3. 重投次数限制和死信队列

避免由于消费者的持续异常状态(依赖不可用等 )造成的无限重投, 可以规定最多重投次数, 超过该次数, 则丢到死信队列中, 方便人为去分析处理这些未被确认的消息

http://www.xdnf.cn/news/20252.html

相关文章:

  • React Fiber 风格任务调度库
  • 2025Android开发面试题
  • 目标检测双雄:一阶段与二阶段检测器全解析
  • Nextcloud 实战:打造属于你的私有云与在线协作平台
  • Oracle 数据库:视图与索引
  • 没 iCloud, 如何数据从iPhone转移到iPhone
  • ZooKeeper架构深度解析:分布式协调服务的核心设计与实现
  • Conda环境隔离和PyCharm配置,完美同时运行PaddlePaddle和PyTorch
  • 机器学习(七)决策树-分类
  • [论文阅读] 人工智能 + 软件工程 | 当ISO 26262遇上AI:电动车安全标准的新玩法
  • 中国移动浪潮云电脑CD1000-系统全分区备份包-可瑞芯微工具刷机-可救砖
  • 乐观并发: TCP 与编程实践
  • 华锐视点VR风电场培训课件:多模块全面覆盖风机知识与操作​
  • UniApp 页面通讯方案全解析:从 API 到状态管理的最佳实践
  • 【Docker-Day 24】K8s网络解密:深入NodePort与LoadBalancer,让你的应用走出集群
  • B 题 碳化硅外延层厚度的确定
  • 【Linux学习笔记】信号的深入理解之软件条件产生信号
  • Docker在Windows与Linux系统安装的一体化教学设计
  • AI 基础设施新范式,百度百舸 5.0 技术深度解析
  • 【AI编程工具】快速搭建图书管理系统
  • 9.5 递归函数+常见算法
  • Preprocessing Model in MPC 7 - Matrix Triples and Convolutions Lookup Tables
  • LinuxC++项目开发日志——高并发内存池(1-定长内存池)
  • finally 与 return的执行顺序
  • Web相关知识(草稿)
  • MySQL高可用之组复制(MGR)
  • Web基础、HTTP/HTTPS协议与Nginx详解
  • 商城系统——项目测试
  • JUC的安全并发包机制
  • Python 值传递 (Pass by Value) 和引用传递 (Pass by Reference)