当前位置: 首页 > news >正文

【RabbitMQ】基于Spring Boot + RabbitMQ 完成应用通信

文章目录

  • 需求描述
  • 创建项目
  • 订单系统(生产者)
    • 完善配置
    • 声明队列
    • 下单接口
    • 启动服务
  • 物流系统(消费者)
    • 完善配置
    • 监听队列
    • 启动服务
  • 格式化发送消息对象
    • SimpleMessageConverter
      • 定义一个对象
      • 生产者代码
      • 消费者
      • 运行程序
    • JSON
      • 定义一个对象
      • 生产者代码
      • 定义转换器
      • 消费者代码
      • 运行程序

需求描述

用户下单成功之后,通知物流系统,进行发货(只涉及到应用通信,不做具体功能实现)
image.png

  • 订单系统——生产者
  • 物流系统——消费者

创建项目

通常情况下,订单系统和物流系统是不同团队来开发的,是两个独立的应用

  • 为了方便演示,就把两个项目创建到一个文件夹下

image.png|291

  • 图标没有发生变化,启动类也没有被识别出来
    • 因为 Maven 没有被识别出来
    • 我们手动加入 Maven
      • 在项目目录中右键点击 pom.xml 文件,选择:Add as Maven Projectimage.png|191

订单系统(生产者)

完善配置

image.png

spring.application.name=order-service
server.port=8080  
#amqp://username:password@Ip:port/virtual-host  
spring.rabbitmq.addresses=amqp://order:order@127.0.0.1:5672/order

声明队列

package org.example.order.config;  import org.springframework.amqp.core.Queue;  
import org.springframework.amqp.core.QueueBuilder;  
import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;  @Configuration  
public class RabbitMQConfig {  // 使用简单模式来完成消息发送  @Bean  public Queue orderQueue(){  return QueueBuilder.durable("order.create").build();  }  
}

下单接口

package org.example.order.controller;  import org.springframework.amqp.rabbit.core.RabbitTemplate;  
import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.web.bind.annotation.RequestMapping;  
import org.springframework.web.bind.annotation.RestController;  import java.util.UUID;  @RequestMapping("/order")  
@RestController  
public class OrderController {  // 注入RabbitMQ的客户端  @Autowired  private RabbitTemplate rabbitTemplate;  // 下单的接口  @RequestMapping("/create")  public String create(){  // 参数校验、数据库保存等等...业务代码省略  // 发送消息  // 交换机、队列是什么routingKey就是什么、字符串信息  String orderId = UUID.randomUUID().toString();  rabbitTemplate.convertAndSend("", "order.create", "订单信息,订单ID: " + orderId);  return "下单成功";  }  
}
  • 下单成功之后,发送订单消息

启动服务

  1. 访问接口,模拟下单请求: http://127.0.0.1:8080/order/create

查看消息:image.png

物流系统(消费者)

RabbitMQ 中接收消息

完善配置

8080 端口号已经被订单系统占用了,修改物流系统的端口号为 9090

spring.application.name=logistics-service  
# 两边的端口号不能一样,他们是同时运行的  
server.port=9090  
#amqp://username:password@Ip:port/virtual-host  
spring.rabbitmq.addresses=amqp://guest:guest@127.0.0.1:5672/order

监听队列

package org.example.logistics.listener;  import org.springframework.amqp.rabbit.annotation.RabbitListener;  
import org.springframework.stereotype.Component;  @Component  
public class OrderListener {  @RabbitListener(queues = "order.create") // 指出需要监听的队列的名称  public void handMessage(String orderInfo) {  System.out.println("接收到订单消息: " + orderInfo);  // 收到消息后的处理,代码省略}  
}

启动服务

访问订单系统的接口,模拟下单请求: http://127.0.0.1:8080/order/create

在物流系统的日志中,可以观察到,通过 RabbitMQ,成功把下单信息传递给了物流系统image.png

格式化发送消息对象

如果通过 RabbitTemplate 发送⼀个对象作为消息, 我们需要对该对象进⾏序列化

