当前位置: 首页 > backend >正文

rabbitmq--默认模式(点对点)

导入包:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application.yml 

springrabbitmq:host: localhostport: 5672username: guestpassword: guestvirtual-host: /  # 默认虚拟主机listener:simple:acknowledge-mode: manual # 可根据需要设置为autopublisher-confirm-type: correlated

RabbitMQConfig.java
package com.example.dyreportapi.config;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
@Configuration
public class RabbitMQConfig {public static final String DECLARE_QUEUE = "*****替换正式队列名***"; // 申报队列public static final String RESPONSE_QUEUE = "***替换正式队列名**"; // 回执队列public static final String RK_QUEUE = "***替换正式队列名**"; // 出入库队列@Beanpublic Queue declareQueue() {return new Queue(DECLARE_QUEUE, true); // durable}@Beanpublic Queue responseQueue() {return new Queue(RESPONSE_QUEUE, true);}public Queue rkQUEUE() {return new Queue(RK_QUEUE, true);}@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate template = new RabbitTemplate(connectionFactory);template.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {System.out.println("消息成功投递至 RabbitMQ Broker");} else {System.out.println("消息未投递成功:" + cause);}});return template;}}

ResponseMessageListener.java 

import com.example.dyreportapi.config.RabbitMQConfig;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import javax.xml.parsers.ParserConfigurationException;
import org.xml.sax.SAXException;@Component
public class ResponseMessageListener {@RabbitListener(queues = RabbitMQConfig.RESPONSE_QUEUE)public void receiveMessage(String message, Channel channel, Message amqpMessage) throws IOException {System.out.println("接收到回执消息: " + message);long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();try {// 解析XML消息if (isXmlMessage(message)) {parseXmlMessage(message);}// 判断是否是某个业务消息的回执if (message.contains("03355813-a4c0-49b8-955b-edd0e2934275") || message.contains("LT072303")) {System.out.println("✅ 匹配到本次发送的车辆备案回执!");// 正常确认消息channel.basicAck(deliveryTag, false);} else {// 不匹配的消息,拒绝并重新入队给其他消费者处理channel.basicNack(deliveryTag, false, true); // requeue=true}} catch (Exception e) {System.err.println("处理消息时发生错误: " + e.getMessage());// 处理异常时,可以选择重新入队或不重新入队channel.basicNack(deliveryTag, false, true);}}/*** 判断消息是否为XML格式*/private boolean isXmlMessage(String message) {return message.trim().startsWith("<") && message.trim().endsWith(">");}/*** 解析XML消息并提取关键标签*/private void parseXmlMessage(String xmlMessage) {try {DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();DocumentBuilder builder = factory.newDocumentBuilder();Document document = builder.parse(new ByteArrayInputStream(xmlMessage.getBytes("UTF-8")));// 获取根元素Element root = document.getDocumentElement();System.out.println("根元素: " + root.getNodeName());// 解析常见的回执标签示例parseCommonResponseTags(document);} catch (ParserConfigurationException | SAXException | IOException e) {System.err.println("XML解析失败: " + e.getMessage());}}/*** 解析常见的回执标签*/private void parseCommonResponseTags(Document document) {// 解析PRE_NO标签NodeList preNoNodes = document.getElementsByTagName("PRE_NO");if (preNoNodes.getLength() > 0) {String preNo = preNoNodes.item(0).getTextContent();System.out.println("PRE_NO: " + preNo);}// 解析MESSAGE_ID标签NodeList messageIdNodes = document.getElementsByTagName("MESSAGE_ID");if (messageIdNodes.getLength() > 0) {String messageId = messageIdNodes.item(0).getTextContent();System.out.println("MESSAGE_ID: " + messageId);}// 解析RESPONSE_CODE标签NodeList responseCodeNodes = document.getElementsByTagName("RESPONSE_CODE");if (responseCodeNodes.getLength() > 0) {String responseCode = responseCodeNodes.item(0).getTextContent();System.out.println("RESPONSE_CODE: " + responseCode);}// 解析RESPONSE_MESSAGE标签NodeList responseMessageNodes = document.getElementsByTagName("RESPONSE_MESSAGE");if (responseMessageNodes.getLength() > 0) {String responseMessage = responseMessageNodes.item(0).getTextContent();System.out.println("RESPONSE_MESSAGE: " + responseMessage);}// 解析NK_STATUS标签NodeList nkStatusNodes = document.getElementsByTagName("NK_STATUS");if (nkStatusNodes.getLength() > 0) {String nkStatus = nkStatusNodes.item(0).getTextContent();System.out.println("NK_STATUS: " + nkStatus);}}
}
package com.example.dyreportapi.controller;import com.example.dyreportapi.service.VehicleDeclareSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class VehicleDeclareController {@Autowiredprivate VehicleDeclareSender sender;@GetMapping("/sendVehicle")public String sendVehicleDeclare() {sender.sendVehicleDeclareMessage();return "车辆备案报文已发送";}@GetMapping("/sendVehicleOut")public String sendVehicleOut() {sender.sendVehicleHfdMessage();return "车辆出厂报文已发送";}
}
package com.example.dyreportapi.service;import com.example.dyreportapi.config.RabbitMQConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.UUID;@Service
public class VehicleDeclareSender {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 车辆备案报文*/public void sendVehicleDeclareMessage() {String messageId = UUID.randomUUID().toString();String messageDate = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));System.out.println(messageId);System.out.println(messageDate);String xml = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" +"<DECLARE_DATA>\n" +"  <MESSAGE_HEAD>\n" +"    <MESSAGE_TYPE>VEH101</MESSAGE_TYPE>\n" +// 报文类型"    <MESSAGE_ID>" + messageId + "</MESSAGE_ID>\n" +// 报文ID"    <MESSAGE_TYPE>VEH101</MESSAGE_TYPE>\n" +// 报文类型"    <PRE_NO>LT072303</PRE_NO>\n" +// 预录入编号"    <B2B_NO>LT072303</B2B_NO>\n" +// 预录入编号"    <MESSAGE_DATE>" + messageDate + "</MESSAGE_DATE>\n" +// 报文时间"    <SENDER_ID>3408660A06</SENDER_ID>\n" +// 发送方ID
//                "    <SEND_ADDRESS>安庆</SEND_ADDRESS>\n" +// 发送方地址"    <RECEIVER_ID>3329</RECEIVER_ID>\n" +// 接收方ID"  </MESSAGE_HEAD>\n" +"  <VEH101>\n" +"    <VEH_BSC>\n" +"      <VEHICLE_NO>皖A12345</VEHICLE_NO>\n" +// 车牌号"      <B2B_NO>皖A12345</B2B_NO>\n" +// 车牌号"      <VEHICLE_TYPE>3</VEHICLE_TYPE>\n" +// 车辆类型"      <VEHICLE_WT>12345.67</VEHICLE_WT>\n" +// 车辆重量"      <CUSTOMS_CODE>3329</CUSTOMS_CODE>\n" +// 关区代码 备注:3329安庆综保
//                "      <NOTE>测试车辆备案</NOTE>\n" +// 备注"      <DCL_TYPECD>1</DCL_TYPECD>\n" +// 申报类型 1:备案;2:变更;
//                "      <STEP_ID></STEP_ID>\n" +// 当前环节"      <DECLARE_CODE>3408660A06</DECLARE_CODE>\n" + // 申报公司编号"      <DECLARE_NAME>安庆振新汽车有限公司</DECLARE_NAME>\n" +// 申报公司名称"      <CREATE_BY>admin</CREATE_BY>\n" +// 创建人"      <CREATE_TIME>" + messageDate + "</CREATE_TIME>\n" +// 创建时间"      <DECLARE_BY>admin</DECLARE_BY>\n" +// 申报人"      <DECLARE_TIME>" + messageDate + "</DECLARE_TIME>\n" +// 申报时间
//                "      <APPROVE_TIME></APPROVE_TIME>\n" +// 审核时间"    </VEH_BSC>\n" +"  </VEH101>\n" +"</DECLARE_DATA>";System.out.println("已发送车辆备案报文:" + xml);rabbitTemplate.convertAndSend(RabbitMQConfig.DECLARE_QUEUE, xml);System.out.println("已发送车辆备案报文至队列:" + RabbitMQConfig.DECLARE_QUEUE);}/*** 整车出厂报文*/public void sendVehicleHfdMessage() {String messageId = UUID.randomUUID().toString();String messageDate = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));System.out.println(messageId);System.out.println(messageDate);/*** <?xml version="1.0" encoding="UTF-8" standalone="yes"?>* <JEERP_BZ>*     <MESSAGE_HEAD>*         <MESSAGE_TYPE>HFDB2B</MESSAGE_TYPE>*         <MESSAGE_ID>232ab09e-75d0-4fbe-a94e-c02f752c6cf6-20250729101240</MESSAGE_ID>*         <MESSAGE_TIME>2025-07-29T10:12:40</MESSAGE_TIME>*         <SENDER_ADDRESS></SENDER_ADDRESS>*         <RECEIVER_ID>JGEQ</RECEIVER_ID>*         <RECEIVER_ADDRESS></RECEIVER_ADDRESS>*         <CUSTOM_CODE>3329</CUSTOM_CODE>*         <SEQNO>HFD202507297719</SEQNO>*         <NOTE></NOTE>*         <POCKET_ID>null232ab09e-75d0-4fbe-a94e-c02f752c6cf6-20250729101240</POCKET_ID>*         <TOTAL_POCKET_QTY>1</TOTAL_POCKET_QTY>*         <CUR_POCKET_NO>1</CUR_POCKET_NO>*         <ROUTE_CODE></ROUTE_CODE>*     </MESSAGE_HEAD>*     <MESSAGE_BODY>*         <JG_HFD_HEAD_B2B>*             <ERP_NO>HFD202507297719</ERP_NO>*             <DCL_TYPECD>1</DCL_TYPECD>*             <PASSPORT_TYPECD>5</PASSPORT_TYPECD>*             <IO_TYPECD>I</IO_TYPECD>*             <MASTER_CUSCD>3329</MASTER_CUSCD>*             <AREAIN_ETPSNO>3408660A06</AREAIN_ETPSNO>*             <DCL_ETPSNO>3408660A06</DCL_ETPSNO>*             <INPUT_CODE>3408660A06</INPUT_CODE>*             <CREATE_USER>hxy0154</CREATE_USER>*             <VEHICLE_NO>LT072302</VEHICLE_NO>*             <VEHICLE_WT>10000</VEHICLE_WT>*             <TOTAL_GROSS_WT>560</TOTAL_GROSS_WT>*             <TOTAL_NET_WT>510</TOTAL_NET_WT>*             <CREATE_DATE>2025-07-29T10:12:40</CREATE_DATE>*             <TRAILER_NO>LT072302</TRAILER_NO>*             <PLAT_TYPECD>0</PLAT_TYPECD>*             <COL1>0</COL1>*             <IS_PP>0</IS_PP>*             <ETPS_PREENT_NO>HFD202507297719</ETPS_PREENT_NO>*         </JG_HFD_HEAD_B2B>*         <JG_HFD_LIST_B2B>*             <GUID_L>HFD202507297719_1</GUID_L>*             <PASSPORT_SEQNO>1</PASSPORT_SEQNO>*             <GDECD>LLDB1</GDECD>*             <GDS_NM>领料调拨</GDS_NM>*             <DCL_UNITCD>007</DCL_UNITCD>*             <DCL_QTY>30</DCL_QTY>*             <GROSS_WT>560</GROSS_WT>*             <NET_WT>510</NET_WT>*             <IS_GOODS>0</IS_GOODS>*         </JG_HFD_LIST_B2B>*     </MESSAGE_BODY>* </JEERP_BZ>*/// 构建基础的XML头部和尾部StringBuilder xmlBuilder = new StringBuilder();xmlBuilder.append("<?xml version=\"1.0\" encoding=\"UTF-8\"  standalone=\"yes\" ?>\n").append("<JEERP_BZ>\n").append("  <MESSAGE_HEAD>\n")//报文类型.append("    <MESSAGE_TYPE>HFDB2B</MESSAGE_TYPE>\n")//报文ID.append("    <MESSAGE_ID>").append(messageId).append("</MESSAGE_ID>\n")//报文发送时间.append("    <MESSAGE_TIME>").append(messageDate).append("</MESSAGE_TIME>\n")//发送方地址.append("    <SENDER_ADDRESS></SENDER_ADDRESS>\n")//申报公司编号.append("    <RECEIVER_ID>JGEQ</RECEIVER_ID>\n")//接收方地址.append("    <RECEIVER_ADDRESS></RECEIVER_ADDRESS>\n")//使用您提供的安庆关区代码.append("    <CUSTOM_CODE>3329</CUSTOM_CODE>\n")//业务单号,唯一.append("    <SEQNO>HFD202507245559</SEQNO>\n")//备注.append("    <NOTE></NOTE>\n").append("    <POCKET_ID>").append(messageId).append("</POCKET_ID>\n").append("    <TOTAL_POCKET_QTY>1</TOTAL_POCKET_QTY>\n").append("    <CUR_POCKET_NO>1</CUR_POCKET_NO>\n").append("    <ROUTE_CODE></ROUTE_CODE>\n").append("  </MESSAGE_HEAD>\n").append("  <MESSAGE_BODY>\n").append("  <JG_HFD_HEAD_B2B>\n")//业务单号,唯一.append("    <ERP_NO>HFD202507245559</ERP_NO>\n")//申报类型 1备案  3作废.append("    <DCL_TYPECD>1</DCL_TYPECD>\n")//核放单类型 5-卡口登记货物 6-空车进出区.append("    <PASSPORT_TYPECD>5</PASSPORT_TYPECD>\n")//进出标志 I-进区,E-出区.append("    <IO_TYPECD>E</IO_TYPECD>\n")//主管海关.append("    <MASTER_CUSCD>3329</MASTER_CUSCD>\n")//国内企业编码.append("    <AREAIN_ETPSNO>3408660A06</AREAIN_ETPSNO>\n")//申报单位代码.append("    <DCL_ETPSNO>3408660A06</DCL_ETPSNO>\n")//录入单位代码.append("    <INPUT_CODE>3408660A06</INPUT_CODE>\n")//创建.append("    <CREATE_USER>hxy0154</CREATE_USER>\n")//车牌号.append("    <VEHICLE_NO>YQY99999</VEHICLE_NO>\n")//车自重.append("    <VEHICLE_WT>1000.19</VEHICLE_WT>\n")//货物总毛重.append("    <TOTAL_GROSS_WT>560</TOTAL_GROSS_WT>\n")//货物总净重.append("    <TOTAL_NET_WT>510</TOTAL_NET_WT>\n")//创建时间.append("    <CREATE_DATE>").append(messageDate).append("</CREATE_DATE>\n")//挂车车牌号.append("    <TRAILER_NO>YQY99999</TRAILER_NO>\n")//是否无车辆运输货物 0 否 1是.append("    <PLAT_TYPECD>0</PLAT_TYPECD>\n")//到货确认标记.append("    <COL1>0</COL1>\n")//是否拼票业务.append("    <IS_PP>0</IS_PP>\n")//企业内部编号.append("    <ETPS_PREENT_NO>HFD202507245559</ETPS_PREENT_NO>\n").append("  </JG_HFD_HEAD_B2B>\n");// 使用for循环生成多个JG_HFD_LIST_B2B项for (int i = 1; i <= 1; i++) {xmlBuilder.append("  <JG_HFD_LIST_B2B>\n")// 报文ID.append("    <GUID_L>HFD202507245559_").append(i).append("</GUID_L>\n")// 序号 从1开始递增的正整数.append("    <PASSPORT_SEQNO>1</PASSPORT_SEQNO>\n")// 商品编码.append("    <GDECD>LLDB1</GDECD>\n")// 商品名称.append("    <GDS_NM>车辆1</GDS_NM>\n")// 申报计量单位.append("    <DCL_UNITCD>007</DCL_UNITCD>\n")// 申报数量.append("    <DCL_QTY>3</DCL_QTY>\n")// 申报毛重.append("    <GROSS_WT>560</GROSS_WT>\n")// 申报净重.append("    <NET_WT>510</NET_WT>\n")// 是否商品.append("    <IS_GOODS>0</IS_GOODS>\n").append("  </JG_HFD_LIST_B2B>\n");}// 添加XML尾部xmlBuilder.append("  </MESSAGE_BODY>\n").append("</JEERP_BZ>");String xml = xmlBuilder.toString();System.out.println("已生成金关核放单报文:" + xml);rabbitTemplate.convertAndSend(RabbitMQConfig.RK_QUEUE, xml);System.out.println("已发送车辆出入库消息:" + RabbitMQConfig.RK_QUEUE);}
}

http://www.xdnf.cn/news/16615.html

相关文章:

