当前位置: 首页 > news >正文

RocketMQ介绍与部署

RocketMQ介绍

MQ:MessageQueue,消息队列。
MQ的作⽤主要:

  • 异步能提⾼系统的响应速度、吞吐量。
  • 解耦:服务之间进⾏解耦,才可以减少服务之间的影响。解耦后可以实现数据分发,⽣产者发送⼀个消息后,可以由⼀个或者多个消费者进⾏消费,并且消费者的增加或者减少对⽣产者没有影响。
  • 削峰:以稳定的系统资源应对突发的流量冲击。

RocketMQ部署

Binary 下载

RocketMQ官网

修改默认配置

测试环境配置不高,修改choose_gc_options函数内的jvm相关参数

vim bin/runserver.sh

修改choose_gc_log_directory下一行的jvm相关参数

vim bin/runbroker.sh

启动

启动NameServer

nohup bin/mqnamesrv &

启动Broker

  • 先修改环境变量,让Broker知道NameServer的地址端口
vim ~/.bash_profileexport NAMESRV_ADDR='localhost:9876'source ~/.bash_profile
  • 启动Broker
nohup bin/mqbroker &

消息测试

  • 生产消息
 bin/tools.sh org.apache.rocketmq.example.quickstart.Producer 
  • 消费消息
bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

搭建Java客户端项目

  • 在pom.xml中引⼊以下核⼼依赖
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>对应的服务端的版本</version></dependenc>
  • 创建⼀个简单的消息⽣产者
public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {//初始化一个消息生产者DefaultMQProducer producer = new DefaultMQProducer("DemoProducer");// 指定nameserver地址producer.setNamesrvAddr("192.168.234.141:9876");//设置发送超时时间和客户端api超时时间producer.setSendMsgTimeout(10000);producer.setMqClientApiTimeout(10000);// 启动消息生产者服务producer.start();for (int i = 0; i < 1000; i++) {try {// 创建消息。消息由Topic,Tag和body三个属性组成,其中Body就是消息内容Message msg = new Message("TopicTest","TagA",("Hello RocketMQ " +i).getBytes(RemotingHelper.DEFAULT_CHARSET));//发送消息,获取发送结果SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}//消息发送完后,停止消息生产者服务。producer.shutdown();}
}
  • 创建⼀个消息消费者
public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {//构建一个消息消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DemoConsumer");//指定nameserver地址consumer.setNamesrvAddr("192.168.234.141:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);// 订阅一个感兴趣的话题,这个话题需要与消息的topic一致consumer.subscribe("TopicTest", "*");// 注册一个消息回调函数,消费到消息后就会触发回调。consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {msgs.forEach(messageExt -> {try {System.out.println("收到消息:"+new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET));} catch (UnsupportedEncodingException e) {}});return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//设置客户端api超时时间10000毫秒consumer.setMqClientApiTimeout(10000);//启动消费者服务consumer.start();System.out.print("Consumer Started");}
}

RocketMQ可视化管理服务

  • 源码下载
    官网Dashboard服务下载
  • 编译
 mvn clean package -Dmaven.test.skip=true
  • 修改配置文件
    在jar包所在的⽬录下创建⼀个application.yml配置⽂件
rocketmq: config: namesrvAddrs: - 192.168.65.112:9876
  • 运行
java -jar rocketmq-dashboard-1.0.1-SNAPSHOT.jar 1>dashboard.log 2>&1 &

在这里插入图片描述

http://www.xdnf.cn/news/774289.html

相关文章:

  • 动中通天线跟踪性能指标的测试
  • 显示即战略:铁电液晶如何成为 “数字中国” 的 “像素基石”?
  • Python训练营打卡 Day43
  • 【数据集】不同情景下全球城市扩张(2050年)
  • 嵌入式开发之STM32学习笔记day16
  • 初识Linux指令(笔记2)
  • Python_day43
  • 408考研逐题详解:2009年第28题
  • MCP调研
  • 揭秘 CompletedFuture 的设计精髓(基础)
  • 打卡day43
  • 第12次09:展示收货地址和新增地址
  • 基于vue3-elemenyui的动态列案例
  • 【C语言入门级教学】assert断⾔和指针的使用
  • linux学习第18天(fork函数)
  • 代码随想录算法训练营第六天| 242.有效的字母异位词 、 349. 两个数组的交集 、 202. 快乐数 、1. 两数之和
  • Cesium使用primitive添加点线面(贴地)
  • 【文献阅读】Learning Transferable Visual Models From Natural Language Supervision
  • 【网络信息安全体系结构】知识点总结
  • 每日算法-250602
  • 复变函数 $w = z^2$ 的映射图像演示
  • 电商 API 开发实战:唯品会商品详情页实时数据接口接入与调试
  • 【Python 进阶2】抽象方法和实例调用方法
  • 激光雷达的强度像和距离像误差与噪声分析(2)2025.6.2
  • ps反相调整
  • 西红柿番茄成熟度目标检测数据集介绍
  • RSCUcaller
  • C语言进阶知识:深入探索编程的奥秘
  • 免费的硬盘工具
  • c++ 赋值函数和拷贝构造函数的调用时机