自定义rabbitmq的ConnectionFactory配置
文章目录
- 一、回顾一下spring自动装配时的写法
- 二、自己配置 ConnectionFactory
一、回顾一下spring自动装配时的写法
1、在pom文件中引入amqp相关依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、新增application.properties文件相关配置
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=xxxxx
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
#生产者消息确认
spring.rabbitmq.template.mandatory=true
#为队列设置过期消息自动删除的时间 (x-message-ttl) ,单位ms
spring.cloud.stream.rabbit.bindings.input.consumer.ttl=30000
3、写一个producer
直接使用org.springframework.amqp.rabbit.core.RabbitTemplate即可
package com.xxx;import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.nio.charset.StandardCharsets;@Component
public class RabbitMqProducer implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {private final RabbitTemplate rabbitTemplate;@Autowiredprivate ObjectMapper objectMapper;@Autowiredpublic RabbitMqProducer(RabbitTemplate rabbitTemplate, ObjectMapper objectMapper) {this.rabbitTemplate = rabbitTemplate;this.objectMapper = objectMapper;this.rabbitTemplate.setConfirmCallback(this);this.rabbitTemplate.setReturnsCallback(this);this.rabbitTemplate.setMandatory(true);}/*** 以String格式发送rmq消息* 如果消息不是String,则会通过json序列化工具进行序列化后才发送** @param routingKey rmq 路由* @param msg 消息体*/public void send(String routingKey, Object msg) {if (msg instanceof String) {this.sendString(routingKey, (String) msg);} else {try {String str = objectMapper.writeValueAsString(msg);this.sendString(routingKey, str);} catch (JsonProcessingException e) {LOGGER.errorWithErrorCode(ErrorCode.Common.ERR_DATA.getCode(), "convert msg from Object to String fail.");} catch (Exception e){LOGGER.errorWithErrorCode(ErrorCode.Common.ERR_DATA.getCode(), e.getMessage(), e);}}}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (LOGGER.isDebugEnabled()) {if (ack) {LOGGER.debug("rabbitmq message id: {}, confirm success", correlationData == null ? null : correlationData.getId());} else {LOGGER.debug("rabbitmq message id: {}, confirm fail, cause: {}", correlationData == null ? null : correlationData.getId(), cause);}}}@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {if (LOGGER.isDebugEnabled()) {LOGGER.debug("rabbitmq returned message:{},replyCode:{} ,replyText:{},exchange:{},routingKey:{}",new String(returnedMessage.getMessage().getBody(), StandardCharsets.UTF_8), returnedMessage.getReplyCode(), returnedMessage.getReplyText(), returnedMessage.getExchange(), returnedMessage.getRoutingKey());}}private void sendString(String routingKey, String msg) {CorrelationData cd = new CorrelationData(UUID.randomUUID().toString().replace("-", ""));this.rabbitTemplate.convertAndSend(CommonConstants.Rabbitmq.EXCHANGE_TRANS_URL, routingKey, msg, cd);LOGGER.debug("rabbitmq message send, routing key: {}, msg: {}, message id:{}", routingKey, msg, cd.getId());}
4、在service中注入RabbitMqProducer
@Autowired
private RabbitMqProducer rabbitSender;public void test(){rabbitSender.send(CommonConstants.Rabbitmq.ROUTING_KEY_XXX,xxxMqBO);
}
二、自己配置 ConnectionFactory
1、引入依赖、以及producer和service保持不变即可。
2、在启动类中关闭rabbitmq的自动配置
@SpringBootApplication(scanBasePackages = {"com.xxx", "com.yyy"}, exclude = {RabbitAutoConfiguration.class})//关闭自动配置
@EnableTransactionManagement
@EnableFeignClients
@EnableDiscoveryClient
@EnableScheduling
@EnableCaching
@EnableAsync
public class xxxApplication {public static void main(String[] args) {SpringApplication.run(xxxApplication.class, args);}
}
3、编写配置类,自定义ConnectionFactory
由于我这里的目的是使用ssl端口时跳过证书认证,所以会有这部分逻辑
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.Base64Utils;import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import java.nio.charset.StandardCharsets;@Configuration
@EnableAutoConfiguration
@EnableConfigurationProperties(RabbitMqProperties.class)
public class RabbitConfiguration {@Autowiredprivate RabbitMqProperties rabbitMqProperties;@Bean(name = "rabbitConnectionFactory")public ConnectionFactory rabbitConnectionFactory() throws Exception {LOGGER.info("Creating RabbitMQ ConnectionFactory with host: {}", rabbitMqProperties.getHost());CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost(rabbitMqProperties.getHost());boolean sslFlag = false;if (StringUtils.isNotEmpty(rabbitMqProperties.getAmqpSslPort())){connectionFactory.setPort(Integer.parseInt(rabbitMqProperties.getAmqpSslPort()));LOGGER.info("ConnectionFactory port:{}",rabbitMqProperties.getAmqpSslPort());sslFlag = true;}else if(StringUtils.isNotEmpty(rabbitMqProperties.getAmqpPort())){connectionFactory.setPort(Integer.parseInt(rabbitMqProperties.getAmqpPort()));}connectionFactory.setUsername(rabbitMqProperties.getUsername());connectionFactory.setPassword(rabbitMqProperties.getPassword());connectionFactory.setVirtualHost(rabbitMqProperties.getVirtualHost());connectionFactory.setConnectionTimeout(rabbitMqProperties.getConnectionTimeout());// 设置发布确认connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.valueOf(rabbitMqProperties.getPublisherConfirmType().toUpperCase()));connectionFactory.setPublisherReturns(rabbitMqProperties.isPublisherReturns());if(sslFlag){// 创建一个信任所有证书的 TrustManagerTrustManager[] trustAllCerts = new TrustManager[]{new NoCheckTrustManager()};SSLContext sslContext = SSLContext.getInstance("TLS");sslContext.init(null, trustAllCerts, null);connectionFactory.getRabbitConnectionFactory().useSslProtocol(sslContext);}else {LOGGER.info("Using plain AMQP connection for port: {}", rabbitMqProperties.getAmqpPort());}return connectionFactory;}@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate template = new RabbitTemplate(connectionFactory);template.setMandatory(rabbitMqProperties.getTemplate().isMandatory());return template;}
}
4、application.properties中的配置项,由原来的spring开头改为自定义风格,比如:
myApp.rabbitmq.host=127.0.0.1
myApp.rabbitmq.port=5672
myApp.rabbitmq.username=root
myApp.rabbitmq.password=xxxxx
myApp.rabbitmq.virtual-host=/
myApp.rabbitmq.connection-timeout=15000
myApp.rabbitmq.publisher-confirm-type=correlated
myApp.rabbitmq.publisher-returns=true
#生产者消息确认
myApp.rabbitmq.template.mandatory=true
#为队列设置过期消息自动删除的时间 (x-message-ttl) ,单位ms
myApp.rabbitmq.bindings.input.consumer.ttl=30000
对应的RabbitMqProperties:
@ConfigurationProperties(prefix = "myApp.rabbitmq")
@Data
@Component
public class RabbitMqProperties {private String host;private String amqpSslPort;private String amqpPort;private String username;private String password;private String virtualHost = "/";private int connectionTimeout = 15000;private String publisherConfirmType = "correlated";private boolean publisherReturns = true;private Template template = new Template();private Bindings bindings = new Bindings();@Datapublic static class Template {private boolean mandatory = true;}@Datapublic static class Bindings {private Input input = new Input();@Datapublic static class Input {private Consumer consumer = new Consumer();@Datapublic static class Consumer {private long ttl = 3600000L;}}}
}