当前位置: 首页 > news >正文

【阿里云实战】基于MQTT的Java SDK收发消息-终端和终端消息收发

文章目录

    • 前提条件
    • 架构流程图
    • 调用Java SDK收发消息

前提条件

  • 创建资源:MQTT实例、topic、group
  • 获取AK、SK
  • 安装IDE/VScode,本文以VScode为例

架构流程图

在这里插入图片描述

调用Java SDK收发消息

  1. 下载阿里云云消息队列 MQTT 版的Java SDK的Demo示例作为您代码开发的参考。下载地址为mqtt-java-demo。
  2. 克隆示例代码至您指定的文件夹。
  3. 在vscode中,打开下载的代码,并确认pom.xml中已包含以下依赖。
<dependencies><dependency><groupId>commons-codec</groupId><artifactId>commons-codec</artifactId><version>1.10</version></dependency><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.2</version></dependency><dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpclient</artifactId><version>4.5.2</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version></dependency><dependency><groupId>com.aliyun</groupId><artifactId>aliyun-java-sdk-onsmqtt</artifactId><version>1.0.3</version></dependency><dependency><groupId>com.aliyun</groupId><artifactId>aliyun-java-sdk-core</artifactId><version>4.5.0</version></dependency>
</dependencies>

我们主要是用绿色框中的两个文件+pom.xml文件
在这里插入图片描述

  1. 按代码注释说明填写上述两个文件的相应参数,主要涉及您已在创建资源中所创建的MQTT资源信息。示例代码如下。