SimpleMessageConverter

默认使用的是 SimpleMessageConverter 进行序列化

定义一个对象

package org.example.order.model;  import lombok.Data;  import java.io.Serializable;  @Data  
public class OrderInfo implements Serializable {  private String orderId;  private String name;  
}
  • 在发送消息的时候,信息需要经过 MessageConverter 进行转换 (默认是 SimpleMessageConverter)
  • SimpleMessageConverter 只支持 Stringbyte[]Serializable
  • 此接口将 OrderInfo 序列化,之后才能被正常接收 (OrderInfo 类型不被支持,会报 500 错误)

生产者代码

package org.example.order.controller;  import org.example.order.model.OrderInfo;  
import org.springframework.amqp.rabbit.core.RabbitTemplate;  
import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.web.bind.annotation.RequestMapping;  
import org.springframework.web.bind.annotation.RestController;  import java.util.Random;  
import java.util.UUID;  @RequestMapping("/order")  
@RestController  
public class OrderController {  // 注入RabbitMQ的客户端  @Autowired  private RabbitTemplate rabbitTemplate;  // 下单的接口  @RequestMapping("/create2")  public String create2(){  // 发送对象  OrderInfo orderInfo = new OrderInfo();  orderInfo.setOrderId(UUID.randomUUID().toString());  orderInfo.setName("商品" + new Random().nextInt(100));  rabbitTemplate.convertAndSend("", "order.create", orderInfo);  return "下单成功";  }  
}

消费者

package org.example.logistics.listener;  import org.example.order.model.OrderInfo;  
import org.springframework.amqp.rabbit.annotation.RabbitHandler;  
import org.springframework.amqp.rabbit.annotation.RabbitListener;  
import org.springframework.stereotype.Component;  @Component  
@RabbitListener(queues = "order.create") // 指出需要监听的队列的名称  
public class OrderListener {  @RabbitHandler  public void handMessage(String orderInfo) {  System.out.println("接收到订单消息String: " + orderInfo);  // 收到消息后的处理,代码省略  }  
}

运行程序

访问订单系统的接口,模拟下单请求: http://127.0.0.1:8080/order/create2

观察发送的消息image.png

  • 可以看到消息的可读性太差
  • 所以我们使用 JSON 序列化

JSON

使用 SimpleMessageConverter 序列化可读性太差,Spring AMQP 推荐使用 JSON 序列化

