当前位置: 首页 > news >正文

自定义rabbitmq的ConnectionFactory配置

文章目录

  • 一、回顾一下spring自动装配时的写法
  • 二、自己配置 ConnectionFactory

一、回顾一下spring自动装配时的写法

1、在pom文件中引入amqp相关依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、新增application.properties文件相关配置

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=xxxxx
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
#生产者消息确认
spring.rabbitmq.template.mandatory=true
#为队列设置过期消息自动删除的时间 (x-message-ttl) ,单位ms
spring.cloud.stream.rabbit.bindings.input.consumer.ttl=30000

3、写一个producer
直接使用org.springframework.amqp.rabbit.core.RabbitTemplate即可

package com.xxx;import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.nio.charset.StandardCharsets;@Component
public class RabbitMqProducer implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {private final RabbitTemplate rabbitTemplate;@Autowiredprivate ObjectMapper objectMapper;@Autowiredpublic RabbitMqProducer(RabbitTemplate rabbitTemplate, ObjectMapper objectMapper) {this.rabbitTemplate = rabbitTemplate;this.objectMapper = objectMapper;this.rabbitTemplate.setConfirmCallback(this);this.rabbitTemplate.setReturnsCallback(this);this.rabbitTemplate.setMandatory(true);}/*** 以String格式发送rmq消息* 如果消息不是String,则会通过json序列化工具进行序列化后才发送** @param routingKey rmq 路由* @param msg 消息体*/public void send(String routingKey, Object msg) {if (msg instanceof String) {this.sendString(routingKey, (String) msg);} else {try {String str = objectMapper.writeValueAsString(msg);this.sendString(routingKey, str);} catch (JsonProcessingException e) {LOGGER.errorWithErrorCode(ErrorCode.Common.ERR_DATA.getCode(), "convert msg from Object to String fail.");} catch (Exception e){LOGGER.errorWithErrorCode(ErrorCode.Common.ERR_DATA.getCode(), e.getMessage(), e);}}}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (LOGGER.isDebugEnabled()) {if (ack) {LOGGER.debug("rabbitmq message id: {}, confirm success", correlationData == null ? null : correlationData.getId());} else {LOGGER.debug("rabbitmq message id: {}, confirm fail, cause: {}", correlationData == null ? null : correlationData.getId(), cause);}}}@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {if (LOGGER.isDebugEnabled()) {LOGGER.debug("rabbitmq returned message:{},replyCode:{} ,replyText:{},exchange:{},routingKey:{}",new String(returnedMessage.getMessage().getBody(), StandardCharsets.UTF_8), returnedMessage.getReplyCode(), returnedMessage.getReplyText(), returnedMessage.getExchange(), returnedMessage.getRoutingKey());}}private void sendString(String routingKey, String msg) {CorrelationData cd = new CorrelationData(UUID.randomUUID().toString().replace("-", ""));this.rabbitTemplate.convertAndSend(CommonConstants.Rabbitmq.EXCHANGE_TRANS_URL, routingKey, msg, cd);LOGGER.debug("rabbitmq message send, routing key: {}, msg: {}, message id:{}", routingKey, msg, cd.getId());}

4、在service中注入RabbitMqProducer

@Autowired
private RabbitMqProducer rabbitSender;public void test(){rabbitSender.send(CommonConstants.Rabbitmq.ROUTING_KEY_XXX,xxxMqBO);
}

二、自己配置 ConnectionFactory

1、引入依赖、以及producer和service保持不变即可。
2、在启动类中关闭rabbitmq的自动配置

@SpringBootApplication(scanBasePackages = {"com.xxx", "com.yyy"}, exclude = {RabbitAutoConfiguration.class})//关闭自动配置
@EnableTransactionManagement
@EnableFeignClients
@EnableDiscoveryClient
@EnableScheduling
@EnableCaching
@EnableAsync
public class xxxApplication {public static void main(String[] args) {SpringApplication.run(xxxApplication.class, args);}
}

3、编写配置类,自定义ConnectionFactory
由于我这里的目的是使用ssl端口时跳过证书认证,所以会有这部分逻辑

