您好,登錄后才能下訂單哦!
一.導入Netty依賴
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.25.Final</version> </dependency>
二.搭建websocket服務器
@Component public class WebSocketServer { /** * 主線程池 */ private EventLoopGroup bossGroup; /** * 工作線程池 */ private EventLoopGroup workerGroup; /** * 服務器 */ private ServerBootstrap server; /** * 回調 */ private ChannelFuture future; public void start() { future = server.bind(9001); System.out.println("netty server - 啟動成功"); } public WebSocketServer() { bossGroup = new NioEventLoopGroup(); workerGroup = new NioEventLoopGroup(); server = new ServerBootstrap(); server.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new WebsocketInitializer()); } }
三.初始化Websocket
public class WebsocketInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // ------------------ // 用于支持Http協議 // ------------------ // websocket基于http協議,需要有http的編解碼器 pipeline.addLast(new HttpServerCodec()); // 對寫大數據流的支持 pipeline.addLast(new ChunkedWriteHandler()); // 添加對HTTP請求和響應的聚合器:只要使用Netty進行Http編程都需要使用 //設置單次請求的文件的大小 pipeline.addLast(new HttpObjectAggregator(1024 * 64)); //webSocket 服務器處理的協議,用于指定給客戶端連接訪問的路由 :/ws pipeline.addLast(new WebSocketServerProtocolHandler("/ws")); // 添加Netty空閑超時檢查的支持 // 1. 讀空閑超時(超過一定的時間會發送對應的事件消息) // 2. 寫空閑超時 // 3. 讀寫空閑超時 pipeline.addLast(new IdleStateHandler(4, 8, 12)); //添加心跳處理 pipeline.addLast(new HearBeatHandler()); // 添加自定義的handler pipeline.addLast(new ChatHandler()); } }
四.創建Netty監聽器
@Component public class NettyListener implements ApplicationListener<ContextRefreshedEvent> { @Resource private WebSocketServer websocketServer; @Override public void onApplicationEvent(ContextRefreshedEvent event) { if(event.getApplicationContext().getParent() == null) { try { websocketServer.start(); } catch (Exception e) { e.printStackTrace(); } } } }
五.建立消息通道
public class UserChannelMap { /** * 用戶保存用戶id與通道的Map對象 */ // private static Map<String, Channel> userChannelMap; /* static { userChannelMap = new HashMap<String, Channel>(); }*/ /** * 定義一個channel組,管理所有的channel * GlobalEventExecutor.INSTANCE 是全局的事件執行器,是一個單例 */ private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); /** * 存放用戶與Chanel的對應信息,用于給指定用戶發送消息 */ private static ConcurrentHashMap<String,Channel> userChannelMap = new ConcurrentHashMap<>(); private UserChannelMap(){} /** * 添加用戶id與channel的關聯 * @param userNum * @param channel */ public static void put(String userNum, Channel channel) { userChannelMap.put(userNum, channel); } /** * 根據用戶id移除用戶id與channel的關聯 * @param userNum */ public static void remove(String userNum) { userChannelMap.remove(userNum); } /** * 根據通道id移除用戶與channel的關聯 * @param channelId 通道的id */ public static void removeByChannelId(String channelId) { if(!StringUtils.isNotBlank(channelId)) { return; } for (String s : userChannelMap.keySet()) { Channel channel = userChannelMap.get(s); if(channelId.equals(channel.id().asLongText())) { System.out.println("客戶端連接斷開,取消用戶" + s + "與通道" + channelId + "的關聯"); userChannelMap.remove(s); UserService userService = SpringUtil.getBean(UserService.class); userService.logout(s); break; } } } /** * 打印所有的用戶與通道的關聯數據 */ public static void print() { for (String s : userChannelMap.keySet()) { System.out.println("用戶id:" + s + " 通道:" + userChannelMap.get(s).id()); } } /** * 根據好友id獲取對應的通道 * @param receiverNum 接收人編號 * @return Netty通道 */ public static Channel get(String receiverNum) { return userChannelMap.get(receiverNum); } /** * 獲取channel組 * @return */ public static ChannelGroup getChannelGroup() { return channelGroup; } /** * 獲取用戶channel map * @return */ public static ConcurrentHashMap<String,Channel> getUserChannelMap(){ return userChannelMap; } }
六.自定義消息類型
public class Message { /** * 消息類型 */ private Integer type; /** * 聊天消息 */ private String message; /** * 擴展消息字段 */ private Object ext; public Integer getType() { return type; } public void setType(Integer type) { this.type = type; } public MarketChatRecord getChatRecord() { return marketChatRecord; } public void setChatRecord(MarketChatRecord chatRecord) { this.marketChatRecord = chatRecord; } public Object getExt() { return ext; } public void setExt(Object ext) { this.ext = ext; } @Override public String toString() { return "Message{" + "type=" + type + ", marketChatRecord=" + marketChatRecord + ", ext=" + ext + '}'; } }
七.創建處理消息的handler
public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { private static final Logger log = LoggerFactory.getLogger(WebSocketServer.class); /** * 用來保存所有的客戶端連接 */ private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); /** *當Channel中有新的事件消息會自動調用 */ @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { // 當接收到數據后會自動調用 // 獲取客戶端發送過來的文本消息 Gson gson = new Gson(); log.info("服務器收到消息:{}",msg.text()); System.out.println("接收到消息數據為:" + msg.text()); Message message = gson.fromJson(msg.text(), Message.class); //根據業務要求進行消息處理 switch (message.getType()) { // 處理客戶端連接的消息 case 0: // 建立用戶與通道的關聯 // 處理客戶端發送好友消息 break; case 1: // 處理客戶端的簽收消息 break; case 2: // 將消息記錄設置為已讀 break; case 3: // 接收心跳消息 break; default: break; } } // 當有新的客戶端連接服務器之后,會自動調用這個方法 @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { log.info("handlerAdded 被調用"+ctx.channel().id().asLongText()); // 添加到channelGroup 通道組 UserChannelMap.getChannelGroup().add(ctx.channel()); // clients.add(ctx.channel()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.info("{異常:}"+cause.getMessage()); // 刪除通道 UserChannelMap.getChannelGroup().remove(ctx.channel()); UserChannelMap.removeByChannelId(ctx.channel().id().asLongText()); ctx.channel().close(); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { log.info("handlerRemoved 被調用"+ctx.channel().id().asLongText()); //刪除通道 UserChannelMap.getChannelGroup().remove(ctx.channel()); UserChannelMap.removeByChannelId(ctx.channel().id().asLongText()); UserChannelMap.print(); } }
八.處理心跳
public class HearBeatHandler extends ChannelInboundHandlerAdapter { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if(evt instanceof IdleStateEvent) { IdleStateEvent idleStateEvent = (IdleStateEvent)evt; if(idleStateEvent.state() == IdleState.READER_IDLE) { System.out.println("讀空閑事件觸發..."); } else if(idleStateEvent.state() == IdleState.WRITER_IDLE) { System.out.println("寫空閑事件觸發..."); } else if(idleStateEvent.state() == IdleState.ALL_IDLE) { System.out.println("---------------"); System.out.println("讀寫空閑事件觸發"); System.out.println("關閉通道資源"); ctx.channel().close(); } } } }
搭建完成后調用測試
1.頁面訪問http://localhost:9001/ws
2.端口號9001和訪問路徑ws都是我們在上邊配置的,然后傳入我們自定義的消息message類型。
3.大概流程:消息發送 :用戶1先連接通道,然后發送消息給用戶2,用戶2若是在線直接可以發送給用戶,若沒在線可以將消息暫存在redis或者通道里,用戶2鏈接通道的話,兩者可以直接通訊。
消息推送 :用戶1連接通道,根據通道id查詢要推送的人是否在線,或者推送給所有人,這里我只推送給指定的人。
到此這篇關于SpringBoot+Netty+WebSocket實現消息發送的示例代碼的文章就介紹到這了,更多相關SpringBoot Netty WebSocket消息發送內容請搜索億速云以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持億速云!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。