【RabbitWQ】基于 Java 实现轻量级消息队列(二)
目录
一. 数据库创建
1.1 导入Maven
1.2 配置信息
1.3 建表
1.3.1 核心API
1.3.2 Mybatis
二. 数据库管理
2.1 初始化操作
2.2 删除数据库
2.3 交换机操作
2.4 队列操作
2.5 绑定操作
2.6 补充方法
一. 数据库创建
考虑到项目的轻便性,元数据规模适中,读多写少,关系模型简单,所以这里使用的数据库是SQLite,SQLite主要用于元数据存储 ,负责保存交换机(Exchange)、队列(MSGQueue)和绑定关系(Binding)等核心配置信息
1.1 导入Maven
<dependency><groupId>org.xerial</groupId><artifactId>sqlite-jdbc</artifactId><version>3.41.0.1</version></dependency>
这个时候SQlite的优势就体现出来了,不需要额外安装,不需要启动服务,只需要导入Maven就可以使用了
1.2 配置信息
spring:datasource:url: jdbc:sqlite:./data/meta.dbusername:password:driver-class-name: org.sqlite.JDBC
SQLite不需要账号密码,数据存放在本地文件上,跟网络无关,只有本地主机才可以访问
SQLite中的 url 就是数据的存储位置
1.3 建表
这里的数据库就是url中指定的文件,配置和依赖没有问题的话,程序启动就会自动建库
这里根据交换机,队列,绑定的核心操作,创建出对应的SQL语句
- 交换机——创建交换机,增加交换机,删除交换机,查询所有交换机
- 队列——创建队列,增加队列,删除队列,查询所有队列
- 绑定——创建绑定,增加绑定,删除绑定,查询所有绑定
1.3.1 核心API
数据库操作的核心API
@Mapper
public interface IMetaMapper {//创建表void createExchangeTable();void createMSGQueueTable();void createBindingTable();// 交换机增删查操作void insertExchange(Exchange exchange);void deleteExchange(String exchangeName);List<Exchange> selectAllExchange();// 队列增删查操作void insertMSGQueue(MSGQueue queue);void deleteMSGQueue(String queueName);List<MSGQueue> selectAllMSGQueue();// 绑定增删查操作void insertBinding(Binding binding);void deleteBinding(Binding binding);List<Binding> selectAllBinding();
}
1.3.2 Mybatis
Mybatis的yml配置信息
mybatis:mapper-locations: classpath:mapper/**Mapper.xml# Mysql注释log-impl: org.apache.ibatis.logging.stdout.StdOutImpl# 自动转换驼峰map-underscore-to-camel-case: true
使用Mybatis编写sql语句
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN""http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.example.mq.mqserver.mapper.IMetaMapper"><update id="createExchangeTable">create table if not exists exchange (name varchar(50) primary key,type int,durable boolean,auto_delete boolean,arguments varchar(1024));</update><update id="createMSGQueueTable">create table if not exists msg_queue(name varchar(50) primary key,durable boolean,exclusive boolean,auto_delete boolean,arguments varchar(1024));</update><update id="createBindingTable">CREATE TABLE IF NOT EXISTS binding(exchange_name VARCHAR(50) NOT NULL,queue_name VARCHAR(50) NOT NULL,binding_key VARCHAR(256),PRIMARY KEY (exchange_name, queue_name))</update><insert id="insertExchange" parameterType="org.example.mq.mqserver.core.Exchange">insert into exchange values (#{name},#{type},#{durable},#{autoDelete},#{arguments});</insert><delete id="deleteExchange" parameterType="java.lang.String">delete from exchange where name = #{name};</delete><insert id="insertMSGQueue" parameterType="org.example.mq.mqserver.core.MSGQueue">insert into msg_queue values (#{name},#{durable},#{exclusive},#{autoDelete},#{arguments});</insert><delete id="deleteMSGQueue" parameterType="java.lang.String">delete from msg_queue where name = #{name};</delete><insert id="insertBinding" parameterType="org.example.mq.mqserver.core.Binding">INSERT OR IGNORE INTO binding(exchange_name, queue_name, binding_key)VALUES (#{exchangeName},#{queueName},#{bindingKey, jdbcType=VARCHAR})</insert><delete id="deleteBinding" parameterType="org.example.mq.mqserver.core.Binding">delete from bindingwhere exchange_name = #{exchangeName}and queue_name = #{queueName}<if test="bindingKey != null">and binding_key = #{bindingKey}</if></delete><select id="selectAllExchange" resultType="org.example.mq.mqserver.core.Exchange">select * from exchange</select><select id="selectAllMSGQueue" resultType="org.example.mq.mqserver.core.MSGQueue">select * from msg_queue</select><select id="selectAllBinding" resultType="org.example.mq.mqserver.core.Binding">SELECTexchange_name as "exchangeName",queue_name as "queueName",binding_key as "bindingKey"FROM binding</select>
</mapper>
二. 数据库管理
对数据库进行统一管理,封装数据库的初始化操作和删除数据库操作
2.1 初始化操作
先判断数据库是否存在
- 数据库如果存在,不做任何操作
- 数据库如果不存在,创建库,创建表,构造默认数据
public void init(){iMetaMapper = MqApplication.context.getBean(IMetaMapper.class);File fileDir = new File("./data");fileDir.mkdirs();if(!checkDBExists()){createTable();createDefaultData();System.out.println("数据加载完成");}else {System.out.println("数据已存在");}}
fileDir.mkdirs();这个创建目录的操作并不是多此一举,SQLite不能创建父目录,只能创建文件,如果没有这个创建目录的操作,直接创建表,可能会出现IO异常,重复创建没事,只是会返回false目录创建失败,对程序没有负面影响
2.2 删除数据库
先判断数据库是否存在
- 数据库如果存在,执行删除数据库,删除父目录操作
- 数据库如果不存在,检查父目录是否存在,存在则删除
public void deleteDB(){File file = new File("./data/meta.db");if(file.exists()){file.delete();System.out.println("数据文件删除成功");}else {System.out.println("数据文件不存在");}File fileDir = new File("./data");if(fileDir.exists()){fileDir.delete();System.out.println("data文件删除成功");}else {System.out.println("data文件不存在");}}
递归删除的方式,先删除文件后删除目录
2.3 交换机操作
封装交换机的增删查操作
public void insertExchange(Exchange exchange){iMetaMapper.insertExchange(exchange);}public void deleteExchange(String exchangeName){iMetaMapper.deleteExchange(exchangeName);}public List<Exchange> selectAllExchange(){return iMetaMapper.selectAllExchange();}
2.4 队列操作
封装队列的增删差操作
public void insertMSGQueue(MSGQueue queue){iMetaMapper.insertMSGQueue(queue);}public void deleteMSGQueue(String queueName){iMetaMapper.deleteMSGQueue(queueName);}public List<MSGQueue> selectAllMSGQueue(){return iMetaMapper.selectAllMSGQueue();}
2.5 绑定操作
封装绑定的增删差操作
public void insertBinding(Binding binding){iMetaMapper.insertBinding(binding);}public void deleteBinding(Binding binding){iMetaMapper.deleteBinding(binding);}public List<Binding> selectAllBinding(){return iMetaMapper.selectAllBinding();}
2.6 补充方法
检查文件是否存在,封装后的创建表和初始化数据操作
private boolean checkDBExists() {File file = new File("./data/meta.db");if(file.exists()){return true;}else {return false;}}private void createTable() {iMetaMapper.createExchangeTable();iMetaMapper.createMSGQueueTable();;iMetaMapper.createBindingTable();System.out.println("数据表创建完成");}private void createDefaultData() {Exchange exchange = new Exchange();exchange.setName("");exchange.setType(ExchangeType.DIRECT);exchange.setDurable(true);exchange.setAutoDelete(false);iMetaMapper.insertExchange(exchange);System.out.println("初始化已完成…");}