package com.aliyun.openservices.lmq.example.demo;import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper;/*** 本代码提供签名鉴权模式下 MQ4IOT 客户端发送消息到 MQ4IOT 客户端的示例,其中初始化参数请根据实际情况修改* 签名模式即使用阿里云账号系统提供的 AccessKey 和 SecretKey 对每个客户端计算出一个独立的签名供客户端识别使用。* 对于实际业务场景使用过程中,考虑到私钥 SecretKey 的隐私性,可以将签名过程放在受信任的环境完成。** 完整 demo 工程,参考https://github.com/AliwareMQ/lmq-demo* */public class MQ4IoTConsumerDemo {public static void main(String[] args) throws Exception {/*** MQ4IOT 实例 ID,购买后控制台获取*/String instanceId = "XXXXX"; // **输入你自己的实例ID**/*** 接入点地址,购买 MQ4IOT 实例,且配置完成后即可获取,接入点地址必须填写分配的域名,不得使用 IP 地址直接连接,否则可能会导致客户端异常。*/String endPoint = "XXXXX";  // **输入自己的公网接入点**/*** 账号 accesskey,从账号系统控制台获取* 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。* 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。* 本示例以把AccessKey ID和AccessKey Secret保存在环境变量为例说明。运行本代码示例之前,请先配置环境变量MQTT_AK_ENV和MQTT_SK_ENV* 例如:export MQTT_AK_ENV=<access_key_id>*      export MQTT_SK_ENV=<access_key_secret>* 需要将<access_key_id>替换为已准备好的AccessKey ID,<access_key_secret>替换为AccessKey Secret。*/String accessKey = System.getenv("MQTT_AK_ENV");  // **AK、SK使用环境变量,方式暴露**/*** 账号 secretKey,从账号系统控制台获取,仅在Signature鉴权模式下需要设置*/String secretKey = System.getenv("MQTT_SK_ENV");// 检查环境变量是否设置if (accessKey == null || accessKey.isEmpty() || secretKey == null || secretKey.isEmpty()) {System.err.println("环境变量MQTT_AK_ENV和MQTT_SK_ENV必须设置");System.exit(1);}/*** MQ4IOT clientId,由业务系统分配,需要保证每个 tcp 连接都不一样,保证全局唯一,如果不同的客户端对象(tcp 连接)使用了相同的 clientId 会导致连接异常断开。* clientId 由两部分组成,格式为 GroupID@@@DeviceId,其中 groupId 在 MQ4IOT 控制台申请,DeviceId 由业务方自己设置,clientId 总长度不得超过64个字符。*/String clientId = "GID_XXXXX@@@XXXXX"; // **控制台创建的groupid,clientID自定义,consumer与provider需要设置不同的,这里consumer为demo1,provider为demo2**/*** MQ4IOT 消息的一级 topic,需要在控制台申请才能使用。* 如果使用了没有申请或者没有被授权的 topic 会导致鉴权失败,服务端会断开客户端连接。*/final String parentTopic = "XXXXX";  // **父级topic**/*** MQ4IOT支持子级 topic,用来做自定义的过滤,此处为示意,可以填写任何字符串,具体参考https://help.aliyun.com/document_detail/42420.html?spm=a2c4g.11186623.6.544.1ea529cfAO5zV3* 需要注意的是,完整的 topic 参考 https://help.aliyun.com/document_detail/63620.html?spm=a2c4g.11186623.6.554.21a37f05ynxokW。*/final String mq4IotTopic = parentTopic + "/" + "testMq4Iot"; /*** QoS参数代表传输质量,可选0,1,2,根据实际需求合理设置,具体参考 https://help.aliyun.com/document_detail/42420.html?spm=a2c4g.11186623.6.544.1ea529cfAO5zV3*/final int qosLevel = 0;ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);final MemoryPersistence memoryPersistence = new MemoryPersistence();/*** 客户端使用的协议和端口必须匹配,具体参考文档 https://help.aliyun.com/document_detail/44866.html?spm=a2c4g.11186623.6.552.25302386RcuYFB* 如果是 SSL 加密则设置ssl://endpoint:8883*/final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);/*** 客户端设置好发送超时时间,防止无限阻塞*/mqttClient.setTimeToWait(5000);final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());mqttClient.setCallback(new MqttCallbackExtended() {@Overridepublic void connectComplete(boolean reconnect, String serverURI) {/*** 客户端连接成功后就需要尽快订阅需要的 topic*/System.out.println("connect success");executorService.submit(new Runnable() {@Overridepublic void run() {try {final String topicFilter[] = {mq4IotTopic};final int[] qos = {qosLevel};mqttClient.subscribe(topicFilter, qos);} catch (MqttException e) {e.printStackTrace();}}});}@Overridepublic void connectionLost(Throwable throwable) {System.err.println("Connection lost: " + throwable.getMessage());throwable.printStackTrace();}@Overridepublic void messageArrived(String s, MqttMessage mqttMessage) throws Exception {/*** 消费消息的回调接口,需要确保该接口不抛异常,该接口运行返回即代表消息消费成功。* 消费消息需要保证在规定时间内完成,如果消费耗时超过服务端约定的超时时间,对于可靠传输的模式,服务端可能会重试推送,业务需要做好幂等去重处理。超时时间约定参考限制* https://help.aliyun.com/document_detail/63620.html?spm=a2c4g.11186623.6.546.229f1f6ago55Fj*/System.out.println("receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);}});try {mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());System.out.println("MQTT client connected successfully");// 添加关闭钩子以优雅地关闭连接Runtime.getRuntime().addShutdownHook(new Thread(() -> {try {if (mqttClient.isConnected()) {mqttClient.disconnect();System.out.println("MQTT client disconnected");}executorService.shutdown();if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {executorService.shutdownNow();}} catch (Exception e) {System.err.println("Error while shutting down: " + e.getMessage());}}));// 保持程序运行while (true) {Thread.sleep(1000);}} catch (MqttException e) {System.err.println("Failed to connect to MQTT broker: " + e.getMessage());e.printStackTrace();}}
}
  1. 先运行consumer,再运行provider,成功后可以去MQTT控制台查看消息轨迹。
// Consumer 运行命令:
cd /Users/basumin/Desktop/项目/MQTT-demo/mqtt-demo/lmq-java-demo && MQTT_AK_ENV=XXXXXX MQTT_SK_ENV=XXXXXX java -cp "classes:lib/*" com.aliyun.openservices.lmq.example.demo.MQ4IoTConsumerDemo// producer 运行命令:
cd /Users/basumin/Desktop/项目/MQTT-demo/mqtt-demo/lmq-java-demo && MQTT_AK_ENV=XXXXXX MQTT_SK_ENV=XXXXXX java -cp "classes:lib/*" com.aliyun.openservices.lmq.example.demo.MQ4IoTProducerDemo

启动如下图:
在这里插入图片描述
在这里插入图片描述
6. 控制台查看消息轨迹,输入group ID,与device ID,点击查询。
在这里插入图片描述

参考链接:
快速使用MQTT的Java SDK收发消息(终端和终端消息收发)

http://www.xdnf.cn/news/1418977.html

相关文章:

  • 汽车曲柄连杆机构cad+ea113+设计说明书
  • 深入理解Java虚拟机:JVM高级特性与最佳实践(第3版)第八章知识点问答(18题)
  • 从理论到RTL,实战实现高可靠ECC校验(附完整开源代码/脚本)(3) RTL实现实战
  • DBeaver社区版AI助手(AI Assistant)设置
  • 基于Hadoop与层次聚类技术的电子游戏销售分析系统的设计与实现
  • 机器翻译:python库PyGTranslator的详细使用
  • (论文速读)3DTopia-XL:高质量3D资产生成技术
  • FOUPK3云服务平台旗下产品
  • ARM-进阶汇编指令
  • linux安装gitlab详细教程,本地管理源代码
  • 存储掉电强制拉库引起ORA-01555和ORA-01189/ORA-01190故障处理---惜分飞
  • 英伟达Newton与OpenTwins如何重构具身智能“伴随式数采”范式
  • 【ElasticSearch实用篇-04】Boost权重底层原理和基本使用
  • Ruoyi项目MyBatis升级MyBatis-Plus指南
  • linux:离线/无网环境安装docker
  • 从Java全栈开发到微服务架构:一次真实的面试实录
  • (Arxiv-2025)HunyuanCustom:一种面向多模态驱动的定制化视频生成架构
  • vizard-将长视频变成适合社交的短视频AI工具
  • 【JavaWeb】之HTML(对HTML细节的一些总结)
  • vue3使用路由router
  • 大规模异构数据挖掘与数据架构
  • C++ STL序列容器-------list
  • 【LeetCode】3524. 求出数组的 X 值 I (动态规划)
  • 机器学习(四)KNN算法-分类
  • 13 选 list 还是 vector?C++ STL list 扩容 / 迭代器失效问题 + 模拟实现,对比后再做选择
  • MVC、三层架构
  • 手写MyBatis第46弹:多插件责任链模式的实现原理与执行顺序奥秘--MyBatis插件架构深度解析
  • 2025 数字化转型期,值得关注的 10 项高价值证书解析
  • T507 音频调试
  • Redis--Lua脚本以及在SpringBoot中的使用