Spring整合MQTT使用
MQTT使用
消息发送和接收
引入依赖
<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version></dependency>
编写发送消息客户端
clientId不能重复,重复的clientId会导致已连接的客户端下线
/*** 消息发送* @throws MqttException*/public void send() throws MqttException {//url和clientId请修改为自己的MqttClient client = new MqttClient("tcp://101.43.94.164:1883", "testuser1-send1");//设置回调client.setCallback(new MqttCallback() {@Overridepublic void connectionLost(Throwable throwable) {System.out.println("连接丢失");}@Overridepublic void messageArrived(String s, MqttMessage mqttMessage) throws Exception {System.out.println("消息到达");}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {System.out.println("消息发送完成");}});//连接参数设置MqttConnectOptions connectOptions = new MqttConnectOptions();//用户名和密码请修改为自己的connectOptions.setUserName("testuser1");connectOptions.setPassword("testuser1".toCharArray());client.connect(connectOptions);//主题String topic = "topic/test2";int qos = 1;String msg = "Hello MQTT 1";MqttMessage message = new MqttMessage(msg.getBytes());message.setQos(qos);client.publish(topic, message);client.disconnect();client.close();}
消息接收
请先启动消息接收客户端,再开启消息发送客户端,否则消息发送后没有客户端接收,消息会被丢失
public void recevie() throws MqttException, InterruptedException {MqttClient client = new MqttClient("tcp://101.43.94.164:1883", "testuser1-receive123");MqttConnectOptions connectOptions = new MqttConnectOptions();connectOptions.setUserName("testuser1");connectOptions.setPassword("testuser1".toCharArray());connectOptions.setAutomaticReconnect(true);client.connect(connectOptions);String topic = "topic/test2";CountDownLatch countDownLatch = new CountDownLatch(1);client.setCallback(new MqttCallback() {@Overridepublic void connectionLost(Throwable throwable) {System.out.println("连接丢失");}@Overridepublic void messageArrived(String s, MqttMessage mqttMessage) throws Exception {System.out.println("消息到达:" + mqttMessage.toString() + " " + s);countDownLatch.countDown();}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {System.out.println("消息发送完成");}});client.subscribe(topic, 1);countDownLatch.await();client.disconnect();client.close();}
topic(主题)
MQTT 协议规定主题的长度为两个字节,因此主题最多可包含 65,535 个字符。
MQTT中的t主题不需要用户提前创建,如果代码中使用了不存在的主题,MQTT会自动创建这个主题。
mqtt通过/对消息分层。为了便于理解,不建议使用/chat和chat/。
topic订阅规则
订阅一个主题很简单,如果主题是topic,那么那么使用/topic就可以订阅了;但是如果想一次订阅多个主题时,就需要用到通配符了。
常用通配符:
- +:用于单个主题层级匹配的通配符
test/+/ip //有效
test+ //无效
test/+ //无效
如果客户端订阅了test/+/ip主题,那么会收到以下主题的消息:
test/1/ip
test/2/ip
test/3/ip
不会收到以下主题的消息:
test/ip
test/wind/1/ip
- #:用于匹配主题中任意层级的通配符;多层通配符表示它的父级和任意数量的子层级,在使用多层通配符时,它必须占据整个层级并且必须是主题的最后一个字符
如果订阅了test/#主题,那么会收到以下主题的消息:
test
test/ip
test/1/ip
- :以:以:以$SYS开头的主题为系统主题,系统主题主要用户获取MQTT服务器自身运行状态、消息统计、客户端上下线事件等数据
通配符主题订阅的性能弱于普通主题订阅,且会消耗更多的服务器资源,用户可根据实际业务情况选择订阅类型。
MQTT和Spring的整合
引入依赖
<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency>
核心配置:
@Configuration
public class MQTTConfig {@Value("${mqtt.broker-url}")private String brokerUrl;@Value("${mqtt.client-id}")private String clientId;@Value("${mqtt.username}")private String username;@Value("${mqtt.password}")private String password;@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();MqttConnectOptions options = new MqttConnectOptions();options.setServerURIs(new String[]{brokerUrl});// 连接断开时自动重连options.setAutomaticReconnect(true);// MQTT服务器认证用户名options.setUserName(username);// MQTT服务器认证密码options.setPassword(password.toCharArray());factory.setConnectionOptions(options);System.out.println("Connecting to broker: " + brokerUrl + " OK.");return factory;}@Beanpublic MqttPahoMessageDrivenChannelAdapter mqttInbound() {// 创建入站适配器:监听MQTT消息并转发到Spring Integration通道MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId + "-inbound", // 入站客户端ID(唯一,避免与其他客户端冲突)mqttClientFactory(), // 使用前面定义的客户端工厂"testSub/#" // 订阅的MQTT主题("#"为通配符,表示订阅"testSub/"下所有子主题));// 消息接收后转发到的通道名称adapter.setOutputChannelName("mqttInputChannel");// 设置订阅QoS级别(1表示确保消息至少到达一次)adapter.setQos(1);return adapter;}/*** 网关指定通过该网关发送的消息,默认会被路由到名为 指定的消息通道(这里是配置类中定义的 mqttOutboundChannel Bean),* 最终由通道连接的 MqttPahoMessageHandler 处理并发送到 MQTT 服务器。* @return*/@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MessageHandler mqttOutbound() {MqttPahoMessageHandler handler = new MqttPahoMessageHandler(clientId + "-outbound",mqttClientFactory());handler.setAsync(true);handler.setDefaultQos(1);return handler;}@Beanpublic MessageChannel mqttInputChannel() {// 入站消息通道(接收MQTT消息)return new DirectChannel();}@Beanpublic MessageChannel mqttOutboundChannel() {// 出站消息通道(发送MQTT消息)return new DirectChannel();}
}
MessageChannel 是 Spring Integration 的核心概念,相当于消息流转的“管道”:
- mqttInputChannel:入站适配器(mqttInbound)接收的 MQTT 消息会流入此通道,后续可通过 @ServiceActivator 绑定处理器消费消息(当前代码未实现,需补充消息处理逻辑)
- mqttOutboundChannel:业务代码通过 MqttMessageGateway 发送的消息会先流入此通道,再由出站处理器(mqttOutbound)发送到 MQTT 服务器。
发送消息
/*** Description: @MessagingGateway 注解的接口,用于发送 MQTT 消息,会自动生成代理类,代理类会将方法参数转换为 Spring Integration 消息(Message),* 通过 defaultRequestChannel = "mqttOutboundChannel" 指定: 所有通过该网关发送的消息,默认会被路由到名为 mqttOutboundChannel* 的消息通道(对应配置类中定义的 mqttOutboundChannel Bean),最终由通道连接的 MqttPahoMessageHandler 处理并发送到 MQTT 服务器。*/
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttMessageGateway {void sendMessage(@Header(MqttHeaders.TOPIC) String topic, String payload);
}
接收消息
@Component
public class MqttMessageListener {@Bean@ServiceActivator(inputChannel = "mqttInputChannel") //绑定消息入站通道public MessageHandler handler() {return new MessageHandler() {@Overridepublic void handleMessage(Message<?> message) throws MessagingException {String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();String payload = message.getPayload().toString();Logger log = LoggerFactory.getLogger(MqttMessageListener.class);log.info("[MQTT] 收到消息: topic={}, payload={}", topic, payload);try {if (topic.startsWith("testSub/")) {//进行业务处理}} else if (topic.startsWith("report/allpoints")) {//进行业务处理}}} catch (Exception e) {log.error("[MQTT] 消息处理失败: {}", e.getMessage());}}};}
}
- 接收消息: MQTT服务器 → mqttInbound(入站适配器,订阅主题)→ mqttInputChannel(通道)→ 消息处理器(需补充,如存储/业务处理)。
- 发送消息: 业务代码 → MqttMessageGateway(消息网关)→ mqttOutboundChannel(通道)→ mqttOutbound(出站处理器)→ MQTT服务器。
参考
- MQTT 主题与通配符(Topics & Wildcards)入门手册
- Java对接MQTT协议的完整实现