当前位置: 首页 > news >正文

在SpringBoot中使用MQTT实现消息的订阅

1 导入依赖包

修改pom.xml 文件,添加MQTT相关依赖,具体示例代码如下所示:

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId><version>5.3.2.RELEASE</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.4</version></dependency></dependencies>

2 修改配置文件
修改application.yml配置文件,增加MQTT相关配置。示例代码如下所示:

spring:application:name: consumer#MQTT配置信息mqtt:#MQTT服务端地址,端口默认为11883,如果有多个,用逗号隔开url: tcp://127.0.0.1:11883#用户名username: admin#密码password: public#客户端id(不能重复)client:id: consumer-id#MQTT默认的消息推送主题,实际可在调用接口时指定default:topic: topic
server:port: 8085

3 消费者客户端配置
创建消费者客户端配置类MqttConsumerConfig,读取application.yml中的相关配置,并初始化创建MQTT的连接。示例代码如下所示:

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;@Configuration
public class MqttConsumerConfig {@Value("${spring.mqtt.username}")private String username;@Value("${spring.mqtt.password}")private String password;@Value("${spring.mqtt.url}")private String hostUrl;@Value("${spring.mqtt.client.id}")private String clientId;@Value("${spring.mqtt.default.topic}")private String defaultTopic;@Autowiredprivate MqttConsumerCallBack mqttConsumerCallBack;/*** 客户端对象*/private MqttClient client;/*** 在bean初始化后连接到服务器*/@PostConstructpublic void init(){connect();}/*** 客户端连接服务端*/public void connect(){try {//创建MQTT客户端对象client = new MqttClient(hostUrl,clientId,new MemoryPersistence());//连接设置MqttConnectOptions options = new MqttConnectOptions();//是否清空session,设置为false表示服务器会保留客户端的连接记录,客户端重连之后能获取到服务器在客户端断开连接期间推送的消息//设置为true表示每次连接到服务端都是以新的身份//options.setCleanSession(true);//如果想要保持长连接,持续的接收消息,setCleanSession可以这样设置options.setAutomaticReconnect(true);  // 自动重连options.setCleanSession(false);       // 保留会话状态(避免重新订阅)//设置连接用户名options.setUserName(username);//设置连接密码options.setPassword(password.toCharArray());//设置超时时间,单位为秒options.setConnectionTimeout(100);//设置心跳时间 单位为秒,表示服务器每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线options.setKeepAliveInterval(20);//设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息options.setWill("willTopic",(clientId + "与服务器断开连接").getBytes(),0,false);//设置回调client.setCallback(new MqttConsumerCallBack());//设置回调这里还可以才用另一种方式,上面使用注解导入mqttConsumerCallBack,再使用      mqttConsumerCallBack,new  的方式在回调函数中保存数据的时候会报错//client.setCallback(mqttConsumerCallBack);client.connect(options);//订阅主题//消息等级,和主题数组一一对应,服务端将按照指定等级给订阅了主题的客户端推送消息int[] qos = {1,1};//主题String[] topics = {"topic1","topic2"};//订阅主题client.subscribe(topics,qos);} catch (MqttException e) {e.printStackTrace();}}/*** 断开连接*/public void disConnect(){try {client.disconnect();} catch (MqttException e) {e.printStackTrace();}}/*** 订阅主题*/public void subscribe(String topic,int qos){try {client.subscribe(topic,qos);} catch (MqttException e) {e.printStackTrace();}}
}

4 消费者客户端消息回调
创建MqttConsumerCallBack类并继承MqttCallback,实现相关消息回调事件,示例代码如下图所示:

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;public class MqttConsumerCallBack implements MqttCallback{/*** 客户端断开连接的回调*/@Overridepublic void connectionLost(Throwable throwable) {System.out.println("与服务器断开连接,可重连");}/*** 消息到达的回调*/@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {System.out.println(String.format("接收消息主题 : %s",topic));System.out.println(String.format("接收消息Qos : %d",message.getQos()));System.out.println(String.format("接收消息内容 : %s",new String(message.getPayload())));System.out.println(String.format("接收消息retained : %b",message.isRetained()));}/*** 消息发布成功的回调*/@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {System.out.println(String.format("接收消息成功"));}
}

5 创建Controller控制器,实现MQTT连接的建立和断开
接下来,创建Controller控制器MqttController,并实现MQTT连接的建立和断开等方法。示例代码如下所示:

import com.weiz.mqtt.config.MqttConsumerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;@Controller
public class MqttController {@Autowiredprivate MqttConsumerConfig client;@Value("${spring.mqtt.client.id}")private String clientId;@RequestMapping("/connect")@ResponseBodypublic String connect(){client.connect();return clientId + "连接到服务器";}@RequestMapping("/disConnect")@ResponseBodypublic String disConnect(){client.disConnect();return clientId + "与服务器断开连接";}
}

打开浏览器,输入地址http://localhost:18083/,在EMQX管理界面可以看到连接上来的两个客户端。如下图所示:
在这里插入图片描述

http://www.xdnf.cn/news/361405.html

相关文章:

  • Element-UI字体图标不显示
  • Oracle — 数据管理
  • LVGL源码学习之渲染、更新过程(2)---无效区域的处理
  • 电厂数据库未来趋势:时序数据库 + AI 驱动的自优化系统
  • 期货跟单软件如何对实盘进行风控?
  • go语言封装、继承与多态:
  • 【A2A】管中窥豹,google源码python-demo介绍
  • Go语言中 源文件开头的 // +build 注释的用法
  • 母亲节祝福网页制作
  • 推荐一个很方便的浏览器管理插件Wetab插件
  • 水印云:AI赋能,让图像处理变得简单高效
  • VSCode如何解决打开html页面中文乱码的问题
  • 工业软件自主化突围:RTOS 如何打破 “协议栈 - 控制器” 生态垄断
  • 零件画图实战提升案例(上)
  • 企业高性能WEB服务器—Nginx
  • 【论文阅读】基于客户端数据子空间主角度的聚类联邦学习分布相似性高效识别
  • 深度解析动态IP业务核心场景:从技术演进到行业实践
  • 住宅IP的深度解析与合理运用
  • 探索Stream流:高效数据处理的秘密武器
  • TOGAF 企业架构介绍(4A架构)
  • [javascript]取消异步请求
  • 26考研——中央处理器_指令执行过程(5)
  • qiankun微前端任意位置子应用
  • Kubernetes调度策略深度解析:NodeSelector与NodeAffinity的正确打开方式
  • 网络安全体系架构:核心框架与关键机制解析
  • kubernetes服务自动伸缩-HPA
  • C++ 访问者模式详解
  • Redis面试题
  • 力扣26——删除有序数组中的重复项
  • 【推荐笔记工具】思源笔记 - 隐私优先的个人知识管理系统,支持 Markdown 排版、块级引用和双向链接