  • 应用药品 GMP 证书识别技术,实现证书信息的自动化、精准化提取与核验
  • 【动态规划算法】斐波那契数列模型
  • Linux730 tr:-d /-s;sort:-r,-n,-R,-o,-t,-k,-u;bash;cut:-d,-c;tee -a;uniq -c -i
  • 独立站如何吃掉平台蛋糕?DTC模式下的成本重构与利润跃升
  • sqli-labs:Less-6关卡详细解析
  • KONG API Gateway中的核心概念
  • 图像处理中级篇 [1]—— 彩色照相机的效果与预处理
  • SpringBoot之整合SSM步骤
  • PHP语法高级篇(七):MySQL数据库
  • [论文阅读] 人工智能 + 软件工程 | 增强RESTful API测试:针对MongoDB的搜索式模糊测试新方法
  • 【LINUX网络】使用TCP简易通信
  • 【STM32-HAL】 SPI通信与Flash数据写入实战
  • 国产化再进一步,杰和科技推出搭载国产芯片的主板
  • 【CF】Day115——杂题 (构造 | 区间DP | 思维 + 贪心 | 图论 + 博弈论 | 构造 + 位运算 | 贪心 + 构造 | 计数DP)
  • 代码随想录算法训练营第五十五天|图论part5
  • 【音视频】WebRTC-Web 音视频采集与播放
  • 如何利用 Redis 的原子操作(INCR, DECR)实现分布式计数器?
  • CSS-in-JS 动态主题切换与首屏渲染优化
  • IBM Watsonx BI:AI赋能的下一代商业智能平台
  • 领域驱动设计(DDD)在分布式系统中的架构实践
  • jenkins连接docker失败【还是没解决】
  • 基于SpringBoot+MyBatis+MySQL+VUE实现的便利店信息管理系统(附源码+数据库+毕业论文+远程部署)
  • 计算机网络基础(一) --- (网络通信三要素)
  • 【C++算法】77.优先级队列_数据流的中位数
  • PHP云原生架构:容器化、Kubernetes与Serverless实践
  • 机器学习笔记(四)——聚类算法KNN、Kmeans、Dbscan
  • 深入理解 Qt 元对象系统 (Meta-Object System)
  • 架构实战——互联网架构模板(“用户层”和“业务层”技术)
  • 【Linux系统编程】Ext2文件系统
  • 【C++】指针