您好,登錄后才能下訂單哦!
本篇內容介紹了“hadoop rpc服務端初始化和調用過程舉例分析”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
上面已經提到我們這里主要借用了namenode的遠程服務,先來看看相關代碼:
public class NameNode implements NameNodeStatusMXBean { public static void main(String argv[]) throws Exception { NameNode namenode = createNameNode(argv, null); } protected NameNode(Configuration conf, NamenodeRole role)throws IOException { initialize(conf); } protected void initialize(Configuration conf) throws IOException { rpcServer = createRpcServer(conf); startCommonServices(conf); //相當重要 } protected NameNodeRpcServer createRpcServer(Configuration conf)throws IOException { return new NameNodeRpcServer(conf, this); } }
我們的linux的終端執行hadoop的啟動命令的時候,最終的命令是調用NameNode的main方法,所以我們追蹤代碼的切入點是NameNode的main方法,方法比較簡單,就是調用NameNode的構造函數創建一個NameNode,然后執行初始化方法initialize,這個方法相對來說,是我們關注的重點,包括rpc服務在內的初始化操作都放在這個方法里面。特定于rpc,他執行了兩個相關的方法createRpcServer和startCommonServices,第一個方法見名思意,不多說,先簡單介紹下后面的方法,該方法的作用就是啟動namenode的rpc服務,稍后我給出代碼。好的,從上面的代碼可以看到,我們的rpcServer功能都放在了類NameNodeRpcServer里面,現在讓我們來看看這個類里面相關的代碼:
class NameNodeRpcServer implements NamenodeProtocols { public NameNodeRpcServer(Configuration conf, NameNode nn) throws IOException { RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class); ClientNamenodeProtocolServerSideTranslatorPB clientProtocolServerTranslator = new ClientNamenodeProtocolServerSideTranslatorPB(this); BlockingService clientNNPbService = ClientNamenodeProtocol. newReflectiveBlockingService(clientProtocolServerTranslator); InetSocketAddress rpcAddr = nn.getRpcServerAddress(conf); // fs.defaultFS String bindHost = nn.getRpcServerBindHost(conf); if (bindHost == null) { bindHost = rpcAddr.getHostName(); } LOG.info("RPC server is binding to " + bindHost + ":" + rpcAddr.getPort()); this.clientRpcServer = new RPC.Builder(conf) .setProtocol( org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class) .setInstance(clientNNPbService).setBindAddress(bindHost) .setPort(rpcAddr.getPort()).setNumHandlers(handlerCount) .setVerbose(false) .setSecretManager(namesystem.getDelegationTokenSecretManager()).build(); // Add all the RPC protocols that the namenode implements DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService, clientRpcServer); DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService, clientRpcServer); DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService, clientRpcServer); } }
在NameNodeRpcServer的構造函數里面最重要的一件事情是實例化clientRpcServer,這里面我最想說明的是,NameNode宣稱自己實現了三個協議:ClientProtocol、DatanodeProtocol和NamenodeProtocol,在服務端的實現基本上就靠ClientNamenodeProtocolServerSideTranslatorPB之類的類型了,特別在實例化ClientNamenodeProtocolServerSideTranslatorPB的時候有傳入一個形參,這個形參就是NameNodeRpcServer實例,看代碼:
public ClientNamenodeProtocolServerSideTranslatorPB(ClientProtocol server) throws IOException { this.server = server; } @Override public GetBlockLocationsResponseProto getBlockLocations( RpcController controller, GetBlockLocationsRequestProto req) throws ServiceException { try { LocatedBlocks b = server.getBlockLocations(req.getSrc(), req.getOffset(), req.getLength()); Builder builder = GetBlockLocationsResponseProto .newBuilder(); if (b != null) { builder.setLocations(PBHelper.convert(b)).build(); } return builder.build(); } catch (IOException e) { throw new ServiceException(e); } }
上面代碼中的getBlockLocations也一定程度上說明了剛才的觀點。
現在讓我們回過頭看看NameNode中initialize方法中執行的startCommonServices方法,這個方法用來啟動clientRpcServer下面的線程,包括listener,handler、response,具體看代碼:
public class NameNode implements NameNodeStatusMXBean { private void startCommonServices(Configuration conf) throws IOException { rpcServer.start(); } } class NameNodeRpcServer implements NamenodeProtocols { void start() { clientRpcServer.start(); if (serviceRpcServer != null) { serviceRpcServer.start(); } } } public abstract class Server { public synchronized void start() { responder.start(); listener.start(); handlers = new Handler[handlerCount]; for (int i = 0; i < handlerCount; i++) { handlers[i] = new Handler(i); handlers[i].start(); } } }
代碼看到這里,啟動過程中rpc相關的代碼就結束了。
現在讓我們來看看rpc被調用的過程,先來認識下Server的關鍵結構:
public abstract class Server { private Listener listener = null; private Responder responder = null; private Handler[] handlers = null; private class Responder extends Thread { } private class Listener extends Thread { } private class Handler extends Thread { } }
在初始化的時候,就啟動listener、responder和handlers下面的所有線程。
其中listener線程里面啟動了一個socker服務,專門用來接受客戶端的請求,handler下面的線程用來處理具體的請求,responder寫請求結果,具體過程可以看下下面的代碼:
public abstract class Server { private Listener listener = null; private Responder responder = null; private Handler[] handlers = null; private class Listener extends Thread { public Listener() throws IOException { address = new InetSocketAddress(bindAddress, port); // Create a new server socket and set to non blocking mode acceptChannel = ServerSocketChannel.open(); acceptChannel.configureBlocking(false); // Bind the server socket to the local host and port bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig); port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port // create a selector; selector= Selector.open(); readers = new Reader[readThreads]; for (int i = 0; i < readThreads; i++) { Reader reader = new Reader( "Socket Reader #" + (i + 1) + " for port " + port); readers[i] = reader; reader.start(); } // Register accepts on the server socket with the selector. acceptChannel.register(selector, SelectionKey.OP_ACCEPT); this.setName("IPC Server listener on " + port); this.setDaemon(true); } public void run() { while (running) { doAccept(key); } } void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOfMemoryError { Reader reader = getReader(); Connection c = connectionManager.register(channel); key.attach(c); // so closeCurrentConnection can get the object reader.addConnection(c); } private class Reader extends Thread { public void run() { doRunLoop(); } private synchronized void doRunLoop() { while (running) { Connection conn = pendingConnections.take(); conn.channel.register(readSelector, SelectionKey.OP_READ, conn); } readSelector.select(); doRead(key); } void doRead(SelectionKey key) throws InterruptedException { Connection c = (Connection)key.attachment(); count = c.readAndProcess(); } } } public class Connection { public int readAndProcess(){ processOneRpc(data.array()); } private void processOneRpc(byte[] buf){ processRpcRequest(header, dis); } private void processRpcRequest(RpcRequestHeaderProto header, DataInputStream dis) throws WrappedRpcServerException, InterruptedException { Call call = new Call(header.getCallId(), header.getRetryCount(), rpcRequest, this, ProtoUtil.convert(header.getRpcKind()), header .getClientId().toByteArray()); callQueue.put(call); } } private class Handler extends Thread { public void run() { final Call call = callQueue.take(); value = call(call.rpcKind, call.connection.protocolName, call.rpcRequest, call.timestamp); setupResponse(buf, call, returnStatus, detailedErr, value, errorClass, error); responder.doRespond(call); } } private class Responder extends Thread { void doRespond(Call call) throws IOException { processResponse(call.connection.responseQueue, true); } private boolean processResponse(LinkedList<Call> responseQueue, boolean inHandler) throws IOException { int numBytes = channelWrite(channel, call.rpcResponse); done = true; } } }
這里給出了一個比較完整版Server的rpc調用過程,從listener都構造函數開始,在他的構造函數中起了幾個reader線程,當監聽器收到訪問請求的時候,由reader請請求中讀取數據,reader中實際上調用的是connection的readAndProcess方法,在這個方法中,會往RPC server中的callQueue添加call對象,之后,handler這個家伙從隊列中取出當前call,具體的處理過程,用到了Server類的call方法,這地方有些玄機,仔細跟過代碼的人才知道,因為server的實例類不再是org.apache.hadoop.ipc.Server,而是Protobuf的一個實現類,org.apache.hadoop.ipc.RPC.Server,而且call方法是被重寫過的,代碼如下:
@Override public Writable call(RPC.RpcKind rpcKind, String protocol, Writable rpcRequest, long receiveTime) throws Exception { return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest, receiveTime); }
繼續追蹤下,差不多就可以到底了:
public class ProtobufRpcEngine implements RpcEngine { public static class Server extends RPC.Server { static class ProtoBufRpcInvoker implements RpcInvoker { public Writable call(RPC.Server server, String protocol, Writable writableRequest, long receiveTime) throws Exception { ProtoClassProtoImpl protocolImpl = getProtocolImpl(server, protoName,clientVersion); BlockingService service = (BlockingService) protocolImpl.protocolImpl; result = service.callBlockingMethod(methodDescriptor, null, param); return new RpcResponseWrapper(result); } } }
這部分的代碼也正是hadoop rpc與protobuf結合的地方,這地方在補充一點,protbufImpl就是NameNodeRpcServer初始化的時候,已經準備了,而且看懂ProtoBufRpcInvoker下的call方法,確實也是需要結合NameNodeRpcServer初始化過程來理解的。我朦朦朧朧的懂了。而且這地方的深入會讓你看到一些本質的東西,舉例的話,你會跟蹤到ClientNamenodeProtocolServerSideTranslatorPB,然后是NameNodeRpcServer,再然后是FSNamesystem,最后你發現,服務端對文件系統的操作出自FSNamesystem。
繼續回到handler中的run方法,call方法調用完了,就輪到Responder處理返回結果了。
“hadoop rpc服務端初始化和調用過程舉例分析”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。