您好,登錄后才能下訂單哦!
這篇文章主要講解了“怎么寫一個RPC框架”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“怎么寫一個RPC框架”吧!
RPC 框架應該長什么樣子
我們首先來看一下:一個 RPC 框架是什么東西?我們最直觀的感覺就是:
集成了 RPC 框架之后,通過配置一個注冊中心的地址。一個應用(稱為服務提供者)將某個接口(interface)“暴露”出去,另外一個應用(稱為服務消費者)通過“引用”這個接口(interface),然后調用了一下,就很神奇的可以調用到另外一個應用的方法了
給我們的感覺就好像調用了一個本地方法一樣。即便兩個應用不是在同一個 JVM 中甚至兩個應用都不在同一臺機器中。
那他們是如何做到的呢?當我們的服務消費者調用某個 RPC 接口的方法之后,它的底層會通過動態代理,然后經過網絡調用,去到服務提供者的機器上,然后執行對應的方法。
接著方法的結果通過網絡傳輸返回到服務消費者那里,然后就可以拿到結果了。
整個過程如下圖:
那么這個時候,可能有人會問了:服務消費者怎么知道服務提供者在哪臺機器的哪個端口呢?
這個時候,就需要“注冊中心”登場了,具體來說是這樣子的:
服務提供者在啟動的時候,將自己應用所在機器的信息提交到注冊中心上面。
服務消費者在啟動的時候,將需要消費的接口所在機器的信息抓回來。
這樣一來,服務消費者就有了一份服務提供者所在的機器列表了。
“服務消費者”拿到了“服務提供者”的機器列表就可以通過網絡請求來發起請求了。
網絡客戶端,我們應該采用什么呢?有幾種選擇:
使用 JDK 原生 BIO(也就是 ServerSocket 那一套)。阻塞式 IO 方法,無法支撐高并發。
使用 JDK 原生 NIO(Selector、SelectionKey 那一套)。非阻塞式 IO,可以支持高并發,但是自己實現復雜,需要處理各種網絡問題。
使用大名鼎鼎的 NIO 框架 Netty,天然支持高并發,封裝好,API 易用。
“服務消費者”拿到了“服務提供者”的機器列表就可以通過網絡請求來發起請求了。
作為一個有追求的程序員,我們要求開發出來的框架要求支持高并發、又要求簡單、還要快。
當然是選擇 Netty 來實現了,使用 Netty 的一些很基本的 API 就能滿足我們的需求。
網絡協議定義
當然了,既然我們要使用網絡傳輸數據。我們首先要定義一套網絡協議出來。
你可能又要問了,啥叫網絡協議?網絡協議,通俗理解,意思就是說我們的客戶端發送的數據應該長什么樣子,服務端可以去解析出來知道要做什么事情。
話不多說,上代碼,假設我們現在服務提供者有兩個類:
// com.study.rpc.test.producer.HelloService public interface HelloService { String sayHello(TestBean testBean); } // com.study.rpc.test.producer.TestBean public class TestBean { private String name; private Integer age; public TestBean(String name, Integer age) { this.name = name; this.age = age; } // getter setter }
現在我要調用 HelloService.sayHello(TestBean testBean) 這個方法。
作為“服務消費者”,應該怎么定義我們的請求,從而讓服務端知道我是要調用這個方法呢?
這需要我們將這個接口信息產生一個唯一的標識:這個標識會記錄了接口名、具體是那個方法、然后具體參數是什么!
然后將這些信息組織起來發送給服務端,我這里的方式是將信息保存為一個 JSON 格式的字符串來傳輸。
比如上面的接口我們傳輸的數據大概是這樣的:
{ "interfaces": "interface=com.study.rpc.test.producer.HelloService&method=sayHello& parameter=com.study.rpc.test.producer.TestBean", "requestId": "3", "parameter": { "com.study.rpc.test.producer.TestBean": { "age": 20, "name": "張三" } } }
嗯,我這里用一個 JSON 來標識這次調用是調用哪個接口的哪個方法,其中 interface 標識了唯一的類,parameter 標識了里面具體有哪些參數, 其中 key 就是參數的類全限定名,value 就是這個類的 JSON 信息。
可能看到這里,大家可能有意見了:數據不一定用 JSON 格式傳輸啊,而且使用 JSON 也不一定性能最高啊。
你使用 JDK 的 Serializable 配合 Netty 的 ObjectDecoder 來實現,這當然也可以,其實這里是一個拓展點,我們應該要提供多種序列化方式來供用戶選擇。
但是這里選擇了 JSON 的原因是因為它比較直觀,對于寫文章來說比較合理。
開發服務提供者
嗯,搞定了網絡協議之后,我們開始開發“服務提供者”了。對于服務提供者,因為我們這里是寫一個簡單版本的 RPC 框架,為了保持簡潔。
我們不會引入類似 Spring 之類的容器框架,所以我們需要定義一個服務提供者的配置類,它用于定義這個服務提供者是什么接口,然后它具體的實例對象是什么:
public class ServiceConfig{ public Class type; public T instance; public ServiceConfig(Classtype, T instance) { this.type = type; this.instance = instance; } public ClassgetType() { return type; } public void setType(Classtype) { this.type = type; } public T getInstance() { return instance; } public void setInstance(T instance) { this.instance = instance; } }
有了這個東西之后,我們就知道需要暴露哪些接口了。為了框架有一個統一的入口,我定義了一個類叫做 ApplicationContext,可以認為這是一個應用程序上下文,他的構造函數,接收 2 個參數。
代碼如下:
public ApplicationContext(String registryUrl, ListserviceConfigs){ // 1. 保存需要暴露的接口配置 this.serviceConfigs = serviceConfigs == null ? new ArrayList<>() : serviceConfigs; // step 2: 實例化注冊中心 initRegistry(registryUrl); // step 3: 將接口注冊到注冊中心,從注冊中心獲取接口,初始化服務接口列表 RegistryInfo registryInfo = null; InetAddress addr = InetAddress.getLocalHost(); String hostname = addr.getHostName(); String hostAddress = addr.getHostAddress(); registryInfo = new RegistryInfo(hostname, hostAddress, port); doRegistry(registryInfo); // step 4:初始化Netty服務器,接受到請求,直接打到服務提供者的service方法中 if (!this.serviceConfigs.isEmpty()) { // 需要暴露接口才暴露 nettyServer = new NettyServer(this.serviceConfigs, interfaceMethods); nettyServer.init(port); } }
注冊中心設計
這里分為幾個步驟,首先保存了接口配置,接著初始化注冊中心,因為注冊中心可能會提供多種來供用戶選擇,所以這里需要定義一個注冊中心的接口:
public interface Registry { /** * 將生產者接口注冊到注冊中心 * * @param clazz 類 * @param registryInfo 本機的注冊信息 */ void register(Class clazz, RegistryInfo registryInfo) throws Exception; }
這里我們提供一個注冊的方法,這個方法的語義是將 clazz 對應的接口注冊到注冊中心。
接收兩個參數,一個是接口的 class 對象,另一個是注冊信息,里面包含了本機的一些基本信息,如下:
public class RegistryInfo { private String hostname; private String ip; private Integer port; public RegistryInfo(String hostname, String ip, Integer port) { this.hostname = hostname; this.ip = ip; this.port = port; } // getter setter }
好了,定義好注冊中心,回到之前的實例化注冊中心的地方,代碼如下:
/** * 注冊中心 */ private Registry registry; private void initRegistry(String registryUrl) { if (registryUrl.startsWith("zookeeper://")) { registryUrl = registryUrl.substring(12); registry = new ZookeeperRegistry(registryUrl); } else if (registryUrl.startsWith("multicast://")) { registry = new MulticastRegistry(registryUrl); } }
這里邏輯也非常簡單,就是根據 url 的 schema 來判斷是那個注冊中心,注冊中心這里實現了 2 個實現類,分別使用 Zookeeper 作為注冊中心,另外一個是使用廣播的方式作為注冊中心。
廣播注冊中心這里僅僅是做個示范,內部沒有實現。我們主要是實現了 Zookeeper 的注冊中心。
當然了,如果有興趣,可以實現更多的注冊中心供用戶選擇,比如 Redis 之類的,這里只是為了保持“拓展點”。
那么實例化完注冊中心之后,回到上面的代碼。
注冊服務提供者
// step 3: 將接口注冊到注冊中心,從注冊中心獲取接口,初始化服務接口列表 RegistryInfo registryInfo = null; InetAddress addr = InetAddress.getLocalHost(); String hostname = addr.getHostName(); String hostAddress = addr.getHostAddress(); registryInfo = new RegistryInfo(hostname, hostAddress, port); doRegistry(registryInfo);
這里邏輯很簡單,就是獲取本機的的基本信息構造成 RegistryInfo,然后調用了 doRegistry 方法:
/** * 接口方法對應method對象 */ private MapinterfaceMethods = new ConcurrentHashMap<>(); private void doRegistry(RegistryInfo registryInfo) throws Exception { for (ServiceConfig config : serviceConfigs) { Class type = config.getType(); registry.register(type, registryInfo); Method[] declaredMethods = type.getDeclaredMethods(); for (Method method : declaredMethods) { String identify = InvokeUtils.buildInterfaceMethodIdentify(type, method); interfaceMethods.put(identify, method); } } }
這里做了兩件事情:
將接口注冊到注冊中心中。
對于每一個接口的每一個方法,生成一個唯一標識,保存在 interfaceMethods 集合中。
下面分別分析這兩件事情,首先是注冊方法:因為我們用到了 Zookeeper,為了方便,引入了 Zookeeper 的客戶端框架 Curator。
<dependency> <groupId>org.apache.curatorgroupId> <artifactId>curator-recipesartifactId> <version>2.3.0version> dependency>
接著看代碼:
public class ZookeeperRegistry implements Registry { private CuratorFramework client; public ZookeeperRegistry(String connectString) { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); client = CuratorFrameworkFactory.newClient(connectString, retryPolicy); client.start(); try { Stat myRPC = client.checkExists().forPath("/myRPC"); if (myRPC == null) { client.create() .creatingParentsIfNeeded() .forPath("/myRPC"); } System.out.println("Zookeeper Client初始化完畢......"); } catch (Exception e) { e.printStackTrace(); } } @Override public void register(Class clazz, RegistryInfo registryInfo) throws Exception { // 1. 注冊的時候,先從zk中獲取數據 // 2. 將自己的服務器地址加入注冊中心中 // 為每一個接口的每一個方法注冊一個臨時節點,然后key為接口方法的唯一標識,data為服務地址列表 Method[] declaredMethods = clazz.getDeclaredMethods(); for (Method method : declaredMethods) { String key = InvokeUtils.buildInterfaceMethodIdentify(clazz, method); String path = "/myRPC/" + key; Stat stat = client.checkExists().forPath(path); ListregistryInfos; if (stat != null) { // 如果這個接口已經有人注冊過了,把數據拿回來,然后將自己的信息保存進去 byte[] bytes = client.getData().forPath(path); String data = new String(bytes, StandardCharsets.UTF_8); registryInfos = JSONArray.parseArray(data, RegistryInfo.class); if (registryInfos.contains(registryInfo)) { // 正常來說,zk的臨時節點,斷開連接后,直接就沒了,但是重啟會經常發現存在節點,所以有了這樣的代碼 System.out.println("地址列表已經包含本機【" + key + "】,不注冊了"); } else { registryInfos.add(registryInfo); client.setData().forPath(path, JSONArray.toJSONString(registryInfos).getBytes()); System.out.println("注冊到注冊中心,路徑為:【" + path + "】 信息為:" + registryInfo); } } else { registryInfos = new ArrayList<>(); registryInfos.add(registryInfo); client.create() .creatingParentsIfNeeded() // 臨時節點,斷開連接就關閉 .withMode(CreateMode.EPHEMERAL) .forPath(path, JSONArray.toJSONString(registryInfos).getBytes()); System.out.println("注冊到注冊中心,路徑為:【" + path + "】 信息為:" + registryInfo); } } } }
Zookeeper 注冊中心在初始化的時候,會建立好連接。然后注冊的時候,針對 clazz 接口的每一個方法,都會生成一個唯一標識。
這里使用了InvokeUtils.buildInterfaceMethodIdentify方法:
public static String buildInterfaceMethodIdentify(Class clazz, Method method) { Map<String, String> map = new LinkedHashMap<>(); map.put("interface", clazz.getName()); map.put("method", method.getName()); Parameter[] parameters = method.getParameters(); if (parameters.length > 0) { StringBuilder param = new StringBuilder(); for (int i = 0; i < parameters.length; i++) { Parameter p = parameters[i]; param.append(p.getType().getName()); if (i < parameters.length - 1) { param.append(","); } } map.put("parameter", param.toString()); } return map2String(map); } public static String map2String(Map<String, String> map) { StringBuilder sb = new StringBuilder(); Iterator<map.entry<string, <="" span="">String>> iterator = map.entrySet().iterator(); while (iterator.hasNext()) { Map.Entry<String, String> entry = iterator.next(); sb.append(entry.getKey() + "=" + entry.getValue()); if (iterator.hasNext()) { sb.append("&"); } } return sb.toString(); }
其實就是對接口的方法使用他們的限定名和參數來組成一個唯一的標識,比如 HelloService#sayHello(TestBean) 生成的大概是這樣的:
interface=com.study.rpc.test.producer.HelloService&method=sayHello& parameter=com.study.rpc.test.producer.TestBean
接下來的邏輯就簡單了,在 Zookeeper 中的 /myRPC 路徑下面建立臨時節點,節點名稱為我們上面的接口方法唯一標識,數據內容為機器信息。
之所以采用臨時節點是因為:如果機器宕機了,連接斷開之后,消費者可以通過 Zookeeper 的 watcher 機制感知到。
大概看起來是這樣的:
/myRPC/interface=com.study.rpc.test.producer.HelloService&method=sayHello& parameter=com.study.rpc.test.producer.TestBean [ { "hostname":peer1, "port":8080 }, { "hostname":peer2, "port":8081 } ]
通過這樣的方式,在服務消費的時候就可以拿到這樣的注冊信息,然后知道可以調用那臺機器的那個端口。
好了,注冊中心弄完了之后,我們回到前面說的注冊方法做的第二件事情,我們將每一個接口方法標識的方法放入了一個 map 中:
/** * 接口方法對應method對象 */ private Map<String, Method> interfaceMethods = new ConcurrentHashMap<>();
這個的原因是因為,我們在收到網絡請求的時候,需要調用反射的方式調用 Method 對象,所以存起來。
啟動網絡服務端接受請求
接下來我們就可以看第四步了:
// step 4:初始化Netty服務器,接受到請求,直接打到服務提供者的service方法中 if (!this.serviceConfigs.isEmpty()) { // 需要暴露接口才暴露 nettyServer = new NettyServer(this.serviceConfigs, interfaceMethods); nettyServer.init(port); }
因為這里使用 Netty 來做的所以需要引入 Netty 的依賴:
<dependency> <groupId>io.nettygroupId> <artifactId>netty-allartifactId> <version>4.1.30.Finalversion> dependency>
接著來分析:
public class NettyServer { /** * 負責調用方法的handler */ private RpcInvokeHandler rpcInvokeHandler; public NettyServer(ListserverConfigs, MapinterfaceMethods)throws InterruptedException { this.rpcInvokeHandler = new RpcInvokeHandler(serverConfigs, interfaceMethods); } public int init(int port) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChannelInitializer(){ @Override protected void initChannel(SocketChannel ch) throws Exception { ByteBuf delimiter = Unpooled.copiedBuffer("$$"); // 設置按照分隔符“&&”來切分消息,單條消息限制為 1MB ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024 * 1024, delimiter)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast().addLast(rpcInvokeHandler); } }); ChannelFuture sync = b.bind(port).sync(); System.out.println("啟動NettyService,端口為:" + port); return port; } }
這部分主要的都是 Netty 的 API,我們不做過多的說明,就簡單的說一下:
我們通過“&&”作為標識符號來區分兩條信息,然后一條信息的最大長度為 1MB。
所有邏輯都在 RpcInvokeHandler 中,這里面傳進去了配置的服務接口實例,以及服務接口實例每個接口方法唯一標識對應的 Method 對象的 Map 集合。
public class RpcInvokeHandler extends ChannelInboundHandlerAdapter { /** * 接口方法唯一標識對應的Method對象 */ private Map<String, Method> interfaceMethods; /** * 接口對應的實現類 */ private Map<class, Object> interfaceToInstance; /** * 線程池,隨意寫的,不要吐槽 */ private ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 50, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), new ThreadFactory() { AtomicInteger m = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "IO-thread-" + m.incrementAndGet()); } }); public RpcInvokeHandler(ListserviceConfigList, Map<String, Method> interfaceMethods) { this.interfaceToInstance = new ConcurrentHashMap<>(); this.interfaceMethods = interfaceMethods; for (ServiceConfig config : serviceConfigList) { interfaceToInstance.put(config.getType(), config.getInstance()); } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { String message = (String) msg; // 這里拿到的是一串JSON數據,解析為Request對象, // 事實上這里解析網絡數據,可以用序列化方式,定一個接口,可以實現JSON格式序列化,或者其他序列化 // 但是demo版本就算了。 System.out.println("接收到消息:" + msg); RpcRequest request = RpcRequest.parse(message, ctx); threadPoolExecutor.execute(new RpcInvokeTask(request)); } finally { ReferenceCountUtil.release(msg); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("發生了異常..." + cause); cause.printStackTrace(); ctx.close(); } public class RpcInvokeTask implements Runnable { private RpcRequest rpcRequest; RpcInvokeTask(RpcRequest rpcRequest) { this.rpcRequest = rpcRequest; } @Override public void run() { try { /* * 數據大概是這樣子的 * {"interfaces":"interface=com.study.rpc.test.producer.HelloService&method=sayHello¶meter=com * .study.rpc.test.producer.TestBean","requestId":"3","parameter":{"com.study.rpc.test.producer * .TestBean":{"age":20,"name":"張三"}}} */ // 這里希望能拿到每一個服務對象的每一個接口的特定聲明 String interfaceIdentity = rpcRequest.getInterfaceIdentity(); Method method = interfaceMethods.get(interfaceIdentity); Map<String, String> map = string2Map(interfaceIdentity); String interfaceName = map.get("interface"); Class interfaceClass = Class.forName(interfaceName); Object o = interfaceToInstance.get(interfaceClass); String parameterString = map.get("parameter"); Object result; if (parameterString != null) { String[] parameterTypeClass = parameterString.split(","); Map<String, Object> parameterMap = rpcRequest.getParameterMap(); Object[] parameterInstance = new Object[parameterTypeClass.length]; for (int i = 0; i < parameterTypeClass.length; i++) { String parameterClazz = parameterTypeClass[i]; parameterInstance[i] = parameterMap.get(parameterClazz); } result = method.invoke(o, parameterInstance); } else { result = method.invoke(o); } // 寫回響應 ChannelHandlerContext ctx = rpcRequest.getCtx(); String requestId = rpcRequest.getRequestId(); RpcResponse response = RpcResponse.create(JSONObject.toJSONString(result), interfaceIdentity, requestId); String s = JSONObject.toJSONString(response) + "$$"; ByteBuf byteBuf = Unpooled.copiedBuffer(s.getBytes()); ctx.writeAndFlush(byteBuf); System.out.println("響應給客戶端:" + s); } catch (Exception e) { e.printStackTrace(); } } public static Map<String, String> string2Map(String str) { String[] split = str.split("&"); Map<String, String> map = new HashMap<>(16); for (String s : split) { String[] split1 = s.split("="); map.put(split1[0], split1[1]); } return map; } } }
這里說明一下上面的邏輯:channelRead 方法用于接收消息,接收到的就是我們前面分析的那個 JSON 格式的數據,接著我們將消息解析成 RpcRequest。
public class RpcRequest { private String interfaceIdentity; private Map<String, Object> parameterMap = new HashMap<>(); private ChannelHandlerContext ctx; private String requestId; public static RpcRequest parse(String message, ChannelHandlerContext ctx) throws ClassNotFoundException { /* * { * "interfaces":"interface=com.study.rpc.test.producer.HelloService&method=sayHello2¶meter=java.lang * .String,com.study.rpc.test.producer.TestBean", * "parameter":{ * "java.lang.String":"haha", * "com.study.rpc.test.producer.TestBean":{ * "name":"小王", * "age":20 * } * } * } */ JSONObject jsonObject = JSONObject.parseObject(message); String interfaces = jsonObject.getString("interfaces"); JSONObject parameter = jsonObject.getJSONObject("parameter"); Set<String> strings = parameter.keySet(); RpcRequest request = new RpcRequest(); request.setInterfaceIdentity(interfaces); Map<String, Object> parameterMap = new HashMap<>(16); String requestId = jsonObject.getString("requestId"); for (String key : strings) { if (key.equals("java.lang.String")) { parameterMap.put(key, parameter.getString(key)); } else { Class clazz = Class.forName(key); Object object = parameter.getObject(key, clazz); parameterMap.put(key, object); } } request.setParameterMap(parameterMap); request.setCtx(ctx); request.setRequestId(requestId); return request; } }
接著從 request 中解析出來需要調用的接口,然后通過反射調用對應的接口,得到結果后我們將響應封裝成 PrcResponse 寫回給客戶端:
public class RpcResponse { private String result; private String interfaceMethodIdentify; private String requestId; public String getResult() { return result; } public void setResult(String result) { this.result = result; } public static RpcResponse create(String result, String interfaceMethodIdentify, String requestId) { RpcResponse response = new RpcResponse(); response.setResult(result); response.setInterfaceMethodIdentify(interfaceMethodIdentify); response.setRequestId(requestId); return response; } }
里面包含了請求的結果 JSON 串,接口方法唯一標識,請求 ID。數據大概看起來這個樣子:
{"interfaceMethodIdentify":"interface=com.study.rpc.test.producer.HelloService&method=sayHello& parameter=com.study.rpc.test.producer.TestBean","requestId":"3", "result":"\"牛逼,我收到了消息:TestBean{name='張三', age=20}\""}
通過這樣的信息,客戶端就可以通過響應結果解析出來。
測試服務提供者
既然我們代碼寫完了,現在需要測試一把,首先我們先寫一個 HelloService 的實現類出來:
public class HelloServiceImpl implements HelloService { @Override public String sayHello(TestBean testBean) { return "牛逼,我收到了消息:" + testBean; } }
接著編寫服務提供者代碼:
public class TestProducer { public static void main(String[] args) throws Exception { String connectionString = "zookeeper://localhost1:2181,localhost2:2181,localhost3:2181"; HelloService service = new HelloServiceImpl(); ServiceConfig config = new ServiceConfig<>(HelloService.class, service); ListserviceConfigList = new ArrayList<>(); serviceConfigList.add(config); ApplicationContext ctx = new ApplicationContext(connectionString, serviceConfigList, null, 50071); } }
接著啟動起來,看到日志:
Zookeeper Client初始化完畢...... 注冊到注冊中心,路徑為:【/myRPC/interface=com.study.rpc.test.producer.HelloService& method=sayHello¶meter=com.study.rpc.test.producer.TestBean】 信息為:RegistryInfo{hostname='localhost', ip='192.168.16.7', port=50071} 啟動NettyService,端口為:50071
這個時候,我們期望用 NettyClient 發送請求:
{ "interfaces": "interface=com.study.rpc.test.producer.HelloService& method=sayHello¶meter=com.study.rpc.test.producer.TestBean", "requestId": "3", "parameter": { "com.study.rpc.test.producer.TestBean": { "age": 20, "name": "張三" } } }
得到的響應應該是:
{"interfaceMethodIdentify":"interface=com.study.rpc.test.producer.HelloService&method=sayHello& parameter=com.study.rpc.test.producer.TestBean","requestId":"3", "result":"\"牛逼,我收到了消息:TestBean{name='張三', age=20}\""}
那么,可以編寫一個測試程序(這個程序僅僅用于中間測試用,讀者不必理解):
public class NettyClient { public static void main(String[] args) { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new NettyClientHandler()); } }); ChannelFuture sync = b.connect("127.0.0.1", 50071).sync(); sync.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } private static class NettyClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { JSONObject jsonObject = new JSONObject(); jsonObject.put("interfaces", "interface=com.study.rpc.test.producer" + ".HelloService&method=sayHello¶meter=com.study.rpc.test.producer.TestBean"); JSONObject param = new JSONObject(); JSONObject bean = new JSONObject(); bean.put("age", 20); bean.put("name", "張三"); param.put("com.study.rpc.test.producer.TestBean", bean); jsonObject.put("parameter", param); jsonObject.put("requestId", 3); System.out.println("發送給服務端JSON為:" + jsonObject.toJSONString()); String msg = jsonObject.toJSONString() + "$$"; ByteBuf byteBuf = Unpooled.buffer(msg.getBytes().length); byteBuf.writeBytes(msg.getBytes()); ctx.writeAndFlush(byteBuf); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("收到消息:" + msg); } } }
啟動之后,看到控制臺輸出:
發送給服務端JSON為:{"interfaces":"interface=com.study.rpc.test.producer.HelloService&method=sayHello& parameter=com.study.rpc.test.producer.TestBean","requestId":3, "parameter":{"com.study.rpc.test.producer.TestBean":{"name":"張三","age":20}}} 收到消息:{"interfaceMethodIdentify":"interface=com.study.rpc.test.producer.HelloService& method=sayHello¶meter=com.study.rpc.test.producer.TestBean","requestId":"3", "result":"\"牛逼,我收到了消息:TestBean{name='張三', age=20}\""}
Bingo,完美實現了 RPC 的服務提供者。接下來我們只需要實現服務消費者就完成了。
開發服務消費者
服務消費者是同樣的處理,我們同樣要定義一個消費者的配置:
public class ReferenceConfig{ private Class type; public ReferenceConfig(Classtype) { this.type = type; } public ClassgetType() { return type; } public void setType(Classtype) { this.type = type; } }
然后我們是統一入口,在 ApplicationContext 中修改代碼:
public ApplicationContext(String registryUrl, ListserviceConfigs, ListreferenceConfigs, int port) throws Exception { // step 1: 保存服務提供者和消費者 this.serviceConfigs = serviceConfigs == null ? new ArrayList<>() : serviceConfigs; this.referenceConfigs = referenceConfigs == null ? new ArrayList<>() : referenceConfigs; // .... } private void doRegistry(RegistryInfo registryInfo) throws Exception { for (ServiceConfig config : serviceConfigs) { Class type = config.getType(); registry.register(type, registryInfo); Method[] declaredMethods = type.getDeclaredMethods(); for (Method method : declaredMethods) { String identify = InvokeUtils.buildInterfaceMethodIdentify(type, method); interfaceMethods.put(identify, method); } } for (ReferenceConfig config : referenceConfigs) { ListregistryInfos = registry.fetchRegistry(config.getType()); if (registryInfos != null) { interfacesMethodRegistryList.put(config.getType(), registryInfos); initChannel(registryInfos); } } }
在注冊的時候,我們需要將需要消費的接口,通過注冊中心抓取出來,所以注冊中心要增加一個接口方法:
public interface Registry { /** * 將生產者接口注冊到注冊中心 * * @param clazz 類 * @param registryInfo 本機的注冊信息 */ void register(Class clazz, RegistryInfo registryInfo) throws Exception; /** * 為服務提供者抓取注冊表 * * @param clazz 類 * @return 服務提供者所在的機器列表 */ ListfetchRegistry(Class clazz) throws Exception; }
獲取服務提供者的機器列表
具體在 Zookeeper 中的實現如下:
@Override public ListfetchRegistry(Class clazz) throws Exception { Method[] declaredMethods = clazz.getDeclaredMethods(); ListregistryInfos = null; for (Method method : declaredMethods) { String key = InvokeUtils.buildInterfaceMethodIdentify(clazz, method); String path = "/myRPC/" + key; Stat stat = client.checkExists() .forPath(path); if (stat == null) { // 這里可以添加watcher來監聽變化,這里簡化了,沒有做這個事情 System.out.println("警告:無法找到服務接口:" + path); continue; } if (registryInfos == null) { byte[] bytes = client.getData().forPath(path); String data = new String(bytes, StandardCharsets.UTF_8); registryInfos = JSONArray.parseArray(data, RegistryInfo.class); } } return registryInfos; }
其實就是去 Zookeeper 獲取節點中的數據,得到接口所在的機器信息,獲取到的注冊信息諸侯,就會調用以下代碼:
if (registryInfos != null) { // 保存接口和服務地址 interfacesMethodRegistryList.put(config.getType(), registryInfos); // 初始化網絡連接 initChannel(registryInfos); } private void initChannel(ListregistryInfos) throws InterruptedException { for (RegistryInfo info : registryInfos) { if (!channels.containsKey(info)) { System.out.println("開始建立連接:" + info.getIp() + ", " + info.getPort()); NettyClient client = new NettyClient(info.getIp(), info.getPort()); client.setMessageCallback(message -> { // 這里收單服務端返回的消息,先壓入隊列 RpcResponse response = JSONObject.parseObject(message, RpcResponse.class); responses.offer(response); synchronized (ApplicationContext.this) { ApplicationContext.this.notifyAll(); } }); // 等待連接建立 ChannelHandlerContext ctx = client.getCtx(); channels.put(info, ctx); } } }
我們會針對每一個唯一的 RegistryInfo 建立一個連接,然后有這樣一段代碼:
client.setMessageCallback(message -> { // 這里收單服務端返回的消息,先壓入隊列 RpcResponse response = JSONObject.parseObject(message, RpcResponse.class); responses.offer(response); synchronized (ApplicationContext.this) { ApplicationContext.this.notifyAll(); } });
設置一個 callback,用于收到消息的時候,回調這里的代碼,這部分我們后面再分析。
然后在 client.getCtx() 的時候,同步阻塞直到連接完成,建立好連接后通過,NettyClient 的代碼如下:
public class NettyClient { private ChannelHandlerContext ctx; private MessageCallback messageCallback; public NettyClient(String ip, Integer port) { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ByteBuf delimiter = Unpooled.copiedBuffer("$$".getBytes()); // 設置按照分隔符“&&”來切分消息,單條消息限制為 1MB ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024 * 1024, delimiter)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new NettyClientHandler()); } }); ChannelFuture sync = b.connect(ip, port).sync(); } catch (Exception e) { e.printStackTrace(); } } public void setMessageCallback(MessageCallback callback) { this.messageCallback = callback; } public ChannelHandlerContext getCtx() throws InterruptedException { System.out.println("等待連接成功..."); if (ctx == null) { synchronized (this) { wait(); } } return ctx; } private class NettyClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { String message = (String) msg; if (messageCallback != null) { messageCallback.onMessage(message); } } finally { ReferenceCountUtil.release(msg); } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { NettyClient.this.ctx = ctx; System.out.println("連接成功:" + ctx); synchronized (NettyClient.this) { NettyClient.this.notifyAll(); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); } } public interface MessageCallback { void onMessage(String message); } }
這里主要是用了 wait() 和 notifyAll() 來實現同步阻塞等待連接建立。建立好連接后,我們保存到集合中:
// 等待連接建立 ChannelHandlerContext ctx = client.getCtx(); channels.put(info, ctx);
發送請求
好了,到了這里我們為每一個需要消費的接口建立了網絡連接,接下來要做的事情就是提供一個接口給用戶獲取服務提供者實例。
我把這個方法寫在 ApplicationContext 中:
/** * 負責生成requestId的類 */ private LongAdder requestIdWorker = new LongAdder(); /** * 獲取調用服務 */ @SuppressWarnings("unchecked") publicT getService(Classclazz){ return (T) Proxy.newProxyInstance(getClass().getClassLoader(), new Class[]{clazz}, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); if ("equals".equals(methodName) || "hashCode".equals(methodName)) { throw new IllegalAccessException("不能訪問" + methodName + "方法"); } if ("toString".equals(methodName)) { return clazz.getName() + "#" + methodName; } // step 1: 獲取服務地址列表 ListregistryInfos = interfacesMethodRegistryList.get(clazz); if (registryInfos == null) { throw new RuntimeException("無法找到服務提供者"); } // step 2: 負載均衡 RegistryInfo registryInfo = loadBalancer.choose(registryInfos); ChannelHandlerContext ctx = channels.get(registryInfo); String identify = InvokeUtils.buildInterfaceMethodIdentify(clazz, method); String requestId; synchronized (ApplicationContext.this) { requestIdWorker.increment(); requestId = String.valueOf(requestIdWorker.longValue()); } Invoker invoker = new DefaultInvoker(method.getReturnType(), ctx, requestId, identify); inProgressInvoker.put(identify + "#" + requestId, invoker); return invoker.invoke(args); } }); }
這里主要是通過動態代理來實現的,首先通過 class 來獲取對應的機器列表,接著通過 loadBalancer 來選擇一個機器。
這個 LoaderBalance 是一個接口:
public interface LoadBalancer { /** * 選擇一個生產者 * * @param registryInfos 生產者列表 * @return 選中的生產者 */ RegistryInfo choose(ListregistryInfos); }
在 ApplicationContext 初始化的時候可以選擇不同的實現,我這里主要實現了一個簡單的隨機算法(后續可以拓展為其他的,比如 RoundRobin 之類的):
public class RandomLoadbalancer implements LoadBalancer { @Override public RegistryInfo choose(ListregistryInfos){ Random random = new Random(); int index = random.nextInt(registryInfos.size()); return registryInfos.get(index); } }
接著構造接口方法的唯一標識 identify,還有一個 requestId。為什么需要一個 requestId 呢?
這是因為我們在處理響應的時候,需要找到某個響應是對應的哪個請求,但是僅僅使用 identify 是不行的,因為我們同一個應用程序中可能會有多個線程同時調用同一個接口的同一個方法,這樣的 identify 是相同的。
所以我們需要用 identify+requestId 的方式來判斷,reqeustId 是一個自增的 LongAddr。服務端在響應的時候會將 requestId 返回。
接著我們構造了一個 Invoker,把它放入 inProgressInvoker 的集合中。調用了其 invoke 方法:
Invoker invoker = new DefaultInvoker(method.getReturnType(), ctx, requestId, identify); inProgressInvoker.put(identify + "#" + requestId, invoker); // 阻塞等待結果 return invoker.invoke(args); public class DefaultInvokerimplements Invoker{ private ChannelHandlerContext ctx; private String requestId; private String identify; private ClassreturnType; private T result; DefaultInvoker(ClassreturnType, ChannelHandlerContext ctx, String requestId, String identify){ this.returnType = returnType; this.ctx = ctx; this.requestId = requestId; this.identify = identify; } @SuppressWarnings("unckecked") @Override public T invoke(Object[] args) { JSONObject jsonObject = new JSONObject(); jsonObject.put("interfaces", identify); JSONObject param = new JSONObject(); if (args != null) { for (Object obj : args) { param.put(obj.getClass().getName(), obj); } } jsonObject.put("parameter", param); jsonObject.put("requestId", requestId); System.out.println("發送給服務端JSON為:" + jsonObject.toJSONString()); String msg = jsonObject.toJSONString() + "$$"; ByteBuf byteBuf = Unpooled.buffer(msg.getBytes().length); byteBuf.writeBytes(msg.getBytes()); ctx.writeAndFlush(byteBuf); waitForResult(); return result; } @Override public void setResult(String result) { synchronized (this) { this.result = JSONObject.parseObject(result, returnType); notifyAll(); } } private void waitForResult() { synchronized (this) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
我們可以看到調用 Invoker 的 invoke 方法之后,會運行到 waitForResult() 這里,這里已經把請求通過網絡發送出去了,但是就會被卡住。
這是因為我們的網絡請求的結果不是同步返回的,有可能是客戶端同時發起很多個請求,所以我們不可能在這里讓他同步阻塞等待的。
接受響應
那么對于服務消費者而言,把請求發送出去但是卡住了,這個時候當服務端處理完之后,會把消息返回給客戶端。
返回的入口在 NettyClient 的 onChannelRead 中:
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { String message = (String) msg; if (messageCallback != null) { messageCallback.onMessage(message); } } finally { ReferenceCountUtil.release(msg); } }
這里通過 callback 回調出去。是否還記的我們在初始化 NettyClient 的時候,會設置一個 callback?
/** * 響應隊列 */ private ConcurrentLinkedQueueresponses = new ConcurrentLinkedQueue<>(); client.setMessageCallback(message -> { // 這里收單服務端返回的消息,先壓入隊列 RpcResponse response = JSONObject.parseObject(message, RpcResponse.class); responses.offer(response); synchronized (ApplicationContext.this) { ApplicationContext.this.notifyAll(); } });
這里接受消息之后,解析成為一個 RpcResponse 對象,然后壓入 responses 隊列中,這樣我們就把所有的請求響應放入隊列中。
但是這樣一來,我們應該怎么把響應結果返回給調用的地方呢?
我們可以這樣做:起一個或多個后臺線程,然后從隊列中拿出響應,然后根據響應從我們之前保存的 inProcessInvoker 中找出對應的 Invoker,然后把結果返回回去。
public ApplicationContext(....){ //..... // step 5:啟動處理響應的processor initProcessor(); } private void initProcessor() { // 事實上,這里可以通過配置文件讀取,啟動多少個processor int num = 3; processors = new ResponseProcessor[num]; for (int i = 0; i < 3; i++) { processors[i] = createProcessor(i); } } /** * 處理響應的線程 */ private class ResponseProcessor extends Thread { @Override public void run() { System.out.println("啟動響應處理線程:" + getName()); while (true) { // 多個線程在這里獲取響應,只有一個成功 RpcResponse response = responses.poll(); if (response == null) { try { synchronized (ApplicationContext.this) { // 如果沒有響應,先休眠 ApplicationContext.this.wait(); } } catch (InterruptedException e) { e.printStackTrace(); } } else { System.out.println("收到一個響應:" + response); String interfaceMethodIdentify = response.getInterfaceMethodIdentify(); String requestId = response.getRequestId(); String key = interfaceMethodIdentify + "#" + requestId; Invoker invoker = inProgressInvoker.remove(key); invoker.setResult(response.getResult()); } } } }
這里面如果從隊列中拿不到數據,就會調用 wait() 方法等待。這里需要注意的是,在 callbak 中獲取到響應的時候我們是會調用 notifyAll() 來喚醒這里的線程的:
responses.offer(response); synchronized (ApplicationContext.this) { ApplicationContext.this.notifyAll(); }
這里被喚醒之后,就會有多個線程去爭搶那個響應,因為隊列是線程安全的,所以這里多個線程可以獲取到響應結果。
接著拿到結果之后,通過 identify+requestId 構造成唯一的請求標識,從 inProgressInvoker 中獲取對應的 invoker,然后通過 setResult 將結果設置進去:
String key = interfaceMethodIdentify + "#" + requestId; Invoker invoker = inProgressInvoker.remove(key); invoker.setResult(response.getResult()); @Override public void setResult(String result) { synchronized (this) { this.result = JSONObject.parseObject(result, returnType); notifyAll(); } }
這里設置進去之后,就會將結果用 json 反序列化成為用戶需要的結果,然后調用其 notifyAll 方法喚醒 invoke 方法被阻塞的線程:
@SuppressWarnings("unckecked") @Override public T invoke(Object[] args) { JSONObject jsonObject = new JSONObject(); jsonObject.put("interfaces", identify); JSONObject param = new JSONObject(); if (args != null) { for (Object obj : args) { param.put(obj.getClass().getName(), obj); } } jsonObject.put("parameter", param); jsonObject.put("requestId", requestId); System.out.println("發送給服務端JSON為:" + jsonObject.toJSONString()); String msg = jsonObject.toJSONString() + NettyServer.DELIMITER; ByteBuf byteBuf = Unpooled.buffer(msg.getBytes().length); byteBuf.writeBytes(msg.getBytes()); ctx.writeAndFlush(byteBuf); // 這里被喚醒 waitForResult(); return result; }
然后就可以返回結果了,返回的結果就會返回給用戶了。
整體測試
到了這里我們的生產者和消費者的代碼都寫完了,我們來整體測試一遍。生產者的代碼是和之前的一致的:
public class TestProducer { public static void main(String[] args) throws Exception { String connectionString = "zookeeper://localhost1:2181,localhost2:2182,localhost3:2181"; HelloService service = new HelloServiceImpl(); ServiceConfig config = new ServiceConfig<>(HelloService.class, service); ListserviceConfigList = new ArrayList<>(); serviceConfigList.add(config); ApplicationContext ctx = new ApplicationContext(connectionString, serviceConfigList, null, 50071); } }
消費者測試代碼:
public class TestConsumer { public static void main(String[] args) throws Exception { String connectionString = "zookeeper://localhost1:2181,localhost2:2182,localhost3:2181"; ReferenceConfigconfig = new ReferenceConfig<>(HelloService.class); ApplicationContext ctx = new ApplicationContext(connectionString, null, Collections.singletonList(config), 50070); HelloService helloService = ctx.getService(HelloService.class); System.out.println("sayHello(TestBean)結果為:" + helloService.sayHello(new TestBean("張三", 20))); } }
接著啟動生產者,然后啟動消費者。生產者得到的日志如下:
Zookeeper Client初始化完畢...... 注冊到注冊中心,路徑為:【/myRPC/interface=com.study.rpc.test.producer.HelloService& method=sayHello¶meter=com.study.rpc.test.producer.TestBean】 信息為:RegistryInfo{hostname='localhost', ip='192.168.16.7', port=50071} 啟動NettyService,端口為:50071 啟動響應處理線程:Response-processor-0 啟動響應處理線程:Response-processor-2 啟動響應處理線程:Response-processor-1 接收到消息:{"interfaces":"interface=com.study.rpc.test.producer.HelloService& method=sayHello¶meter=com.study.rpc.test.producer.TestBean","requestId":"1", "parameter":{"com.study.rpc.test.producer.TestBean":{"age":20,"name":"張三"}}} 響應給客戶端:{"interfaceMethodIdentify":"interface=com.study.rpc.test.producer.HelloService& method=sayHello¶meter=com.study.rpc.test.producer.TestBean","requestId":"1", "result":"\"牛逼,我收到了消息:TestBean{name='張三', age=20}\""}
消費者得到的日志為:
Zookeeper Client初始化完畢...... 開始建立連接:192.168.16.7, 50071 等待連接成功... 啟動響應處理線程:Response-processor-1 啟動響應處理線程:Response-processor-0 啟動響應處理線程:Response-processor-2 連接成功:ChannelHandlerContext(NettyClient$NettyClientHandler#0, [id: 0xb7a59701, L:/192.168.16.7:58354 - R:/192.168.16.7:50071]) 發送給服務端JSON為:{"interfaces":"interface=com.study.rpc.test.producer.HelloService& method=sayHello¶meter=com.study.rpc.test.producer.TestBean","requestId":"1", "parameter":{"com.study.rpc.test.producer.TestBean":{"age":20,"name":"張三"}}} 收到一個響應:RpcResponse{result='"牛逼,我收到了消息:TestBean{name='張三', age=20}"', interfaceMethodIdentify='interface=com.study.rpc.test.producer.HelloService& method=sayHello¶meter=com.study.rpc.test.producer.TestBean', requestId='1'} sayHello(TestBean)結果為:牛逼,我收到了消息:TestBean{name='張三', age=20}
感謝各位的閱讀,以上就是“怎么寫一個RPC框架”的內容了,經過本文的學習后,相信大家對怎么寫一個RPC框架這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。