当前位置: 首页 > news >正文

SpringBoot整合RocketMQ(rocketmq-client.jar)

目录

配置pom.xml

配置application.properties

生产者配置 MQProducerConfig.java

消费者配置MQConsumerConfig.java

消费者监听 MQConsumeMsgListenerProcessor.java

发送消息


Springboot集成RocketMQ:通过直接引入rocketmq-client依赖实现基础集成,需手动配置生产者和消费者。

配置pom.xml

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.7.0</version>
</dependency>

配置application.properties

# 是否开启自动配置
rocketmq.producer.isOnOff=on
# 发送同一类消息设置为同一个group,保证唯一默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标识
rocketmq.producer.groupName=GID_abc
# mq的nameserver地址
rocketmq.producer.namesrvAddr=localhost:9876
# 消息最大长度 默认 1024 * 4 (4M)
rocketmq.producer.maxMessageSize = 4096
# 发送消息超时时间,默认 3000
rocketmq.producer.sendMsgTimeOut=3000
# 发送消息失败重试次数,默认2
rocketmq.producer.retryTimesWhenSendFailed=2# 是否开启自动配置
rocketmq.consumer.isOnOff=on
# 发送同一类消息设置为同一个group,保证唯一默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标识
rocketmq.consumer.groupName=GID_abc
# mq的nameserver地址
rocketmq.consumer.namesrvAddr=localhost:9876
# 消费者订阅的主题topic和tags(*标识订阅该主题下所有的tags),格式: topic~tag1||tag2||tags3;
rocketmq.consumer.topics=abcTopic~*
# 消费者线程数据量
rocketmq.consumer.consumeThreadMin=5
rocketmq.consumer.consumeThreadMax=32
# 设置一次消费信心的条数,默认1
rocketmq.consumer.consumeMessageBatchMaxSize=1

生产者配置 MQProducerConfig.java

import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Getter
@Setter
@ToString
@Configuration
@ConfigurationProperties(prefix = "rocketmq.producer")
public class MQProducerConfig {public static final Logger LOGGER = LoggerFactory.getLogger(MQProducerConfig.class);private String groupName;private String namesrvAddr;// 消息最大值private Integer maxMessageSize;// 消息发送超时时间private Integer sendMsgTimeOut;// 发送失败重试次数private Integer retryTimesWhenSendFailed;@Bean@ConditionalOnProperty(prefix = "rocketmq.producer", value = "isOnOff", havingValue = "on")public DefaultMQProducer defaultProducer() throws MQClientException {LOGGER.info("-----defaultProducer 正在创建-----");DefaultMQProducer producer = new DefaultMQProducer(groupName);producer.setNamesrvAddr(namesrvAddr);producer.setVipChannelEnabled(false);producer.setMaxMessageSize(maxMessageSize);producer.setSendMsgTimeout(sendMsgTimeOut);producer.setRetryTimesWhenSendAsyncFailed(retryTimesWhenSendFailed);producer.start();LOGGER.info("-----rocketmq producer server 开启成功-----");return producer;}
}

消费者配置MQConsumerConfig.java

import com.bestone.online.consult.receiverMq.MQConsumeMsgListenerProcessor;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Getter
@Setter
@ToString
@Configuration
@ConfigurationProperties(prefix = "rocketmq.consumer")
public class MQConsumerConfig {public static final Logger LOGGER = LoggerFactory.getLogger(MQConsumerConfig.class);private String groupName;private String namesrvAddr;private String topics;// 消费者线程数据量private Integer consumeThreadMin;private Integer consumeThreadMax;private Integer consumeMessageBatchMaxSize;@Autowiredprivate MQConsumeMsgListenerProcessor consumeMsgListenerProcessor;@Bean@ConditionalOnProperty(prefix = "rocketmq.consumer", value = "isOnOff", havingValue = "on")public DefaultMQPushConsumer defaultConsumer() {LOGGER.info("-----defaultConsumer 正在创建-----");DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);consumer.setNamesrvAddr(namesrvAddr);consumer.setConsumeThreadMin(consumeThreadMin);consumer.setConsumeThreadMax(consumeThreadMax);consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);// 设置监听consumer.registerMessageListener(consumeMsgListenerProcessor);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);try {// 设置该消费者订阅的主题和tag,如果订阅该主题下的所有tag,则使用*,String[] topicArr = topics.split(";");for (String topic : topicArr) {String[] tagArr = topic.split("~");consumer.subscribe(tagArr[0], tagArr[1]);}consumer.start();LOGGER.info("-----consumer 创建成功 groupName={}, topics={}, namesrvAddr={}-----", groupName, topics, namesrvAddr);} catch (MQClientException e) {LOGGER.error("-----consumer 创建失败!-----");}return consumer;}
}

