【项目篇之统一硬盘操作】仿照RabbitMQ模拟实现消息队列
统一硬盘操作
- 创建出实例
- 封装交换机的操作
- 封装队列的操作
- 封装绑定的操作
- 封装消息的操作
- 总的完整代码:
我们之前已经使用了数据库去管理交换机,绑定,队列
还使用了数据文件去管理消息
此时我们就搞一个类去把上述两个部分都整合在一起,对上层提供统一的一套接口
在项目文件中的datacenter这个包下面创建一个新的类:DiskDataCenter
使用这个类来管理所有的硬盘上的数据:
一个是数据库:交换机,绑定,队列
一个是数据文件:消息
上层逻辑如果需要操作硬盘,统一都通过这个类来使用(上层代码不在乎当前数据是在数据文件中还是数据库中)
创建出实例
我们先去创建出数据库实例和数据文件的实例:
//把数据库实例创建出来 private DataBaseMapper dataBaseMapper = new DataBaseMapper(); //把数据文件的实力创建出来 private MessageFileManager messageFileManager = new MessageFileManager();
接着对这两个实例进行初始化:
//初始化方法:针对上面的两个实例进行初始化 public void init(){ dataBaseMapper.init(); //下面这个是空的方法,后续要扩展再写 messageFileManager.init(); }
封装交换机的操作
我们这里是使用刚刚创建出来的数据库的实例dataBaseMapper去封装了插入交换机,删除交换机,查询交换机:
//封装交换机的三个操作 //插入交换机 public void insertExchange(Exchange exchange){ dataBaseMapper.insertExchange(exchange); } //删除交换机 public void deleteExchange(String exchangeName){ dataBaseMapper.deleteExchange(exchangeName); } //查询交换机 public List<Exchange> selectAllExchanges(){ return dataBaseMapper.selectAllexchanges(); }
封装队列的操作
我们这里是使用刚刚创建出来的数据库的实例dataBaseMapper去封装了插入队列,删除队列,查询队列:
//封装队列的三个操作 //插入队列 public void insertQueue(MSGQueue queue){ dataBaseMapper.insertQueue(queue); } //删除队列 public void deleteQueue(String queueName){ dataBaseMapper.deleteQueue(queueName); } //查询队列 public List<MSGQueue> selectAllQueue(){ return dataBaseMapper.selectAllQueues(); }
封装绑定的操作
我们这里是使用刚刚创建出来的数据库的实例dataBaseMapper去封装了插入绑定,删除绑定,查询绑定:
//封装绑定的三个操作 //插入绑定 public void insertBinding(Binding binding){ dataBaseMapper.insertBinding(binding); } //删除绑定 public void deleteBinding(Binding binding){ dataBaseMapper.deleteBinding(binding); } //查询绑定 public List<Binding> selectAllBindings(){ return dataBaseMapper.selectAllBindings(); }
封装消息的操作
我们这里是使用刚刚创建出来的数据文件的实例messageFileManager去封装了发送消息,删除消息,加载队列中的所有消息:
//封装消息操作 //发送消息 public void sendMessage(MSGQueue queue, Message message) throws IOException, MqException { messageFileManager.sendMessage(queue,message); } //删除消息 //考虑删除了之后,多了一个无效消息,看看是不是要进行垃圾回收 public void deleteMessage(MSGQueue queue, Message message) throws IOException, ClassNotFoundException, MqException { messageFileManager.deleteMessage(queue,message); if(messageFileManager.checkGC(queue.getName())){ messageFileManager.gc(queue); } } //加载队列中的所有消息public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException { return messageFileManager.loadAllMessageFromQueue(queueName); }
总的完整代码:
这个DiskDataCenter类的总代码如下所示:
package org.example.mqtexxt.mqserver.datacenter; import org.example.mqtexxt.common.MqException;
import org.example.mqtexxt.mqserver.core.Binding;
import org.example.mqtexxt.mqserver.core.Exchange;
import org.example.mqtexxt.mqserver.core.MSGQueue;
import org.example.mqtexxt.mqserver.core.Message; import java.io.IOException;
import java.util.LinkedList;
import java.util.List; /*
使用这个类来管理所有的硬盘上的数据:
一个是数据库:交换机,绑定,队列
一个是数据文件:消息
上层逻辑如果需要操作硬盘,统一都通过这个类来使用(上层代码不在乎当前数据是在数据文件中还是数据库中) */public class DiskDataCenter { //把数据库实例创建出来 private DataBaseMapper dataBaseMapper = new DataBaseMapper(); //把数据文件的实力创建出来 private MessageFileManager messageFileManager = new MessageFileManager(); //初始化方法:针对上面的两个实例进行初始化 public void init(){ dataBaseMapper.init(); //下面这个是空的方法,后续要扩展再写 messageFileManager.init(); } //封装交换机的三个操作 //插入交换机 public void insertExchange(Exchange exchange){ dataBaseMapper.insertExchange(exchange); } //删除交换机 public void deleteExchange(String exchangeName){ dataBaseMapper.deleteExchange(exchangeName); } //查询交换机 public List<Exchange> selectAllExchanges(){ return dataBaseMapper.selectAllexchanges(); } //封装队列的三个操作 //插入队列 public void insertQueue(MSGQueue queue){ dataBaseMapper.insertQueue(queue); } //删除队列 public void deleteQueue(String queueName){ dataBaseMapper.deleteQueue(queueName); } //查询队列 public List<MSGQueue> selectAllQueue(){ return dataBaseMapper.selectAllQueues(); } //封装绑定的三个操作 //插入绑定 public void insertBinding(Binding binding){ dataBaseMapper.insertBinding(binding); } //删除绑定 public void deleteBinding(Binding binding){ dataBaseMapper.deleteBinding(binding); } //查询绑定 public List<Binding> selectAllBindings(){ return dataBaseMapper.selectAllBindings(); } //封装消息操作 //发送消息 public void sendMessage(MSGQueue queue, Message message) throws IOException, MqException { messageFileManager.sendMessage(queue,message); } //删除消息 //考虑删除了之后,多了一个无效消息,看看是不是要进行垃圾回收 public void deleteMessage(MSGQueue queue, Message message) throws IOException, ClassNotFoundException, MqException { messageFileManager.deleteMessage(queue,message); if(messageFileManager.checkGC(queue.getName())){ messageFileManager.gc(queue); } } //加载队列中的所有消息public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException { return messageFileManager.loadAllMessageFromQueue(queueName); }
}
DiskDataCenter类主要就是去封装了消息的基本操作,其实也就是把之前的MessageFileManager类和DataBaseMapper类的关键方法统一进行了封装操作
后续的代码中,上层代码就不用直接去调用MessageFileManager类和DataBaseMapper类了
上层代码而是直接去调用这个DiskDataCenter类即可