【RelayMQ】基于 Java 实现轻量级消息队列(六)
目录
一. 数据存储方式
二. 交换机管理
三. 队列管理
四. 绑定管理
五. 消息管理 ! ! !
六.数据加载
文件负责数据的存储, 内存负责数据的管理, 为了保证MQ的传输效率, 所以注定无法使用硬盘管理数据
一. 数据存储方式
这里主要使用哈希表, 链表, 嵌套结构存储和管理数据
交换机管理
第一个key为交换名字(exchangeName), value为交换机对象,
private ConcurrentHashMap<String, Exchange> exchangeMap = new ConcurrentHashMap<>();
队列管理
第一个key为队列名(queueName), value为队列对象
private ConcurrentHashMap<String, MSGQueue> msgQueueMap = new ConcurrentHashMap<>();
绑定关系管理
第一个key为交换机名(exchangeName), value为HashMap,HashMap中存储的key为队列名(queueName),value为Binding对象
private ConcurrentHashMap<String,ConcurrentHashMap<String, Binding>> bindingsMap = new ConcurrentHashMap<>();
消息管理
第一个Key为消息Id(messageId), value为消息对象
private ConcurrentHashMap<String, Message> messageMap = new ConcurrentHashMap<>();
可以使用消息Id快速的查找对象
队列中的所有消息
第一个Key为队列名(queueName), value为链表(链表中的每一个对象都是message对象)
private ConcurrentHashMap<String, LinkedList<Message>> queueMessageMap = new ConcurrentHashMap<>();
便于对队列中的对象进行管理,适用于队列这种先来先服务模式
队列中未应答消息
第一个key为队列名(queueName),value为hashMap, 其中key为MessageId, value为消息对象
private ConcurrentHashMap<String,ConcurrentHashMap<String,Message>> queueMessageWaitMap = new ConcurrentHashMap<>();
便于维护队列中已发送给消费者但还未被确认(ACK)的消息
总体上采用HashMap的方式, 可以快速的进行索引查询
二. 交换机管理
这里的添加操作和之前的不同, 这里是添加进入内存中的容器HashMap中, 并非数据库中的容器
//增加交换机public void insertExchange(String exchangeName,Exchange exchange){exchangeMap.put(exchangeName, exchange);System.out.println("[MemoryDataCenter] 交换机添加成功");}
//删除交换机public void deleteExchange(String exchangeName){exchangeMap.remove(exchangeName);System.out.println("[MemoryDataCenter] 交换机删除成功");}//查找交换机public Exchange getExchange(String exchangeName){return exchangeMap.get(exchangeName);}
三. 队列管理
这里也是在内存中进行增加, 删除, 查找操作
public void insertQueue(String queueName,MSGQueue queue){msgQueueMap.put(queueName, queue);System.out.println("[MemoryDataCenter] 队列添加成功");}public void deleteQueue(String queueName){msgQueueMap.remove(queueName);System.out.println("[MemoryDataCenter] 队列删除成功");}public MSGQueue getQueue(String queueName){return msgQueueMap.get(queueName);}
四. 绑定管理
这里采用由内到外的方式
添加绑定
- 先检查内部的HashMap中是否存在数据
- 不存在则创建, 存在则检查最外层的是否存在
- 不存在则创建, 存在则抛出异常,无法添加
public void insertBinding(Binding binding) throws MqException {
// 1.检查这个交换机的value是否存在,如果不存在则需要创建(根据key检查value是否存在)
/* ConcurrentHashMap<String, Binding> bindMap = bindingsMap.get(binding.getExchangeName());if(bindMap == null){bindMap = new ConcurrentHashMap<>();bindingsMap.put(binding.getExchangeName(),bindMap);}*///另一种写法ConcurrentHashMap<String, Binding> bindMap = bindingsMap.computeIfAbsent(binding.getExchangeName(),k -> new ConcurrentHashMap<>());synchronized (bindMap) {
// 2.如果根据交换机名和队列名,查询出来的关系存在,则抛出异常,因为已经两者之间已经存在关系,所以不能插入if (bindMap.get(binding.getQueueName()) != null) {throw new MqException("[MemoryDataCenter] 绑定关系已经存在,不能继续绑定");}bindMap.put(binding.getQueueName(), binding);System.out.println("[MemoryDataCenter] 绑定添加成功");}}
删除绑定
- 先检查内部绑定是否存在
- 不存在则无法删除,存在则删除里面的HashMap
- 然后删除外面的hashMap
public void deleteBinding(Binding binding) throws MqException {ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.get(binding.getExchangeName());if(bindingMap == null){throw new MqException("[MemoryDataCenter] 绑定关系不存在,无法删除");}bindingMap.remove(binding.getQueueName());System.out.println("[MemoryDataCenter] 绑定删除成功");}
获取绑定(唯一)
这里采用从外到内的方式查询
- 先检查交换机是否存在
- 不存在则返回,存在则继续查看交换机
- 存在返回数据,不存在返回 null
public Binding getBinding(String exchangeName ,String queueName){ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(exchangeName);//判断最外层value是否为空if(bindingMap==null){return null;}return bindingMap.get(queueName);}
获取所有的绑定
- 直接根据交换机进行查找并返回
public ConcurrentHashMap<String,Binding> getBindings(String exchangeName){return bindingsMap.get(exchangeName);}
五. 消息管理 ! ! !
添加消息
public void insertMessage(Message message){messageMap.put(message.getMessageId(),message);System.out.println("[MemoryDataCenter] 添加消息成功");}
查询消息(根据MessageID查询消息)
//根据messageID查询消息public Message getMessage(String messageId){return messageMap.get(messageId);}
删除消息(根据MessageId删除消息)
- 先检查消息是否存在
- 如果存在才能进行删除操作
//根据messageId删除消息public void deleteMessage(String messageId) throws MqException {Message message = messageMap.get(messageId);if(message==null){throw new MqException("[MemoryDataCenter] 消息不存在,无法删除");}messageMap.remove(messageId);System.out.println("[MemoryDataCenter] 删除消息成功");}
队列中的消息管理
发送消息到队列
- 检查队列中是否存在数据列表
- 存在则向其中添加数据, 不存在则创建一个链表
- 将消息直接添加到队列中
- 在总消息统计中也需要添加
public void sendMessage(MSGQueue queue,Message message){LinkedList<Message> messages = queueMessageMap.computeIfAbsent(queue.getName(),k->new LinkedList<>());synchronized (messages){messages.add(message);}//在队列中添加,在总的消息管理中也需要添加//因为messageID相同,所有也不用担心内容不一样insertMessage(message);System.out.println("[MemoryDataCenter] 消息成功放入队列中");}
从队列中取出消息
- 检查是否存在列表
- 如果不存在, 则直接返回 null, 如果存在判断里面是否存在数据
- 如果存在数据,则返回第一个,如果不存在则返回 null
public Message pollMessage(String queueName){LinkedList<Message> messages = queueMessageMap.get(queueName);//判断队列中有没有消息if(messages==null){return null;}synchronized (messages){if(messages.size()==0){return null;}//采用从头取出(头插法)Message curmessage = messages.remove(0);System.out.println("[MemoryDataCenter] 消息成功从队列中取出");return curmessage;}}
获取队列中消息的个数
- 判断链表是否存在
- 如果不存在,直接返回0,
- 如果存在则返回消息个数
public int getQueueMessageCount(String queueName){LinkedList<Message> messages = queueMessageMap.get(queueName);//判断队列中有没有消息if(messages == null){return 0;}synchronized (messages){return messages.size();}}
队列中未确认的消息管理
添加未确认消息
- 检查HashMap是否存在,
- 如果不存在,则创建一个,如果存在,则向里面添加未处理的消息
public void insertMessageWaitAck(MSGQueue queue,Message message) {ConcurrentHashMap<String, Message> messageWaitAckMap = queueMessageWaitMap.computeIfAbsent(queue.getName(), k -> new ConcurrentHashMap<>());messageWaitAckMap.put(message.getMessageId(), message);System.out.println("[MemoryDataCenter] 未处理消息添加成功");}
删除未确认的消息
- 检查HashMap是否存在
- 如果不存在, 则返回null, 如果存在,则进行删除操作
public void deleteMessageWaitAck(String queueName,String messageId){ConcurrentHashMap<String, Message> messageWaitAckMap = queueMessageWaitMap.get(queueName);if(messageWaitAckMap==null){return;}messageWaitAckMap.remove(messageId);System.out.println("[MemoryDataCenter] 未处理消息删除成功");}
获取指定的未确认消息
- 检查HashMap是否存在
- 如果不存在, 则返回null, 如果存在, 则返回其中具体消息(根据MessageId返回)
public Message getMessageWaitAck(String queueName,String messageId){ConcurrentHashMap<String, Message> messageWaitAckMap = queueMessageWaitMap.get(queueName);if(messageWaitAckMap==null){return null;}return messageWaitAckMap.get(messageId);}
六.数据加载
将文件数据加载到内存中
- 保证内存存储数据为空
- 加载交换机数据
- 加载队列数据
- 加载绑定关系
- 加载消息数据 (1. 所有的消息 2. 队列下的消息)
public void recovery(DiskDataCenter dataCenter) throws IOException, MqException, ClassNotFoundException {
// 1.清空之前的数据exchangeMap.clear();msgQueueMap.clear();bindingsMap.clear();messageMap.clear();queueMessageWaitMap.clear();
// 2.加载交换机数据List<Exchange> exchanges = dataCenter.selectAllExchange();for (Exchange exchange :exchanges){exchangeMap.put(exchange.getName(),exchange);}
// 3.加载队列数据List<MSGQueue> msgQueues = dataCenter.selectAllMSGQueue();for (MSGQueue queue:msgQueues){msgQueueMap.put(queue.getName(),queue);}
// 4.加载绑定关系(将一个个绑定添加进里面)//先检查交互机是否存在,不存在则创建对应的hashMap,然后添加其中的内容List<Binding> bindings = dataCenter.selectAllBinding();for (Binding binding:bindings){ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(), k -> new ConcurrentHashMap<>());bindingMap.put(binding.getQueueName(), binding);}
// 5.恢复所有的消息数据(添加每一个队列中的消息)(这里有两个,1.管理所有的消息,2.管理队列下的消息)for(MSGQueue queue: msgQueues){LinkedList<Message> messages = dataCenter.selectAllMessage(queue);//队列下的消息queueMessageMap.put(queue.getName(), messages);for (Message message:messages){messageMap.put(message.getMessageId(),message);}}
// 6.未被确认的消息//这里未被确认的消息,并不会被加载进入内存,考虑情况:服务器如果重启,那么未被确认的消息,再一次被加载,实际上被当做没有取出的消息//这时候只需要重新取出未被取出的消息,那么和方法5(加载数据)是一样的}
未被确认的消息怎么处理?
在本项目中, 数据加载的过程, 多数出现在重启服务器的场景中, 未被确认的消息并不会直接存储进入内存, 而是被当做普通消息加载进入内存, 已经被确认的数据会被直接删除掉, 未被确认的数据会被当做新数据加载进入内存
缺点: 造成消息的冗余发送 优点: 实现方便
优化建议:
1. 数据的存储持久化
引入一个标志位, 表示消息是否处于发送且未确认状态, 在数据加载的时候, 根据标志位将数据加载放入内存存储
2. 超时重投机制
如果消息发送给消费者, 消费者消费数据时, 未发送确认就崩溃, 那么消息会一直处于发送未确认状态, 会造成资源浪费
检测超时未确认的消息, 超过一定时间间隔会触发重新发送消息
3. 重投次数限制和死信队列
避免由于消费者的持续异常状态(依赖不可用等 )造成的无限重投, 可以规定最多重投次数, 超过该次数, 则丢到死信队列中, 方便人为去分析处理这些未被确认的消息