您好,登錄后才能下訂單哦!
在很多互聯網應用系統中,請求處理異步化是提升系統性能一種常用的手段,而基于消息系統的異步處理由于具備高可靠性、高吞吐量的特點,因而在并發請求量比較高的互聯網系統中被廣泛應用。與此同時,這種方案也帶來了調用鏈路處理上的問題,因為大部分應用請求都會要求同步響應實時處理結果,而由于請求的處理過程已經通過消息異步解耦,所以整個調用鏈路就變成了異步鏈路,此時請求鏈路的發起者如何同步拿到響應結果,就需要進行額外的系統設計考慮。
為了更清晰地理解這個問題,小碼哥以最近正在做的共享單車的IOT系統為例,給大家來一張圖描述下,如圖所示:
在上述系統流程中,終端設備與服務端之間通過MQTT協議相連,而MQTT協議本質上是一種異步消息的連接方式,因此業務應用(如圖中的訂單系統)發起開鎖請求后,IOT應用系統會以MQTT協議的方式通過物聯網平臺(此處使用的是AWS IOT服務)向設備發起開鎖下行消息,而這一過程在IOT應用系統完成與物聯網平臺的交互后同步調用鏈路就結束了,因為物聯網平臺與鎖設備之間通過MQTT消息服務異步解耦了,當然物聯網平臺會通過一系列可靠消息機制來確保開鎖消息能夠發送到指定設備的監聽MQTT隊列。而至于鎖設備是否能夠及時接收到開鎖下行MQTT消息,則取決于鎖設備當時的移動網絡情況。
鎖設備在收到MQTT開鎖消息后,會通過嵌入式軟件系統觸發硬件設備完成開鎖動作,之后就需要通過MQTT上行消息將開鎖結果反饋到服務端,從而由服務端系統判斷是否創建騎行訂單并計算費用。這一過程需要物聯網平臺監聽指定鎖設備相應的MQTT上行消息隊列,由于物聯網平臺需要與上萬個設備進行連接,因而不可能將每一個鎖設備所產生的MQTT上行消息都直接轉發給Iot應用系統,因此在物聯網平臺可以將一類的設備MQTT消息轉發至特定的業務消息隊列中,例如開鎖上行消息,所有設備的MQTT開鎖響應上行消息都可以轉發到表示開鎖響應的Iot業務消息隊列,如“iot_upstream_lock_response”,這樣Iot業務系統則不需要再關注底層設備的MQTT消息,可以用更利于業務理解的方式去處理開鎖響應結果。
現在的問題是通過MQTT協議的開鎖下行消息、上行消息已經完全處于兩條不同的異步網絡鏈路,而鏈路的發起者此時卻需要同步等待開鎖結果,但是實際上同步鏈路早已在Iot應用系統向物聯網平臺發送開鎖消息后就已經完成,所以為了滿足調用方的同步請求/響應需要就需要在Iot應用系統的下發開鎖消息后進行額外的同步阻塞等待,并監聽開鎖響應的Iot業務消息隊列“iot_upstream_lock_response”關于此次開鎖請求的上行消息,之后再結束掉之前的同步阻塞等待邏輯,從而實現向業務調用方返回實時開鎖響應結果的同步調用效果。那么在上述流程中如何實施額外的同步阻塞以及如何進行回調消息的監聽呢?在接下來的內容中就和大家一起探討具體的實施方案!
以上問題在使用消息服務進行異步解耦的應用場景中是比較普遍的需求,由于異步調用鏈路非常長所以通用的解決思路是在調用鏈的起始端進行同步阻塞,而在調用鏈的結束端通過回調的方式來實現,如下圖所示:
在上述圖示中,鏈路起始隊列處在發送第一次異步消息后會開啟一個臨時隊列并同步阻塞監聽該臨時隊列的回調消息,而鏈路的結束隊列在完成邏輯處理后需要回調起始隊列監聽的臨時隊列,而由于請求線程一直處于阻塞監聽該臨時隊列的狀態,所以一旦收到回調消息就可以結束阻塞執行后續流程,從而完成整個鏈路的同步響應。
雖然常見的消息中間件都可以實現以上邏輯,例如小碼哥之前所在的公司就基于RabbitMQ通過臨時隊列的方式實現過消息鏈路的同步調用,但是基于消息中間件的方式多少還是顯得有些繁瑣,對于常見的消息中間件如RocketMQ、RabbitMQ來說異步消息才是其強項,如果以大量臨時隊列的創建和銷毀為代價來實現消息調用鏈路的同步,不僅從使用上來說顯得有些麻煩,并且也會對消息中間件的穩定性帶來一些不好的影響。
因此在前面提到的IOT系統中,我們采用了基于Redis的發布/訂閱功能來實現異步消息鏈路的同步化調用。而由于Redis的高性能以及Redis的應用場景非常豐富,并且非常適合數據頻繁變動的場景,在系統中既可以作為NoSQL數據庫來使用,同時還支持分布式鎖等功能,因而維護的性價比也相對較高。接下來我們就基于Spring Boot的開發框架來演示如何利用Redis的發布/訂閱來實現異步消息鏈路的同步回調!
Redis本身可以通過發布訂閱機制實現一定的消息隊列功能,在Redis中通過subscribe/publish等命令可以實現發布訂閱功能,基于此原先的IOT系統處理示意圖如下:
如上圖所示,在IOT應用端發送異步MQTT消息后會以消息ID組成的Key作為頻道**,并保持請求線程對該頻道的同步監聽**,直到收到Iot業務消息隊列的開鎖結果上行消息后,在消息隊列的消費端將該上行消息發布至同樣以消息requestId組成的頻道中,從而實現基于Redis發布訂閱機制的異步消息系統同步調用效果。
下面我們基于Spring Boot演示如何通過代碼進行實現,創建Spring Boot工程后引入Spring Boot Redis集成依賴包,如下:
<!--Redis依賴-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
之后在項目的配置文件中添加Redis服務連接信息,如下所示:
spring:
redis:
host: 127.0.0.1
port: 6379
password: 123456
此時項目就具備了訪問Redis的能力,接下來我們通過具體的代碼實現來進行功能演示。訂閱監聽代碼如下:
@RestController
@RequestMapping("/iot")
public class IotController {
//注入Redis消息容器對象
@Autowired
RedisMessageListenerContainer redisMessageListenerContainer;
@RequestMapping(value = "/unLock", method = RequestMethod.POST)
public boolean unLock(@RequestParam(value = "thingName") String thingName,
@RequestParam(value = "requestId") String requestId)
throws InterruptedException, ExecutionException, TimeoutException {
//此處實現異步消息調用處理....
//生成監聽頻道Key
String key = "IOT_" + thingName + "_" + requestId;
//創建監聽Topic
ChannelTopic channelTopic = new ChannelTopic(key);
//創建消息任務對象
IotMessageTask iotMessageTask = new IotMessageTask();
//任務對象及監聽Topic添加到消息監聽容器
try {
redisMessageListenerContainer.addMessageListener(new IotMessageListener(iotMessageTask), channelTopic);
System.out.println("start redis subscribe listener->" + key);
//進入同步阻塞等待,超時時間設置為60秒
Message message = (Message) iotMessageTask.getIotMessageFuture().get(60000, TimeUnit.MILLISECONDS);
System.out.println("receive redis callback message->" + message.toString());
} finally {
//銷毀消息監聽對象
if (iotMessageTask != null) {
redisMessageListenerContainer.removeMessageListener(iotMessageTask.getMessageListener());
}
}
return true;
}
}
在上述代碼中,我們模擬了一個開鎖請求,在完成異步消息處理后會開啟Redis訂閱監聽,為了實現異步阻塞還需要我們創建消息任務對象,代碼如下:
public class IotMessageTask<T> {
//聲明線程異步阻塞對象(JDK 1.8新提供Api)
private CompletableFuture<T> iotMessageFuture = new CompletableFuture<>();
//聲明消息監聽對象
private MessageListener messageListener;
//聲明超時時間
private boolean isTimeout;
public IotMessageTask() {
}
public CompletableFuture<T> getIotMessageFuture() {
return iotMessageFuture;
}
public void setIotMessageFuture(CompletableFuture<T> iotMessageFuture) {
this.iotMessageFuture = iotMessageFuture;
}
public MessageListener getMessageListener() {
return messageListener;
}
public void setMessageListener(MessageListener messageListener) {
this.messageListener = messageListener;
}
public boolean isTimeout() {
return isTimeout;
}
public void setTimeout(boolean timeout) {
isTimeout = timeout;
}
}
在消息任務對象中我們通過JDK1.8新提供的CompletableFuture類實現線程阻塞效果,并通過定義消息監聽對象及超時時間完善處理機制。此外根據Controller層代碼還需要自定義定義消息監聽處理對象,代碼如下:
public class IotMessageListener implements MessageListener {
IotMessageTask iotMessageTask;
public IotMessageListener(IotMessageTask iotMessageTask) {
this.iotMessageTask = iotMessageTask;
}
//實現消息發布監聽處理方法
@Override
public void onMessage(Message message, byte[] bytes) {
System.out.println("subscribe redis iot task response:{}" + message.toString());
//線程阻塞完成
iotMessageTask.getIotMessageFuture().complete(message);
}
}
此時就完成了Redis服務訂閱這部分邏輯的編寫,在后續的邏輯處理中需要完成消息的發布才能正常結束此處的阻塞等待,接下來我們寫一段代碼來模擬消息發布,代碼如下:
@RestController
@RequestMapping("/iot")
public class IotCallBackController {
//引入Redis客戶端操作對象
@Autowired
StringRedisTemplate stringRedisTemplate;
@RequestMapping(value = "/unLockCallBack", method = RequestMethod.POST)
public boolean unLockCallBack(@RequestParam(value = "thingName") String thingName,
@RequestParam(value = "requestId") String requestId) {
//生成監聽頻道Key
String key = "IOT_" + thingName + "_" + requestId;
//模擬實現消息回調
stringRedisTemplate.convertAndSend(key, "this is a redis callback");
return true;
}
}
此時啟動Spring Boot應用調用開鎖模擬接口,邏輯就會暫時處于訂閱等待狀態;之后再模擬調用開鎖回調Redis消息發布邏輯,之前的阻塞等待就會因為監聽回調而完成同步返回。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。