您好,登錄后才能下訂單哦!
本篇內容主要講解“netty的怎么實現及運用到gmq中”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“netty的怎么實現及運用到gmq中”吧!
書接上文手寫MQ框架(三)-客戶端實現 ,前面通過web的形式實現了mq的服務端和客戶端,現在計劃使用netty來改造一下。前段時間學習了一下netty的使用。大概有一些想法。
netty封裝了socket的使用,我們通過簡單的調用即可構建高性能的網絡應用。我計劃采用以下例子來對gmq進行改造。
Netty是由JBOSS提供的一個java開源框架。Netty提供異步的、事件驅動的網絡應用程序框架和工具,用以快速開發高性能、高可靠性的網絡服務器和客戶端程序。
netty是一個java框架,是網絡編程框架,支持異步、事件驅動的特性,所以性能表現很好。
Handler是處理器,handler 是由 Netty 生成用來處理 I/O 事件的。
package me.lovegao.netty.learnw3c.mqdemo; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; public class SimpleServerHandler extends SimpleChannelInboundHandler<String> { public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { Channel incoming = ctx.channel(); System.out.println("[SERVER] - " + incoming.remoteAddress() + " 加入\n"); channels.add(ctx.channel()); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { Channel incoming = ctx.channel(); System.out.println("[SERVER] - " + incoming.remoteAddress() + " 離開\n"); channels.remove(ctx.channel()); } @Override protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception { Channel incoming = ctx.channel(); System.out.println("[" + incoming.remoteAddress() + "]" + s); if(s == null || s.length() == 0) { incoming.writeAndFlush("消息是空的呀!\n"); } else { // MqRouter<?> mqRouter = JSONObject.parseObject(s, MqRouter.class); // System.out.println(mqRouter.getUri()); String responseMsg = "收到了," + s + "\n"; incoming.writeAndFlush(responseMsg); } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { Channel incoming = ctx.channel(); System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"在線"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { Channel incoming = ctx.channel(); System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"掉線"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { Channel incoming = ctx.channel(); System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"異常"); cause.printStackTrace(); ctx.close(); } }
SimpleServerInitializer 用來增加多個的處理類到 ChannelPipeline 上,包括編碼、解碼、SimpleServerHandler 等。
package me.lovegao.netty.learnw3c.mqdemo; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.Delimiters; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; public class SimpleServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter())); pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast("handler", new SimpleServerHandler()); System.out.println("SimpleChatClient:" + ch.remoteAddress() + "連接上"); } }
package me.lovegao.netty.learnw3c.mqdemo; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; public class SimpleServer { private int port; public SimpleServer(int port) { this.port = port; } public void run() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .childHandler(new SimpleServerInitializer()).option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true); System.out.println("SimpleChatServer 啟動了"); ChannelFuture f = b.bind(port).sync(); f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); System.out.println("SimpleChatServer 關閉了"); } } public static void main(String[] args) throws Exception { int port; if (args.length > 0) { port = Integer.parseInt(args[0]); } else { port = 8080; } new SimpleServer(port).run(); } }
package me.lovegao.netty.learnw3c.mqdemo; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; public class SimpleClientHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception { System.out.println("收到的信息:" + s); } }
package me.lovegao.netty.learnw3c.mqdemo; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.Delimiters; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; public class SimpleClientInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter())); pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast("handler", new SimpleClientHandler()); } }
package me.lovegao.netty.learnw3c.mqdemo; import java.io.BufferedReader; import java.io.InputStreamReader; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; public class SimpleClient { private final String host; private final int port; public SimpleClient(String host, int port) { this.host = host; this.port = port; } public static void main(String[] args) throws Exception { new SimpleClient("localhost", 8080).run(); } public void run() throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap() .group(group) .channel(NioSocketChannel.class) .handler(new SimpleClientInitializer()); Channel channel = bootstrap.connect(host, port).sync().channel(); BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); while(true) { String line = in.readLine(); if(line.equals("exit!")) { break; } channel.writeAndFlush(line + "\r\n"); } } catch(Exception e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } }
在我把教程上的代碼略微改了一下,測試時發現客戶端能夠發出消息,服務端能夠收到消息,服務端也走到了回復客戶端的流程,但是客戶端卻收不到消息。還原代碼后是正常的,想了半天,最后才發現是改代碼的的時候漏掉了“\n”這個標識,以此導致客戶端始終不打印消息。
netty只封裝了網絡交互,gmq整體使用了gmvc框架,而gmvc框架目前還無法脫離servlet。而我又不太想把之前寫的代碼全部改為自己new的方式。
1)改造gmvc框架
對gmvc框架進行重構,使得能夠脫離servlet使用。也就是將IOC功能剝離開。
優點:一步到位,符合整體的規劃。
缺點:gmq的迭代會延遲一段時間。
2)暫時拋棄gmvc框架
暫時將目前依賴的gmvc框架給去除掉,優先完成gmq的迭代。待后期gmvc框架改造完成后再進行改造。
優點:能夠盡早的完成gmq的功能。
缺點:先移除框架,后期再套上框架,相當于做了兩次多余的功。費時費力。
到此,相信大家對“netty的怎么實現及運用到gmq中”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。