您好,登錄后才能下訂單哦!
本篇內容主要講解“redis流數據推送多用戶的方法是什么”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“redis流數據推送多用戶的方法是什么”吧!
1 當用戶幾百 幾千個時 如何推送?取締線程池 采用單線程異步同步推送。
2 現在的邏輯:
每次項目重新啟動: 初始化channel 、服務端斷開連接-重新連接。當有服務連接不上的時候定時器連接。
這樣會產生一個問題:異步連接的任務特別多 導致服務奔潰
/** * 關閉連接時 */@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception { InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress(); int port = ipSocket.getPort(); String host = ipSocket.getHostString(); String serverUrl = host + ":" + port; String prev = redisTemplate.opsForValue().get(SOCKET_CONNECT_PREFIX + serverUrl); Integer cur = Math.toIntExact(System.currentTimeMillis() / 1000); if (null == prev || cur - Integer.parseInt(prev) > 20 * 60) { redisTemplate.opsForValue().set(SOCKET_CONNECT_PREFIX + serverUrl, cur + ""); log.info("服務端斷開連接=====" + host + port + "20分鐘之后 重新連接"); final EventLoop eventLoop = ctx.channel().eventLoop(); Bootstrap bootstrap = defaultProcessHandler.getBootstrap(eventLoop); eventLoop.schedule( () -> defaultProcessHandler.doConnect(bootstrap, host + ":" + port, new AtomicInteger(0)), 20, TimeUnit.MINUTES); } else { log.warn(serverUrl + "距離上次斷開連接不足20分鐘==" + (cur - Integer.parseInt(prev)) + "s"); } super.channelInactive(ctx);}
在系統啟動初始化 連接10次:
3 如何保證發送數據的完整性 TCP 粘包問題
服務端添加按換行符分隔的解碼器;
serverBootstrap.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) {
//此方法每次客戶端連接都會調用,是為通道初始化的方法
//獲得通道channel中的管道鏈(執行鏈、handler鏈)
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new LineBasedFrameDecoder(Short.MAX_VALUE * 10));
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringHandler());
log.info("success to initHandler!");
}
});
ChannelFuture future = channels.get(callbackUrl.getUrl());if (null != future) { try { boolean result = sendMsg(future, pushDataStr); this.redisTemplate.opsForValue().set(callbackUrl.getUrl(), offset.toString()); if (!result) { log.error("this channel push failed {}", callbackUrl.getUrl()); returnVal = false; } } catch (Exception e) { log.error("push exception", e); returnVal = false; }}return returnVal;
當推送成功 會記錄次用戶 此次的數據游標數據到redis.
當推送服務掛斷之后,會進行任務的初始化 此時會從redis中讀取每個客戶上次讀取的位置offest 提交到任務線程池
這個問題同樣解決了服務重啟之后,依然可以從上次讀取結束的位置接著讀取。讀取任務的開始游標位置 :是上次服務成功處理后的游標。
BaseReceiver 接口 handler 方法 每次接收到數據 返回當前游標值。當進行業務處理成功之后 返回true .會自動進行后續數據讀取。當客戶那邊的接收數據服務端掛了之后 首先會進行自動重連操作,此時讀取datahub數據的線程依然在返回數據 但是不能推送成功,所以游標值 不會后移。
到此,相信大家對“redis流數據推送多用戶的方法是什么”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。