  • Spring AMQP 提供了 Jsckson2JsonMessageConverterMappingJackson2MessageConverter 等转换器
  • 我们需要把一个 MessageConverter 设置到 RabbitTemplate

定义一个对象

package org.example.order.model;  import lombok.Data;  @Data  
public class OrderInfo  {  private String orderId;  private String name;  
}

生产者代码

package org.example.order.controller;  import org.example.order.model.OrderInfo;  
import org.springframework.amqp.rabbit.core.RabbitTemplate;  
import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.web.bind.annotation.RequestMapping;  
import org.springframework.web.bind.annotation.RestController;  import java.util.Random;  
import java.util.UUID;  @RequestMapping("/order")  
@RestController  
public class OrderController {  // 注入RabbitMQ的客户端  @Autowired  private RabbitTemplate rabbitTemplate;  // 下单的接口  @RequestMapping("/create2")  public String create2(){  // 发送对象  OrderInfo orderInfo = new OrderInfo();  orderInfo.setOrderId(UUID.randomUUID().toString());  orderInfo.setName("商品" + new Random().nextInt(100));  rabbitTemplate.convertAndSend("", "order.create", orderInfo);  return "下单成功";  }  
}
  • 和前面的使用默认转换器代码一样

定义转换器

package org.example.order.config;  import org.springframework.amqp.core.Queue;  
import org.springframework.amqp.core.QueueBuilder;  
import org.springframework.amqp.rabbit.connection.ConnectionFactory;  
import org.springframework.amqp.rabbit.core.RabbitTemplate;  
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;  
import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;  @Configuration  
public class RabbitMQConfig {  // 使用简单模式来完成消息发送  @Bean  public Queue orderQueue(){  return QueueBuilder.durable("order.create").build();  }  /**  * 创建一个 rabbitTemplate 对象  * @return  */  @Bean  public Jackson2JsonMessageConverter jsonMessageConverter(){  return new Jackson2JsonMessageConverter();  }  @Bean  public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter jsonMessageConverter) {  RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);  rabbitTemplate.setMessageConverter(jsonMessageConverter);  return rabbitTemplate;  }  
}
  • 创建出一个 rabbitTemplate 对象进行使用
  • 生产者(order-service)和消费者(logistics-service)都需要
    • 不然还是拿不到消息

消费者代码

package org.example.logistics.listener;  import org.example.order.model.OrderInfo;  
import org.springframework.amqp.rabbit.annotation.RabbitHandler;  
import org.springframework.amqp.rabbit.annotation.RabbitListener;  
import org.springframework.stereotype.Component;  @Component  
@RabbitListener(queues = "order.create") // 指出需要监听的队列的名称  
public class OrderListener {  @RabbitHandler  public void handMessage2(OrderInfo orderInfo) {  System.out.println("接收到订单消息OrderInfo: " + orderInfo);  // 收到消息后的处理,代码省略  }  
}

@RabbitListener(queues = "order.create") 可以加在类上,也可以加在方法上,用于定义一个类或者方法作为消息的监听器
@RabbitHandler 是一个方法级别的注解,当使用 @RabbitHandler 注解时,这个方法将被调用处理特定的消息

  • 根据调用的类型,调用相关方法

运行程序

访问订单系统的接口,模拟下单请求: http://127.0.0.1:8080/order/create2

image.png

前面的 String 也还可以接收到,只要模拟下单请求: http://127.0.0.1:8080/order/create 即可image.png

  • 归功于 @RabbitHandler指哪打哪
http://www.xdnf.cn/news/650197.html

相关文章:

  • 七、【前端路由篇】掌控全局:Vue Router 实现页面导航、动态路由与权限控制
  • 2025/5/26 学习日记 基本/扩展正则表达式 linux三剑客之grep
  • [ARM][架构] 02.AArch32 程序状态
  • 3DVR拍摄指南:从理论到实践
  • [特殊字符] next-intl 服务端 i18n getTranslations 教程
  • 三分钟了解 MCP 概念(Model Context Protocol,模型上下文协议)
  • CLAM完整流程。patches-feature-split-train-eval
  • 5.26 面经整理 360共有云 golang
  • Java大师成长计划之第31天:Docker与Java应用容器化
  • 基于matlab版本的三维直流电法反演算法
  • 论文阅读: 2023 NeurIPS Jailbroken: How does llm safety training fail?
  • 支持selenium的chrome driver更新到136.0.7103.113
  • C++寻位映射的究极密码:哈希扩展
  • ubuntu 22.04 配置静态IP、网关、DNS
  • 鸿蒙OSUniApp 实现的日期选择器与时间选择器组件#三方框架 #Uniapp
  • 对数的运算困惑
  • 鸿蒙OSUniApp 开发带有通知提示的功能组件#三方框架 #Uniapp
  • Linux《基础IO》
  • 深入Java TCP流套接字编程:高效服务器构建与高并发实战优化指南​
  • Kafka自定义分区策略实战避坑指南
  • 论文阅读笔记:YOLO-World: Real-Time Open-Vocabulary Object Detection
  • nginx安全防护与https部署实战
  • 简述各类机器学习问题
  • 机器学习k近邻,高斯朴素贝叶斯分类器
  • html使用JS实现账号密码登录的简单案例
  • uboot常用命令之eMMC/SD卡命令
  • rpm安装jenkins-2.452
  • 关于vue结合elementUI输入框回车刷新问题
  • API Gateway CLI 实操入门笔记(基于 LocalStack)
  • SQL注入原理及防护方案