当前位置: 首页 > ops >正文

MQTT:Java集成MQTT

目录

  • 一、原生java架构
    • 1.1 导入POM文件
    • 1.2 编写测试用例
  • 二、SpringBoot集成MQTT
    • 2.1 导入POM文件
    • 2.2 在YML文件中增加配置
    • 2.3 新建Properties配置文件映射配置
    • 2.4 创建连接工厂
    • 2.5 增加入站规则配置
    • 2.6 增加出站规则配置
    • 2.7 创建消息发送网关
    • 2.8 测试消息发送
    • 2.9 项目结构


一、原生java架构

1.1 导入POM文件

<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version>
</dependency>

1.2 编写测试用例

package com.ming;import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.junit.jupiter.api.Test;/*** 使用java原生方法连接MQTT*/
public class MqttPahoTest {private final String serverURI = "tcp://localhost:1883";private final String clientId = "emqx_spring_client_132";/*** 建立连接* @throws MqttException*/@Testpublic MqttClient createConnection() throws MqttException {// 创建MQTT对象MqttClient client = new MqttClient(serverURI, clientId, new MemoryPersistence());// 发送建立连接的请求MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();mqttConnectOptions.setUserName("admin");mqttConnectOptions.setPassword("admin".toCharArray());mqttConnectOptions.setCleanSession(true);client.connect(mqttConnectOptions);return client;}/*** 发送消息* @throws MqttException*/@Testpublic void sendMsg() throws MqttException {// 创建对象MqttClient client = createConnection();// 发送消息MqttMessage mqttMessage = new MqttMessage();mqttMessage.setQos(2);mqttMessage.setPayload("Hello World".getBytes());client.publish("java/a", mqttMessage);// 关闭连接client.disconnect();client.close();}/*** 接收消息* @throws MqttException*/@Testpublic void receiveMsg() throws MqttException {// 创建MQTT对象MqttClient client = new MqttClient(serverURI, clientId, new MemoryPersistence());// 发送建立连接的请求MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();mqttConnectOptions.setUserName("admin");mqttConnectOptions.setPassword("admin".toCharArray());mqttConnectOptions.setCleanSession(true);client.setCallback(new MqttCallback() {@Overridepublic void connectionLost(Throwable throwable) {  // 当连接丢失时的回调System.out.println("Connection lost...");}@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {  // 消息接收回调System.out.println(String.format("%s ---> %s", topic, new String(mqttMessage.getPayload())));}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {  // 消息传输完毕System.out.println("Delivery complete");}});client.connect(mqttConnectOptions);// 订阅主题client.subscribe("java/b", 2);while (true);}
}

二、SpringBoot集成MQTT

2.1 导入POM文件

<!-- spring boot 项目集成消息中间件基础依赖 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<!-- spring boot 项目和MQTT客户端集成依赖 -->
<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId><version>5.4.3</version>
</dependency>
<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId>
</dependency>
<dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version>
</dependency>

2.2 在YML文件中增加配置

spring:mqtt:username: adminpassword: adminurl: tcp://localhost:1883subClientId: sub_client_id_123subTopic: atguigu/iot/lamp/line1,atguigu/iot/lamp/line2pubClientId: pub_client_id_123

2.3 新建Properties配置文件映射配置

package com.ming.properties;import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;@Data
@ConfigurationProperties(prefix = "spring.mqtt")
public class MqttConfigurationProperties {private String username;private String password;private String url;private String subClientId;private String subTopic;private String pubClientId;
}

2.4 创建连接工厂

package com.ming.config;import com.ming.properties.MqttConfigurationProperties;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;/*** 创建MQTT的配置类 配置连接工厂*/
@Configuration
public class MqttConfiguration {@Autowiredprivate MqttConfigurationProperties mqttConfigurationProperties;@Beanpublic MqttPahoClientFactory mqttPahoClientFactory() {DefaultMqttPahoClientFactory mqttPahoClientFactory = new DefaultMqttPahoClientFactory();MqttConnectOptions options = new MqttConnectOptions();options.setUserName(mqttConfigurationProperties.getUsername());options.setPassword(mqttConfigurationProperties.getPassword().toCharArray());options.setServerURIs(new String[]{mqttConfigurationProperties.getUrl()});mqttPahoClientFactory.setConnectionOptions(options);return mqttPahoClientFactory;}
}

2.5 增加入站规则配置

package com.ming.config;import com.ming.handler.ReceiverMessageHandler;
import com.ming.properties.MqttConfigurationProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;/*** MQTT入站规则配置类(接收消息)*/
@Configuration
public class MqttInboundConfiguration {@Autowiredprivate MqttConfigurationProperties mqttConfigurationProperties;@Autowiredprivate MqttPahoClientFactory mqttPahoClientFactory;@Autowiredprivate ReceiverMessageHandler receiverMessageHandler;/*** 消息通道* @return*/@Beanpublic MessageChannel messageInboundChannel() {return new DirectChannel();}/*** 配置入站适配器,作用:设置订阅主题,以及指定消息的相关属性* @return*/@Beanpublic MessageProducer messageProducer() {MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter = new MqttPahoMessageDrivenChannelAdapter(mqttConfigurationProperties.getUrl(),mqttConfigurationProperties.getSubClientId(),mqttPahoClientFactory,mqttConfigurationProperties.getSubTopic().split(","));mqttPahoMessageDrivenChannelAdapter.setQos(1);mqttPahoMessageDrivenChannelAdapter.setConverter(new DefaultPahoMessageConverter());mqttPahoMessageDrivenChannelAdapter.setOutputChannel(messageInboundChannel());return mqttPahoMessageDrivenChannelAdapter;}/*** 消息处理器* @return*/@Bean@ServiceActivator(inputChannel = "messageInboundChannel")public MessageHandler messageHandler() {return receiverMessageHandler;}
}
package com.ming.handler;import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;/*** 当订阅的主题有消息时就会触发此处回调*/
@Component
public class ReceiverMessageHandler implements MessageHandler {@Overridepublic void handleMessage(Message<?> message) throws MessagingException {Object payload = message.getPayload();  // 获取消息的内容System.out.println(message.getHeaders().get("mqtt_receivedTopic"));  // 主题名称System.out.println(payload);  // 消息主体}
}

2.6 增加出站规则配置

package com.ming.config;import com.ming.properties.MqttConfigurationProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;/*** MQTT出站规则配置类(发送消息)*/
@Configuration
public class MqttOutboundConfiguration {@Autowiredprivate MqttConfigurationProperties mqttConfigurationProperties;@Autowiredprivate MqttPahoClientFactory mqttPahoClientFactory;/*** 消息通道* @return*/@Beanpublic MessageChannel mqttOutboundChannel() {return new DirectChannel();}/*** 配置出站适配器* @return*/@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MessageHandler mqttOutboundMessageHandler() {MqttPahoMessageHandler mqttPahoMessageHandler = new MqttPahoMessageHandler(mqttConfigurationProperties.getUrl(),mqttConfigurationProperties.getPubClientId(),mqttPahoClientFactory);mqttPahoMessageHandler.setDefaultQos(0);mqttPahoMessageHandler.setDefaultTopic("default");mqttPahoMessageHandler.setAsync(true);return mqttPahoMessageHandler;}
}

2.7 创建消息发送网关

package com.ming.getway;import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;/*** MQTT发送消息的网关*/
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGetWay {public abstract void sendMsgToMqtt(@Header(value = MqttHeaders.TOPIC) String topic, String payload);public abstract void sendMsgToMqtt(@Header(value = MqttHeaders.TOPIC) String topic, @Header(value = MqttHeaders.QOS) int qos,String payload);
}
package com.ming.service;import com.ming.getway.MqttGetWay;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class MqttMessageSender {@Autowiredprivate MqttGetWay mqttGetWay;public void sendMsg(String topic, String message) {mqttGetWay.sendMsgToMqtt(topic, message);}public void sendMsg(String topic, int qos, String message) {mqttGetWay.sendMsgToMqtt(topic, qos, message);}
}

2.8 测试消息发送

package com.ming;import com.ming.service.MqttMessageSender;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest(classes = SpringMqttDemoApplication.class)
public class MqttMessageSenderTest {@Autowiredprivate MqttMessageSender mqttMessageSender;@Testpublic void sendToMsg(){mqttMessageSender.sendMsg("java/c","hello mqtt spring boot ...");}
}

2.9 项目结构

在这里插入图片描述

http://www.xdnf.cn/news/17490.html

相关文章:

