目录 一、原生java架构 二、SpringBoot集成MQTT 2.1 导入POM文件 2.2 在YML文件中增加配置 2.3 新建Properties配置文件映射配置 2.4 创建连接工厂 2.5 增加入站规则配置 2.6 增加出站规则配置 2.7 创建消息发送网关 2.8 测试消息发送 2.9 项目结构
一、原生java架构
1.1 导入POM文件
< dependency> < groupId> org.eclipse.paho</ groupId> < artifactId> org.eclipse.paho.client.mqttv3</ artifactId> < version> 1.2.5</ version>
</ dependency>
1.2 编写测试用例
package com. ming ; import org. eclipse. paho. client. mqttv3. * ;
import org. eclipse. paho. client. mqttv3. persist. MemoryPersistence ;
import org. junit. jupiter. api. Test ;
public class MqttPahoTest { private final String serverURI = "tcp://localhost:1883" ; private final String clientId = "emqx_spring_client_132" ; @Test public MqttClient createConnection ( ) throws MqttException { MqttClient client = new MqttClient ( serverURI, clientId, new MemoryPersistence ( ) ) ; MqttConnectOptions mqttConnectOptions = new MqttConnectOptions ( ) ; mqttConnectOptions. setUserName ( "admin" ) ; mqttConnectOptions. setPassword ( "admin" . toCharArray ( ) ) ; mqttConnectOptions. setCleanSession ( true ) ; client. connect ( mqttConnectOptions) ; return client; } @Test public void sendMsg ( ) throws MqttException { MqttClient client = createConnection ( ) ; MqttMessage mqttMessage = new MqttMessage ( ) ; mqttMessage. setQos ( 2 ) ; mqttMessage. setPayload ( "Hello World" . getBytes ( ) ) ; client. publish ( "java/a" , mqttMessage) ; client. disconnect ( ) ; client. close ( ) ; } @Test public void receiveMsg ( ) throws MqttException { MqttClient client = new MqttClient ( serverURI, clientId, new MemoryPersistence ( ) ) ; MqttConnectOptions mqttConnectOptions = new MqttConnectOptions ( ) ; mqttConnectOptions. setUserName ( "admin" ) ; mqttConnectOptions. setPassword ( "admin" . toCharArray ( ) ) ; mqttConnectOptions. setCleanSession ( true ) ; client. setCallback ( new MqttCallback ( ) { @Override public void connectionLost ( Throwable throwable) { System . out. println ( "Connection lost..." ) ; } @Override public void messageArrived ( String topic, MqttMessage mqttMessage) throws Exception { System . out. println ( String . format ( "%s ---> %s" , topic, new String ( mqttMessage. getPayload ( ) ) ) ) ; } @Override public void deliveryComplete ( IMqttDeliveryToken iMqttDeliveryToken) { System . out. println ( "Delivery complete" ) ; } } ) ; client. connect ( mqttConnectOptions) ; client. subscribe ( "java/b" , 2 ) ; while ( true ) ; }
}
二、SpringBoot集成MQTT
2.1 导入POM文件
< dependency> < groupId> org.springframework.boot</ groupId> < artifactId> spring-boot-starter-integration</ artifactId>
</ dependency>
< dependency> < groupId> org.springframework.integration</ groupId> < artifactId> spring-integration-mqtt</ artifactId> < version> 5.4.3</ version>
</ dependency>
< dependency> < groupId> org.projectlombok</ groupId> < artifactId> lombok</ artifactId>
</ dependency>
< dependency> < groupId> com.alibaba</ groupId> < artifactId> fastjson</ artifactId> < version> 1.2.83</ version>
</ dependency>
2.2 在YML文件中增加配置
spring : mqtt : username : adminpassword : adminurl : tcp: //localhost: 1883 subClientId : sub_client_id_123subTopic : atguigu/iot/lamp/line1, atguigu/iot/lamp/line2pubClientId : pub_client_id_123
2.3 新建Properties配置文件映射配置
package com. ming. properties ; import lombok. Data ;
import org. springframework. boot. context. properties. ConfigurationProperties ; @Data
@ConfigurationProperties ( prefix = "spring.mqtt" )
public class MqttConfigurationProperties { private String username; private String password; private String url; private String subClientId; private String subTopic; private String pubClientId;
}
2.4 创建连接工厂
package com. ming. config ; import com. ming. properties. MqttConfigurationProperties ;
import org. eclipse. paho. client. mqttv3. MqttConnectOptions ;
import org. springframework. beans. factory. annotation. Autowired ;
import org. springframework. context. annotation. Bean ;
import org. springframework. context. annotation. Configuration ;
import org. springframework. integration. mqtt. core. DefaultMqttPahoClientFactory ;
import org. springframework. integration. mqtt. core. MqttPahoClientFactory ;
@Configuration
public class MqttConfiguration { @Autowired private MqttConfigurationProperties mqttConfigurationProperties; @Bean public MqttPahoClientFactory mqttPahoClientFactory ( ) { DefaultMqttPahoClientFactory mqttPahoClientFactory = new DefaultMqttPahoClientFactory ( ) ; MqttConnectOptions options = new MqttConnectOptions ( ) ; options. setUserName ( mqttConfigurationProperties. getUsername ( ) ) ; options. setPassword ( mqttConfigurationProperties. getPassword ( ) . toCharArray ( ) ) ; options. setServerURIs ( new String [ ] { mqttConfigurationProperties. getUrl ( ) } ) ; mqttPahoClientFactory. setConnectionOptions ( options) ; return mqttPahoClientFactory; }
}
2.5 增加入站规则配置
package com. ming. config ; import com. ming. handler. ReceiverMessageHandler ;
import com. ming. properties. MqttConfigurationProperties ;
import org. springframework. beans. factory. annotation. Autowired ;
import org. springframework. context. annotation. Bean ;
import org. springframework. context. annotation. Configuration ;
import org. springframework. integration. annotation. ServiceActivator ;
import org. springframework. integration. channel. DirectChannel ;
import org. springframework. integration. core. MessageProducer ;
import org. springframework. integration. mqtt. core. MqttPahoClientFactory ;
import org. springframework. integration. mqtt. inbound. MqttPahoMessageDrivenChannelAdapter ;
import org. springframework. integration. mqtt. support. DefaultPahoMessageConverter ;
import org. springframework. messaging. MessageChannel ;
import org. springframework. messaging. MessageHandler ;
@Configuration
public class MqttInboundConfiguration { @Autowired private MqttConfigurationProperties mqttConfigurationProperties; @Autowired private MqttPahoClientFactory mqttPahoClientFactory; @Autowired private ReceiverMessageHandler receiverMessageHandler; @Bean public MessageChannel messageInboundChannel ( ) { return new DirectChannel ( ) ; } @Bean public MessageProducer messageProducer ( ) { MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter = new MqttPahoMessageDrivenChannelAdapter ( mqttConfigurationProperties. getUrl ( ) , mqttConfigurationProperties. getSubClientId ( ) , mqttPahoClientFactory, mqttConfigurationProperties. getSubTopic ( ) . split ( "," ) ) ; mqttPahoMessageDrivenChannelAdapter. setQos ( 1 ) ; mqttPahoMessageDrivenChannelAdapter. setConverter ( new DefaultPahoMessageConverter ( ) ) ; mqttPahoMessageDrivenChannelAdapter. setOutputChannel ( messageInboundChannel ( ) ) ; return mqttPahoMessageDrivenChannelAdapter; } @Bean @ServiceActivator ( inputChannel = "messageInboundChannel" ) public MessageHandler messageHandler ( ) { return receiverMessageHandler; }
}
package com. ming. handler ; import org. springframework. messaging. Message ;
import org. springframework. messaging. MessageHandler ;
import org. springframework. messaging. MessagingException ;
import org. springframework. stereotype. Component ;
@Component
public class ReceiverMessageHandler implements MessageHandler { @Override public void handleMessage ( Message < ? > message) throws MessagingException { Object payload = message. getPayload ( ) ; System . out. println ( message. getHeaders ( ) . get ( "mqtt_receivedTopic" ) ) ; System . out. println ( payload) ; }
}
2.6 增加出站规则配置
package com. ming. config ; import com. ming. properties. MqttConfigurationProperties ;
import org. springframework. beans. factory. annotation. Autowired ;
import org. springframework. context. annotation. Bean ;
import org. springframework. context. annotation. Configuration ;
import org. springframework. integration. annotation. ServiceActivator ;
import org. springframework. integration. channel. DirectChannel ;
import org. springframework. integration. mqtt. core. MqttPahoClientFactory ;
import org. springframework. integration. mqtt. outbound. MqttPahoMessageHandler ;
import org. springframework. messaging. MessageChannel ;
import org. springframework. messaging. MessageHandler ;
@Configuration
public class MqttOutboundConfiguration { @Autowired private MqttConfigurationProperties mqttConfigurationProperties; @Autowired private MqttPahoClientFactory mqttPahoClientFactory; @Bean public MessageChannel mqttOutboundChannel ( ) { return new DirectChannel ( ) ; } @Bean @ServiceActivator ( inputChannel = "mqttOutboundChannel" ) public MessageHandler mqttOutboundMessageHandler ( ) { MqttPahoMessageHandler mqttPahoMessageHandler = new MqttPahoMessageHandler ( mqttConfigurationProperties. getUrl ( ) , mqttConfigurationProperties. getPubClientId ( ) , mqttPahoClientFactory) ; mqttPahoMessageHandler. setDefaultQos ( 0 ) ; mqttPahoMessageHandler. setDefaultTopic ( "default" ) ; mqttPahoMessageHandler. setAsync ( true ) ; return mqttPahoMessageHandler; }
}
2.7 创建消息发送网关
package com. ming. getway ; import org. springframework. integration. annotation. MessagingGateway ;
import org. springframework. integration. mqtt. support. MqttHeaders ;
import org. springframework. messaging. handler. annotation. Header ;
@MessagingGateway ( defaultRequestChannel = "mqttOutboundChannel" )
public interface MqttGetWay { public abstract void sendMsgToMqtt ( @Header ( value = MqttHeaders . TOPIC) String topic, String payload) ; public abstract void sendMsgToMqtt ( @Header ( value = MqttHeaders . TOPIC) String topic, @Header ( value = MqttHeaders . QOS) int qos, String payload) ;
}
package com. ming. service ; import com. ming. getway. MqttGetWay ;
import org. springframework. beans. factory. annotation. Autowired ;
import org. springframework. stereotype. Component ; @Component
public class MqttMessageSender { @Autowired private MqttGetWay mqttGetWay; public void sendMsg ( String topic, String message) { mqttGetWay. sendMsgToMqtt ( topic, message) ; } public void sendMsg ( String topic, int qos, String message) { mqttGetWay. sendMsgToMqtt ( topic, qos, message) ; }
}
2.8 测试消息发送
package com. ming ; import com. ming. service. MqttMessageSender ;
import org. junit. jupiter. api. Test ;
import org. springframework. beans. factory. annotation. Autowired ;
import org. springframework. boot. test. context. SpringBootTest ; @SpringBootTest ( classes = SpringMqttDemoApplication . class )
public class MqttMessageSenderTest { @Autowired private MqttMessageSender mqttMessageSender; @Test public void sendToMsg ( ) { mqttMessageSender. sendMsg ( "java/c" , "hello mqtt spring boot ..." ) ; }
}
2.9 项目结构