您好,登錄后才能下訂單哦!
這篇文章主要介紹“RPC框架和Tars-Java客戶端介紹”,在日常操作中,相信很多人在RPC框架和Tars-Java客戶端介紹問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”RPC框架和Tars-Java客戶端介紹”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
一、基本RPC框架簡介
1.1、RPC調用流程
二、Tars Java客戶端設計介紹
2.1、Tars Java客戶端初始化過程
2.2、使用范例
2.3、代理生成
2.4、遠程服務尋址方法
2.5、網絡模型
2.6、遠程調用交互模型
2.6.1、寫 IO 流程
2.6.2、同步和異步調用的底層技術實現
三、總結
在分布式計算中,遠程過程調用(Remote Procedure Call,縮寫 RPC)允許運行于一臺計算機的程序調用另一個地址空間計算機的程序,就像調用本地程序一樣,無需額外地為這個交互作用涉及到的代理對象構建、網絡協議等進行編程。
一般RPC架構,有至少三種結構,分別為注冊中心,服務提供者和服務消費者。如圖1.1所示,注冊中心提供注冊服務和注冊信息變更的通知服務,服務提供者運行在服務器來提供服務,服務消費者使用服務提供者的服務。
服務提供者(RPC Server),運行在服務端,提供服務接口定義與服務實現類,并對外暴露服務接口。注冊中心(Registry),運行在服務端,負責記錄服務提供者的服務對象,并提供遠程服務信息的查詢服務和變更通知服務。服務消費者(RPC Client),運行在客戶端,通過遠程代理對象調用遠程服務。
如下圖所示,描述了RPC的調用流程,其中IDL(Interface Description Language)為接口描述語言,使得在不同平臺上運行的程序和用不同語言編寫的程序可以相互通信交流。
1)客戶端調用客戶端樁模塊。該調用是本地過程調用,其中參數以正常方式推入堆棧。
2)客戶端樁模塊將參數打包到消息中,并進行系統調用以發送消息。打包參數稱為編組。
3)客戶端的本地操作系統將消息從客戶端計算機發送到服務器計算機。
4)服務器計算機上的本地操作系統將傳入的數據包傳遞到服務器樁模塊。
5)服務器樁模塊從消息中解包出參數。解包參數稱為解組。
6)最后,服務器樁模塊執行服務器程序流程。回復是沿相反的方向執行相同的步驟。
Tars Java客戶端整體設計與主流的RPC框架基本一致。我們先介紹Tars Java客戶端初始化過程。
如圖2.1所示,描述了Tars Java的初始化過程。
1)先出創建一個CommunicatorConfig配置項,命名為communicatorConfig,其中按需設置locator, moduleName, connections等參數。
2)通過上述的CommunicatorConfig配置項,命名為config,那么調用CommunicatorFactory.getInstance().getCommunicator(config),創建一個Communicator對象,命名為communicator。
3)假設objectName="MESSAGE.ControlCenter.Dispatcher",需要生成的代理接口為Dispatcher.class,調用communicator.stringToProxy(objectName, Dispatcher.class)方法來生成代理對象的實現類。
4)在stringToProxy()方法里,首先通過初始化QueryHelper代理對象,調用getServerNodes()方法獲取遠程服務對象列表,并設置該返回值到communicatorConfig的objectName字段里。具體的代理對象的代碼分析,見下文中的“2.3 代理生成”章節。
5)判斷在之前調用stringToProxy是否有設置LoadBalance參數,如果沒有的話,就生成默認的采用RR輪訓算法的DefaultLoadBalance對象。
6)創建TarsProtocolInvoker協議調用對象,其中過程有通過解析communicatorConfig中的objectName和simpleObjectName來獲取URL列表,其中一個URL對應一個遠程服務對象,TarsProtocolInvoker初始化各個URL對應的ServantClient對象,其中一個URL根據communicatorConfig的connections配置項確認生成多少個ServantClient對象。然后使用ServantClients等參數初始化TarsInvoker對象,并將這些TarsInvoker對象集合設置到TarsProtocolInvoker的allInvokers成員變量中,其中每個URL對應一個TarsInvoker對象。上述分析表明,一個遠程服務節點對應一個TarsInvoker對象,一個TarsInvoker對象包含connections個ServantClient對象,對于TCP協議,那么就是一個ServantClient對象對應一個TCP連接。
7)使用api, objName, servantProxyConfig,loadBalance,protocolInvoker, this.communicator參數生成一個實現JDK代理接口InvocationHandler的ObjectProxy對象。
8)生成ObjectProxy對象的同時進行初始化操作,首先會執行loadBalancer.refresh()方法刷新遠程服務節點到負載均衡器中便于后續tars遠程調用進行路由。
9)然后注冊統計信息上報器,其中是上報方法采用JDK的ScheduledThreadPoolExecutor進行定時輪訓上報。
10)注冊服務列表刷新器,采用的技術方法和上述統計信息上報器基本一致。
以下代碼為最簡化示例,其中CommunicatorConfig里的配置采用默認值,communicator通過CommunicatorConfig配置生成后,直接指定遠程服務對象的具體服務對象名、IP和端口生成一個遠程服務代理對象。
Tars Java代碼使用范例// 先初始化基本Tars配置CommunicatorConfig cfg = new CommunicatorConfig();// 通過上述的CommunicatorConfig配置生成一個Communicator對象。Communicator communicator = CommunicatorFactory.getInstance().getCommunicator(cfg);// 指定Tars遠程服務的服務對象名、IP和端口生成一個遠程服務代理對象。
// 先初始化基本Tars配置 CommunicatorConfig cfg = new CommunicatorConfig(); // 通過上述的CommunicatorConfig配置生成一個Communicator對象。 Communicator communicator = CommunicatorFactory.getInstance().getCommunicator(cfg); // 指定Tars遠程服務的服務對象名、IP和端口生成一個遠程服務代理對象。 HelloPrx proxy = communicator.stringToProxy(HelloPrx.class, "TestApp.HelloServer.HelloObj@tcp -h 127.0.0.1 -p 18601 -t 60000"); //同步調用,阻塞直到遠程服務對象的方法返回結果 String ret = proxy.hello(3000, "Hello World"); System.out.println(ret); //異步調用,不關注異步調用最終的情況 proxy.async_hello(null, 3000, "Hello World"); //異步調用,注冊一個實現TarsAbstractCallback接口的回執處理對象,該實現類分別處理調用成功,調用超時和調用異常的情況。 proxy.async_hello(new HelloPrxCallback() { @Override public void callback_expired() { //超時事件處理 } @Override public void callback_exception(Throwable ex) { //異常事件處理 } @Override public void callback_hello(String ret) { //調用成功事件處理 Main.logger.info("invoke async method successfully {}", ret); } }, 1000, "Hello World");
在上述例子中,演示了常見的兩種調用方式,分別為同步調用和異步調用。其中異步調用,如果調用方想捕捉異步調用的最終結果,可以注冊一個實現TarsAbstractCallback接口的實現類,對tars調用的異常,超時和成功事件進行處理。
Tars Java的客戶端樁模塊的遠程代理對象是采用JDK原生Proxy方法。如下文的源碼所示,ObjectProxy實現了java.lang.reflect.InvocationHandler的接口方法,該接口是JDK自帶的代理接口。
代理實現
public final class ObjectProxy<T> implements ServantProxy, InvocationHandler { public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); InvokeContext context = this.protocolInvoker.createContext(proxy, method, args); try { if ("toString".equals(methodName) && parameterTypes.length == 0) { return this.toString(); } else if //***** 省略代碼 ***** } else { // 在負載均衡器選取一個遠程調用類,進行應用層協議的封裝,最后調用TCP傳輸層進行發送。 Invoker invoker = this.loadBalancer.select(context); return invoker.invoke(context); } } catch (Throwable var8) { // ***** 省略代碼 ***** } } }
當然生成上述遠程服務代理類,涉及到輔助類,Tars Java采用ServantProxyFactory來生成上述的ObjectProxy,并存儲ObjectProxy對象到Map結構,便于調用方二次使用時直接復用已存在的遠程服務代理對象。
具體相關邏輯如源碼所示,ObjectProxyFactory是生成ObjectProxy的輔助工廠類,和ServantProxyFactory不同,其本身不緩存生成的代理對象。
class ServantProxyFactory { private final ConcurrentHashMap<String, Object> cache = new ConcurrentHashMap(); // ***** 省略代碼 ***** public <T> Object getServantProxy(Class<T> clazz, String objName, ServantProxyConfig servantProxyConfig, LoadBalance loadBalance, ProtocolInvoker<T> protocolInvoker) { Object proxy = this.cache.get(objName); if (proxy == null) { this.lock.lock(); // 加鎖,保證只生成一個遠程服務代理對象。 try { proxy = this.cache.get(objName); if (proxy == null) { // 創建實現JDK的java.lang.reflect.InvocationHandler接口的對象 ObjectProxy<T> objectProxy = this.communicator.getObjectProxyFactory().getObjectProxy(clazz, objName, servantProxyConfig, loadBalance, protocolInvoker); // 使用JDK的java.lang.reflect.Proxy來生成實際的代理對象 this.cache.putIfAbsent(objName, this.createProxy(clazz, objectProxy)); proxy = this.cache.get(objName); } } finally { this.lock.unlock(); } } return proxy; } /** 使用JDK自帶的Proxy.newProxyInstance生成代理對象 */ private <T> Object createProxy(Class<T> clazz, ObjectProxy<T> objectProxy) { return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{clazz, ServantProxy.class}, objectProxy); } // ***** 省略代碼 ***** }
從以上的源碼中,可以看到createProxy使用了JDK的Proxy.newProxyInstance方法來生成遠程服務代理對象。
作為一個RPC遠程框架,在分布式系統中,調用遠程服務,涉及到如何路由的問題,也就是如何從多個遠程服務節點中選擇一個服務節點進行調用,當然Tars Java支持直連特定節點的方式調用遠程服務,如上文的2.2 使用范例所介紹。
如圖下圖所示,ClientA某個時刻的一次調用使用了Service3節點進行遠程服務調用,而ClientB某個時刻的一次調用采用Service2節點。Tars Java提供多種負載均衡算法實現類,其中有采用RR輪訓算法的RoundRobinLoadBalance,一致性哈希算法的ConsistentHashLoadBalance和普通哈希算法的HashLoadBalance。
(客戶端按特定路由規則調用遠程服務)
如下述源碼所示,如果要自定義負載均衡器來定義遠程調用的路由規則,那么需要實現com.qq.tars.rpc.common.LoadBalance接口,其中LoadBalance.select()方法負責按照路由規則,選取對應的Invoker對象,然后進行遠程調用,具體邏輯見源碼代理實現。由于遠程服務節點可能發生變更,比如上下線遠程服務節點,需要刷新本地負載均衡器的路由信息,那么此信息更新的邏輯在LoadBalance.refresh()方法里實現。
負載均衡接口
public interface LoadBalance<T> { /** 根據負載均衡策略,挑選invoker */ Invoker<T> select(InvokeContext invokeContext) throws NoInvokerException; /** 通知invoker列表的更新 */ void refresh(Collection<Invoker<T>> invokers); }
Tars Java的IO模式采用的JDK的NIO的Selector模式。這里以TCP協議來描述網絡處理,如下述源碼所示,Reactor是一個線程,其中的run()方法中,調用了selector.select()方法,意思是如果除非此時網絡產生一個事件,否則將一直線程阻塞下去。
假如此時出現一個網絡事件,那么此時線程將會被喚醒,執行后續代碼,其中一個代碼是dispatcheEvent(key),也就是將進行事件的分發。
其中將根據對應條件,調用acceptor.handleConnectEvent(key)方法來處理客戶端連接成功事件,或acceptor.handleAcceptEvent(key)方法來處理服務器接受連接成功事件,或調用acceptor.handleReadEvent(key)方法從Socket里讀取數據,或acceptor.handleWriteEvent(key)方法來寫數據到Socket 。
Reactor事件處理
public final class Reactor extends Thread { protected volatile Selector selector = null; private Acceptor acceptor = null; //***** 省略代碼 ***** public void run() { try { while (!Thread.interrupted()) { // 阻塞直到有網絡事件發生。 selector.select(); //***** 省略代碼 ***** while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); if (!key.isValid()) continue; try { //***** 省略代碼 ***** // 分發傳輸層協議TCP或UDP網絡事件 dispatchEvent(key); //***** 省略代碼 ***** } } //***** 省略代碼 ***** } //***** 省略代碼 ***** private void dispatchEvent(final SelectionKey key) throws IOException { if (key.isConnectable()) { acceptor.handleConnectEvent(key); } else if (key.isAcceptable()) { acceptor.handleAcceptEvent(key); } else if (key.isReadable()) { acceptor.handleReadEvent(key); } else if (key.isValid() && key.isWritable()) { acceptor.handleWriteEvent(key); } } }
網絡處理采用Reactor事件驅動模式,Tars定義一個Reactor對象對應一個Selector對象,針對每個遠程服務(整體服務集群,非單個節點程序)默認創建2個Reactor對象進行處理。
上圖中的處理讀IO事件(Read Event)實現和寫IO事件(Write Event)的線程池是在Communicator初始化的時候配置的。具體邏輯如源碼所示,其中線程池參數配置由CommunicatorConfig的corePoolSize, maxPoolSize, keepAliveTime等參數決定。
讀寫事件線程池初始化
private void initCommunicator(CommunicatorConfig config) throws CommunicatorConfigException { //***** 省略代碼 ***** this.threadPoolExecutor = ClientPoolManager.getClientThreadPoolExecutor(config); //***** 省略代碼 ***** } public class ClientPoolManager { public static ThreadPoolExecutor getClientThreadPoolExecutor(CommunicatorConfig communicatorConfig) { //***** 省略代碼 ***** clientThreadPoolMap.put(communicatorConfig, createThreadPool(communicatorConfig)); //***** 省略代碼 ***** return clientPoolExecutor; } private static ThreadPoolExecutor createThreadPool(CommunicatorConfig communicatorConfig) { int corePoolSize = communicatorConfig.getCorePoolSize(); int maxPoolSize = communicatorConfig.getMaxPoolSize(); int keepAliveTime = communicatorConfig.getKeepAliveTime(); int queueSize = communicatorConfig.getQueueSize(); TaskQueue taskqueue = new TaskQueue(queueSize); String namePrefix = "tars-client-executor-"; TaskThreadPoolExecutor executor = new TaskThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, taskqueue, new TaskThreadFactory(namePrefix)); taskqueue.setParent(executor); return executor; } }
調用代理類的方法,那么會進入實現InvocationHandler接口的ObjectProxy中的invoke方法。
下圖描述了遠程服務調用的流程情況。這里著重講幾個點,一個是如何寫數據到網絡IO。第二個是Tars Java通過什么方式進行同步或者異步調用,底層采用了什么技術。
如圖(底層代碼寫IO過程)所示,ServantClient將調用底層網絡寫操作,在invokeWithSync方法中,取得ServantClient自身成員變量TCPSession,調用TCPSession.write()方法,如圖(底層代碼寫IO過程)和以下源碼( 讀寫事件線程池初始化)所示,先獲取Encode進行請求內容編碼成IoBuffer對象,最后將IoBuffer的java.nio.ByteBuffer內容放入TCPSession的queue成員變量中,然后調用key.selector().wakeup(),喚醒Reactor中run()方法中的Selector.select(),執行后續的寫操作。
具體Reactor邏輯見上文2.5 網絡模型內容,如果Reactor檢查條件發現可以寫IO的話也就是key.isWritable()為true,那么最終會循環從TCPSession.queue中取出ByteBuffer對象,調用SocketChannel.write(byteBuffer)執行實際的寫網絡Socket操作,代碼邏輯見源碼中的doWrite()方法。
讀寫事件線程池初始化
public class TCPSession extends Session { public void write(Request request) throws IOException { try { IoBuffer buffer = selectorManager.getProtocolFactory().getEncoder().encodeRequest(request, this); write(buffer); //***** 省略代碼 ***** } protected void write(IoBuffer buffer) throws IOException { //***** 省略代碼 ***** if (!this.queue.offer(buffer.buf())) { throw new IOException("The session queue is full. [ queue size:" + queue.size() + " ]"); } if (key != null) { key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); key.selector().wakeup(); } } protected synchronized int doWrite() throws IOException { int writeBytes = 0; while (true) { ByteBuffer wBuf = queue.peek(); //***** 省略代碼 ***** int bytesWritten = ((SocketChannel) channel).write(wBuf); //***** 省略代碼 ***** return writeBytes; } }
對于同步方法調用,如圖(遠程調用流程)和源碼(ServantClient的同步調用)所示,ServantClient調用底層網絡寫操作,在invokeWithSync方法中創建一個Ticket對象,Ticket顧名思義就是票的意思,這張票唯一標識本次網絡調用情況。
ServantClient的同步調用
public class ServantClient { public <T extends ServantResponse> T invokeWithSync(ServantRequest request) throws IOException { //***** 省略代碼 ***** ticket = TicketManager.createTicket(request, session, this.syncTimeout); Session current = session; current.write(request); if (!ticket.await(this.syncTimeout, TimeUnit.MILLISECONDS)) { //***** 省略代碼 ***** response = ticket.response(); //***** 省略代碼 ***** return response; //***** 省略代碼 ***** return response; } }
如代碼所示,在執行完session.write()操作后,緊接著執行ticket.await()方法,該方法線程等待直到遠程服務回復返回結果到客戶端,ticket.await()被喚醒后,將執行后續操作,最終invokeWithSync方法返回response對象。其中Ticket的等待喚醒功能內部采用java.util.concurrent.CountDownLatch來實現。
對于異步方法調用,將會執行ServantClient.invokeWithAsync方法,也會創建一個Ticket,并且執行Session.write()操作,雖然不會調用ticket.await(),但是在Reactor接收到遠程回復時,首先會先解析Tars協議頭得到Response對象,然后將Response對象放入如圖(Tars-Java的網絡事件處理模型)所示的IO讀寫線程池中進行進一步處理,如下述源碼(異步回調事件處理)所示,最終會調用WorkThread.run()方法,在run()方法里執行ticket.notifyResponse(resp),該方法里面會執行類似上述代碼2.1中的實現TarsAbstractCallback接口的調用成功回調的方法。
異步回調事件處理
public final class WorkThread implements Runnable { public void run() { try { //***** 省略代碼 ***** Ticket<Response> ticket = TicketManager.getTicket(resp.getTicketNumber()); //***** 省略代碼 ***** ticket.notifyResponse(resp); ticket.countDown(); TicketManager.removeTicket(ticket.getTicketNumber()); } //***** 省略代碼 ***** } }
如下述源碼所示,TicketManager會有一個定時任務輪訓檢查所有的調用是否超時,如果(currentTime - t.startTime) > t.timeout條件成立,那么會調用t.expired()告知回調對象,本次調用超時。
調用超時事件處理
public class TicketManager { //***** 省略代碼 ***** static { executor.scheduleAtFixedRate(new Runnable() { long currentTime = -1; public void run() { Collection<Ticket<?>> values = tickets.values(); currentTime = System.currentTimeMillis(); for (Ticket<?> t : values) { if ((currentTime - t.startTime) > t.timeout) { removeTicket(t.getTicketNumber()); t.expired(); } } } }, 500, 500, TimeUnit.MILLISECONDS); } }
代碼的調用一般都是層層遞歸調用,代碼的調用深度和廣度都很大,通過調試代碼的方式一步步學習源碼的方式,更加容易理解源碼的含義和設計理念。
Tars與其他RPC框架,并沒有什么本質區別,通過類比其他框架的設計理念,可以更加深入理解Tars Java設計理念。
到此,關于“RPC框架和Tars-Java客戶端介紹”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。