  • 【LLM】OpenAI开源GPT级模型,120B及20B参数GPT-OSS
  • 调用springboot接口返回403,问题定位及总结
  • Java 大视界 -- Java 大数据机器学习模型在电商商品销量预测与库存精准管理中的应用(391)
  • 安装1panel之后如何通过nginx代理访问
  • 展锐平台(Android15)WLAN热点名称修改不生效问题分析
  • 【Docker实战】Spring Boot应用容器化
  • Chat2DB入门教程
  • JavaSE:入门
  • 【图像算法 - 11】基于深度学习 YOLO 与 ByteTrack 的目标检测与多目标跟踪系统(系统设计 + 算法实现 + 代码详解 + 扩展调优)
  • MySQL的隔离级别及MVCC原理解析
  • SpringCloud详细笔记
  • reinterpret_cast and static cast
  • 【PyTorch】单目标检测项目
  • 深度解析1688关键字搜索API接口:技术实现与应用探索
  • crc32算法php版----crc32.php
  • 什么是ABA问题?
  • 【牛客刷题】REAL800 棋盘
  • 随想记——excel报表
  • WinForm之TreeView控件
  • Excel版经纬度和百分度互转v1.1
  • 复现论文《多无人机协同任务分配算法设计与实现》
  • 【YOLO11改进 - C3k2融合】C3k2融合EBlock(Encoder Block):低光增强编码器块,利用傅里叶信息增强图像的低光条件
  • Spring 依赖注入、AOP代理
  • Stlink识别不到-安装驱动
  • Redis基本原理,性能优化和参数调优简述
  • Lua基础+Lua数据类型
  • 正则表达式常用语法参考
  • es查询小结
  • 机械学习--DBSCAN 算法(附实战案例)
  • 本地WSL部署接入 whisper + ollama qwen3:14b 总结字幕校对增强版