91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何理解zk-client通信層ClientCnxnSocket

發布時間:2021-10-18 10:31:21 來源:億速云 閱讀:230 作者:柒染 欄目:大數據

這篇文章將為大家詳細講解有關如何理解zk-client通信層ClientCnxnSocket,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。

ClientCnxnSocket抽象類結構 定義了底層Socket通信接口,默認實現是ClientCnxnSocketNIO

readConnectResult 讀取server的connnect的response

readLength 方法 讀取buffer長度并給incomingBuffer分配對應大小空間

ClientCnxnSocketNIO 實現

findSendablePacket函數 從outgoingQueue中讀取發送的packet

doIO函數 處理讀寫

doTransport函數

如果連接就緒,調用sendThread連接操作

若讀寫就緒,調用doIO函數

ClientCnxnSocket

屬性

如何理解zk-client通信層ClientCnxnSocket

ClientCnxnSocketNIO子類

屬性

//nio中的selector
private final Selector selector = Selector.open();
/**
 * nio中的selectionKey
 */
private SelectionKey sockKey;
private SocketAddress localSocketAddress;
private SocketAddress remoteSocketAddress;
方法 
@Override
void connect(InetSocketAddress addr) throws IOException {
    SocketChannel sock = createSock();
    try {
        registerAndConnect(sock, addr);
    } catch (IOException e) {
        LOG.error("Unable to open socket to " + addr);
        sock.close();
        throw e;
    }
//已經連接,但是沒有收到resposne
    initialized = false;

    /*
     * Reset incomingBuffer
     */
    lenBuffer.clear();
    incomingBuffer = lenBuffer;
}

client和server主要交互函數
@Override
void doTransport(
    int waitTimeOut,
    Queue<Packet> pendingQueue,
    ClientCnxn cnxn) throws IOException, InterruptedException {
    selector.select(waitTimeOut);
    Set<SelectionKey> selected;
    synchronized (this) {
        selected = selector.selectedKeys();
    }
    // Everything below and until we get back to the select is
    // non blocking, so time is effectively a constant. That is
    // Why we just have to do this once, here
    updateNow();
    for (SelectionKey k : selected) {
        SocketChannel sc = ((SocketChannel) k.channel());
        if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
            if (sc.finishConnect()) {
                updateLastSendAndHeard();
                updateSocketAddresses();
                sendThread.primeConnection();
            }
        } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
            doIO(pendingQueue, cnxn);
        }
    }
    if (sendThread.getZkState().isConnected()) {
        if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
            enableWrite();
        }
    }
    selected.clear();
}
主要分為讀寫兩種
讀:沒有初始化完成初始化
讀取len再改incomingbuffer分配對應空間
讀取對應response
寫: 找到可以發送的packet
如果packet的bytebuffer沒有創建,那就進行屬性添加
bytebuffer寫入socketChannel
把Packet從outgingQueue中取出,放入pendingQueue中
/**
 * @throws InterruptedException
 * @throws IOException
 */