消费者监听 MQConsumeMsgListenerProcessor.java

import com.alibaba.fastjson.JSONObject;
import java.util.Date;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.List;/*** 消费者监听*/
@Component
public class MQConsumeMsgListenerProcessor implements MessageListenerConcurrently {private static final Logger LOGGER = LoggerFactory.getLogger(MQConsumeMsgListenerProcessor.class);/*** 默认msg里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息* 不要抛异常,如果没有return CONSUME_SUCCESS,consumer会重新消费该消息,直到return CONSUME_SUCCESS*/@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList,ConsumeConcurrentlyContext consumeConcurrentlyContext) {if (CollectionUtils.isEmpty(msgList)) {LOGGER.info("MQ接收消息为空,直接返回成功");return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}MessageExt messageExt = msgList.get(0);LOGGER.info("MQ接收到的消息为:" + messageExt.toString());try {String topic = messageExt.getTopic();String tags = messageExt.getTags();String data = new String(messageExt.getBody(), "utf-8");LOGGER.info("MQ消息topic={}, tags={}, 消息内容={}", topic, tags, data);// TODO 处理业务逻辑} catch (Exception e) {LOGGER.error("获取MQ消息内容异常{}", e);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
}

发送消息

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/mqProducer")
public class MQProducerController {public static final Logger LOGGER = LoggerFactory.getLogger(MQProducerController.class);@AutowiredDefaultMQProducer defaultMQProducer;/*** 发送简单的MQ消息*/@GetMapping("/send")public void send(String msg)throws InterruptedException, RemotingException, MQClientException, MQBrokerException {LOGGER.info("发送MQ消息内容:" + msg);Message msg = new Message("abcTopic",   msg.getBytes());// 延时级别 "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"// 设置消息延迟级别为3,也就是延迟10s。msg.setDelayTimeLevel(3);// 定时发送消息,5秒之后发送msg.setDeliveryTimestamp(System.currentTimeMillis() + Duration.ofSeconds(5).toMillis());// 默认3秒超时SendResult sendResult = defaultMQProducer.send(msg);LOGGER.info("消息发送响应:" + sendResult.toString());}
}
http://www.xdnf.cn/news/1208395.html

相关文章:

  • C++ AI流处理核心算法实战
  • MOGA(多目标遗传算法)求解 ZDT1 双目标优化问题
  • 沪铝本周想法
  • 智能编队重构职场生态:Agentic AI 协同时代来临
  • 基于Blazor进销存管理系统
  • 对College数据进行多模型预测(R语言)
  • thingsboard 自定义动作JS编程
  • 【高阶版】R语言空间分析、模拟预测与可视化高级应用
  • 【C++算法】82.BFS解决FloodFill算法_被围绕的区域
  • Java抽Oracle数据时编码问题
  • SpringBoot整合RocketMQ(阿里云ONS)
  • CentOS安装ffmpeg并转码视频为mp4
  • 【腾讯云】EdgeOne免费版实现网站加速与安全防护
  • 通缩漩涡中的测量突围:新启航如何以国产 3D 白光干涉仪劈开半导体成本困局?
  • 橡胶制品加工:塑造生活的柔韧力量
  • SketchUp纹理贴图插件Architextures安装使用图文教程
  • 【Linux】环境变量
  • 字符串函数安全解析成执行函数
  • 【Spring Boot 快速入门】三、分层解耦
  • 论文阅读--射频电源在半导体领域的应用
  • 【nerf处理视频数据】Instant-NGP项目NeRF模型训练数据集准备指南
  • 机器学习线性回归:从基础到实践的入门指南
  • Golang语言如何高效使用字符串
  • VLA--Gemini Robotics On-Device: 将AI带到本地机器人设备上
  • 字节序详解
  • Windows下基于 SenseVoice模型的本地语音转文字工具
  • 重塑浏览器!微软在Edge加入AI Agent,自动化搜索、预测、整合
  • 数据结构【红黑树】
  • SeeMoE:从零开始实现一个MoE视觉语言模型
  • 【学习笔记】Lean4 定理证明 ing