当前位置: 首页 > java >正文

RocketMQ 的详细使用教程

以下是 RocketMQ 的详细使用教程,涵盖安装配置、基础操作、高级功能及高可用部署等内容:


一、安装与配置

1. 单机模式安装
  1. 下载与解压
    • 从 RocketMQ 官网 下载对应版本,解压后进入 bin 目录15。

示例命令:

unzip rocketmq-all-5.1.4-bin-release.zip
cd rocketmq-5.1.4
  1. 启动服务

NameServer:RocketMQ 的注册中心。

nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log  # 查看启动日志

Broker:消息代理服务。

nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log
  1. 验证安装

使用内置工具测试消息收发:

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer  # 生产者
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer  # 消费者
2. 集群部署(高可用)
  1. 双主双从架构

在两台服务器上分别部署 NameServer 和 Broker,配置文件 broker-master.propertiesbroker-slave.properties,关键参数:

namesrvAddr=192.168.100.43:9876;192.168.100.44:9876  # 多 NameServer 地址
brokerRole=SYNC_MASTER  # 主节点同步复制
flushDiskType=ASYNC_FLUSH  # 异步刷盘

启动命令:

nohup sh bin/mqbroker -c conf/broker-master.properties &
nohup sh bin/mqbroker -c conf/broker-slave.properties &
  1. 配置说明
    • brokerId=0 表示主节点,>0 为从节点5。
    • fileReservedTime=48 设置消息保留时间(小时)5。

二、基础使用

1. 创建 Topic

命令行创建

  • bash复制sh bin/mqadmin createTopic -n localhost:9876 -c DefaultCluster -t testTopic
  • 控制台创建:访问 http://IP:8080/admin,输入默认账号 admin/admin,在主题管理页面创建4。
2. 生产者发送消息
// 初始化生产者
DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();// 发送消息
Message msg = new Message("testTopic", "tagA", "Hello RocketMQ".getBytes());
SendResult result = producer.send(msg);
producer.shutdown();
3. 消费者订阅消息
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("testTopic", "*");consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {for (MessageExt msg : msgs) {System.out.println("Received: " + new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();

三、高级功能

1. 事务消息

实现两阶段提交

TransactionMQProducer producer = new TransactionMQProducer("transactionGroup");
producer.setTransactionListener(new TransactionListener() {@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {// 执行本地事务(如数据库操作)return LocalTransactionState.COMMIT_MESSAGE;}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {return LocalTransactionState.UNKNOW;}
});
producer.sendMessageInTransaction(msg, null);
2. 延迟消息

支持 18 个延迟级别(1s~2h):

Message msg = new Message("testTopic", "tagA", "Delayed Message".getBytes());
msg.setDelayTimeLevel(3);  // 10秒延迟
producer.send(msg);
3. 消息过滤
  • Tag 过滤:消费者订阅时指定 Tag(如 consumer.subscribe("testTopic", "tagA || tagB"))。
  • SQL 过滤:通过消息属性筛选(需 Broker 配置 enablePropertyFilter=true)。

四、高可用与性能优化

1. 消息存储机制
  • CommitLog:顺序写入消息内容,提升吞吐量(顺序写速度可达 600MB/s)2。
  • ConsumeQueue:逻辑队列,存储消息索引,加速消费2。
2. 刷盘策略
  • 同步刷盘:消息写入磁盘后才返回成功,数据可靠性高,性能较低。
  • 异步刷盘:消息写入 PageCache 后立即返回,性能高,适合高吞吐场景25。
3. 负载均衡
  • 生产者负载均衡:默认轮询发送到不同 Broker 的队列2。
  • 消费者负载均衡:集群模式下自动分配队列,支持 AllocateMessageQueueAveragely(平均分配)和 AllocateMessageQueueByCircle(环状分配)2。

五、监控与管理

1. 控制台功能
  • 实时监控:查看消息堆积、TPS、消费者状态等4。
  • 主题管理:动态创建/删除 Topic,调整队列数量。
  • 日志查看:支持按时间范围检索 Broker 和消费者日志。
2. 日志配置
  • 日志路径:~/logs/rocketmqlogs/,可通过 logback.xml 调整日志级别和格式。

六、常见问题

  1. 消息重复消费
    • 解决方案:消费者实现幂等性(如通过唯一业务 ID 去重)2。
  1. Broker 启动失败
    • 检查端口冲突(默认 NameServer 端口 9876,Broker 端口 10911)5。

总结

RocketMQ 的核心使用流程包括安装部署、Topic 管理、消息生产与消费,高级功能涵盖事务消息、延迟消息和高可用集群配置。实际应用中需根据业务场景选择刷盘策略(同步/异步)和复制方式(同步/异步主从),并结合控制台监控优化性能。更多配置细节可参考 官方文档7。

http://www.xdnf.cn/news/130.html

相关文章:

  • 【多目标进化算法】NSGA-II 算法(结合例子)
  • 【C++】 —— 笔试刷题day_19
  • Web3架构下的数据隐私与保护
  • 【数据结构_10】二叉树(2)
  • HarmonyOS:1.4 - HarmonyOS应用程序框架基础
  • Python(21)Python日期时间完全指南:从基础到实战注意事项
  • QT 文件和文件夹操作
  • 基于SpringBoot成绩管理系统设计与实现(源码+文档+部署讲解)
  • SAP系统中MD01与MD02区别
  • 如何使用Python进行自动化的系统管理?
  • 《软件设计师》复习笔记(14.2)——统一建模语言UML、事务关系图
  • TCL 亮相北京 InfoComm China 2025,引领商显智能化变革浪潮
  • AI数据分析与BI可视化结合:解锁企业决策新境界
  • Java 高并发核心:线程池使用详解 + 自定义参数配置全剖析(附源码+面试解析)
  • 基于ubuntu24.10安装NACOS2.5.1的简介
  • PHP腾讯云人脸核身获取Access Token
  • 【ESP32-IDF笔记】06-触摸传感IO配置
  • day1-小白学习JAVA(mac版)---(jdk安装和环境变量配置)
  • 《软件设计师》复习笔记(14.3)——设计模式
  • Java ThreadLocal内存泄漏分析
  • Docker Image export and load and tag
  • 所见即所得的前端 AI 工具 Readdy.ai
  • ubantu18.04(Hadoop3.1.3)之MapReduce编程
  • 算法-堆+单调栈
  • Python爬虫第16节-动态渲染页面抓取之Selenium使用上篇
  • 文件包含(详解)
  • 力扣每日打卡 2176. 统计数组中相等且可以被整除的数对(简单)
  • 基于 React 和 CodeMirror 实现自定义占位符编辑器
  • java 设计模式之模板方法模式
  • OpenVINO怎么用