当前位置: 首页 > ops >正文

Spring整合MQTT使用

MQTT使用

消息发送和接收

引入依赖

<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version></dependency>

编写发送消息客户端

clientId不能重复,重复的clientId会导致已连接的客户端下线

    /*** 消息发送* @throws MqttException*/public void send() throws MqttException {//url和clientId请修改为自己的MqttClient client = new MqttClient("tcp://101.43.94.164:1883", "testuser1-send1");//设置回调client.setCallback(new MqttCallback() {@Overridepublic void connectionLost(Throwable throwable) {System.out.println("连接丢失");}@Overridepublic void messageArrived(String s, MqttMessage mqttMessage) throws Exception {System.out.println("消息到达");}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {System.out.println("消息发送完成");}});//连接参数设置MqttConnectOptions connectOptions = new MqttConnectOptions();//用户名和密码请修改为自己的connectOptions.setUserName("testuser1");connectOptions.setPassword("testuser1".toCharArray());client.connect(connectOptions);//主题String topic = "topic/test2";int qos = 1;String msg = "Hello MQTT 1";MqttMessage message = new MqttMessage(msg.getBytes());message.setQos(qos);client.publish(topic, message);client.disconnect();client.close();}

消息接收

请先启动消息接收客户端,再开启消息发送客户端,否则消息发送后没有客户端接收,消息会被丢失

    public void recevie() throws MqttException, InterruptedException {MqttClient client = new MqttClient("tcp://101.43.94.164:1883", "testuser1-receive123");MqttConnectOptions connectOptions = new MqttConnectOptions();connectOptions.setUserName("testuser1");connectOptions.setPassword("testuser1".toCharArray());connectOptions.setAutomaticReconnect(true);client.connect(connectOptions);String topic = "topic/test2";CountDownLatch countDownLatch = new CountDownLatch(1);client.setCallback(new MqttCallback() {@Overridepublic void connectionLost(Throwable throwable) {System.out.println("连接丢失");}@Overridepublic void messageArrived(String s, MqttMessage mqttMessage) throws Exception {System.out.println("消息到达:" + mqttMessage.toString() + " " + s);countDownLatch.countDown();}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {System.out.println("消息发送完成");}});client.subscribe(topic, 1);countDownLatch.await();client.disconnect();client.close();}

topic(主题)

MQTT 协议规定主题的长度为两个字节,因此主题最多可包含 65,535 个字符。
MQTT中的t主题不需要用户提前创建,如果代码中使用了不存在的主题,MQTT会自动创建这个主题。

mqtt通过/对消息分层。为了便于理解,不建议使用/chat和chat/。

topic订阅规则

订阅一个主题很简单,如果主题是topic,那么那么使用/topic就可以订阅了;但是如果想一次订阅多个主题时,就需要用到通配符了。
常用通配符:

  1. +:用于单个主题层级匹配的通配符
test/+/ip   //有效
test+       //无效
test/+      //无效

如果客户端订阅了test/+/ip主题,那么会收到以下主题的消息:

test/1/ip
test/2/ip
test/3/ip

不会收到以下主题的消息:

test/ip
test/wind/1/ip
  1. #:用于匹配主题中任意层级的通配符;多层通配符表示它的父级和任意数量的子层级,在使用多层通配符时,它必须占据整个层级并且必须是主题的最后一个字符

如果订阅了test/#主题,那么会收到以下主题的消息:

test
test/ip
test/1/ip
  1. :以:以:以$SYS开头的主题为系统主题,系统主题主要用户获取MQTT服务器自身运行状态、消息统计、客户端上下线事件等数据

通配符主题订阅的性能弱于普通主题订阅,且会消耗更多的服务器资源,用户可根据实际业务情况选择订阅类型。

MQTT和Spring的整合

引入依赖

        <dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency>

核心配置:

@Configuration
public class MQTTConfig {@Value("${mqtt.broker-url}")private String brokerUrl;@Value("${mqtt.client-id}")private String clientId;@Value("${mqtt.username}")private String username;@Value("${mqtt.password}")private String password;@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();MqttConnectOptions options = new MqttConnectOptions();options.setServerURIs(new String[]{brokerUrl});// 连接断开时自动重连options.setAutomaticReconnect(true);// MQTT服务器认证用户名options.setUserName(username);// MQTT服务器认证密码options.setPassword(password.toCharArray());factory.setConnectionOptions(options);System.out.println("Connecting to broker: " + brokerUrl + " OK.");return factory;}@Beanpublic MqttPahoMessageDrivenChannelAdapter mqttInbound() {// 创建入站适配器:监听MQTT消息并转发到Spring Integration通道MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId + "-inbound", // 入站客户端ID(唯一,避免与其他客户端冲突)mqttClientFactory(),    // 使用前面定义的客户端工厂"testSub/#"  // 订阅的MQTT主题("#"为通配符,表示订阅"testSub/"下所有子主题));// 消息接收后转发到的通道名称adapter.setOutputChannelName("mqttInputChannel");// 设置订阅QoS级别(1表示确保消息至少到达一次)adapter.setQos(1);return adapter;}/***  网关指定通过该网关发送的消息,默认会被路由到名为 指定的消息通道(这里是配置类中定义的 mqttOutboundChannel Bean),*  最终由通道连接的 MqttPahoMessageHandler 处理并发送到 MQTT 服务器。* @return*/@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MessageHandler mqttOutbound() {MqttPahoMessageHandler handler = new MqttPahoMessageHandler(clientId + "-outbound",mqttClientFactory());handler.setAsync(true);handler.setDefaultQos(1);return handler;}@Beanpublic MessageChannel mqttInputChannel() {// 入站消息通道(接收MQTT消息)return new DirectChannel();}@Beanpublic MessageChannel mqttOutboundChannel() {// 出站消息通道(发送MQTT消息)return new DirectChannel();}
}

MessageChannel 是 Spring Integration 的核心概念,相当于消息流转的“管道”:

  • mqttInputChannel:入站适配器(mqttInbound)接收的 MQTT 消息会流入此通道,后续可通过 @ServiceActivator 绑定处理器消费消息(当前代码未实现,需补充消息处理逻辑)
  • mqttOutboundChannel:业务代码通过 MqttMessageGateway 发送的消息会先流入此通道,再由出站处理器(mqttOutbound)发送到 MQTT 服务器。

发送消息

/*** Description: @MessagingGateway 注解的接口,用于发送 MQTT 消息,会自动生成代理类,代理类会将方法参数转换为 Spring Integration 消息(Message),*    通过 defaultRequestChannel = "mqttOutboundChannel" 指定: 所有通过该网关发送的消息,默认会被路由到名为 mqttOutboundChannel*    的消息通道(对应配置类中定义的 mqttOutboundChannel Bean),最终由通道连接的 MqttPahoMessageHandler 处理并发送到 MQTT 服务器。*/
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttMessageGateway {void sendMessage(@Header(MqttHeaders.TOPIC) String topic, String payload);
}

接收消息

@Component
public class MqttMessageListener {@Bean@ServiceActivator(inputChannel = "mqttInputChannel") //绑定消息入站通道public MessageHandler handler() {return new MessageHandler() {@Overridepublic void handleMessage(Message<?> message) throws MessagingException {String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();String payload = message.getPayload().toString();Logger log = LoggerFactory.getLogger(MqttMessageListener.class);log.info("[MQTT] 收到消息: topic={}, payload={}", topic, payload);try {if (topic.startsWith("testSub/")) {//进行业务处理}} else if (topic.startsWith("report/allpoints")) {//进行业务处理}}} catch (Exception e) {log.error("[MQTT] 消息处理失败: {}", e.getMessage());}}};}
}
  1. 接收消息: MQTT服务器 → mqttInbound(入站适配器,订阅主题)→ mqttInputChannel(通道)→ 消息处理器(需补充,如存储/业务处理)。
  2. 发送消息: 业务代码 → MqttMessageGateway(消息网关)→ mqttOutboundChannel(通道)→ mqttOutbound(出站处理器)→ MQTT服务器。

参考

  1. MQTT 主题与通配符(Topics & Wildcards)入门手册
  2. Java对接MQTT协议的完整实现
http://www.xdnf.cn/news/20098.html

相关文章:

  • ai连接怡和达进行非标选型 抓包失败
  • 阿瓦隆 A1566HA 2U 480T矿机参数解析:性能与能效深入分析
  • 问题三ai思路
  • 项目经理为什么要有一张PMP®认证?
  • 力扣hot100:旋转图像(48)(详细图解以及核心思路剖析)
  • 代码可读性的详细入门
  • 1.10 虚拟内存管理机制
  • 【面板数据】全球数字贸易规则一体化程度数据及参考文献(2000-2024年)
  • 11.1.7 cpp客户端上传测试和文件引用计数测试
  • 【数据结构——哈夫曼树】
  • 理解UE4中C++17的...符号及enable_if_t的用法及SFINAE思想
  • Redis到底什么,该怎么用
  • JavaWeb —— 登录校验
  • AOI 检测准、机床运行稳?杰和 AR707 撑起工控 “精准 + 高效”
  • biocmanager安装 库 老是提示网络连接错误 才尝试各种办法
  • 「数据获取」《中国劳动统计年鉴》(1991-2024)
  • linux inotify 功能详解
  • MySQL锁篇-锁类型
  • 解析豆科系统发育冲突原因
  • 无字母数字命令执行
  • UC Berkeley 开源大世界模型(LWM):多模态大模型领域世界模型技术新进展
  • 鹿客发布旗舰新品AI智能锁V6 Max,打造AI家庭安全领域新标杆
  • keil 5 STM32工程介绍
  • 写给大学生的
  • 【开题答辩全过程】以 在线教育系统为例,包含答辩的问题和答案
  • 从安装到应用:GISBox与GeoServer的关键区别及用户适配指南
  • Gradle Task 进阶:Task 依赖关系、输入输出、增量构建原理
  • 一种用geoserver发布复杂样式矢量服务的方法
  • [bat-cli] 语法映射 | SyntaxMapping
  • 机器学习-决策树(下)