当前位置: 首页 > news >正文

SpringBoot整合RocketMQ(阿里云ONS)

目录

配置pom.xml

配置application.properties 

MQ配置类

注入消息生产者配置信息

消息生产者

消费者配置

 消息消费者


配置pom.xml

<!-- 政务云MQ版本是4.x的,请使用对应的SDK ons-client-1.8.8.x -->
<dependency><groupId>com.aliyun.openservices</groupId><artifactId>ons-client</artifactId><version>1.8.8.8.Final</version>
</dependency>

配置application.properties 

rocketmq.accessKey=xxx
rocketmq.secretKey=zzz
rocketmq.nameSrvAddr=http://yyy.aliyun.com:9876
rocketmq.groupId=GID_abc
rocketmq.topic=abcTopic
rocketmq.tag=TagA

MQ配置类

import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Properties;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;@Getter
@Setter
@Configuration
@ConfigurationProperties(prefix = "rocketmq")
public class RocketMQConfig {private String accessKey;private String secretKey;private String nameSrvAddr;private String groupId;private String topic;private String tag;public Properties getMqPropertie() {Properties properties = new Properties();properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);return properties;}
}

注入消息生产者配置信息

ProducerBean用于将Producer集成至SpringBoot中

import com.aliyun.openservices.ons.api.bean.ProducerBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RocketMQProducerClient {@Autowiredprivate RocketMQConfig mqConfig;@Bean(initMethod = "start", destroyMethod = "shutdown")public ProducerBean buildProducer() {ProducerBean producer = new ProducerBean();producer.setProperties(mqConfig.getMqPropertie());return producer;}
}

消息生产者

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class RocketMQProducer {private static final Logger LOGGER = LoggerFactory.getLogger(RocketMQProducer.class);@Autowiredprivate ProducerBean producer;@Autowiredprivate RocketMQConfig mqConfig;public SendResult send(String body) {Message msg = new Message(mqConfig.getTopic(), mqConfig.getTag(), body.getBytes());msg.setKey(null);LOGGER.info("准备发送消息为:{}", body);try {SendResult sendResult = producer.send(msg);if (sendResult != null) {LOGGER.info("发送MQ消息成功");return sendResult;} else {LOGGER.error("发送MQ消息失败");return null;}} catch (ONSClientException e) {LOGGER.error("发送MQ消息失败");LOGGER.error("Exception -- : ", e);}return null;}
}

消费者配置

import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.bean.ConsumerBean;
import com.aliyun.openservices.ons.api.bean.Subscription;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RocketMQConsumerClient {@Autowiredprivate RocketMQConfig mqConfig;@Autowiredprivate MqReceiver messageListener;@Bean(initMethod = "start", destroyMethod = "shutdown")public ConsumerBean buildConsumer() {ConsumerBean consumerBean = new ConsumerBean();Properties properties = mqConfig.getMqPropertie();properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getGroupId());properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");properties.setProperty(PropertyKeyConst.MaxReconsumeTimes, "-1");consumerBean.setProperties(properties);Map<Subscription, MessageListener> subscriptionTable = new HashMap<>();Subscription subscription = new Subscription();subscription.setTopic(mqConfig.getTopic());subscription.setExpression(mqConfig.getTag());subscriptionTable.put(subscription, messageListener);consumerBean.setSubscriptionTable(subscriptionTable);return consumerBean;}
}

 消息消费者

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;@Component
public class MqReceiver implements MessageListener {private static final Logger LOGGER = LoggerFactory.getLogger(MqReceiver.class);@Overridepublic Action consume(Message message, ConsumeContext consumeContext) {if (Objects.nonNull(message)) {LOGGER.info("MQ接收到的消息为:" + message.toString());try {String data = new String(message.getBody(), StandardCharsets.UTF_8);LOGGER.info("MQ消息topic={},  消息内容={}", message.getTopic(), data);} catch (Exception e) {LOGGER.error("获取MQ消息内容异常{}", e.getMessage());return Action.CommitMessage;}}return Action.CommitMessage;}
}

http://www.xdnf.cn/news/1208197.html

相关文章:

  • CentOS安装ffmpeg并转码视频为mp4
  • 【腾讯云】EdgeOne免费版实现网站加速与安全防护
  • 通缩漩涡中的测量突围:新启航如何以国产 3D 白光干涉仪劈开半导体成本困局?
  • 橡胶制品加工:塑造生活的柔韧力量
  • SketchUp纹理贴图插件Architextures安装使用图文教程
  • 【Linux】环境变量
  • 字符串函数安全解析成执行函数
  • 【Spring Boot 快速入门】三、分层解耦
  • 论文阅读--射频电源在半导体领域的应用
  • 【nerf处理视频数据】Instant-NGP项目NeRF模型训练数据集准备指南
  • 机器学习线性回归:从基础到实践的入门指南
  • Golang语言如何高效使用字符串
  • VLA--Gemini Robotics On-Device: 将AI带到本地机器人设备上
  • 字节序详解
  • Windows下基于 SenseVoice模型的本地语音转文字工具
  • 重塑浏览器!微软在Edge加入AI Agent,自动化搜索、预测、整合
  • 数据结构【红黑树】
  • SeeMoE:从零开始实现一个MoE视觉语言模型
  • 【学习笔记】Lean4 定理证明 ing
  • OCR 技术识别全解析:原理、主流方案与实战应用
  • 基于JavaWeb的兼职发布平台的设计与实现
  • React函数组件的“生活管家“——useEffect Hook详解
  • [学习记录]URP流程解析(2)--初始化阶段
  • Rust 实战二 | 开发简易版命令行工具 grep
  • Java程序数据库连接满问题排查指南
  • napping-1.0.1靶机练习
  • SQLAlchemy 全方位指南:从入门到精通
  • RabbitMQ面试精讲 Day 7:消息持久化与过期策略
  • 【C++算法】78.BFS解决FloodFill算法_算法简介
  • umijs局域网访问警告Disconnected from the devServer,trying to reconnect...