您好,登錄后才能下訂單哦!
這期內容當中小編將會給大家帶來有關如何從構建分布式秒殺系統進行WebSocket推送通知,文章內容豐富且以專業的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。
秒殺架構到后期,我們采用了消息隊列的形式實現搶購邏輯,那么之前拋出過這樣一個問題:消息隊列異步處理完每個用戶請求后,如何通知給相應用戶秒殺成功?
首先,我們舉一個生活中比較常見的例子:我們去銀行辦理業務,一般會選擇相關業務打印一個排號紙,然后就可以坐在小板凳上玩著手機,等待被小喇叭報號。當小喇叭喊到你所持有的號碼,就可以拿著排號紙去柜臺辦理自己的業務。
這里,假設當我們取排號紙的時候,銀行根據時間段內的排隊情況,比較人性化的提示用戶:排隊人數較多,您是否繼續等待?否的話我們可以換個時間段再來辦理。
由此我們把生活場景映射到真實的秒殺業務邏輯中來:
我們可以把柜臺比喻成商品下單處理邏輯單元
拿到排號紙說明你進入相應商品處理隊列
拿到排號紙的請求直接返回前臺,提示用戶搶購進行中
排號紙進入隊列后,等待商品業務處理邏輯
小喇叭叫到自己的排號相當于服務端通知用戶秒殺成功,這時候可以進行支付邏輯
那些拿不到票號的同學,相當于隊列已滿直接返回秒殺失敗
通過上面的場景,我們很容易能夠想到一種方案就是服務端通知,那么如何做到服務端異步通知的呢?下面,主角開始登場了,就是我們的Websocket。
WebSocket是HTML5開始提供的一種瀏覽器與服務器間進行全雙工通訊的網絡技術。依靠這種技術可以實現客戶端和服務器端的長連接,雙向實時通信。
HTTP VS WebSocket
特點:
異步、事件觸發
可以發送文本,圖片等流文件
數據格式比較輕量,性能開銷小,通信高效
使用ws或者wss協議的客戶端socket,能夠實現真正意義上的推送功能
缺點:
部分瀏覽器不支持,瀏覽器支持的程度與方式有區別,需要各種兼容寫法。
由于我們的秒殺架構項目案例中使用了SpringBoot,因此集成webSocket也是相對比較簡單的。
首先pom.xml引入以下依賴:
<!-- webSocket 秒殺通知--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency>
WebSocketConfig 配置:
/** * WebSocket配置 * 創建者 爪哇筆記 * 創建時間 2018年5月29日 */ @Configuration public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }
WebSocketServer 配置:
@ServerEndpoint("/websocket/{userId}") @Component public class WebSocketServer { private final static Logger log = LoggerFactory.getLogger(WebSocketServer.class); //靜態變量,用來記錄當前在線連接數。應該把它設計成線程安全的。 private static int onlineCount = 0; //concurrent包的線程安全Set,用來存放每個客戶端對應的MyWebSocket對象。 private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>(); //與某個客戶端的連接會話,需要通過它來給客戶端發送數據 private Session session; //接收userId private String userId=""; /** * 連接建立成功調用的方法*/ @OnOpen public void onOpen(Session session,@PathParam("userId") String userId) { this.session = session; webSocketSet.add(this); //加入set中 addOnlineCount(); //在線數加1 log.info("有新窗口開始監聽:"+userId+",當前在線人數為" + getOnlineCount()); this.userId=userId; try { sendMessage("連接成功"); } catch (IOException e) { log.error("websocket IO異常"); } } /** * 連接關閉調用的方法 */ @OnClose public void onClose() { webSocketSet.remove(this); //從set中刪除 subOnlineCount(); //在線數減1 log.info("有一連接關閉!當前在線人數為" + getOnlineCount()); } /** * 收到客戶端消息后調用的方法 * @param message 客戶端發送過來的消息*/ @OnMessage public void onMessage(String message, Session session) { log.info("收到來自窗口"+userId+"的信息:"+message); //群發消息 for (WebSocketServer item : webSocketSet) { try { item.sendMessage(message); } catch (IOException e) { e.printStackTrace(); } } } /** * @param session * @param error */ OnError public void onError(Session session, Throwable error) { log.error("發生錯誤"); error.printStackTrace(); } /** * 實現服務器主動推送 */ public void sendMessage(String message) throws IOException { this.session.getBasicRemote().sendText(message); } /** * 群發自定義消息 * */ public static void sendInfo(String message,@PathParam("userId") String userId){ log.info("推送消息到窗口"+userId+",推送內容:"+message); for (WebSocketServer item : webSocketSet) { try { //這里可以設定只推送給這個userId的,為null則全部推送 if(userId==null) { item.sendMessage(message); }else if(item.userId.equals(userId)){ item.sendMessage(message); } } catch (IOException e) { continue; } } } public static synchronized int getOnlineCount() { return onlineCount; } public static synchronized void addOnlineCount() { WebSocketServer.onlineCount++; } public static synchronized void subOnlineCount() { WebSocketServer.onlineCount--; } }
KafkaConsumer 消費配置,通知用戶是否秒殺成功:
/** * 消費者 spring-kafka 2.0 + 依賴JDK8 * @author 科幫網 By https://blog.52itstyle.com */ @Component public class KafkaConsumer { @Autowired private ISeckillService seckillService; private static RedisUtil redisUtil = new RedisUtil(); /** * 監聽seckill主題,有消息就讀取 * @param message */ @KafkaListener(topics = {"seckill"}) public void receiveMessage(String message){ //收到通道的消息之后執行秒殺操作 String[] array = message.split(";"); if(redisUtil.getValue(array[0])!=null){//control層已經判斷了,其實這里不需要再判斷了 Result result = seckillService.startSeckil(Long.parseLong(array[0]), Long.parseLong(array[1])); if(result.equals(Result.ok())){ WebSocketServer.sendInfo(array[0].toString(), "秒殺成功");//推送給前臺 } else{ WebSocketServer.sendInfo(array[0].toString(), "秒殺失敗");//推送給前臺 redisUtil.cacheValue(array[0], "ok");//秒殺結束 } }else{ WebSocketServer.sendInfo(array[0].toString(), "秒殺失敗");//推送給前臺 } } }
webSocket.js 前臺通知邏輯:
$(function(){ socket.init();}); var basePath = "ws://localhost:8080/seckill/"; socket = { webSocket : "", init : function() { //userId:自行追加 if ('WebSocket' in window) { webSocket = new WebSocket(basePath+'websocket/1'); } else if ('MozWebSocket' in window) { webSocket = new MozWebSocket(basePath+"websocket/1"); } else { webSocket = new SockJS(basePath+"sockjs/websocket"); } webSocket.onerror = function(event) { alert("websockt連接發生錯誤,請刷新頁面重試!") }; webSocket.onopen = function(event) { }; webSocket.onmessage = function(event) { var message = event.data; alert(message)//判斷秒殺是否成功、自行處理邏輯 }; } }
send() 向遠程服務器發送數據
close() 關閉該websocket鏈接
onopen 當網絡連接建立時觸發該事件
onerror 當網絡發生錯誤時觸發該事件
onclose 當websocket被關閉時觸發該事件
onmessage 當websocket接收到服務器發來的消息的時觸發的事件,也是通信中最重要的一個監聽事件。msg.data
這個屬性可以返回websocket所處的狀態。
CONNECTING(0) websocket正嘗試與服務器建立連接
OPEN(1) websocket與服務器已經建立連接
CLOSING(2) websocket正在關閉與服務器的連接
CLOSED(3) websocket已經關閉了與服務器的連接
GoEasy實時Web推送,支持后臺推送和前臺推送兩種:后臺推送可以選擇Java SDK、 Restful API支持所有開發語言;前臺推送:JS推送。無論選擇哪種方式推送代碼都十分簡單(10分鐘可搞定)。由于它支持websocket 和polling兩種連接方式所以兼顧大多數主流瀏覽器,低版本的IE瀏覽器也是支持的。
Pushlets 是通過長連接方式實現“推”消息的。推送模式分為:Poll(輪詢)、Pull(拉)。
Pushlet 是一個開源的 Comet 框架,Pushlet 使用了觀察者模型:客戶端發送請求,訂閱感興趣的事件;服務器端為每個客戶端分配一個會話 ID 作為標記,事件源會把新產生的事件以多播的方式發送到訂閱者的事件隊列里。
其實前面有提過,盡管WebSocket有諸多優點,但是,如果服務端維護很多長連接也是挺耗費資源的,服務器集群以及覽器或者客戶端兼容性問題,也會帶來了一些不確定性因素。大體了解了一下各大廠的做法,大多數都還是基于輪詢的方式實現的,比如:騰訊PC端微信掃碼登錄、京東商城支付成功通知等等。
有些小伙伴可能會問了,輪詢豈不是會更耗費資源?其實在我看來,有些輪詢是不可能穿透到后端數據庫查詢服務的,比如秒殺,一個緩存標記位就可以判定是否秒殺成功。相對于WS的長連接以及其不確定因素,在秒殺場景下,輪詢還是相對比較合適的。
上述就是小編為大家分享的如何從構建分布式秒殺系統進行WebSocket推送通知了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關知識,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。