import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.Base64Utils;import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import java.nio.charset.StandardCharsets;@Configuration
@EnableAutoConfiguration
@EnableConfigurationProperties(RabbitMqProperties.class)
public class RabbitConfiguration {@Autowiredprivate RabbitMqProperties rabbitMqProperties;@Bean(name = "rabbitConnectionFactory")public ConnectionFactory rabbitConnectionFactory() throws Exception {LOGGER.info("Creating RabbitMQ ConnectionFactory with host: {}", rabbitMqProperties.getHost());CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost(rabbitMqProperties.getHost());boolean sslFlag = false;if (StringUtils.isNotEmpty(rabbitMqProperties.getAmqpSslPort())){connectionFactory.setPort(Integer.parseInt(rabbitMqProperties.getAmqpSslPort()));LOGGER.info("ConnectionFactory port:{}",rabbitMqProperties.getAmqpSslPort());sslFlag = true;}else if(StringUtils.isNotEmpty(rabbitMqProperties.getAmqpPort())){connectionFactory.setPort(Integer.parseInt(rabbitMqProperties.getAmqpPort()));}connectionFactory.setUsername(rabbitMqProperties.getUsername());connectionFactory.setPassword(rabbitMqProperties.getPassword());connectionFactory.setVirtualHost(rabbitMqProperties.getVirtualHost());connectionFactory.setConnectionTimeout(rabbitMqProperties.getConnectionTimeout());// 设置发布确认connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.valueOf(rabbitMqProperties.getPublisherConfirmType().toUpperCase()));connectionFactory.setPublisherReturns(rabbitMqProperties.isPublisherReturns());if(sslFlag){// 创建一个信任所有证书的 TrustManagerTrustManager[] trustAllCerts = new TrustManager[]{new NoCheckTrustManager()};SSLContext sslContext = SSLContext.getInstance("TLS");sslContext.init(null, trustAllCerts, null);connectionFactory.getRabbitConnectionFactory().useSslProtocol(sslContext);}else {LOGGER.info("Using plain AMQP connection for port: {}", rabbitMqProperties.getAmqpPort());}return connectionFactory;}@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate template = new RabbitTemplate(connectionFactory);template.setMandatory(rabbitMqProperties.getTemplate().isMandatory());return template;}
}

4、application.properties中的配置项,由原来的spring开头改为自定义风格,比如:

myApp.rabbitmq.host=127.0.0.1
myApp.rabbitmq.port=5672
myApp.rabbitmq.username=root
myApp.rabbitmq.password=xxxxx
myApp.rabbitmq.virtual-host=/
myApp.rabbitmq.connection-timeout=15000
myApp.rabbitmq.publisher-confirm-type=correlated
myApp.rabbitmq.publisher-returns=true
#生产者消息确认
myApp.rabbitmq.template.mandatory=true
#为队列设置过期消息自动删除的时间 (x-message-ttl) ,单位ms
myApp.rabbitmq.bindings.input.consumer.ttl=30000

对应的RabbitMqProperties:

@ConfigurationProperties(prefix = "myApp.rabbitmq")
@Data
@Component
public class RabbitMqProperties {private String host;private String amqpSslPort;private String amqpPort;private String username;private String password;private String virtualHost = "/";private int connectionTimeout = 15000;private String publisherConfirmType = "correlated";private boolean publisherReturns = true;private Template template = new Template();private Bindings bindings = new Bindings();@Datapublic static class Template {private boolean mandatory = true;}@Datapublic static class Bindings {private Input input = new Input();@Datapublic static class Input {private Consumer consumer = new Consumer();@Datapublic static class Consumer {private long ttl = 3600000L;}}}
}
http://www.xdnf.cn/news/1344313.html

相关文章:

  • RabbitMQ深度剖析:从基础到高级进阶实战
  • 乐迪信息:AI摄像机+刮板机人员入侵检测:杜绝井下安全事故
  • 爬虫基础学习-配置代理、以及项目实践
  • 关于爬虫的基本步骤说明【爬虫七步骤】
  • jenkins实现分布式构建并自动发布到远程服务器上 jenkins实现自动打包编译发布远程服务器
  • Laravel分布式全链路追踪实战
  • 【机器学习深度学习】LMDeploy的分布式推理实现
  • selenium爬虫
  • 布隆过滤器:用微小的空间代价换取高效的“可能存在”判定
  • TCP/UDP详解(一)
  • 微服务的编程测评系统14-C端题目列表功能-个人中心
  • Redis面试精讲 Day 27:Redis 7.0/8.0新特性深度解析
  • 高通Camx相机dump yuv和raw图的抓取方式和查看
  • 【iOS】YYModel第三方库源码
  • 笔试——Day46
  • 恢复性测试:定义、重要性及实施方法
  • 深入解析CNAME记录:域名管理的隐形枢纽
  • 几个element-plus的UI,及环境配置
  • 三格电子——ModbusTCP 转 Profinet 主站网关应用实例
  • 【TrOCR】根据任务特性设计词表vocab.json
  • RabbitMQ面试精讲 Day 27:常见故障排查与分析
  • 【数据结构C语言】顺序表
  • 四十一、【高级特性篇】API 文档驱动:OpenAPI/Swagger 一键导入测试用例
  • Design Compiler:层次模型(Block Abstraction)的简介
  • memcmp 函数的使用及其模拟实现
  • 数学建模--Topsis
  • 分布式与微服务
  • [特殊字符] 潜入深渊:探索 Linux 内核源码的奇幻之旅与生存指南
  • LeetCode Hot 100 第一天
  • 相机曝光调节与自动曝光控制详解