- 实体
public class MqttMessagePayload {private String traceId;private String mdcSn;
}
- 配置
@ConfigurationProperties(prefix = "sdk.mqtt")
public class MqttProperties {private boolean enabled = false;private String brokerUrl;private String clientId;private String topic;
}
sdk:mqtt:enabled: truebroker-url: tcp://localhost:1883client-id: my-client-idtopic: my-topic
- 配置类
@Configuration
@ConditionalOnProperty(name = "sdk.mqtt.enabled", havingValue = "true")
@EnableConfigurationProperties(MqttProperties.class)
public class MqttAutoConfiguration {@Beanpublic MqttOutboundChannelAdapter mqttOutbound(MqttProperties properties) {MqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(properties.getClientId(), factory);messageHandler.setAsync(true);messageHandler.setDefaultTopic(properties.getTopic());return new MqttOutboundChannelAdapter(messageHandler);}
}
- service
public class MqttMessageSender {private final MessageChannel mqttOutboundChannel;public MqttMessageSender(MessageChannel mqttOutboundChannel) {this.mqttOutboundChannel = mqttOutboundChannel;}public void sendMessage(MqttMessagePayload payload) {try {GenericMessage<String> message = new GenericMessage<>(new ObjectMapper().writeValueAsString(payload));mqttOutboundChannel.send(message);} catch (Exception e) {throw new MqttSendException("MQTT消息发送失败", e);}}
}
- SDK集成
sdk.mqtt.enabled: true
@Autowired(required = false)
private MqttMessageSender mqttMessageSender;public void someBusinessLogic() {if (mqttMessageSender != null) {mqttMessageSender.sendMessage(new MqttMessagePayload("trace123", "mdc456"));}
}