java集成mqtt
官网:如何在 Java 中使用 Paho MQTT 客户端 | EMQ
1.引入依赖
<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency>
2.配置文件
#MQTT客户端
publish:mqtt:host: tcp://broker.emqx.io:1883clientId: mqttx_9a36d31912options:#userName: GuoShun#password: 123456# 这里表示会话不过期cleanSession: false# 配置一个默认的主题,加载时不会用到,只能在需要时手动提取defaultTopic: devopstimeout: 1000KeepAliveInterval: 10#断线重连方式,自动重新连接与会话不过期配合使用会导致#断线重新连接后会接收到断线期间的消息。需要更改设置请看password联系我automaticReconnect: trueconnectionTimeout: 3000# 最大链接数maxInflight: 100
3.MQTTConfigBuilder配置
package com.system.utils.mqtt;import lombok.Data;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;@Configuration
@ConfigurationProperties(MQTTConfigBuilder.PREFIX)
@Data
public class MQTTConfigBuilder {//配置的名称public static final String PREFIX = "publish.mqtt";/*** 服务端地址*/private String host;/*** 客户端id*/private String clientId;/*** 配置链接项*/private MqttConnectOptions options;}
4.MQTTClientUtils工具类
package com.system.utils.mqtt;import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;@Slf4j
@Configuration
public class MQTTClientUtils {@Autowiredprivate MQTTConfigBuilder mqttConfig;private MqttClient mqttClient;@Resourceprivate MessageCallbackListener messageCallbackListener;//这里是初始化方法@PostConstructpublic void initMqttClient(){//创建连接MQTTClientUtils mqttClientUtils = this.createDevOpsMQTTClient().connect();//这里主要是项目启动时订阅一些主题。看个人需要使用//mqttClientUtils.subscribe("test/#", 2, new HeartBeatListener());//MessageCallbackListener订阅主题,接受到该主题消息后交给MessageCallbackListener去处理mqttClientUtils.subscribe("message/call/back", 2, messageCallbackListener);mqttClientUtils.publish("message/call/back", "测试111");mqttClientUtils.publish("message/call/back", "测试222");}public MQTTClientUtils createDevOpsMQTTClient() {this.createMQTTClient();return this;}private MQTTClientUtils connect() {try {this.mqttClient.connect(mqttConfig.getOptions());if(this.mqttClient.isConnected()) {this.mqttClient.setCallback(new MqttCallback() {@Overridepublic void connectionLost(Throwable throwable) {log.info("callback connectionLost: " + throwable.getMessage());}@Overridepublic void messageArrived(String s, MqttMessage mqttMessage) throws Exception {log.info("callback topic: " + s + ",qos: " + mqttMessage.getQos() + ",message content: " + new String(mqttMessage.getPayload()));}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {log.info("callback deliveryComplete: " + iMqttDeliveryToken.isComplete());}});}log.info("MQTTClient连接成功!");}catch (MqttException mqttException){mqttException.printStackTrace();log.error("MQTTClient连接失败!");}return this;}private MqttClient createMQTTClient() {try{this.mqttClient = new MqttClient( mqttConfig.getHost(), mqttConfig.getClientId());log.info("MQTTClient创建成功!");return this.mqttClient;}catch (MqttException exception){exception.printStackTrace();log.error("MQTTClient创建失败!");return null;}}/*** 消息发送* @param topicName* @param message* @return*/public boolean publish(String topicName, String message) {log.info("发送消息-主题名:{}, message:{}", topicName, message);MqttMessage mqttMessage = new MqttMessage(message.getBytes(StandardCharsets.UTF_8));try {this.mqttClient.publish(topicName, mqttMessage);return true;}catch (MqttException exception){exception.printStackTrace();return false;}}/*** 消息发送 : retained 默认为 false* "retained message" 指的是 Broker 会保留的最后一条发布到某个主题的消息。* 当新的订阅者连接到该主题时,Broker 会将这条保留消息立即发送给订阅者,即使在订阅者订阅时该消息并未被重新发布。* 这对于一些需要初始状态或者最后一次已知状态的应用场景非常有用。* @param topicName* @param message* @param qos* @return*/public boolean publish(String topicName, int qos, String message) {log.info("发送消息-主题名:{}, qos:{}, message:{}", topicName, qos, message);MqttMessage mqttMessage = new MqttMessage(message.getBytes(StandardCharsets.UTF_8));try {this.mqttClient.publish(topicName, mqttMessage.getPayload(), qos, false);return true;}catch (MqttException exception){exception.printStackTrace();return false;}}/*** 订阅某个主题* @param qos* @param topicName* @param qos*/public void subscribe(String topicName, int qos) {log.info("订阅主题名:{}, qos:{}", topicName, qos);try {this.mqttClient.subscribe(topicName, qos);} catch (MqttException e) {e.printStackTrace();}}/*** 订阅某个主题** @param topicName* @param qos 0 – 最多交付一次 1 – 至少交付一次 2 – 只交付一次 无特殊情况写2*/public void subscribe(String topicName, int qos, IMqttMessageListener messageListener) {log.info("订阅主题名:{}, qos:{}, Listener类:{}", topicName, qos, messageListener.getClass());try {this.mqttClient.subscribe(topicName, qos, messageListener);} catch (MqttException e) {e.printStackTrace();}}/*** 取消订阅主题* @param topicName 主题名称*/public void cleanTopic(String topicName) {log.info("取消订阅主题名:{}", topicName);try {this.mqttClient.unsubscribe(topicName);} catch (MqttException e) {e.printStackTrace();}}}
5.MessageCallbackListener
package com.system.utils.mqtt;import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Component;import java.nio.charset.StandardCharsets;/*** @description 消息回调返回*/
@Component
@Slf4j
public class MessageCallbackListener implements IMqttMessageListener {@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {String messageBody = new String(message.getPayload(), StandardCharsets.UTF_8);log.info("MessageCallbackListener-收到消息-主题:"+topic+", 消息内容是:"+ messageBody);}
}