【项目篇之统一内存操作】仿照RabbitMQ模拟实现消息队列
我们的操作分为两种,一种是在内存上进行统一的操作,一种是在硬盘上面操作,今天我写的文章是编写了一个MemoryDataCenter类来实现了 在内存上面的统一操作:
实现统一内存操作
- 如何使用内存来组织数据
- 创建一个类来统一管理内存上的所有数据
- 针对交换机的操作
- 针对队列进行操作
- 针对绑定进行操作
- 新增绑定
- 获取绑定
- 删除绑定
- 针对消息进行操作
- 添加消息
- 查询信息
- 删除消息
- 发送消息到指定队列
- 从队列中取出到消息
- 获取指定队列中消息的个数
- 针对未确认消息进行操作
- 添加未确认消息
- 删除未确认的消息
- 获取指定的未确认消息
- 恢复数据到内存中去
- 总代码如下所示:
- 内存管理数据的总结
对于MQ来说,是以在内存上存储数据为主的,
在硬盘上存储数据为辅的
在硬盘上存储数据主要是为了进行持久化保存,重启之后,数据不会丢失
但是我们在真正去进行消息转发的的过程中,各种核心的逻辑都还是以内存上存储的数据为主的
因为访问内存比访问硬盘要快得多,所以我们还是要使用内存来组织数据
如何使用内存来组织数据
在交换机上使用一个数据结构:HashMap来组织数据:key是交换机的name,value是交换机对象
队列: 直接使用HashMap来组织数据: key是队列的name,value是队列对象
绑定: 使用一个嵌套的HashMap来组织数据:key是exchangeName,value是一个HashMap2,这个HashMap2中也有一个key1:key1是队列的name,HashMap2中的value2是绑定对象。
去查询这个绑定的时候,先根据交换机名字去查询,查询到的结果也还是一个HashMap2,表示的是该交换机都绑定了哪些队列,然后再进一步根据队列名字去HashMap2中查询绑定对象
消息: 也使用一个HashMap去进行管理数据:key是messageId,value是Message对象
消息是存放在队列中的,消息和队列之间还有一个归属关系,
通过MessageId可以查询到是哪一个Message对象,同时我们还得知道当前这个消息对象是放到哪一个队列中的
映射:
所以再做出一个映射:表示队列和消息之间的关联:
表示出每个队列中都有哪些消息:
如何表示:
使用嵌套的HashMap来组织,key是queueName,value是一个LinkedList,这个LinkedList里面的每一个元素都是一个Message对象
表示“未被确认”的消息:
首先,为什么会有这个未被确认的消息,是和我们的MQ的特性有关:
表示“未被确认”的消息:这里面就存储了当前队列中哪些消息被消费者取走了,但是还没有应答
我们使用嵌套的HashMap来组织这个"未被确认的消息"数据:key是queueName,value是一个HashMap
这个value的HashMap中的key是messageId,value是Message对象
后续实现消息确认的逻辑是
需要根据ACK相应的内容,这里会提供一个确认的messageId,根据这个messageId来把上述结构中的Message对象找到并进行移除
所以这里使用HashMap去查找更好一些
创建一个类来统一管理内存上的所有数据
创建一个类来统一管理内存上的所有数据
/*
使用这个类来统一管理内存上的所有数据 */
public class MemoryDataCenter { //key是exchangeName,value是exchange对象 private HashMap<String, Exchange> exchangeMap = new HashMap<>(); }
注意:这个类之后会提供一些增删改查的操作,让上一层代码去进行调用,也就是我们的brokerServer要处理很多的请求,此时有些请求涉及到去创建交换机,创建绑定,创建队列,此时服务器处理多个请求,
这个类提供的一些方法,会在多线程环境下使用,所以就需要去处理线程不安全的问题,
此时就会涉及到多线程的问题,也就会涉及到线程安全的问题,但是HashMap是一个线程不安全的数据结构,所以就需要换一个数据结构来操作,使用另一个线程安全的数据结构:ConcurrentHashMap
所以代码修改如下:
//key是exchangeName,value是exchange对象
private ConcurrentHashMap<String, Exchange> exchangeMap = new ConcurrentHashMap<>();
//存储交换机
//key是exchangeName,value是exchange对象
private ConcurrentHashMap<String, Exchange> exchangeMap = new ConcurrentHashMap<>(); //存储队列
//key是queueName,value是MSGQueue对象
private ConcurrentHashMap<String, MSGQueue> queueMap = new ConcurrentHashMap<>(); //存储绑定
//第一个key是exchangeName,第二个key是queueName
private ConcurrentHashMap<String, ConcurrentHashMap<String, Binding>> bindingsMap = new ConcurrentHashMap<>(); //存储消息
//key是messageId,value是Message对象
private ConcurrentHashMap<String, Message> messageMap = new ConcurrentHashMap<>(); //存储: 消息和队列之间的从属关系
//key是队列的名字queueName,value是一个Message的链表
private ConcurrentHashMap<String, LinkedList<Message>> queueMessageMap = new ConcurrentHashMap<>(); //待确认的消息
//第一个key是queueName,第二个key是messageId
private ConcurrentHashMap<String,ConcurrentHashMap<String,Message>> queueMessageWaitMap = new ConcurrentHashMap<>();
针对交换机的操作
//插入交换机
public void insertExchange(Exchange exchange){ //把交换机插入到HashMap表中即可 exchangeMap.put(exchange.getName(),exchange); System.out.println("[MemoryDatCenter] 交换机创建成功了 exchangeName = "+exchange.getName());
} //查找交换机: 根据交换机名字查找交换机
public Exchange getExchange(String exchangeName){ //在HashMap表中查找交换机 return exchangeMap.get(exchangeName);
} //删除交换机:
public void deleteExchange(String exchangeName){ exchangeMap.remove(exchangeName); System.out.println("[MemoryDatCenter] 交换机被删除成功了 exchangeName = "+exchangeName);
}
针对队列进行操作
//插入队列
public void insertQueue(MSGQueue queue){ queueMap.put(queue.getName(), queue); System.out.println("[MemoryDatCenter] 队列被成功了 queueName= "+queue.getName());
} //查找队列
public MSGQueue getQueue(String queueName){ return queueMap.get(queueName);
} //删除队列
public void deleteQueue(String queueName){ queueMap.remove(queueName); System.out.println("[MemoryDatCenter] 队列被删除成功了 queueName= "+queueName);
}
针对绑定进行操作
新增绑定
- 查询:根据exchangeName查询哈希表是否存在
- 查询:根据queueName查询绑定是否存在
- 插入:在内置的哈希表中插入queueName和绑定
多个线程去进行插入绑定的方法时候,要保证是线程安全的
第一步的查询操作中是从哈希表中查询出数据,本身是线程安全的
但是第二步和第三步的操作有线程安全的风险:
会出现下面的情况:
虽然哈希表本身是线程安全的,同时哈希表的查询和插入单独拎出来一个也是线程安全的,但是这两步查询操作和插入操作放在一起进行联动,那就需要将这两步合在一起,成为一个原子性的操作,进行加锁
//新增绑定 public void insertBinding(Binding binding) throws MqException { //1.先使用exchangeName查询一下,看看对应的哈希表是否存在,不存在就创建一个 ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.get(binding.getExchangeName()); if(bindingMap == null){ //不存在就创建出来 bindingMap =new ConcurrentHashMap<>(); //创建完毕之后就放入bindingsMap中去 bindingsMap.put(binding.getExchangeName(), bindingMap); } //上面这段代码有点多,我们也可以使用下面这段代码来代替 //ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(), // k -> new ConcurrentHashMap<>()); synchronized (bindingMap){ //2.接着根据queueName查询一下绑定是否存在,若存在,抛异常,不存在才可以插入绑定 if(bindingMap.get(binding.getQueueName()) != null){ throw new MqException("[MemoryDataCenter] 绑定已经存在,exchangeName="+binding.getExchangeName() + ", queueName=" + binding.getQueueName()); } //3.查询到绑定不存在,就可以插入绑定了 bindingMap.put(binding.getQueueName(), binding); } System.out.println("[MemoryDatCenter] 绑定添加成功了 exchangeName = " + binding.getExchangeName() +", queueName = " +binding.getQueueName()); }
获取绑定
//获取绑定:写两个版本
// 1. 根据exchangeName和queueName确定唯一一个Binding
// 2. 根据exchangeName获取到所有的Binding // 1. 根据exchangeName和queueName确定唯一一个Binding
public Binding getBinding(String exchangeName, String queueName){ ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(exchangeName); if(bindingMap == null){ return null; } return bindingMap.get(queueName);
} // 2. 根据exchangeName获取到所有的Binding
public ConcurrentHashMap<String,Binding> getBindings(String exchangeName){ return bindingsMap.get(exchangeName);
}
删除绑定
//删除绑定:
public void deleteBinding(Binding binding) throws MqException { ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(binding.getExchangeName()); if(bindingMap == null){ //该交换机没有绑定任何队列,抛出异常 throw new MqException("[MemoryDataCenter] 绑定不存在,exchangeName=" + binding.getExchangeName() + ", queueName=" + binding.getQueueName()); } bindingMap.remove(binding.getQueueName()); System.out.println("[MemoryDataCenter] 绑定删除成功了 exchangeName = " + binding.getExchangeName() + ", queueName = " + binding.getQueueName());
}
针对消息进行操作
添加消息
//添加消息
public void addMessage(Message message){ messageMap.put(message.getMessageId(), message); System.out.println("[MemoryDataCenter] 新消息添加成功!messageId = " + message.getMessageId());
}
查询信息
我们是根据MessagId来查询消息的:
public Message getMessage(String messageId){ return messageMap.get(messageId);
}
删除消息
我们是根据MessageId来删除消息的
public void removeMessage(String messageId){ messageMap.remove(messageId); System.out.println("[MemoryDataCenter] 消息被删除成功了 messageId= "+messageId);
}
发送消息到指定队列
//发送消息到指定队列
public void sendMessage(MSGQueue queue, Message message){ //把消息放到对应的数据结构中 //先根据队列的名字找到该队列对应的消息链表,然后把消息放到消息链表中 LinkedList<Message> messages =queueMessageMap.get(queue.getName()); if(messages == null){ //如果没有这个消息链表,就创建出这个链表 messages = new LinkedList<>(); //创建出链表之后,就把这个链表放到队列中去 queueMessageMap.put(queue.getName(), messages); } //再把消息数据加到messages这个链表里面 //由于链表本身是线程不安全的,所以要加锁 synchronized (messages){ //把消息数据加到messages这个链表里面 messages.add(message); } //接着把该消息也插入到总的哈希表中,也就是消息数据中心中插入 //如果这个消息已经在消息中心中存在了,这里再次插入消息也没事, // 主要是相同messageId对应的message内容是一样的 addMessage(message); System.out.println("[MemeoryDataCenter] 消息被投递到队列中了! messageId = " + message.getMessageId()); }
从队列中取出到消息
//从队列中获取到消息
public Message pollMessage(String queueName){ //先根据队列名去查找一下。对应的队列的消息链表 //如果没找到,说明队列中没有消息 LinkedList<Message> messages = queueMessageMap.get(queueName); if(messages == null){ return null; } synchronized (messages){ if(messages.size() == 0){ return null; } //如果链表中有元素,那就进行头删操作//消息取出来之后,就要在messages中删除,因为消息已经被取走了 Message currentMessage = messages.remove(0); System.out.println("[MemoryDataCenter] 消息从队列中取出! messageId= " +currentMessage.getMessageId()); return currentMessage; }
}
获取指定队列中消息的个数
//获取指定队列中消息的个数
public int getMessageCount(String queueName){ LinkedList<Message> messages = queueMessageMap.get(queueName); if(messages == null){ //队列中没有消息 return 0; } synchronized (messages){ return messages.size(); }
}
针对未确认消息进行操作
添加未确认消息
public void addMessageWaitAck(String queueName, Message message){ ConcurrentHashMap<String,Message> messageHashMap = queueMessageWaitMap.computeIfAbsent(queueName, k -> new ConcurrentHashMap<>()); messageHashMap.put(message.getMessageId(),message); System.out.println("[MemoryDataCenter] 消息进入了待确认队列! messageId = " +message.getMessageId()); }
删除未确认的消息
//删除未确认的消息(消息已经确认了)
public void removeMessageWaitAck(String queueName, String messageId){ ConcurrentHashMap<String, Message> messageHashMap = queueMessageWaitMap.get(queueName); if(messageHashMap == null){ return; } messageHashMap.remove(messageId); System.out.println("[MemoryDataCenter] 消息从待确认队列中删除 messageId = " +messageId); }
获取指定的未确认消息
//删除未确认的消息(消息已经确认了)
public void removeMessageWaitAck(String queueName, String messageId){ ConcurrentHashMap<String, Message> messageHashMap = queueMessageWaitMap.get(queueName); if(messageHashMap == null){ return; } messageHashMap.remove(messageId); System.out.println("[MemoryDataCenter] 消息从待确认队列中删除 messageId = " +messageId); }
恢复数据到内存中去
//这个方法就是从硬盘上读取数据,把硬盘中之前持久化存储的各个维度的数据都恢复到内存中
//通过DiskDataCenter来获取到硬盘上面的数据
public void recovery(DiskDataCenter diskDataCenter) throws IOException, MqException, ClassNotFoundException { //0. 在恢复之间,先把里面的所有数据全部清空 exchangeMap.clear(); queueMap.clear(); bindingsMap.clear(); messageMap.clear(); queueMessageMap.clear(); //1.恢复所有的交换机数据 List<Exchange> exchanges = diskDataCenter.selectAllExchanges(); for(Exchange exchange : exchanges){ exchangeMap.put(exchange.getName(), exchange); } //2.恢复所有的队列数据 // 2. 恢复所有的队列数据 List<MSGQueue> queues = diskDataCenter.selectAllQueues(); for (MSGQueue queue : queues) { queueMap.put(queue.getName(), queue); } //3.恢复所有的绑定数据 List<Binding> bindings = diskDataCenter.selectAllBindings(); for(Binding binding : bindings){ ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(), k -> new ConcurrentHashMap<>()); } //4.恢复所有的消息数据 //遍历所有的队列,然后根据每个队列的名字获取到所有的消息 for(MSGQueue queue : queues){ LinkedList<Message> messages = diskDataCenter.loadAllMessageFromQueue(queue.getName()); queueMessageMap.put(queue.getName(),messages); for(Message message : messages){ messageMap.put(message.getMessageId(), message); } } //注意,针对“未确认消息”这部分内存中的数据,不需要从硬盘恢复,之前考虑硬盘存储的时候,也没有设定这一块 //一旦在等待ack的过程中,服务器重启了,此时这些“未被确认的消息”就恢复成“未被取走的消息” //这个消息在硬盘上存储的时候,就是当做了“未被取走” }
总代码如下所示:
package org.example.mqtexxt.mqserver.datacenter; import org.example.mqtexxt.common.MqException;
import org.example.mqtexxt.mqserver.core.Binding;
import org.example.mqtexxt.mqserver.core.Exchange;
import org.example.mqtexxt.mqserver.core.MSGQueue;
import org.example.mqtexxt.mqserver.core.Message; import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap; /*
使用这个类来统一管理内存上的所有数据 */public class MemoryDataCenter { //存储交换机 //key是exchangeName,value是exchange对象 private ConcurrentHashMap<String, Exchange> exchangeMap = new ConcurrentHashMap<>(); //存储队列 //key是queueName,value是MSGQueue对象 private ConcurrentHashMap<String, MSGQueue> queueMap = new ConcurrentHashMap<>(); //存储绑定 //第一个key是exchangeName,第二个key是queueName private ConcurrentHashMap<String, ConcurrentHashMap<String, Binding>> bindingsMap = new ConcurrentHashMap<>(); //存储消息 //key是messageId,value是Message对象 private ConcurrentHashMap<String, Message> messageMap = new ConcurrentHashMap<>(); //存储: 消息和队列之间的从属关系 //key是队列的名字queueName,value是一个Message的链表 private ConcurrentHashMap<String, LinkedList<Message>> queueMessageMap = new ConcurrentHashMap<>(); //待确认的消息 //第一个key是queueName,第二个key是messageId private ConcurrentHashMap<String,ConcurrentHashMap<String,Message>> queueMessageWaitMap = new ConcurrentHashMap<>(); //针对交换机进行操作: //插入交换机 public void insertExchange(Exchange exchange){ //把交换机插入到HashMap表中即可 exchangeMap.put(exchange.getName(),exchange); System.out.println("[MemoryDatCenter] 交换机创建成功了 exchangeName = "+exchange.getName()); } //查找交换机: 根据交换机名字查找交换机 public Exchange getExchange(String exchangeName){ //在HashMap表中查找交换机 return exchangeMap.get(exchangeName); } //删除交换机: public void deleteExchange(String exchangeName){ exchangeMap.remove(exchangeName); System.out.println("[MemoryDatCenter] 交换机被删除成功了 exchangeName = "+exchangeName); } //针对队列进行操作 //插入队列 public void insertQueue(MSGQueue queue){ queueMap.put(queue.getName(), queue); System.out.println("[MemoryDatCenter] 队列被成功了 queueName= "+queue.getName()); } //查找队列 public MSGQueue getQueue(String queueName){ return queueMap.get(queueName); } //删除队列 public void deleteQueue(String queueName){ queueMap.remove(queueName); System.out.println("[MemoryDatCenter] 队列被删除成功了 queueName= "+queueName); } //针对绑定的操作 //新增绑定 public void insertBinding(Binding binding) throws MqException { //1.先使用exchangeName查询一下,看看对应的哈希表是否存在,不存在就创建一个 ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.get(binding.getExchangeName()); if(bindingMap == null){ //不存在就创建出来 bindingMap =new ConcurrentHashMap<>(); //创建完毕之后就放入bindingsMap中去 bindingsMap.put(binding.getExchangeName(), bindingMap); } //上面这段代码有点多,我们也可以使用下面这段代码来代替 //ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(), // k -> new ConcurrentHashMap<>()); synchronized (bindingMap){ //2.接着根据queueName查询一下绑定是否存在,若存在,抛异常,不存在才可以插入绑定 if(bindingMap.get(binding.getQueueName()) != null){ throw new MqException("[MemoryDataCenter] 绑定已经存在,exchangeName="+binding.getExchangeName() + ", queueName=" + binding.getQueueName()); } //3.查询到绑定不存在,就可以插入绑定了 bindingMap.put(binding.getQueueName(), binding); } System.out.println("[MemoryDatCenter] 绑定添加成功了 exchangeName = " + binding.getExchangeName() +", queueName = " +binding.getQueueName()); } //获取绑定:写两个版本 // 1. 根据exchangeName和queueName确定唯一一个Binding // 2. 根据exchangeName获取到所有的Binding // 1. 根据exchangeName和queueName确定唯一一个Binding public Binding getBinding(String exchangeName, String queueName){ ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(exchangeName); if(bindingMap == null){ return null; } return bindingMap.get(queueName); } // 2. 根据exchangeName获取到所有的Binding public ConcurrentHashMap<String,Binding> getBindings(String exchangeName){ return bindingsMap.get(exchangeName); } //删除绑定 public void deleteBinding(Binding binding) throws MqException { ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(binding.getExchangeName()); if(bindingMap == null){ //该交换机没有绑定任何队列,抛出异常 throw new MqException("[MemoryDataCenter] 绑定不存在,exchangeName=" + binding.getExchangeName() + ", queueName=" + binding.getQueueName()); } bindingMap.remove(binding.getQueueName()); System.out.println("[MemoryDataCenter] 绑定删除成功了 exchangeName = " + binding.getExchangeName() + ", queueName = " + binding.getQueueName()); } //添加消息 public void addMessage(Message message){ messageMap.put(message.getMessageId(), message); System.out.println("[MemoryDataCenter] 新消息添加成功!messageId = " + message.getMessageId()); } //根据MessageId查询信息 public Message getMessage(String messageId){ return messageMap.get(messageId); } //根据MessageId删除消息 public void removeMessage(String messageId){ messageMap.remove(messageId); System.out.println("[MemoryDatCenter] 消息被删除成功了 messageId= "+messageId); } //发送消息到指定队列 public void sendMessage(MSGQueue queue, Message message){ //把消息放到对应的数据结构中 //先根据队列的名字找到该队列对应的消息链表,然后把消息放到消息链表中 LinkedList<Message> messages =queueMessageMap.get(queue.getName()); if(messages == null){ //如果没有这个消息链表,就创建出这个链表 messages = new LinkedList<>(); //创建出链表之后,就把这个链表放到队列中去 queueMessageMap.put(queue.getName(), messages); } //再把消息数据加到messages这个链表里面 //由于链表本身是线程不安全的,所以要加锁 synchronized (messages){ //把消息数据加到messages这个链表里面 messages.add(message); } //接着把该消息也插入到总的哈希表中,也就是消息数据中心中插入 //如果这个消息已经在消息中心中存在了在,这里再次插入消息也没事, // 主要是相同messageId对应的message内容是一样的 addMessage(message); System.out.println("[MemeoryDataCenter] 消息被投递到队列中了! messageId = " + message.getMessageId()); } //从队列中获取到消息 public Message pollMessage(String queueName){ //先根据队列名去查找一下。对应的队列的消息链表 //如果没找到,说明队列中没有消息 LinkedList<Message> messages = queueMessageMap.get(queueName); if(messages == null){ return null; } synchronized (messages){ if(messages.size() == 0){ return null; } //如果链表中有元素,那就进行头删操作 Message currentMessage = messages.remove(0); System.out.println("[MemoryDataCenter] 消息从队列中取出! messageId= " +currentMessage.getMessageId()); return currentMessage; } } //获取指定队列中消息的个数 public int getMessageCount(String queueName){ LinkedList<Message> messages = queueMessageMap.get(queueName); if(messages == null){ //队列中没有消息 return 0; } synchronized (messages){ return messages.size(); } } //添加未确认消息 public void addMessageWaitAck(String queueName, Message message){ ConcurrentHashMap<String,Message> messageHashMap = queueMessageWaitMap.computeIfAbsent(queueName, k -> new ConcurrentHashMap<>()); messageHashMap.put(message.getMessageId(),message); System.out.println("[MemoryDataCenter] 消息进入了待确认队列! messageId = " +message.getMessageId()); } //删除未确认的消息(消息已经确认了) public void removeMessageWaitAck(String queueName, String messageId){ ConcurrentHashMap<String, Message> messageHashMap = queueMessageWaitMap.get(queueName); if(messageHashMap == null){ return; } messageHashMap.remove(messageId); System.out.println("[MemoryDataCenter] 消息从待确认队列中删除 messageId = " +messageId); } //获取指定的未确认消息 public Message getMessageWaitAck(String queueName, String messageId){ ConcurrentHashMap<String,Message> messageConcurrentHashMap = queueMessageWaitMap.get(queueName); if(messageId == null){ return null; } return messageConcurrentHashMap.get(messageId); } //这个方法就是从硬盘上读取数据,把硬盘中之前持久化存储的各个维度的数据都恢复到内存中 //通过DiskDataCenter来获取到硬盘上面的数据 public void recovery(DiskDataCenter diskDataCenter) throws IOException, MqException, ClassNotFoundException { //0. 在恢复之间,先把里面的所有数据全部清空 exchangeMap.clear(); queueMap.clear(); bindingsMap.clear(); messageMap.clear(); queueMessageMap.clear(); //1.恢复所有的交换机数据 List<Exchange> exchanges = diskDataCenter.selectAllExchanges(); for(Exchange exchange : exchanges){ exchangeMap.put(exchange.getName(), exchange); } //2.恢复所有的队列数据 // 2. 恢复所有的队列数据 List<MSGQueue> queues = diskDataCenter.selectAllQueues(); for (MSGQueue queue : queues) { queueMap.put(queue.getName(), queue); } //3.恢复所有的绑定数据 List<Binding> bindings = diskDataCenter.selectAllBindings(); for(Binding binding : bindings){ ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(), k -> new ConcurrentHashMap<>()); } //4.恢复所有的消息数据 //遍历所有的队列,然后根据每个队列的名字获取到所有的消息 for(MSGQueue queue : queues){ LinkedList<Message> messages = diskDataCenter.loadAllMessageFromQueue(queue.getName()); queueMessageMap.put(queue.getName(),messages); for(Message message : messages){ messageMap.put(message.getMessageId(), message); } } //注意,针对“未确认消息”这部分内存中的数据,不需要从硬盘恢复,之前考虑硬盘存储的时候,也没有设定这一块 //一旦在等待ack的过程中,服务器重启了,此时这些“未被确认的消息”就恢复成“未被取走的消息” //这个消息在硬盘上存储的时候,就是当做了“未被取走” } }
内存管理数据的逻辑已经编写得差不多了,下面进行简单的总结
内存管理数据的总结
内存管理这一块主要是使用到了一系列的数据结构,保存和管理,交换机,队列,绑定,消息
我们广泛地使用到了哈希表,链表,嵌套的结构
线程安全问题
涉及到加锁操作
要不要加锁?
锁加到哪里?
使用哪个对象作为锁对象?
是没有统一的加锁规则的,只能要具体问题具体分析
总的加锁原则是:
如果代码不加锁,会造成什么样的后果和问题
后果是否很严重?
你能不能接受后果?