您好,登錄后才能下訂單哦!
本篇內容主要講解“spring integration怎么連接MQTT”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“spring integration怎么連接MQTT”吧!
MQTT一種物聯網數據傳輸協議,構建在TCP之上,采用發布與訂閱的模式進行數據交互,發布與訂閱是兩個獨立的連接通道,這里采用spring-integration-mqt來實現發布與訂閱MQTT,與直接采用MQTT的SDK相對要簡單許多,服務端采用ActiveMQ來支持MQTT的消息服務并實現消息轉發。
這里只需要引入這一個包即可。
<dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> <version>5.3.1.RELEASE</version> </dependency>
和spring-integration集成一樣,需要配置相對應的入站、出站就可以了
具體配置如下:
package org.noka.serialservice.config; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.noka.serialservice.service.MsgSendService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.ApplicationEvent; import org.springframework.context.ApplicationListener; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.config.EnableIntegration; import org.springframework.integration.endpoint.MessageProducerSupport; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.event.MqttSubscribedEvent; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.outbound.AbstractMqttMessageHandler; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.integration.support.MessageBuilder; /**-------------------------------------------------------------- * MQTT 數據轉發服務 * mqtt.services MQTT服務地址不配置時,不會啟用該服務 * 檢測mqtt.services這個參數是否配置,以確定是否啟用MQTT服務 * @author xiefangjian@163.com * @version 1.0.0 **------------------------------------------------------------*/ @EnableIntegration @Configuration @ConditionalOnProperty("mqtt.services") public class MQTTConfig implements ApplicationListener<ApplicationEvent> { private static Logger logger = LoggerFactory.getLogger(MQTTConfig.class); private final MsgSendService msgSendService;//發布消息到消息中間件接口 @Value("${mqtt.appid:mqtt_id}") private String appid;//客戶端ID @Value("${mqtt.input.topic:mqtt_input_topic}") private String[] inputTopic;//訂閱主題,可以是多個主題 @Value("${mqtt.out.topic:mqtt_out_topic}") private String[] outTopic;//發布主題,可以是多個主題 @Value("${mqtt.services:#{null}}") private String[] mqttServices;//服務器地址以及端口 @Value("${mqtt.user:#{null}}") private String user;//用戶名 @Value("${mqtt.password:#{null}}") private String password;//密碼 @Value("${mqtt.KeepAliveInterval:300}") private Integer KeepAliveInterval;//心跳時間,默認為5分鐘 @Value("${mqtt.CleanSession:false}") private Boolean CleanSession;//是否不保持session,默認為session保持 @Value("${mqtt.AutomaticReconnect:true}") private Boolean AutomaticReconnect;//是否自動重聯,默認為開啟自動重聯 @Value("${mqtt.CompletionTimeout:30000}") private Long CompletionTimeout;//連接超時,默認為30秒 @Value("${mqtt.Qos:1}") private Integer Qos;//通信質量,詳見MQTT協議 public MQTTConfig(MsgSendService msgSendService) { this.msgSendService = msgSendService; } /** * MQTT連接配置 * @return 連接工廠 */ @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();//連接工廠類 MqttConnectOptions options = new MqttConnectOptions();//連接參數 options.setServerURIs(mqttServices);//連接地址 if(null!=user) { options.setUserName(user);//用戶名 } if(null!=password) { options.setPassword(password.toCharArray());//密碼 } options.setKeepAliveInterval(KeepAliveInterval);//心跳時間 options.setAutomaticReconnect(AutomaticReconnect);//斷開是否自動重聯 options.setCleanSession(CleanSession);//保持session factory.setConnectionOptions(options); return factory; } /** * 入站管道 * @param mqttPahoClientFactory * @return */ @Bean public MessageProducerSupport mqttInput(MqttPahoClientFactory mqttPahoClientFactory){ MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(appid, mqttPahoClientFactory, inputTopic);//建立訂閱連接 DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(); converter.setPayloadAsBytes(true);//bytes類型接收 adapter.setCompletionTimeout(CompletionTimeout);//連接超時的時間 adapter.setConverter(converter); adapter.setQos(Qos);//消息質量 adapter.setOutputChannelName(ChannelName.INPUT_DATA);//輸入管道名稱 return adapter; } /** * 向服務器發送數據管道綁定 * @param connectionFactory tcp連接工廠類 * @return 消息管道對象 */ @Bean @ServiceActivator(inputChannel = ChannelName.OUTPUT_DATA_MQTT) public AbstractMqttMessageHandler MQTTOutAdapter(MqttPahoClientFactory connectionFactory) { //創建一個新的出站管道,由于MQTT的發布與訂閱是兩個獨立的連接,因此客戶端的ID(即APPID)不能與訂閱時所使用的ID一樣,否則在服務端會認為是同一個客戶端,而造成連接失敗 MqttPahoMessageHandler outGate = new MqttPahoMessageHandler(appid + "_put", connectionFactory); DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(); converter.setPayloadAsBytes(true);//bytes類型接收 outGate.setAsync(true); outGate.setCompletionTimeout(CompletionTimeout);//設置連接超時時時 outGate.setDefaultQos(Qos);//設置通信質量 outGate.setConverter(converter); return outGate; } /** * MQTT連接時調用的方法 * @param event */ @Override public void onApplicationEvent(ApplicationEvent event) { if (event instanceof MqttSubscribedEvent) { String msg = "OK"; /**------------------連接時需要發送起始消息,寫在這里-------------**/ msgSendService.send(MessageBuilder.withPayload(msg.getBytes()).build()); } } }
來標識入站、出站管道的名稱,以便在其它需要的地方使用,實現方法如下:
/** ----------------------------------------- * 管道名稱常量類 * @author xiefangjian@163.com * @version 1.0.0 ** ---------------------------------------**/ public class ChannelName { public final static String INPUT_DATA="input_data";//入站管道 public final static String OUTPUT_DATA_TCP="output_data_TCP";//TCP出站管道 public final static String OUTPUT_DATA_MQTT="output_data_MQTT";//mqtt出站管道名稱 }
此時所有配置完成,接下來需要做的就是處理接收到的數據和發布數據,以上配置完成以后,接收和發送數據都是通過數據管道來完成,配置的是數據管道名稱。
用于向指定的數據管道里面發送數據,實現如下:
package org.noka.serialservice.service; import org.noka.serialservice.config.ChannelName; import org.springframework.integration.annotation.Gateway; import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.Message; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; /**---------------------------------------------------------------- * 發送消息網關,其它需要發向服務器發送消息時,調用該接口 * @author xiefangjian@163.com * @version 1.0.0 **--------------------------------------------------------------**/ @MessagingGateway @Component public interface MsgGateway { /** * MQTT 發送網關 * @param a 主題,可以指定不同的數據發布主題,在消息中間件里面體現為不同的消息隊列 * @param out 消息內容 */ @Gateway(requestChannel = ChannelName.OUTPUT_DATA_MQTT) void send(@Header(MqttHeaders.TOPIC) String a, Message<byte[]> out); }
在需要的地方,可以向下面這樣調用這個接口,向MQTT服務器發送消息
//topic為主題名稱,out為消息內容 msgGateway.send(topic, out);
會自動調將數據放入配置的入站數據管道中,在需要接收數據的地方,向下面這樣配置即可
/** * 服務器有數據下發 * 用ServiceActivator配置需要接收的數據管道名稱,當該管道里面的數據時,會自動調用該方法 * @param in 服務器有數據下發時,序列化后的對象,這里使用byte數組 */ @ServiceActivator(inputChannel = ChannelName.INPUT_DATA) public void upCase(Message<byte[]> in) { logger.info("[net service data]========================================"); logger.info("[net dow data]"+new String(in.getPayload()));//字符串方式打印服務器下發的數據 logger.info("[net dow hex]"+ Hex.encodeHexString(in.getPayload(),false));//16進制方式打印服務器下發的數據 serialService.send(in.getPayload());//將服務器下發的數據轉發給串口 }
#--------MQTT--------------------------- #設備ID,唯一標識 mqtt.appid=mqtt_id #訂閱主題,多個主題用逗號分隔 mqtt.input.topic=mqtt_input_topic #發布主題 mqtt.out.topic=mqtt_out_topic,aac #MQTT服務器地址,可以是多個地址 mqtt.services=tcp://47.244.191.41:1883 #mqtt用戶名,默認無 #mqtt.user=guest #mqtt密碼,默認無 #mqtt.password=guest #心跳間隔時間,默認3000 #mqtt.KeepAliveInterval=3000 #是否不保持session,默認false #mqtt.CleanSession=false #是否自動連接,默認true #mqtt.AutomaticReconnect=true #連接超時,默認30000 #mqtt.CompletionTimeout=30000 #傳輸質量,默認1 #mqtt.Qos=1
到此,相信大家對“spring integration怎么連接MQTT”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。