SpringBoot整合RocketMQ(阿里云ONS)
目录
配置pom.xml
配置application.properties
MQ配置类
注入消息生产者配置信息
消息生产者
消费者配置
消息消费者
配置pom.xml
<!-- 政务云MQ版本是4.x的,请使用对应的SDK ons-client-1.8.8.x -->
<dependency><groupId>com.aliyun.openservices</groupId><artifactId>ons-client</artifactId><version>1.8.8.8.Final</version>
</dependency>
配置application.properties
rocketmq.accessKey=xxx
rocketmq.secretKey=zzz
rocketmq.nameSrvAddr=http://yyy.aliyun.com:9876
rocketmq.groupId=GID_abc
rocketmq.topic=abcTopic
rocketmq.tag=TagA
MQ配置类
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Properties;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;@Getter
@Setter
@Configuration
@ConfigurationProperties(prefix = "rocketmq")
public class RocketMQConfig {private String accessKey;private String secretKey;private String nameSrvAddr;private String groupId;private String topic;private String tag;public Properties getMqPropertie() {Properties properties = new Properties();properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);return properties;}
}
注入消息生产者配置信息
ProducerBean用于将Producer集成至SpringBoot中
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RocketMQProducerClient {@Autowiredprivate RocketMQConfig mqConfig;@Bean(initMethod = "start", destroyMethod = "shutdown")public ProducerBean buildProducer() {ProducerBean producer = new ProducerBean();producer.setProperties(mqConfig.getMqPropertie());return producer;}
}
消息生产者
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class RocketMQProducer {private static final Logger LOGGER = LoggerFactory.getLogger(RocketMQProducer.class);@Autowiredprivate ProducerBean producer;@Autowiredprivate RocketMQConfig mqConfig;public SendResult send(String body) {Message msg = new Message(mqConfig.getTopic(), mqConfig.getTag(), body.getBytes());msg.setKey(null);LOGGER.info("准备发送消息为:{}", body);try {SendResult sendResult = producer.send(msg);if (sendResult != null) {LOGGER.info("发送MQ消息成功");return sendResult;} else {LOGGER.error("发送MQ消息失败");return null;}} catch (ONSClientException e) {LOGGER.error("发送MQ消息失败");LOGGER.error("Exception -- : ", e);}return null;}
}
消费者配置
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.bean.ConsumerBean;
import com.aliyun.openservices.ons.api.bean.Subscription;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RocketMQConsumerClient {@Autowiredprivate RocketMQConfig mqConfig;@Autowiredprivate MqReceiver messageListener;@Bean(initMethod = "start", destroyMethod = "shutdown")public ConsumerBean buildConsumer() {ConsumerBean consumerBean = new ConsumerBean();Properties properties = mqConfig.getMqPropertie();properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getGroupId());properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");properties.setProperty(PropertyKeyConst.MaxReconsumeTimes, "-1");consumerBean.setProperties(properties);Map<Subscription, MessageListener> subscriptionTable = new HashMap<>();Subscription subscription = new Subscription();subscription.setTopic(mqConfig.getTopic());subscription.setExpression(mqConfig.getTag());subscriptionTable.put(subscription, messageListener);consumerBean.setSubscriptionTable(subscriptionTable);return consumerBean;}
}
消息消费者
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;@Component
public class MqReceiver implements MessageListener {private static final Logger LOGGER = LoggerFactory.getLogger(MqReceiver.class);@Overridepublic Action consume(Message message, ConsumeContext consumeContext) {if (Objects.nonNull(message)) {LOGGER.info("MQ接收到的消息为:" + message.toString());try {String data = new String(message.getBody(), StandardCharsets.UTF_8);LOGGER.info("MQ消息topic={}, 消息内容={}", message.getTopic(), data);} catch (Exception e) {LOGGER.error("获取MQ消息内容异常{}", e.getMessage());return Action.CommitMessage;}}return Action.CommitMessage;}
}