void doIO(Queue<Packet> pendingQueue, ClientCnxn cnxn) throws InterruptedException, IOException {
    SocketChannel sock = (SocketChannel) sockKey.channel();
    if (sock == null) {
        throw new IOException("Socket is null!");
    }
    if (sockKey.isReadable()) {
        int rc = sock.read(incomingBuffer);
        if (rc < 0) {
            throw new EndOfStreamException("Unable to read additional data from server sessionid 0x"
                                           + Long.toHexString(sessionId)
                                           + ", likely server has closed socket");
        }
        if (!incomingBuffer.hasRemaining()) {
            incomingBuffer.flip();
            if (incomingBuffer == lenBuffer) {
                recvCount.getAndIncrement();
                readLength();
            } else if (!initialized) {
                readConnectResult();
                enableRead();
                if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
                    // Since SASL authentication has completed (if client is configured to do so),
                    // outgoing packets waiting in the outgoingQueue can now be sent.
                    enableWrite();
                }
                lenBuffer.clear();
                incomingBuffer = lenBuffer;
                updateLastHeard();
                initialized = true;
            } else {
                sendThread.readResponse(incomingBuffer);
                lenBuffer.clear();
                incomingBuffer = lenBuffer;
                updateLastHeard();
            }
        }
    }
    if (sockKey.isWritable()) {
        Packet p = findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress());

        if (p != null) {
            updateLastSend();
            // If we already started writing p, p.bb will already exist
            if (p.bb == null) {
                if ((p.requestHeader != null)
                    && (p.requestHeader.getType() != OpCode.ping)
                    && (p.requestHeader.getType() != OpCode.auth)) {
                    p.requestHeader.setXid(cnxn.getXid());
                }
                p.createBB();
            }
            sock.write(p.bb);
            if (!p.bb.hasRemaining()) {
                sentCount.getAndIncrement();
                outgoingQueue.removeFirstOccurrence(p);
                if (p.requestHeader != null
                    && p.requestHeader.getType() != OpCode.ping
                    && p.requestHeader.getType() != OpCode.auth) {
                    synchronized (pendingQueue) {
                        pendingQueue.add(p);
                    }
                }
            }
        }
        if (outgoingQueue.isEmpty()) {
            // No more packets to send: turn off write interest flag.
            // Will be turned on later by a later call to enableWrite(),
            // from within ZooKeeperSaslClient (if client is configured
            // to attempt SASL authentication), or in either doIO() or
            // in doTransport() if not.
            disableWrite();
        } else if (!initialized && p != null && !p.bb.hasRemaining()) {
            // On initial connection, write the complete connect request
            // packet, but then disable further writes until after
            // receiving a successful connection response.  If the
            // session is expired, then the server sends the expiration
            // response and immediately closes its end of the socket.  If
            // the client is simultaneously writing on its end, then the
            // TCP stack may choose to abort with RST, in which case the
            // client would never receive the session expired event.  See
            // http://docs.oracle.com/javase/6/docs/technotes/guides/net/articles/connection_release.html
            disableWrite();
        } else {
            // Just in case
            enableWrite();
        }
    }
}


查詢待發送隊列中可以發送的packet
private Packet findSendablePacket(LinkedBlockingDeque<Packet> outgoingQueue, boolean tunneledAuthInProgres) {
   //沒有要發送的,返回null
     if (outgoingQueue.isEmpty()) {
        return null;
    }
    // If we've already starting sending the first packet, we better finish
    if (outgoingQueue.getFirst().bb != null || !tunneledAuthInProgres) {
        //取隊列第一個進行發送
        return outgoingQueue.getFirst();
    }
    // Since client's authentication with server is in progress,
    // send only the null-header packet queued by primeConnection().
    // This packet must be sent so that the SASL authentication process
    // can proceed, but all other packets should wait until
    // SASL authentication completes.
    //有正在認證處理,發送空請求頭包給服務端
    Iterator<Packet> iter = outgoingQueue.iterator();
    while (iter.hasNext()) {
        Packet p = iter.next();
        if (p.requestHeader == null) {
            // We've found the priming-packet. Move it to the beginning of the queue.
            iter.remove();
            outgoingQueue.addFirst(p);
            return p;
        } else {
            // Non-priming packet: defer it until later, leaving it in the queue
            // until authentication completes.
            LOG.debug("deferring non-priming packet {} until SASL authentation completes.", p);
        }
    }
    return null;
}

發送接收packet圖

如何理解zk-client通信層ClientCnxnSocket

關于如何理解zk-client通信層ClientCnxnSocket就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

玛多县| 邢台市| 应城市| 武夷山市| 西吉县| 绥阳县| 江安县| 平果县| 垦利县| 定远县| 南涧| 汨罗市| 抚远县| 客服| 东源县| 开鲁县| 彭山县| 枣强县| 庐江县| 尼玛县| 沙河市| 龙海市| 松潘县| 祁连县| 木兰县| 诸暨市| 南宁市| 开平市| 兴业县| 仁寿县| 万盛区| 桐梓县| 徐闻县| 宜黄县| 喀喇沁旗| 彩票| 大竹县| 永修县| 华亭县| 长兴县| 三原县|