91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

elasticsearch節點的transport請求發送怎么處理

發布時間:2022-04-21 17:19:20 來源:億速云 閱讀:171 作者:zzz 欄目:開發技術

這篇文章主要介紹“elasticsearch節點的transport請求發送怎么處理”的相關知識,小編通過實際案例向大家展示操作過程,操作方法簡單快捷,實用性強,希望這篇“elasticsearch節點的transport請求發送怎么處理”文章能幫助大家解決問題。

transport請求的發送和處理過程

前一篇分析對nettytransport的啟動及連接,本篇主要分析transport請求的發送和處理過程。

cluster中各個節點之間需要相互發送很多信息,如master檢測其它節點是否存在,node節點定期檢測master節點是否存儲,cluster狀態的發布及搜索數據請求等等。為了保證信息傳輸,elasticsearch定義了一個19字節長度的信息頭HEADER_SIZE = 2 + 4 + 8 + 1 + 4,以'E','S'開頭,接著是4字節int信息長度,然后是8字節long型信息id,接著是一個字節的status,最后是4字節int型version。

所有的節點間的信息都是以這19個字節開始。同時elasticsearch對于節點間的所有action都定義 了名字,如對master的周期檢測action,internal:discovery/zen/fd/master_ping,每個action對應著相應的messagehandler。接下來會進行詳分析。

request的發送過程

代碼在nettytransport中如下所示:

public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
        //參數說明:node發送的目的節點,requestId請求id,action action名稱,request請求,options包括以下幾種操作 RECOVERY,BULK,REG,STATE,PING;
     Channel targetChannel = nodeChannel(node, options);//獲取對應節點的channel,channel在連接節點時初始化完成(請參考上一篇)
        if (compress) {
            options.withCompress(true);
        }
        byte status = 0;
     //設置status 包括以下幾種STATUS_REQRES = 1 << 0; STATUS_ERROR = 1 << 1; STATUS_COMPRESS = 1 << 2;
    status = TransportStatus.setRequest(status); 
     ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays);//初始寫出流
        boolean addedReleaseListener = false;
        try {
            bStream.skip(NettyHeader.HEADER_SIZE);//留出message header的位置
            StreamOutput stream = bStream;
            // only compress if asked, and, the request is not bytes, since then only
            // the header part is compressed, and the "body" can't be extracted as compressed
            if (options.compress() && (!(request instanceof BytesTransportRequest))) {
                status = TransportStatus.setCompress(status);
                stream = CompressorFactory.defaultCompressor().streamOutput(stream);
            }
            stream = new HandlesStreamOutput(stream);
            // we pick the smallest of the 2, to support both backward and forward compatibility
            // note, this is the only place we need to do this, since from here on, we use the serialized version
            // as the version to use also when the node receiving this request will send the response with
            Version version = Version.smallest(this.version, node.version());
            stream.setVersion(version);
            stream.writeString(transportServiceAdapter.action(action, version));
            ReleasableBytesReference bytes;
            ChannelBuffer buffer;
            // it might be nice to somehow generalize this optimization, maybe a smart "paged" bytes output
            // that create paged channel buffers, but its tricky to know when to do it (where this option is
            // more explicit).
            if (request instanceof BytesTransportRequest) {
                BytesTransportRequest bRequest = (BytesTransportRequest) request;
                assert node.version().equals(bRequest.version());
                bRequest.writeThin(stream);
                stream.close();
                bytes = bStream.bytes();
                ChannelBuffer headerBuffer = bytes.toChannelBuffer();
                ChannelBuffer contentBuffer = bRequest.bytes().toChannelBuffer();
                buffer = ChannelBuffers.wrappedBuffer(NettyUtils.DEFAULT_GATHERING, headerBuffer, contentBuffer);
            } else {
                request.writeTo(stream);
                stream.close();
                bytes = bStream.bytes();
                buffer = bytes.toChannelBuffer();
            }
            NettyHeader.writeHeader(buffer, requestId, status, version);//寫信息頭
            ChannelFuture future = targetChannel.write(buffer);//寫buffer同時獲取future,發送信息發生在這里
            ReleaseChannelFutureListener listener = new ReleaseChannelFutureListener(bytes);
            future.addListener(listener);//添加listener
            addedReleaseListener = true;
            transportServiceAdapter.onRequestSent(node, requestId, action, request, options);
        } finally {
            if (!addedReleaseListener) {
                Releasables.close(bStream.bytes());
            }
        }
    }

以上就是request的發送過程,獲取目標node的channel封裝請求寫入信息頭,然后發送并使用listener監聽,這里transportRequest是一個抽象類,它繼承了TransportMessage同時實現了streamable接口。cluster中對它的實現非常多,各個功能都有相應的request,這里就不一一列舉,后面的代碼分析中會時常涉及。

request的接受過程

request發送只是transport的一部分功能,有發送就要有接收,這樣transport的功能才完整。接下來就是對接收過程的分析。上一篇中簡單介紹過netty的使用,message的處理是通過MessageHandler處理,因此nettyTransport的信息處理邏輯都在MessageChannelHandler的messageReceived()方法中,代碼如下所示:

public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        Transports.assertTransportThread();
        Object m = e.getMessage();
        if (!(m instanceof ChannelBuffer)) {//非buffer之間返回
            ctx.sendUpstream(e);
            return;
        }
     //解析message頭
        ChannelBuffer buffer = (ChannelBuffer) m;
        int size = buffer.getInt(buffer.readerIndex() - 4);
        transportServiceAdapter.received(size + 6);
        // we have additional bytes to read, outside of the header
        boolean hasMessageBytesToRead = (size - (NettyHeader.HEADER_SIZE - 6)) != 0;
        int markedReaderIndex = buffer.readerIndex();
        int expectedIndexReader = markedReaderIndex + size;
        // netty always copies a buffer, either in NioWorker in its read handler, where it copies to a fresh
        // buffer, or in the cumlation buffer, which is cleaned each time
        StreamInput streamIn = ChannelBufferStreamInputFactory.create(buffer, size);
      //讀取信息頭中的幾個重要元數據
        long requestId = buffer.readLong();
        byte status = buffer.readByte();
        Version version = Version.fromId(buffer.readInt());
        StreamInput wrappedStream;
      …………
        if (TransportStatus.isRequest(status)) {//處理請求
            String action = handleRequest(ctx.getChannel(), wrappedStream, requestId, version);
            if (buffer.readerIndex() != expectedIndexReader) {
                if (buffer.readerIndex() < expectedIndexReader) {
                    logger.warn("Message not fully read (request) for [{}] and action [{}], resetting", requestId, action);
                } else {
                    logger.warn("Message read past expected size (request) for [{}] and action [{}], resetting", requestId, action);
                }
                buffer.readerIndex(expectedIndexReader);
            }
        } else {//處理響應
            TransportResponseHandler handler = transportServiceAdapter.onResponseReceived(requestId);
            // ignore if its null, the adapter logs it
            if (handler != null) {
                if (TransportStatus.isError(status)) {
                    handlerResponseError(wrappedStream, handler);
                } else {
                    handleResponse(ctx.getChannel(), wrappedStream, handler);
                }
            } else {
                // if its null, skip those bytes
                buffer.readerIndex(markedReaderIndex + size);
            }
          …………
        wrappedStream.close();
    }

以上就是信息處理邏輯,這個方法基礎自netty的SimpleChannelUpstreamHandler類。作為MessageHandler會在client和server啟動時加入到handler鏈中,在信息到達后netty會自動調用handler鏈依次處理。這是netty的內容,就不詳細說明,請參考netty文檔。

request和response是如何被處理

request的處理

代碼如下所示:

protected String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException {
        final String action = buffer.readString();//讀出action的名字
        transportServiceAdapter.onRequestReceived(requestId, action);
        final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel, requestId, version, profileName);
        try {
            final TransportRequestHandler handler = transportServiceAdapter.handler(action, version);//獲取處理該信息的handler
            if (handler == null) {
                throw new ActionNotFoundTransportException(action);
            }
            final TransportRequest request = handler.newInstance();
            request.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress()));
            request.readFrom(buffer);
            if (handler.executor() == ThreadPool.Names.SAME) {
                //noinspection unchecked
                handler.messageReceived(request, transportChannel);//使用該handler處理信息。
            } else {
                threadPool.executor(handler.executor()).execute(new RequestHandler(handler, request, transportChannel, action));
            }
        } catch (Throwable e) {
            try {
                transportChannel.sendResponse(e);
            } catch (IOException e1) {
                logger.warn("Failed to send error message back to client for action [" + action + "]", e);
                logger.warn("Actual Exception", e1);
            }
        }
        return action;
    }

幾個關鍵部分在代碼中進行了標注。這里仍舊不能看到請求是如何處理的。因為cluster中的請求各種各樣,如ping,discovery,index等等,因此不可能使用同一種處理方式。因此request最終又被提交給handler處理。每個功能請求都實現了自己的handler,當請求被提交給handler時會做對應的處理。這里再說一下transportServiceAdapter,消息的處理都是通過它適配轉發完成。request的完整處理流程是:messageReceived()方法收到信息判斷是request會將其轉發到transportServiceAdapter的handler方法,handler方法查找對應的requesthandler,使用將信息轉發給該handler進行處理。這里就不舉例說明,在后面的discover分析中我們會看到發現,ping等請求的處理過程。

response的處理過程

response通過handleResponse方法進行處理,代碼如下:

protected void handleResponse(Channel channel, StreamInput buffer, final TransportResponseHandler handler) {
        final TransportResponse response = handler.newInstance();
        response.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress()));
        response.remoteAddress();
        try {
            response.readFrom(buffer);
        } catch (Throwable e) {
            handleException(handler, new TransportSerializationException("Failed to deserialize response of type [" + response.getClass().getName() + "]", e));
            return;
        }
        try {
            if (handler.executor() == ThreadPool.Names.SAME) {
                //noinspection unchecked
                handler.handleResponse(response);//轉發給對應的handler
            } else {
                threadPool.executor(handler.executor()).execute(new ResponseHandler(handler, response));
            }
        } catch (Throwable e) {
            handleException(handler, new ResponseHandlerFailureTransportException(e));
        }
    }

response的處理過程跟request很類似。每個request都會對應一個handler和一個response的處理handler,會在時候的時候注冊到transportService中。請求到達時根據action名稱獲取到handler處理request,根據requestId獲取對應的response handler進行響應。

關于“elasticsearch節點的transport請求發送怎么處理”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識,可以關注億速云行業資訊頻道,小編每天都會為大家更新不同的知識點。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

株洲县| 铁力市| 敦煌市| 郸城县| 高陵县| 榕江县| 沙坪坝区| 锦屏县| 南昌县| 鸡东县| 苏尼特左旗| 涟水县| 江城| 庆元县| 青神县| 嫩江县| 衡阳市| 永德县| 富锦市| 开阳县| 房产| 分宜县| 临颍县| 南澳县| 韩城市| 榕江县| 张掖市| 锦屏县| 扶风县| 郸城县| 休宁县| 准格尔旗| 同江市| 孝义市| 托克逊县| 前郭尔| 潮安县| 兴和县| 财经| 株洲市| 黄大仙区|