当前位置: 首页 > java >正文

rabbitmq-spring-boot-start版本优化升级

文章目录

  • 1.前言
  • 2.优化升级内容
  • 3.依赖
  • 4.使用
    • 4.1发送消息代码示例
    • 4.2消费监听代码示例
    • 4.3 brock中的消息
  • 5.RabbmitMq的MessageConverter消息转换器
    • 5.1默认行为
    • 5.2JDK 序列化的缺点
    • 5.3使用 JSON 进行序列化
  • 6.总结

1.前言

    由于之前手写了一个好用的rabbitmq-spring-boot-start启动器,虽然实现了相关的功能,但是还有一点小问题,这个也是最近在项目中使用我写的这个组件发现的一个问题,所以修复升级了,maven依赖已经退推送到中央仓库上了,只需要引入依赖,然后简单的配置就可以实现发消息了,使用上会有一点需要注意的,请听下文分解。

2.优化升级内容

    升级了RabbitService类中相关发送MQ消息方法上的String msg的类型改为了Object msg,之前在项目中使用的时候发送消息的时候这个参数是String类型会有啥问题,如果要发送一个对象,将这个对象序列化成JSON字符串,最终消费者监听的地方反序列化解析不来,因为收到过的是一个String不是一个JSON的字符串,这个字符串中会含有转义\,还有就是字符串的前后会有一个引号("),导致使用FastJson反序列化解析失败,下面是发送消息的demo代码

 Student student = new Student();student.setName("张三");student.setSex("男");student.setAge(18);RabbitTemplate rabbitTemplate = (RabbitTemplate) ZlfMqSpringUtils.getBean(ZlfMqRegistrarBeanNamePrefix.rabbitTemplatePrefix + 0);rabbitService.sendDelayed(rabbitTemplate, "zlf.delay.test1", "delay.test1.key", JSON.toJSONString(student), 60);

    消费者监听的地方解析代码:

    @RabbitHandler@RabbitListener(queues = "zlf.delay.test1", containerFactory = ZlfMqRegistrarBeanNamePrefix.simpleRabbitListenerContainerFactory + 0)public void mqConsumer(Message message, Channel channel) throws IOException {String msg = new String(message.getBody(), "UTF-8");try {log.info("mqConsumer=====>msg1:{}", msg);msg = StringEscapeUtils.unescapeJava(msg);log.info("mqConsumer=====>msg2:{}", msg);msg = msg.substring(1, msg.length() - 1);log.info("mqConsumer=====>msg3:{}", msg);if (StringUtils.isNotEmpty(msg)) {Student student = JSON.parseObject(message.getBody(), Student.class);log.info("mqConsumer=====>studen:{}", JSON.toJSONString(student));//TODO 业务处理}} catch (Exception e) {log.error("mqConsumer消费异常:{}", e.getMessage());} finally {channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);}//channel.basicNack(); 不ack//channel.basicReject(); 拒绝}

    这种方法虽然可以解决这个消费者反序列化的问题,但是只是对于发送FastJson序列化的对象中的字段不含有特殊的转义字符,如果有会被StringEscapeUtils.unescapeJava方法去除转义,我这种处理是业务上发送那个FastJson序列化的对象字段没有特殊的转义字符,所以这种搞是可以的,于是乎我就抽了一点时间优化升级了下,然后也写了个demo测试了一下,非常的耐思。

3.依赖

    下面两个依赖任选其一引入就可以使用了:

<dependency><groupId>io.gitee.bigbigfeifei</groupId><artifactId>rabbitmq-spring-boot-start</artifactId><version>1.1</version>
</dependency>
或者
<dependency><groupId>io.github.bigbigfeifei</groupId><artifactId>rabbitmq-spring-boot-start</artifactId><version>1.1</version>
</dependency>

4.使用

4.1发送消息代码示例

    只需要发送一个对象就可以了

 Student student = new Student();student.setName("张三");student.setSex("男");student.setAge(18);RabbitTemplate rabbitTemplate = (RabbitTemplate) ZlfMqSpringUtils.getBean(ZlfMqRegistrarBeanNamePrefix.rabbitTemplatePrefix + 0);rabbitService.sendDelayed(rabbitTemplate, "zlf.delay.test1", "delay.test1.key", student, 60);

4.2消费监听代码示例

/*** 延迟插件实现延迟队列监听队列消息** @param message* @param channel* @throws IOException*/@RabbitHandler@RabbitListener(queues = "delay.test1", containerFactory = ZlfMqRegistrarBeanNamePrefix.simpleRabbitListenerContainerFactory + 0)public void mqConsumer1(Message message, Channel channel) throws IOException {try {Student student = JSON.parseObject(message.getBody(), Student.class);log.info("mqConsumer1=====>student:{}", JSON.toJSONString(student));//TODO 业务处理} catch (Exception e) {log.error("mqConsumer1消费异常:{}", e.getMessage());} finally {channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);}//channel.basicNack(); 不ack//channel.basicReject(); 拒绝}

4.3 brock中的消息

image-20250423164138770

    本个start使用的是Jackson2JsonMessageConverter消息转换器来序列化发送消息实体为标准的JSON数据,源码在ZlfRabbitMqRegistrar类中registerBeanDefinitions方法中,代码如下:

 //构建发送的RabbitTemplate实例关联连接工厂Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter);

    为每一个对应的rabbitTemplate对象Bean设置了一个Jackson2JsonMessageConverter消息转换器,这样就可以支持JSON数据的序列化和反序列化了。

5.RabbmitMq的MessageConverter消息转换器

    RabbmitMq默认实现是 SimpleMessageConverter,该类基于 JDK 的 ObjectOutputStream 完成序列化。

5.1默认行为

发送消息时:消息会通过 SimpleMessageConverter 转换为字节流。
接收消息时:消息会被自动反序列化回Java对象。
默认的序列化方式:Spring AMQP默认使用JDK自带的序列化机制。消息格式为 application/x-java-serialized-object。

5.2JDK 序列化的缺点

    默认的 JDK 序列化虽然可以转换任意对象,但它存在几个显著的问题:

    安全漏洞:JDK序列化存在反序列化攻击的风险。
    体积过大:序列化后的字节流体积较大,占用带宽和存储空间。
    可读性差:序列化后的字节流不可读,无法直观查看消息内容。

5.3使用 JSON 进行序列化

    为了避免JDK序列化的缺点,可以使用JSON作为消息的序列化格式,这样:

    数据体积较小。
    数据可读性好。
    提高了消息的安全性。

    所以默认的 JDK 的 ObjectOutputStream 完成序列化时候该如何处理:

    //  字节码转化为对象public Object getObjectFromBytes(byte[] objBytes) throws Exception {if (objBytes == null || objBytes.length == 0) {return null;}ByteArrayInputStream bi = new ByteArrayInputStream(objBytes);ObjectInputStream oi = new ObjectInputStream(bi);return oi.readObject();}

    发送消息时:

Student student = new Student();student.setName("张三");student.setSex("男");student.setAge(18);
byte[] e = getBytesFromObject(student);
Message message = MessageBuilder.withBody(e).setContentType(MessageProperties.CONTENT_TYPE_JSON).build();
//或者直接发一个对象,然后消费监听的时候从byte[]中读取反序列化为一个java对象

    消费者监听调用getObjectFromBytes方法反序列化为一个java对象

 Student student = (Student)getObjectFromBytes(message.getBody());

    这种原始的方式不推荐使用,还是推荐使用JSON的方式。

    MessageConverter接口还有很多的实现类,可以根据你自己的需求去使用或者是自定义一个消息转换器,这个难度也比较的大,所以也不推荐去搞。

image-20250423170056153

    SimpleRabbitListenerContainerFactory、DirectRabbitListenerContainerFactory、RabbitTemplate这三个类中都有setMessageConverter的方法,只要从容器中获取对应的Bean,让然把消息转换器的对象设置进去就可以了,切入点可以使用一个类实现implements InitializingBean接口,在afterPropertiesSet中取出对应的bean来设置

public void afterPropertiesSet() throws Exception {//设置SimpleRabbitListenerContainerFactory的消息转换器为Jackson2JsonMessageConverter支持json格式的数据转换SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = (SimpleRabbitListenerContainerFactory) ZlfMqSpringUtils.getBean(ZlfMqRegistrarBeanNamePrefix.simpleRabbitListenerContainerFactory + 0);simpleRabbitListenerContainerFactory.setMessageConverter(new Jackson2JsonMessageConverter());}

    或者:

RabbitTemplate rabbitTemplate = (RabbitTemplate) ZlfMqSpringUtils.getBean(ZlfMqRegistrarBeanNamePrefix.rabbitTemplatePrefix + 0);rabbitService.sendDelayed(rabbitTemplate, "zlf.delay.test1", "delay.test1.key", JSON.toJSONString(student), 60);

    ZlfMqSpringUtils工具类之前分享的文章里面也有有的叫SpringUtils,有的叫xxxxSpringUtils,都是重复的代码,复制过去换个名字而已。

6.总结

    虽然只改动了一个参数的类型,但是给使用带来了很大的便利性,使用更加丝滑,按照这个套路搞一下,分分钟就集成搞定了,只用专注处理业务了,本次分享到此结束,希望我的分享对你有所启发和帮助,请一键三连,么么么哒!

http://www.xdnf.cn/news/1446.html

相关文章:

  • 算力租赁:重构数字经济的基础设施革命
  • 线程入门3
  • 格雷希尔气瓶充装连接器:广泛应用于工业气体充注站的快速充装,及气瓶生产厂家的气密性测试
  • 从Nacos derby RCE学习derby数据库的利用
  • 【源码分析】Linux内核ov13850.c
  • [HCTF 2018]WarmUp
  • ospf综合练习
  • 【编译原理】第三章 习题
  • 文件【Linux操作系统】
  • Dhtmlx Gantt教程
  • uniapp实现app自动更新
  • ollama本地搭建大模型
  • 伺服器用什么语言开发呢?做什么用什么?
  • Python流程控制
  • 前端面试场景题
  • Java标识符与关键字终极指南:从基础到高级应用
  • 影刀RPA怎么制作文生图,把网站上图片获取到本地文件夹工作流
  • Flutter 学习之旅 之 flutter 使用 【验证码】输入组件的简单封装
  • 安装Jupyter Notebook 之不断报错 差点放弃版
  • 基于Python将MongoDB文本数据通过text2vec-large-chinese模型向量化并存储到Milvus数据库的完整实现方案
  • “在中国,为中国” 英飞凌汽车业务正式发布中国本土化战略
  • 【调优】log日志海量数据分表后查询速度调优
  • 语法长难句
  • 破茧成蝶:阿里云应用服务器让传统 J2EE 应用无缝升级 AI 原生时代
  • 汽车可变转向比系统的全面认识
  • Python3(7) 数字类型
  • 穿越链路的旅程:深入理解计算机网络中的数据链路层
  • OpenVINO教程(五):实现YOLOv11+OpenVINO实时视频目标检测
  • Qt实战之将自定义插件(minGW)显示到Qt Creator列表的方法
  • Stable Baselines3 结合 gym 训练 CartPole 倒立摆