当前位置: 首页 > news >正文

java集成mqtt

官网:如何在 Java 中使用 Paho MQTT 客户端 | EMQ

1.引入依赖

<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency>

2.配置文件

#MQTT客户端
publish:mqtt:host: tcp://broker.emqx.io:1883clientId: mqttx_9a36d31912options:#userName: GuoShun#password: 123456# 这里表示会话不过期cleanSession: false# 配置一个默认的主题,加载时不会用到,只能在需要时手动提取defaultTopic: devopstimeout: 1000KeepAliveInterval: 10#断线重连方式,自动重新连接与会话不过期配合使用会导致#断线重新连接后会接收到断线期间的消息。需要更改设置请看password联系我automaticReconnect: trueconnectionTimeout: 3000# 最大链接数maxInflight: 100

3.MQTTConfigBuilder配置

package com.system.utils.mqtt;import lombok.Data;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;@Configuration
@ConfigurationProperties(MQTTConfigBuilder.PREFIX)
@Data
public class MQTTConfigBuilder {//配置的名称public static final String PREFIX = "publish.mqtt";/*** 服务端地址*/private String host;/*** 客户端id*/private String clientId;/*** 配置链接项*/private MqttConnectOptions options;}

4.MQTTClientUtils工具类

package com.system.utils.mqtt;import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;@Slf4j
@Configuration
public class MQTTClientUtils {@Autowiredprivate MQTTConfigBuilder mqttConfig;private MqttClient mqttClient;@Resourceprivate MessageCallbackListener messageCallbackListener;//这里是初始化方法@PostConstructpublic void initMqttClient(){//创建连接MQTTClientUtils mqttClientUtils = this.createDevOpsMQTTClient().connect();//这里主要是项目启动时订阅一些主题。看个人需要使用//mqttClientUtils.subscribe("test/#", 2, new HeartBeatListener());//MessageCallbackListener订阅主题,接受到该主题消息后交给MessageCallbackListener去处理mqttClientUtils.subscribe("message/call/back", 2, messageCallbackListener);mqttClientUtils.publish("message/call/back", "测试111");mqttClientUtils.publish("message/call/back", "测试222");}public MQTTClientUtils createDevOpsMQTTClient() {this.createMQTTClient();return this;}private MQTTClientUtils connect() {try {this.mqttClient.connect(mqttConfig.getOptions());if(this.mqttClient.isConnected()) {this.mqttClient.setCallback(new MqttCallback() {@Overridepublic void connectionLost(Throwable throwable) {log.info("callback connectionLost: " + throwable.getMessage());}@Overridepublic void messageArrived(String s, MqttMessage mqttMessage) throws Exception {log.info("callback topic: " + s + ",qos: " + mqttMessage.getQos() + ",message content: " + new String(mqttMessage.getPayload()));}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {log.info("callback deliveryComplete: " + iMqttDeliveryToken.isComplete());}});}log.info("MQTTClient连接成功!");}catch (MqttException mqttException){mqttException.printStackTrace();log.error("MQTTClient连接失败!");}return this;}private MqttClient createMQTTClient() {try{this.mqttClient = new MqttClient( mqttConfig.getHost(), mqttConfig.getClientId());log.info("MQTTClient创建成功!");return this.mqttClient;}catch (MqttException exception){exception.printStackTrace();log.error("MQTTClient创建失败!");return null;}}/*** 消息发送* @param topicName* @param message* @return*/public boolean publish(String topicName, String message) {log.info("发送消息-主题名:{}, message:{}", topicName, message);MqttMessage mqttMessage = new MqttMessage(message.getBytes(StandardCharsets.UTF_8));try {this.mqttClient.publish(topicName, mqttMessage);return true;}catch (MqttException exception){exception.printStackTrace();return false;}}/*** 消息发送 : retained 默认为 false* "retained message" 指的是 Broker 会保留的最后一条发布到某个主题的消息。* 当新的订阅者连接到该主题时,Broker 会将这条保留消息立即发送给订阅者,即使在订阅者订阅时该消息并未被重新发布。* 这对于一些需要初始状态或者最后一次已知状态的应用场景非常有用。* @param topicName* @param message* @param qos* @return*/public boolean publish(String topicName, int qos, String message) {log.info("发送消息-主题名:{}, qos:{}, message:{}", topicName, qos, message);MqttMessage mqttMessage = new MqttMessage(message.getBytes(StandardCharsets.UTF_8));try {this.mqttClient.publish(topicName, mqttMessage.getPayload(), qos, false);return true;}catch (MqttException exception){exception.printStackTrace();return false;}}/*** 订阅某个主题* @param qos* @param topicName* @param qos*/public void subscribe(String topicName, int qos) {log.info("订阅主题名:{}, qos:{}", topicName, qos);try {this.mqttClient.subscribe(topicName, qos);} catch (MqttException e) {e.printStackTrace();}}/*** 订阅某个主题** @param topicName* @param qos 0 – 最多交付一次 1 – 至少交付一次  2 – 只交付一次  无特殊情况写2*/public void subscribe(String topicName, int qos, IMqttMessageListener messageListener) {log.info("订阅主题名:{}, qos:{}, Listener类:{}", topicName, qos, messageListener.getClass());try {this.mqttClient.subscribe(topicName, qos, messageListener);} catch (MqttException e) {e.printStackTrace();}}/*** 取消订阅主题* @param topicName 主题名称*/public void cleanTopic(String topicName) {log.info("取消订阅主题名:{}", topicName);try {this.mqttClient.unsubscribe(topicName);} catch (MqttException e) {e.printStackTrace();}}}

5.MessageCallbackListener

package com.system.utils.mqtt;import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Component;import java.nio.charset.StandardCharsets;/*** @description 消息回调返回*/
@Component
@Slf4j
public class MessageCallbackListener implements IMqttMessageListener {@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {String messageBody = new String(message.getPayload(), StandardCharsets.UTF_8);log.info("MessageCallbackListener-收到消息-主题:"+topic+", 消息内容是:"+ messageBody);}
}

http://www.xdnf.cn/news/537661.html

相关文章:

  • 停等协议(Stop-and-Wait Protocol)
  • AI人工智能写作平台:AnKo助力内容创作变革!
  • 铅铋环境下应力腐蚀的疲劳试验装置
  • 什么业务需要用到waf
  • 20. 自动化测试框架开发之Excel配置文件的IO开发
  • 【monai 教程】transform之CropPad详解
  • 磁流体 磁性流体 磁液
  • 封装一个基于 WangEditor 的富文本编辑器组件(Vue 3 + TypeScript 实战)
  • UEFI Spec 学习笔记---33 - Human Interface Infrastructure Overview---33.2.6 Strings
  • Oracle 中 open_cursors 参数详解:原理、配置与性能测试
  • 一键无损批量压缩图片 保留高清细节 开源免费!支持 10 + 格式转换
  • HashMap 的特点及应用场景
  • GraphQL 接口设计
  • SRS流媒体服务器(6)源码分析之推流篇
  • 2025.05.19【Barplot】柱状图的多样性绘制
  • Linux句柄数过多问题排查
  • stm32如何触摸屏设置显示按钮
  • c#将json字符串转换为对象数组
  • Linux-进程信号
  • Python 与 Java 在 Web 开发中的深度对比:从语言特性到生态选型
  • GPU状态监控
  • MPCount: 人群计数的单域泛化
  • 【成品设计】基于 STM32 的智能鞋柜系统
  • TransmittableThreadLocal实现上下文传递-笔记
  • 「HHT(希尔伯特黄变换)——ECG信号处理-第十三课」2025年5月19日
  • 院校机试刷题第七天:1828西交-矩阵相加、1822计算圆周率、1823学生成绩排序
  • 基于PetaLinux的Zynq PS应用自启动全攻略
  • 开发指南116-font-size: 0的使用
  • 深入解析 Oracle session_cached_cursors 参数及性能对比实验
  • python动漫论坛管理系统