您好,登錄后才能下訂單哦!
這篇文章主要介紹了zk工廠方法如何實現NIOServerCnxnFactory,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
NIOServerCnxnFactory類
內部類
AbstractSelectThread
AcceptThread
SelectorThread
屬性
ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT | 10s session過期時間 |
ZOOKEEPER_NIO_NUM_SELECTOR_THREADS | selector 線程數 |
ZOOKEEPER_NIO_NUM_WORKER_THREADS | worker 線程數 |
directBuffer | buffer用來線程間數據交互 |
ipMap | 限制ip上連接數 |
cnxnExpiryQueue | 連接失效時間分桶隊列 |
workerPool | WorkerService worker執行服務 |
acceptThread | 接收新連接,simple round-robin 分配到選擇線程 |
selectorThreads | |
方法
停止接收
private void pauseAccept(long millisecs) { acceptKey.interestOps(0); try { selector.select(millisecs); } catch (IOException e) { // ignore } finally { acceptKey.interestOps(SelectionKey.OP_ACCEPT); } } private boolean doAccept() { boolean accepted = false; SocketChannel sc = null; try { sc = acceptSocket.accept(); accepted = true; InetAddress ia = sc.socket().getInetAddress(); int cnxncount = getClientCnxnCount(ia); if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns) { throw new IOException("Too many connections from " + ia + " - max is " + maxClientCnxns); } LOG.debug("Accepted socket connection from {}", sc.socket().getRemoteSocketAddress()); sc.configureBlocking(false); // Round-robin assign this connection to a selector thread if (!selectorIterator.hasNext()) { selectorIterator = selectorThreads.iterator(); } SelectorThread selectorThread = selectorIterator.next(); if (!selectorThread.addAcceptedConnection(sc)) { throw new IOException("Unable to add connection to selector queue" + (stopped ? " (shutdown in progress)" : "")); } acceptErrorLogger.flush(); } catch (IOException e) { // accept, maxClientCnxns, configureBlocking ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1); acceptErrorLogger.rateLimitLog("Error accepting new connection: " + e.getMessage()); fastCloseSock(sc); } return accepted; } private void processAcceptedConnections() { SocketChannel accepted; while (!stopped && (accepted = acceptedQueue.poll()) != null) { SelectionKey key = null; try { key = accepted.register(selector, SelectionKey.OP_READ); NIOServerCnxn cnxn = createConnection(accepted, key, this); key.attach(cnxn); addCnxn(cnxn); } catch (IOException e) { // register, createConnection cleanupSelectionKey(key); fastCloseSock(accepted); } } } configure 獲取客戶端連接數 private int getClientCnxnCount(InetAddress cl) { Set<NIOServerCnxn> s = ipMap.get(cl); if (s == null) { return 0; } return s.size(); } 創建連接 protected NIOServerCnxn createConnection(SocketChannel sock, SelectionKey sk, SelectorThread selectorThread) throws IOException { return new NIOServerCnxn(zkServer, sock, sk, this, selectorThread); } 創建連接 private void addCnxn(NIOServerCnxn cnxn) throws IOException { InetAddress addr = cnxn.getSocketAddress(); if (addr == null) { throw new IOException("Socket of " + cnxn + " has been closed"); } Set<NIOServerCnxn> set = ipMap.get(addr); if (set == null) { // in general we will see 1 connection from each // host, setting the initial cap to 2 allows us // to minimize mem usage in the common case // of 1 entry -- we need to set the initial cap // to 2 to avoid rehash when the first entry is added // Construct a ConcurrentHashSet using a ConcurrentHashMap set = Collections.newSetFromMap(new ConcurrentHashMap<NIOServerCnxn, Boolean>(2)); // Put the new set in the map, but only if another thread // hasn't beaten us to it Set<NIOServerCnxn> existingSet = ipMap.putIfAbsent(addr, set); if (existingSet != null) { set = existingSet; } } set.add(cnxn); cnxns.add(cnxn); touchCnxn(cnxn); } 思考: 為什么單機和集群模式啟動不一樣 單機可以直接從日志,快照恢復數據 集群根據角色劃分,涉及到數據同步
感謝你能夠認真閱讀完這篇文章,希望小編分享的“zk工廠方法如何實現NIOServerCnxnFactory”這篇文章對大家有幫助,同時也希望大家多多支持億速云,關注億速云行業資訊頻道,更多相關知識等著你來學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。