物联网之对接MQTT最佳实践
小伙伴们,你们好呀,我是老寇,跟我一起学习对接MQTT
安装EMQX
采用docker-compose一键式,启动!!!
还没有安装docker朋友,参考文章下面两篇文章
# Ubuntu20.04安装Docker
# Centos7安装Docker 23.0.6
使用 emqx 5.4.1,按照老夫的教程来,请不要改版本号!!!
使用 emqx 5.4.1,按照老夫的教程来,请不要改版本号!!!
使用 emqx 5.4.1,按照老夫的教程来,请不要改版本号!!!
services:emqx:image: emqx/emqx:5.4.1container_name: emqx# 保持容器在没有守护程序的情况下运行tty: truerestart: alwaysprivileged: trueports:- "1883:1883"- "8083:8083"- "8883:8883"- "18083:18083"environment:- TZ=Asia/Shanghaivolumes:# 挂载数据存储- ./emqx/data:/opt/emqx/data# 挂载日志文件- ./emqx/log:/opt/emqx/lognetworks:- laokou_network
networks:laokou_network:driver: bridge
访问 http://127.0.0.1:18083 设置密码
EMQX MQTT【摘抄自官方文档】
EMQX官方文档
MQTT 是物联网 (IoT) 的 OASIS 标准消息传递协议。它被设计为一种极轻量的发布/订阅消息传输协议,非常适合以较小的代码占用空间和极低的网络带宽连接远程设备。MQTT 目前广泛应用于汽车、制造、电信、石油和天然气等众多行业。
EMQX 完全兼容 MQTT 5.0 和 3.x,本节将介绍 MQTT 相关功能的基本配置项,包括基本 MQTT 设置、订阅设置、会话设置、强制关闭设置和强制垃圾回收设置等
客户端对接
本文章采用三种客户端对接
维度 | Paho | Hivemq-MQTT-Client | Vert.x MQTT Client |
---|---|---|---|
协议支持 | MQTT 3.1.1(5.0 实验性) | MQTT 5.0 完整支持 | MQTT 5.0(较新版本) |
性能 | 中(同步模式) | 高(异步非阻塞) | 极高(响应式架构) |
依赖复杂度 | 低 | 中(仅 Netty) | 高(需 Vert.x 生态) |
社区资源 | 丰富 | 较少 | 中等 |
适用场景 | 传统 IoT、跨语言项目 | 企业级 MQTT 5.0、高吞吐 | 响应式系统、高并发微服务 |
Paho【不推荐,连接不稳定】
Paho代码地址
引入依赖
<dependencies><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.mqttv5.client</artifactId><version>1.2.5</version></dependency><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version></dependency>
</dependencies>
项目集成
PahoProperties
/*** @author laokou*/
@Data
public class PahoProperties {private boolean auth = true;private String username = "emqx";private String password = "laokou123";private String host = "127.0.0.1";private int port = 1883;private String clientId;private int subscribeQos = 1;private int publishQos = 0;private int willQos = 1;private int connectionTimeout = 60;private boolean manualAcks = false;// @formatter:off/*** 控制是否创建新会话(true=新建,false=复用历史会话). clearStart=true => Broker 会在连接断开后立即清除所有会话信息.* clearStart=false => Broker 会在连接断开后保存会话信息,并在重新连接后复用会话信息.* <a href="https://github.com/hivemq/hivemq-mqtt-client/issues/627">...</a>*/// @formatter:onprivate boolean clearStart = false;private int receiveMaximum = 10000;private int maximumPacketSize = 10000;// @formatter:off/*** 默认会话保留一天.* 最大值,4294967295L,会话过期时间【永不过期,单位秒】.* 定义客户端断开后会话保留的时间(仅在 Clean Session = false 时生效).*/private long sessionExpiryInterval = 86400L;// @formatter:on/*** 心跳包每隔60秒发一次.*/private int keepAliveInterval = 60;private boolean automaticReconnect = true;private Set<String> topics = new HashSet<>(0);}
PahoMqttClientMessageCallbackV5
/*** @author laokou*/
@Slf4j
@RequiredArgsConstructor
public class PahoMqttClientMessageCallbackV5 implements MqttCallback {private final List<MessageHandler> messageHandlers;@Overridepublic void disconnected(MqttDisconnectResponse disconnectResponse) {log.error("【Paho-V5】 => MQTT关闭连接");}@Overridepublic void mqttErrorOccurred(MqttException ex) {log.error("【Paho-V5】 => MQTT报错,错误信息:{}", ex.getMessage());}@Overridepublic void messageArrived(String topic, MqttMessage message) {for (MessageHandler messageHandler : messageHandlers) {if (messageHandler.isSubscribe(topic)) {log.info("【Paho-V5】 => MQTT接收到消息,Topic:{}", topic);messageHandler.handle(new org.laokou.sample.mqtt.handler.MqttMessage(message.getPayload(), topic));}}}@Overridepublic void deliveryComplete(IMqttToken token) {log.info("【Paho-V5】 => MQTT消息发送成功,消息ID:{}", token.getMessageId());}@Overridepublic void connectComplete(boolean reconnect, String uri) {if (reconnect) {log.info("【Paho-V5】 => MQTT重连成功,URI:{}", uri);}else {log.info("【Paho-V5】 => MQTT建立连接,URI:{}", uri);}}@Overridepublic void authPacketArrived(int reasonCode, MqttProperties properties) {log.info("【Paho-V5】 => 接收到身份验证数据包:{}", reasonCode);}}
PahoV5MqttClientTest
/*** @author laokou*/
@SpringBootTest
@RequiredArgsConstructor
@TestConstructor(autowireMode = TestConstructor.AutowireMode.ALL)
class PahoV5MqttClientTest {private final List<MessageHandler> messageHandlers;@Testvoid testMqttClient() throws InterruptedException {ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(16);PahoProperties pahoProperties = new PahoProperties();pahoProperties.setClientId("test-client-3");pahoProperties.setTopics(Set.of("/test-topic-3/#"));PahoMqttClientV5 pahoMqttClientV5 = new PahoMqttClientV5(pahoProperties, messageHandlers, scheduledExecutorService);pahoMqttClientV5.open();Thread.sleep(1000);pahoMqttClientV5.publish("/test-topic-3/789", "Hello World789".getBytes());}}
PahoMqttClientMessageCallbackV3
/*** @author laokou*/
@Slf4j
@RequiredArgsConstructor
public class PahoMqttClientMessageCallbackV3 implements MqttCallback {private final List<MessageHandler> messageHandlers;@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {log.info("【Paho-V3】 => MQTT消息发送成功,消息ID:{}", iMqttDeliveryToken.getMessageId());}@Overridepublic void connectionLost(Throwable throwable) {log.error("【Paho-V3】 => MQTT关闭连接");}@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {for (MessageHandler messageHandler : messageHandlers) {if (messageHandler.isSubscribe(topic)) {log.info("【Paho-V3】 => MQTT接收到消息,Topic:{}", topic);messageHandler.handle(new org.laokou.sample.mqtt.handler.MqttMessage(message.getPayload(), topic));}}}
}
PahoV3MqttClientTest
/*** @author laokou*/
@SpringBootTest
@RequiredArgsConstructor
@TestConstructor(autowireMode = TestConstructor.AutowireMode.ALL)
class PahoV3MqttClientTest {private final List<MessageHandler> messageHandlers;@Testvoid testMqttClient() throws InterruptedException {ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(16);PahoProperties pahoProperties2 = new PahoProperties();pahoProperties2.setClientId("test-client-4");pahoProperties2.setTopics(Set.of("/test-topic-4/#"));PahoMqttClientV3 pahoMqttClientV3 = new PahoMqttClientV3(pahoProperties2, messageHandlers, scheduledExecutorService);pahoMqttClientV3.open();Thread.sleep(1000);pahoMqttClientV3.publish("/test-topic-4/000", "Hello World000".getBytes());}}
Hivemq-MQTT-Client【不推荐】
注意:订阅一段时间收不到数据,标准mqtt5.0协议,不兼容emqx broker mqtt5.0
Hivemq代码地址
引入依赖
<dependencies><dependency><groupId>com.hivemq</groupId><artifactId>hivemq-mqtt-client-reactor</artifactId><version>1.3.5</version></dependency><dependency><groupId>com.hivemq</groupId><artifactId>hivemq-mqtt-client-epoll</artifactId><version>1.3.5</version><type>pom</type></dependency>
<dependencies>
项目集成
HivemqProperties
/*** @author laokou*/
@Data
public class HivemqProperties {private boolean auth = true;private String username = "emqx";private String password = "laokou123";private String host = "127.0.0.1";private int port = 1883;private String clientId;private int subscribeQos = 1;private int publishQos = 0;private int willQos = 1;// @formatter:off/*** 控制是否创建新会话(true=新建,false=复用历史会话). clearStart=true => Broker 会在连接断开后立即清除所有会话信息.* clearStart=false => Broker 会在连接断开后保存会话信息,并在重新连接后复用会话信息.* <a href="https://github.com/hivemq/hivemq-mqtt-client/issues/627">...</a>*/// @formatter:onprivate boolean clearStart = false;private int receiveMaximum = 10000;private int sendMaximum = 10000;private int maximumPacketSize = 10000;private int sendMaximumPacketSize = 10000;private int topicAliasMaximum = 1024;private int sendTopicAliasMaximum = 2048;private long messageExpiryInterval = 86400L;private boolean requestProblemInformation = true;private boolean requestResponseInformation = true;// @formatter:off/*** 默认会话保留一天.* 最大值,4294967295L,会话过期时间【永不过期,单位秒】.* 定义客户端断开后会话保留的时间(仅在 Clean Session = false 时生效).*/private long sessionExpiryInterval = 86400L;// @formatter:on/*** 心跳包每隔60秒发一次.*/private int keepAliveInterval = 60;private boolean automaticReconnect = true;private long automaticReconnectMaxDelay = 5;private long automaticReconnectInitialDelay = 1;private Set<String> topics = new HashSet<>(0);private int nettyThreads = 32;private boolean retain = false;private boolean noLocal = false;}
HivemqClientV5
/*** @author laokou*/
@Slf4j
public class HivemqClientV5 {/*** 响应主题.*/private final String RESPONSE_TOPIC = "response/topic";/*** 服务下线数据.*/private final byte[] WILL_PAYLOAD = "offline".getBytes(UTF_8);/*** 相关数据.*/private final byte[] CORRELATION_DATA = "correlationData".getBytes(UTF_8);private final HivemqProperties hivemqProperties;private final List<MessageHandler> messageHandlers;private volatile Mqtt5RxClient client;private final Object lock = new Object();private volatile Disposable connectDisposable;private volatile Disposable subscribeDisposable;private volatile Disposable unSubscribeDisposable;private volatile Disposable publishDisposable;private volatile Disposable disconnectDisposable;private volatile Disposable consumeDisposable;public HivemqClientV5(HivemqProperties hivemqProperties, List<MessageHandler> messageHandlers) {this.hivemqProperties = hivemqProperties;this.messageHandlers = messageHandlers;}public void open() {if (Objects.isNull(client)) {synchronized (lock) {if (Objects.isNull(client)) {client = getMqtt5ClientBuilder().buildRx();}}}connect();consume();}public void close() {if (!Objects.isNull(client)) {disconnectDisposable = client.disconnectWith().sessionExpiryInterval(hivemqProperties.getSessionExpiryInterval()).applyDisconnect().subscribeOn(Schedulers.io()).retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1).takeWhile(retryCount -> retryCount != -1).flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS))).subscribe(() -> log.info("【Hivemq-V5】 => MQTT断开连接成功,客户端ID:{}", hivemqProperties.getClientId()),e -> log.error("【Hivemq-V5】 => MQTT断开连接失败,错误信息:{}", e.getMessage(), e));}}public void subscribe() {String[] topics = getTopics();subscribe(topics, getQosArray(topics));}public String[] getTopics() {return hivemqProperties.getTopics().toArray(String[]::new);}public int[] getQosArray(String[] topics) {return Stream.of(topics).mapToInt(item -> hivemqProperties.getSubscribeQos()).toArray();}public void subscribe(String[] topics, int[] qosArray) {checkTopicAndQos(topics, qosArray);if (!Objects.isNull(client)) {List<Mqtt5Subscription> subscriptions = new ArrayList<>(topics.length);for (int i = 0; i < topics.length; i++) {subscriptions.add(Mqtt5Subscription.builder().topicFilter(topics[i]).qos(getMqttQos(qosArray[i])).retainAsPublished(hivemqProperties.isRetain()).noLocal(hivemqProperties.isNoLocal()).build());}subscribeDisposable = client.subscribeWith().addSubscriptions(subscriptions).applySubscribe().subscribeOn(Schedulers.io()).retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1).takeWhile(retryCount -> retryCount != -1).flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS))).subscribe(ack -> log.info("【Hivemq-V5】 => MQTT订阅成功,主题: {}", String.join("、", topics)), e -> log.error("【Hivemq-V5】 => MQTT订阅失败,主题:{},错误信息:{}", String.join("、", topics), e.getMessage(), e));}}public void unSubscribe() {String[] topics = hivemqProperties.getTopics().toArray(String[]::new);unSubscribe(topics);}public void unSubscribe(String[] topics) {checkTopic(topics);if (!Objects.isNull(client)) {List<MqttTopicFilter> matchedTopics = new ArrayList<>(topics.length);for (String topic : topics) {matchedTopics.add(MqttTopicFilter.of(topic));}unSubscribeDisposable = client.unsubscribeWith().addTopicFilters(matchedTopics).applyUnsubscribe().subscribeOn(Schedulers.io()).retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1).takeWhile(retryCount -> retryCount != -1).flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS))).subscribe(ack -> log.info("【Hivemq-V5】 => MQTT取消订阅成功,主题:{}", String.join("、", topics)), e -> log.error("【Hivemq-V5】 => MQTT取消订阅失败,主题:{},错误信息:{}", String.join("、", topics), e.getMessage(), e));}}public void publish(String topic, byte[] payload, int qos) {if (!Objects.isNull(client)) {publishDisposable = client.publish(Flowable.just(Mqtt5Publish.builder().topic(topic).qos(getMqttQos(qos)).payload(payload).noMessageExpiry().retain(hivemqProperties.isRetain()).messageExpiryInterval(hivemqProperties.getMessageExpiryInterval()).correlationData(CORRELATION_DATA).payloadFormatIndicator(Mqtt5PayloadFormatIndicator.UTF_8).contentType("text/plain").responseTopic(RESPONSE_TOPIC).build())).subscribeOn(Schedulers.io()).retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1).takeWhile(retryCount -> retryCount != -1).flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS))).subscribe(ack -> log.info("【Hivemq-V5】 => MQTT消息发布成功,topic:{}", topic),e -> log.error("【Hivemq-V5】 => MQTT消息发布失败,topic:{},错误信息:{}", topic, e.getMessage(), e));}}public void publish(String topic, byte[] payload) {publish(topic, payload, hivemqProperties.getPublishQos());}public void dispose(Disposable disposable) {if (!Objects.isNull(disposable) && !disposable.isDisposed()) {// 显式取消订阅disposable.dispose();}}public void dispose() {dispose(connectDisposable);dispose(subscribeDisposable);dispose(unSubscribeDisposable);dispose(publishDisposable);dispose(consumeDisposable);dispose(disconnectDisposable);}public void reSubscribe() {log.info("【Hivemq-V5】 => MQTT重新订阅开始");dispose(subscribeDisposable);subscribe();log.info("【Hivemq-V5】 => MQTT重新订阅结束");}private MqttQos getMqttQos(int qos) {return MqttQos.fromCode(qos);}private void connect() {connectDisposable = client.connectWith().keepAlive(hivemqProperties.getKeepAliveInterval()).cleanStart(hivemqProperties.isClearStart()).sessionExpiryInterval(hivemqProperties.getSessionExpiryInterval()).willPublish().topic("will/topic").payload(WILL_PAYLOAD).qos(getMqttQos(hivemqProperties.getWillQos())).retain(true).messageExpiryInterval(100).delayInterval(10).payloadFormatIndicator(Mqtt5PayloadFormatIndicator.UTF_8).contentType("text/plain").responseTopic(RESPONSE_TOPIC).correlationData(CORRELATION_DATA).applyWillPublish().restrictions().receiveMaximum(hivemqProperties.getReceiveMaximum()).sendMaximum(hivemqProperties.getSendMaximum()).maximumPacketSize(hivemqProperties.getMaximumPacketSize()).sendMaximumPacketSize(hivemqProperties.getSendMaximumPacketSize()).topicAliasMaximum(hivemqProperties.getTopicAliasMaximum()).sendTopicAliasMaximum(hivemqProperties.getSendTopicAliasMaximum()).requestProblemInformation(hivemqProperties.isRequestProblemInformation()).requestResponseInformation(hivemqProperties.isRequestResponseInformation()).applyRestrictions().applyConnect().toFlowable().firstElement().subscribeOn(Schedulers.io()).retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1).takeWhile(retryCount -> retryCount != -1).flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS))).subscribe(ack -> log.info("【Hivemq-V5】 => MQTT连接成功,主机:{},端口:{},客户端ID:{}", hivemqProperties.getHost(),hivemqProperties.getPort(), hivemqProperties.getClientId()),e -> log.error("【Hivemq-V5】 => MQTT连接失败,错误信息:{}", e.getMessage(), e));}private void consume() {if (!Objects.isNull(client)) {consumeDisposable = client.publishes(MqttGlobalPublishFilter.ALL).onBackpressureBuffer(8192).observeOn(Schedulers.computation(), false, 8192).doOnSubscribe(subscribe -> {log.info("【Hivemq-V5】 => MQTT开始订阅消息,请稍候。。。。。。");reSubscribe();}).subscribeOn(Schedulers.io()).retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1).takeWhile(retryCount -> retryCount != -1).flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS))).subscribe(publish -> {for (MessageHandler messageHandler : messageHandlers) {if (messageHandler.isSubscribe(publish.getTopic().toString())) {log.info("【Hivemq-V5】 => MQTT接收到消息,Topic:{}", publish.getTopic());messageHandler.handle(new MqttMessage(publish.getPayloadAsBytes(), publish.getTopic().toString()));}}}, e -> log.error("【Hivemq-V5】 => MQTT消息处理失败,错误信息:{}", e.getMessage(), e),() -> log.info("【Hivemq-V5】 => MQTT订阅消息结束,请稍候。。。。。。"));}}private Mqtt5ClientBuilder getMqtt5ClientBuilder() {Mqtt5ClientBuilder builder = Mqtt5Client.builder().addConnectedListener(listener -> {Optional<? extends MqttClientConnectionConfig> config = Optional.of(listener.getClientConfig().getConnectionConfig()).get();config.ifPresent(mqttClientConnectionConfig -> log.info("【Hivemq-V5】 => MQTT连接保持时间:{}ms",mqttClientConnectionConfig.getKeepAlive()));log.info("【Hivemq-V5】 => MQTT已连接,客户端ID:{}", hivemqProperties.getClientId());}).addDisconnectedListener(listener -> log.error("【Hivemq-V5】 => MQTT已断开连接,客户端ID:{}", hivemqProperties.getClientId())).identifier(hivemqProperties.getClientId()).serverHost(hivemqProperties.getHost()).serverPort(hivemqProperties.getPort()).executorConfig(MqttClientExecutorConfig.builder().nettyExecutor(ThreadUtils.newVirtualTaskExecutor()).nettyThreads(hivemqProperties.getNettyThreads()).applicationScheduler(Schedulers.from(ThreadUtils.newVirtualTaskExecutor())).build());// 开启重连if (hivemqProperties.isAutomaticReconnect()) {builder.automaticReconnect().initialDelay(hivemqProperties.getAutomaticReconnectInitialDelay(), TimeUnit.SECONDS).maxDelay(hivemqProperties.getAutomaticReconnectMaxDelay(), TimeUnit.SECONDS).applyAutomaticReconnect();}if (hivemqProperties.isAuth()) {builder.simpleAuth().username(hivemqProperties.getUsername()).password(hivemqProperties.getPassword().getBytes()).applySimpleAuth();}return builder;}private void checkTopicAndQos(String[] topics, int[] qosArray) {if (topics == null || qosArray == null) {throw new IllegalArgumentException("【" + "Hivemq-V5" + "】 => Topics and QoS arrays cannot be null");}if (topics.length != qosArray.length) {throw new IllegalArgumentException("【" + "Hivemq-V5" + "】 => Topics and QoS arrays must have the same length");}if (topics.length == 0) {throw new IllegalArgumentException("【" + "Hivemq-V5" + "】 => Topics array cannot be empty");}}private void checkTopic(String[] topics) {if (topics.length == 0) {throw new IllegalArgumentException("【" + "Hivemq-V5" + "】 => Topics array cannot be empty");}}}
HivemqV5MqttClientTest
/*** @author laokou*/
@SpringBootTest
@RequiredArgsConstructor
@TestConstructor(autowireMode = TestConstructor.AutowireMode.ALL)
class HivemqV5MqttClientTest {private final List<MessageHandler> messageHandlers;@Testvoid testMqttClient() throws InterruptedException {HivemqProperties hivemqProperties = new HivemqProperties();hivemqProperties.setClientId("test-client-1");hivemqProperties.setTopics(Set.of("/test-topic-1/#"));HivemqClientV5 hivemqClientV5 = new HivemqClientV5(hivemqProperties, messageHandlers);hivemqClientV5.open();hivemqClientV5.publish("/test-topic-1/123", "Hello World123".getBytes());}}
HivemqClientV3
/*** @author laokou*/
@Slf4j
public class HivemqClientV3 {/*** 服务下线数据.*/private final byte[] WILL_PAYLOAD = "offline".getBytes(UTF_8);private final HivemqProperties hivemqProperties;private final List<MessageHandler> messageHandlers;private volatile Mqtt3RxClient client;private final Object lock = new Object();private volatile Disposable connectDisposable;private volatile Disposable subscribeDisposable;private volatile Disposable unSubscribeDisposable;private volatile Disposable publishDisposable;private volatile Disposable disconnectDisposable;private volatile Disposable consumeDisposable;public HivemqClientV3(HivemqProperties hivemqProperties, List<MessageHandler> messageHandlers) {this.hivemqProperties = hivemqProperties;this.messageHandlers = messageHandlers;}public void open() {if (Objects.isNull(client)) {synchronized (lock) {if (Objects.isNull(client)) {client = getMqtt3ClientBuilder().buildRx();}}}connect();consume();}public void close() {if (!Objects.isNull(client)) {disconnectDisposable = client.disconnect().subscribeOn(Schedulers.io()).retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1).takeWhile(retryCount -> retryCount != -1).flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS))).subscribe(() -> log.info("【Hivemq-V3】 => MQTT断开连接成功,客户端ID:{}", hivemqProperties.getClientId()),e -> log.error("【Hivemq-V3】 => MQTT断开连接失败,错误信息:{}", e.getMessage(), e));}}public void subscribe() {String[] topics = getTopics();subscribe(topics, getQosArray(topics));}public String[] getTopics() {return hivemqProperties.getTopics().toArray(String[]::new);}public int[] getQosArray(String[] topics) {return Stream.of(topics).mapToInt(item -> hivemqProperties.getSubscribeQos()).toArray();}public void subscribe(String[] topics, int[] qosArray) {checkTopicAndQos(topics, qosArray);if (!Objects.isNull(client)) {List<Mqtt3Subscription> subscriptions = new ArrayList<>(topics.length);for (int i = 0; i < topics.length; i++) {subscriptions.add(Mqtt3Subscription.builder().topicFilter(topics[i]).qos(getMqttQos(qosArray[i])).build());}subscribeDisposable = client.subscribeWith().addSubscriptions(subscriptions).applySubscribe().subscribeOn(Schedulers.io()).retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1).takeWhile(retryCount -> retryCount != -1).flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS))).subscribe(ack -> log.info("【Hivemq-V3】 => MQTT订阅成功,主题: {}", String.join("、", topics)), e -> log.error("【Hivemq-V3】 => MQTT订阅失败,主题:{},错误信息:{}", String.join("、", topics), e.getMessage(), e));}}public void unSubscribe() {String[] topics = hivemqProperties.getTopics().toArray(String[]::new);unSubscribe(topics);}public void unSubscribe(String[] topics) {checkTopic(topics);if (!Objects.isNull(client)) {List<MqttTopicFilter> matchedTopics = new ArrayList<>(topics.length);for (String topic : topics) {matchedTopics.add(MqttTopicFilter.of(topic));}unSubscribeDisposable = client.unsubscribeWith().addTopicFilters(matchedTopics).applyUnsubscribe().subscribeOn(Schedulers.io()).retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1).takeWhile(retryCount -> retryCount != -1).flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS))).subscribe(() -> log.info("【Hivemq-V3】 => MQTT取消订阅成功,主题:{}", String.join("、", topics)), e -> log.error("【Hivemq-V3】 => MQTT取消订阅失败,主题:{},错误信息:{}", String.join("、", topics), e.getMessage(), e));}}public void publish(String topic, byte[] payload, int qos) {if (!Objects.isNull(client)) {publishDisposable = client.publish(Flowable.just(Mqtt3Publish.builder().topic(topic).qos(getMqttQos(qos)).payload(payload).retain(hivemqProperties.isRetain()).build())).subscribeOn(Schedulers.io()).retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1).takeWhile(retryCount -> retryCount != -1).flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS))).subscribe(ack -> log.info("【Hivemq-V3】 => MQTT消息发布成功,topic:{}", topic),e -> log.error("【Hivemq-V3】 => MQTT消息发布失败,topic:{},错误信息:{}", topic, e.getMessage(), e));}}public void publish(String topic, byte[] payload) {publish(topic, payload, hivemqProperties.getPublishQos());}public void dispose(Disposable disposable) {if (!Objects.isNull(disposable) && !disposable.isDisposed()) {// 显式取消订阅disposable.dispose();}}public void dispose() {dispose(connectDisposable);dispose(subscribeDisposable);dispose(unSubscribeDisposable);dispose(publishDisposable);dispose(consumeDisposable);dispose(disconnectDisposable);}public void reSubscribe() {log.info("【Hivemq-V3】 => MQTT重新订阅开始");dispose(subscribeDisposable);subscribe();log.info("【Hivemq-V3】 => MQTT重新订阅结束");}private MqttQos getMqttQos(int qos) {return MqttQos.fromCode(qos);}private void connect() {connectDisposable = client.connectWith().keepAlive(hivemqProperties.getKeepAliveInterval()).willPublish().topic("will/topic").payload(WILL_PAYLOAD).qos(getMqttQos(hivemqProperties.getWillQos())).retain(true).applyWillPublish().restrictions().sendMaximum(hivemqProperties.getSendMaximum()).sendMaximumPacketSize(hivemqProperties.getSendMaximumPacketSize()).applyRestrictions().applyConnect().toFlowable().firstElement().subscribeOn(Schedulers.io()).retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1).takeWhile(retryCount -> retryCount != -1).flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS))).subscribe(ack -> log.info("【Hivemq-V3】 => MQTT连接成功,主机:{},端口:{},客户端ID:{}", hivemqProperties.getHost(),hivemqProperties.getPort(), hivemqProperties.getClientId()),e -> log.error("【Hivemq-V3】 => MQTT连接失败,错误信息:{}", e.getMessage(), e));}private void consume() {if (!Objects.isNull(client)) {consumeDisposable = client.publishes(MqttGlobalPublishFilter.ALL).onBackpressureBuffer(8192).observeOn(Schedulers.computation(), false, 8192).doOnSubscribe(subscribe -> {log.info("【Hivemq-V3】 => MQTT开始订阅消息,请稍候。。。。。。");reSubscribe();}).subscribeOn(Schedulers.io()).retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1).takeWhile(retryCount -> retryCount != -1).flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS))).subscribe(publish -> {for (MessageHandler messageHandler : messageHandlers) {if (messageHandler.isSubscribe(publish.getTopic().toString())) {log.info("【Hivemq-V3】 => MQTT接收到消息,Topic:{}", publish.getTopic());messageHandler.handle(new MqttMessage(publish.getPayloadAsBytes(), publish.getTopic().toString()));}}}, e -> log.error("【Hivemq-V3】 => MQTT消息处理失败,错误信息:{}", e.getMessage(), e),() -> log.info("【Hivemq-V3】 => MQTT订阅消息结束,请稍候。。。。。。"));}}private Mqtt3ClientBuilder getMqtt3ClientBuilder() {Mqtt3ClientBuilder builder = Mqtt3Client.builder().addConnectedListener(listener -> {Optional<? extends MqttClientConnectionConfig> config = Optional.of(listener.getClientConfig().getConnectionConfig()).get();config.ifPresent(mqttClientConnectionConfig -> log.info("【Hivemq-V5】 => MQTT连接保持时间:{}ms",mqttClientConnectionConfig.getKeepAlive()));log.info("【Hivemq-V3】 => MQTT已连接,客户端ID:{}", hivemqProperties.getClientId());}).addDisconnectedListener(listener -> log.error("【Hivemq-V3】 => MQTT已断开连接,客户端ID:{}", hivemqProperties.getClientId())).identifier(hivemqProperties.getClientId()).serverHost(hivemqProperties.getHost()).serverPort(hivemqProperties.getPort()).executorConfig(MqttClientExecutorConfig.builder().nettyExecutor(ThreadUtils.newVirtualTaskExecutor()).nettyThreads(hivemqProperties.getNettyThreads()).applicationScheduler(Schedulers.from(ThreadUtils.newVirtualTaskExecutor())).build());// 开启重连if (hivemqProperties.isAutomaticReconnect()) {builder.automaticReconnect().initialDelay(hivemqProperties.getAutomaticReconnectInitialDelay(), TimeUnit.SECONDS).maxDelay(hivemqProperties.getAutomaticReconnectMaxDelay(), TimeUnit.SECONDS).applyAutomaticReconnect();}if (hivemqProperties.isAuth()) {builder.simpleAuth().username(hivemqProperties.getUsername()).password(hivemqProperties.getPassword().getBytes()).applySimpleAuth();}return builder;}private void checkTopicAndQos(String[] topics, int[] qosArray) {if (topics == null || qosArray == null) {throw new IllegalArgumentException("【" + "Hivemq-V3" + "】 => Topics and QoS arrays cannot be null");}if (topics.length != qosArray.length) {throw new IllegalArgumentException("【" + "Hivemq-V3" + "】 => Topics and QoS arrays must have the same length");}if (topics.length == 0) {throw new IllegalArgumentException("【" + "Hivemq-V3" + "】 => Topics array cannot be empty");}}private void checkTopic(String[] topics) {if (topics.length == 0) {throw new IllegalArgumentException("【" + "Hivemq-V3" + "】 => Topics array cannot be empty");}}}
HivemqV3MqttClientTest
/*** @author laokou*/
@SpringBootTest
@RequiredArgsConstructor
@TestConstructor(autowireMode = TestConstructor.AutowireMode.ALL)
class HivemqV3MqttClientTest {private final List<MessageHandler> messageHandlers;@Testvoid testMqttClient() throws InterruptedException {HivemqProperties hivemqProperties2 = new HivemqProperties();hivemqProperties2.setClientId("test-client-2");hivemqProperties2.setTopics(Set.of("/test-topic-2/#"));HivemqClientV3 hivemqClientV3 = new HivemqClientV3(hivemqProperties2, messageHandlers);hivemqClientV3.open();hivemqClientV3.publish("/test-topic-2/456", "Hello World456".getBytes());}}
Vert.x MQTT Client【推荐,只兼容mqtt3.1.1】
# Vert.x MQTT文档
引入依赖
<dependencies><dependency><groupId>io.vertx</groupId><artifactId>vertx-mqtt</artifactId><version>4.5.14</version></dependency><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-core</artifactId><version>3.7.5</version></dependency>
</dependencies>
项目集成
MqttClientProperties
/*** @author laokou*/
@Data
public class MqttClientProperties {private boolean auth = true;private String username = "emqx";private String password = "laokou123";private String host = "127.0.0.1";private int port = 1883;private String clientId = UUIDGenerator.generateUUID();// @formatter:off/*** 控制是否创建新会话(true=新建,false=复用历史会话). clearStart=true => Broker 会在连接断开后立即清除所有会话信息.* clearStart=false => Broker 会在连接断开后保存会话信息,并在重新连接后复用会话信息.*/// @formatter:onprivate boolean clearSession = false;private int receiveBufferSize = Integer.MAX_VALUE;private int maxMessageSize = -1;/*** 心跳包每隔60秒发一次.*/private int keepAliveInterval = 60;private boolean autoKeepAlive = true;private long reconnectInterval = 1000;private int reconnectAttempts = Integer.MAX_VALUE;private Map<String, Integer> topics = new HashMap<>(0);private int willQos = 1;private boolean willRetain = false;private int ackTimeout = -1;private boolean autoAck = true;/*** 服务下线主题.*/private String willTopic = "/will";/*** 服务下线数据.*/private String willPayload = "offline";}
VertxConfig
/*** @author laokou*/
@Configuration
public class VertxConfig {@Beanpublic Vertx vertx() {VertxOptions vertxOptions = new VertxOptions();vertxOptions.setMaxEventLoopExecuteTime(60);vertxOptions.setMaxWorkerExecuteTime(60);vertxOptions.setMaxEventLoopExecuteTimeUnit(TimeUnit.SECONDS);vertxOptions.setMaxWorkerExecuteTimeUnit(TimeUnit.SECONDS);vertxOptions.setPreferNativeTransport(true);return Vertx.vertx(vertxOptions);}}
VertxMqttClient
注意:vertx-mqtt不支持客户端自动断线重连,网络不通畅或连接关闭,需要自己手动调用连接!!!实现这个重连的功能
/*** @author laokou*/
@Slf4j
public class VertxMqttClient {private final Sinks.Many<MqttPublishMessage> messageSink = Sinks.many().multicast().onBackpressureBuffer(Integer.MAX_VALUE, false);private final MqttClient mqttClient;private final Vertx vertx;private final MqttClientProperties mqttClientProperties;private final List<MessageHandler> messageHandlers;private final List<Disposable> disposables;private final AtomicBoolean isConnected = new AtomicBoolean(false);private final AtomicBoolean isLoaded = new AtomicBoolean(false);private final AtomicBoolean isReconnected = new AtomicBoolean(true);public VertxMqttClient(final Vertx vertx, final MqttClientProperties mqttClientProperties,final List<MessageHandler> messageHandlers) {this.vertx = vertx;this.mqttClientProperties = mqttClientProperties;this.mqttClient = MqttClient.create(vertx, getOptions());this.messageHandlers = messageHandlers;this.disposables = Collections.synchronizedList(new ArrayList<>());}public void open() {mqttClient.closeHandler(v -> {isConnected.set(false);log.error("【Vertx-MQTT】 => MQTT连接断开,客户端ID:{}", mqttClientProperties.getClientId());reconnect();}).publishHandler(messageSink::tryEmitNext)// 仅接收QoS1和QoS2的消息.publishCompletionHandler(id -> {// log.info("【Vertx-MQTT】 => 接收MQTT的PUBACK或PUBCOMP数据包,数据包ID:{}", id);}).subscribeCompletionHandler(ack -> {// log.info("【Vertx-MQTT】 => 接收MQTT的SUBACK数据包,数据包ID:{}", ack.messageId());}).unsubscribeCompletionHandler(id -> {// log.info("【Vertx-MQTT】 => 接收MQTT的UNSUBACK数据包,数据包ID:{}", id);}).pingResponseHandler(s -> {// log.info("【Vertx-MQTT】 => 接收MQTT的PINGRESP数据包");}).connect(mqttClientProperties.getPort(), mqttClientProperties.getHost(), connectResult -> {if (connectResult.succeeded()) {isConnected.set(true);log.info("【Vertx-MQTT】 => MQTT连接成功,主机:{},端口:{},客户端ID:{}", mqttClientProperties.getHost(),mqttClientProperties.getPort(), mqttClientProperties.getClientId());resubscribe();}else {isConnected.set(false);Throwable ex = connectResult.cause();log.error("【Vertx-MQTT】 => MQTT连接失败,原因:{},客户端ID:{}", ex.getMessage(),mqttClientProperties.getClientId(), ex);reconnect();}});}public void close() {disconnect();}/*** Sends the PUBLISH message to the remote MQTT server.* @param topic topic on which the message is published* @param payload message payload* @param qos QoS level* @param isDup if the message is a duplicate* @param isRetain if the message needs to be retained*/public void publish(String topic, int qos, String payload, boolean isDup, boolean isRetain) {mqttClient.publish(topic, Buffer.buffer(payload), convertQos(qos), isDup, isRetain);}private void reconnect() {if (isReconnected.get()) {log.info("【Vertx-MQTT】 => MQTT尝试重连");vertx.setTimer(mqttClientProperties.getReconnectInterval(),handler -> ThreadUtils.newVirtualTaskExecutor().execute(this::open));}}private void subscribe() {Map<String, Integer> topics = mqttClientProperties.getTopics();checkTopicAndQos(topics);mqttClient.subscribe(topics, subscribeResult -> {if (subscribeResult.succeeded()) {log.info("【Vertx-MQTT】 => MQTT订阅成功,主题: {}", String.join("、", topics.keySet()));}else {Throwable ex = subscribeResult.cause();log.error("【Vertx-MQTT】 => MQTT订阅失败,主题:{},错误信息:{}", String.join("、", topics.keySet()), ex.getMessage(),ex);}});}private void resubscribe() {if (isConnected.get() || mqttClient.isConnected()) {ThreadUtils.newVirtualTaskExecutor().execute(this::subscribe);}if (isLoaded.compareAndSet(false, true)) {ThreadUtils.newVirtualTaskExecutor().execute(this::consume);}}private void consume() {Disposable disposable = messageSink.asFlux().doOnNext(mqttPublishMessage -> {String topic = mqttPublishMessage.topicName();log.info("【Vertx-MQTT】 => MQTT接收到消息,Topic:{}", topic);for (MessageHandler messageHandler : messageHandlers) {if (messageHandler.isSubscribe(topic)) {messageHandler.handle(new MqttMessage(mqttPublishMessage.payload(), topic));}}}).subscribeOn(Schedulers.boundedElastic()).subscribe();disposables.add(disposable);}private void disposable() {for (Disposable disposable : disposables) {if (ObjectUtils.isNotNull(disposable) && !disposable.isDisposed()) {disposable.dispose();}}}private void disconnect() {isReconnected.set(false);mqttClient.disconnect(disconnectResult -> {if (disconnectResult.succeeded()) {disposable();log.info("【Vertx-MQTT】 => MQTT断开连接成功");disposables.clear();}else {Throwable ex = disconnectResult.cause();log.error("【Vertx-MQTT】 => MQTT断开连接失败,错误信息:{}", ex.getMessage(), ex);}});}private void unsubscribe(List<String> topics) {checkTopic(topics);mqttClient.unsubscribe(topics, unsubscribeResult -> {if (unsubscribeResult.succeeded()) {log.info("【Vertx-MQTT】 => MQTT取消订阅成功,主题:{}", String.join("、", topics));}else {Throwable ex = unsubscribeResult.cause();log.error("【Vertx-MQTT】 => MQTT取消订阅失败,主题:{},错误信息:{}", String.join("、", topics), ex.getMessage(), ex);}});}private MqttClientOptions getOptions() {MqttClientOptions options = new MqttClientOptions();options.setClientId(mqttClientProperties.getClientId());options.setCleanSession(mqttClientProperties.isClearSession());options.setAutoKeepAlive(mqttClientProperties.isAutoKeepAlive());options.setKeepAliveInterval(mqttClientProperties.getKeepAliveInterval());options.setReconnectAttempts(mqttClientProperties.getReconnectAttempts());options.setReconnectInterval(mqttClientProperties.getReconnectInterval());options.setWillQoS(mqttClientProperties.getWillQos());options.setWillTopic(mqttClientProperties.getWillTopic());options.setAutoAck(mqttClientProperties.isAutoAck());options.setAckTimeout(mqttClientProperties.getAckTimeout());options.setWillRetain(mqttClientProperties.isWillRetain());options.setWillMessageBytes(Buffer.buffer(mqttClientProperties.getWillPayload()));options.setReceiveBufferSize(mqttClientProperties.getReceiveBufferSize());options.setMaxMessageSize(mqttClientProperties.getMaxMessageSize());if (mqttClientProperties.isAuth()) {options.setPassword(mqttClientProperties.getPassword());options.setUsername(mqttClientProperties.getUsername());}return options;}private void checkTopicAndQos(Map<String, Integer> topics) {topics.forEach((topic, qos) -> {if (StringUtils.isEmpty(topic) || ObjectUtils.isNull(qos)) {throw new IllegalArgumentException("【Vertx-MQTT】 => Topic and QoS cannot be null");}});}private void checkTopic(List<String> topics) {if (CollectionUtils.isEmpty(topics)) {throw new IllegalArgumentException("【Vertx-MQTT】 => Topics list cannot be empty");}}private MqttQoS convertQos(int qos) {return MqttQoS.valueOf(qos);}}
VertxMqttClientTest
/*** @author laokou*/
@SpringBootTest
@RequiredArgsConstructor
@ContextConfiguration(classes = { DefaultMessageHandler.class, VertxConfig.class })
@TestConstructor(autowireMode = TestConstructor.AutowireMode.ALL)
class VertxMqttClientTest {private final List<MessageHandler> messageHandlers;private final Vertx vertx;@Testvoid testMqttClient() throws InterruptedException {MqttClientProperties properties = new MqttClientProperties();properties.setHost("127.0.0.1");properties.setPort(1883);properties.setUsername("emqx");properties.setPassword("laokou123");properties.setClientId("test-client-1");properties.setTopics(Map.of("/test-topic-1/#", 1));VertxMqttClient vertxMqttClient = new VertxMqttClient(vertx, properties, messageHandlers);Assertions.assertDoesNotThrow(vertxMqttClient::open);Thread.sleep(500);Assertions.assertDoesNotThrow(() -> vertxMqttClient.publish("/test-topic-1/test", 1, "test", false, false));Thread.sleep(500);Assertions.assertDoesNotThrow(vertxMqttClient::close);Thread.sleep(500);}}
详细代码请点击
非常推荐使用vertx-mqtt,项目平稳运行好用!!!
但是,需要时注意的是,项目部署到Linux系统,需要最少分配 -Xmx2100m -Xms2100m 内存,不然连接会关闭!
我是老寇,我们下次再见啦~