您好,登錄后才能下訂單哦!
我們知道kafka是基于TCP連接的。其并沒有像很多中間件使用netty作為TCP服務器。而是自己基于Java NIO寫了一套。
先看下Kafka Client的網絡層架構。
本文主要分析的是Network層。
Network層有兩個重要的類:Selector
和KafkaChannel
。
這兩個類和Java NIO層的java.nio.channels.Selector
和Channel
有點類似。
Selector
幾個關鍵字段如下
//?jdk?nio中的Selector java.nio.channels.Selector?nioSelector; //?記錄當前Selector的所有連接信息 Map<String,?KafkaChannel>?channels; //?已發送完成的請求 List<Send>?completedSends; //?已收到的請求 List<NetworkReceive>?completedReceives; //?還沒有完全收到的請求,對上層不可見 Map<KafkaChannel,?Deque<NetworkReceive>>?stagedReceives; //?作為client端,調用connect連接遠端時返回true的連接 Set<SelectionKey>?immediatelyConnectedKeys; //?已經完成的連接 List<String>?connected; //?一次讀取的最大大小 int?maxReceiveSize;
從網絡層來看kafka是分為client端(producer和consumer,broker作為從時也是client)和server端(broker)的。本文將分析client端是如何建立連接,以及收發數據的。server也是依靠Selector
和KafkaChannel
進行網絡傳輸。在Network層兩端的區別并不大。
kafka的client端啟動時會調用Selector#connect
(下文中如無特殊注明,均指org.apache.kafka.common.network.Selector
)方法建立連接。
public?void?connect(String?id,?InetSocketAddress?address,?int?sendBufferSize,?int?receiveBufferSize)?throws?IOException?{ ????if?(this.channels.containsKey(id)) ????????throw?new?IllegalStateException("There?is?already?a?connection?for?id?"?+?id); ????//?創建一個SocketChannel ????SocketChannel?socketChannel?=?SocketChannel.open(); ????//?設置為非阻塞模式 ????socketChannel.configureBlocking(false); ????//?創建socket并設置相關屬性 ????Socket?socket?=?socketChannel.socket(); ????socket.setKeepAlive(true); ????if?(sendBufferSize?!=?Selectable.USE_DEFAULT_BUFFER_SIZE) ????????socket.setSendBufferSize(sendBufferSize); ????if?(receiveBufferSize?!=?Selectable.USE_DEFAULT_BUFFER_SIZE) ????????socket.setReceiveBufferSize(receiveBufferSize); ????socket.setTcpNoDelay(true); ????boolean?connected; ????try?{ ????????//?調用SocketChannel的connect方法,該方法會向遠端發起tcp建連請求 ????????//?因為是非阻塞的,所以該方法返回時,連接不一定已經建立好(即完成3次握手)。連接如果已經建立好則返回true,否則返回false。一般來說server和client在一臺機器上,該方法可能返回true。 ????????connected?=?socketChannel.connect(address); ????}?catch?(UnresolvedAddressException?e)?{ ????????socketChannel.close(); ????????throw?new?IOException("Can't?resolve?address:?"?+?address,?e); ????}?catch?(IOException?e)?{ ????????socketChannel.close(); ????????throw?e; ????} ????//?對CONNECT事件進行注冊 ????SelectionKey?key?=?socketChannel.register(nioSelector,?SelectionKey.OP_CONNECT); ????KafkaChannel?channel; ????try?{ ????????//?構造一個KafkaChannel ????????channel?=?channelBuilder.buildChannel(id,?key,?maxReceiveSize); ????}?catch?(Exception?e)?{ ??????... ????} ????//?將kafkachannel綁定到SelectionKey上 ????key.attach(channel); ????//?放入到map中,id是遠端服務器的名稱 ????this.channels.put(id,?channel); ????//?connectct為true代表該連接不會再觸發CONNECT事件,所以這里要單獨處理 ????if?(connected)?{ ????????//?OP_CONNECT?won't?trigger?for?immediately?connected?channels ????????log.debug("Immediately?connected?to?node?{}",?channel.id()); ????????//?加入到一個單獨的集合中 ????????immediatelyConnectedKeys.add(key); ????????//?取消對該連接的CONNECT事件的監聽 ????????key.interestOps(0); ????} }
這里的流程和標準的NIO流程差不多,需要單獨說下的是socketChannel#connect
方法返回true的場景,該方法的注釋中有提到
*?<p>?If?this?channel?is?in?non-blocking?mode?then?an?invocation?of?this *?method?initiates?a?non-blocking?connection?operation.??If?the?connection *?is?established?immediately,?as?can?happen?with?a?local?connection,?then *?this?method?returns?<tt>true</tt>.??Otherwise?this?method?returns *?<tt>false</tt>?and?the?connection?operation?must?later?be?completed?by *?invoking?the?{@link?#finishConnect?finishConnect}?method.
也就是說在非阻塞模式下,對于local connection
,連接可能在馬上就建立好了,那該方法會返回true,對于這種情況,不會再觸發之后的connect
事件。因此kafka用一個單獨的集合immediatelyConnectedKeys
將這些特殊的連接記錄下來。在接下來的步驟會進行特殊處理。
之后會調用poll方法對網絡事件監聽:
public?void?poll(long?timeout)?throws?IOException?{ ... //?select方法是對java.nio.channels.Selector#select的一個簡單封裝 int?readyKeys?=?select(timeout); ... //?如果有就緒的事件或者immediatelyConnectedKeys非空 if?(readyKeys?>?0?||?!immediatelyConnectedKeys.isEmpty())?{ ????//?對已就緒的事件進行處理,第2個參數為false ????pollSelectionKeys(this.nioSelector.selectedKeys(),?false,?endSelect); ????//?對immediatelyConnectedKeys進行處理。第2個參數為true ????pollSelectionKeys(immediatelyConnectedKeys,?true,?endSelect); } addToCompletedReceives(); ... } private?void?pollSelectionKeys(Iterable<SelectionKey>?selectionKeys, ???????????????????????????boolean?isImmediatelyConnected, ???????????????????????????long?currentTimeNanos)?{ Iterator<SelectionKey>?iterator?=?selectionKeys.iterator(); //?遍歷集合 while?(iterator.hasNext())?{ ????SelectionKey?key?=?iterator.next(); ????//?移除當前元素,要不然下次poll又會處理一遍 ????iterator.remove(); ????//?得到connect時創建的KafkaChannel ????KafkaChannel?channel?=?channel(key); ???... ????try?{ ????????//?如果當前處理的是immediatelyConnectedKeys集合的元素或處理的是CONNECT事件 ????????if?(isImmediatelyConnected?||?key.isConnectable())?{ ????????????//?finishconnect中會增加READ事件的監聽 ????????????if?(channel.finishConnect())?{ ????????????????this.connected.add(channel.id()); ????????????????this.sensors.connectionCreated.record(); ????????????????... ????????????}?else ????????????????continue; ????????} ????????//?對于ssl的連接還有些額外的步驟 ????????if?(channel.isConnected()?&&?!channel.ready()) ????????????channel.prepare(); ????????//?如果是READ事件 ????????if?(channel.ready()?&&?key.isReadable()?&&?!hasStagedReceive(channel))?{ ????????????NetworkReceive?networkReceive; ????????????while?((networkReceive?=?channel.read())?!=?null) ????????????????addToStagedReceives(channel,?networkReceive); ????????} ????????//?如果是WRITE事件 ????????if?(channel.ready()?&&?key.isWritable())?{ ????????????Send?send?=?channel.write(); ????????????if?(send?!=?null)?{ ????????????????this.completedSends.add(send); ????????????????this.sensors.recordBytesSent(channel.id(),?send.size()); ????????????} ????????} ????????//?如果連接失效 ????????if?(!key.isValid()) ????????????close(channel,?true); ????}?catch?(Exception?e)?{ ????????String?desc?=?channel.socketDescription(); ????????if?(e?instanceof?IOException) ????????????log.debug("Connection?with?{}?disconnected",?desc,?e); ????????else ????????????log.warn("Unexpected?error?from?{};?closing?connection",?desc,?e); ????????close(channel,?true); ????}?finally?{ ????????maybeRecordTimePerConnection(channel,?channelStartTimeNanos); ????} } }
因為immediatelyConnectedKeys
中的連接不會觸發CONNNECT事件,所以在poll時會單獨對immediatelyConnectedKeys
的channel調用finishConnect
方法。在明文傳輸模式下該方法會調用到PlaintextTransportLayer#finishConnect
,其實現如下:
public?boolean?finishConnect()?throws?IOException?{ ????//?返回true代表已經連接好了 ????boolean?connected?=?socketChannel.finishConnect(); ????if?(connected) ????????//?取消監聽CONNECt事件,增加READ事件的監聽 ????????key.interestOps(key.interestOps()?&?~SelectionKey.OP_CONNECT?|?SelectionKey.OP_READ); ????return?connected; }
關于immediatelyConnectedKeys
更詳細的內容可以看看這里。
kafka發送數據分為兩個步驟:
1.調用Selector#send
將要發送的數據保存在對應的KafkaChannel
中,該方法并沒有進行真正的網絡IO。
//?Selector#send public?void?send(Send?send)?{ ????String?connectionId?=?send.destination(); ????//?如果所在的連接正在關閉中,則加入到失敗集合failedSends中 ????if?(closingChannels.containsKey(connectionId)) ????????this.failedSends.add(connectionId); ????else?{ ????????KafkaChannel?channel?=?channelOrFail(connectionId,?false); ????????try?{ ????????????channel.setSend(send); ????????}?catch?(CancelledKeyException?e)?{ ????????????this.failedSends.add(connectionId); ????????????close(channel,?false); ????????} ????} } //KafkaChannel#setSend public?void?setSend(Send?send)?{ ????//?如果還有數據沒有發送出去則報錯 ????if?(this.send?!=?null) ????????throw?new?IllegalStateException("Attempt?to?begin?a?send?operation?with?prior?send?operation?still?in?progress."); ????//?保存下來 ????this.send?=?send; ????//?添加對WRITE事件的監聽 ????this.transportLayer.addInterestOps(SelectionKey.OP_WRITE); }
調用Selector#poll
,在第一步中已經對該channel注冊了WRITE事件的監聽,所以在當channel可寫時,會調用到pollSelectionKeys
將數據真正的發送出去。
private?void?pollSelectionKeys(Iterable<SelectionKey>?selectionKeys, ???????????????????????????boolean?isImmediatelyConnected, ???????????????????????????long?currentTimeNanos)?{ Iterator<SelectionKey>?iterator?=?selectionKeys.iterator(); //?遍歷集合 while?(iterator.hasNext())?{ ????SelectionKey?key?=?iterator.next(); ????//?移除當前元素,要不然下次poll又會處理一遍 ????iterator.remove(); ????//?得到connect時創建的KafkaChannel ????KafkaChannel?channel?=?channel(key); ???... ????try?{ ????????... ? ????????//?如果是WRITE事件 ????????if?(channel.ready()?&&?key.isWritable())?{ ????????????//?真正的網絡寫 ????????????Send?send?=?channel.write(); ????????????//?一個Send對象可能會被拆成幾次發送,write非空代表一個send發送完成 ????????????if?(send?!=?null)?{ ????????????????//?completedSends代表已發送完成的集合 ????????????????this.completedSends.add(send); ????????????????this.sensors.recordBytesSent(channel.id(),?send.size()); ????????????} ????????} ... ????}?catch?(Exception?e)?{ ?????... ????}?finally?{ ????????maybeRecordTimePerConnection(channel,?channelStartTimeNanos); ????} } }
當可寫時,會調用KafkaChannel#write
方法,該方法中會進行真正的網絡IO:
public?Send?write()?throws?IOException?{ ????Send?result?=?null; ????if?(send?!=?null?&&?send(send))?{ ????????result?=?send; ????????send?=?null; ????} ????return?result; } private?boolean?send(Send?send)?throws?IOException?{ ????//?最終調用SocketChannel#write進行真正的寫 ????send.writeTo(transportLayer); ????if?(send.completed()) ????????//?如果寫完了,則移除對WRITE事件的監聽 ????????transportLayer.removeInterestOps(SelectionKey.OP_WRITE); ????return?send.completed(); }
如果遠端有發送數據過來,那調用poll方法時,會對接收到的數據進行處理。
public?void?poll(long?timeout)?throws?IOException?{ ... //?select方法是對java.nio.channels.Selector#select的一個簡單封裝 int?readyKeys?=?select(timeout); ... //?如果有就緒的事件或者immediatelyConnectedKeys非空 if?(readyKeys?>?0?||?!immediatelyConnectedKeys.isEmpty())?{ ????//?對已就緒的事件進行處理,第2個參數為false ????pollSelectionKeys(this.nioSelector.selectedKeys(),?false,?endSelect); ????//?對immediatelyConnectedKeys進行處理。第2個參數為true ????pollSelectionKeys(immediatelyConnectedKeys,?true,?endSelect); } addToCompletedReceives(); ... } private?void?pollSelectionKeys(Iterable<SelectionKey>?selectionKeys, ???????????????????????????boolean?isImmediatelyConnected, ???????????????????????????long?currentTimeNanos)?{ Iterator<SelectionKey>?iterator?=?selectionKeys.iterator(); //?遍歷集合 while?(iterator.hasNext())?{ ????SelectionKey?key?=?iterator.next(); ????//?移除當前元素,要不然下次poll又會處理一遍 ????iterator.remove(); ????//?得到connect時創建的KafkaChannel ????KafkaChannel?channel?=?channel(key); ???... ????try?{ ????????... ? ????????//?如果是READ事件 ????????if?(channel.ready()?&&?key.isReadable()?&&?!hasStagedReceive(channel))?{ ????????????NetworkReceive?networkReceive; ????????????//?read方法會從網絡中讀取數據,但可能一次只能讀取一個req的部分數據。只有讀到一個完整的req的情況下,該方法才返回非null ????????????while?((networkReceive?=?channel.read())?!=?null) ????????????????//?將讀到的請求存在stagedReceives中 ????????????????addToStagedReceives(channel,?networkReceive); ????????} ... ????}?catch?(Exception?e)?{ ?????... ????}?finally?{ ????????maybeRecordTimePerConnection(channel,?channelStartTimeNanos); ????} } } private?void?addToStagedReceives(KafkaChannel?channel,?NetworkReceive?receive)?{ ????if?(!stagedReceives.containsKey(channel)) ????????stagedReceives.put(channel,?new?ArrayDeque<NetworkReceive>()); ????Deque<NetworkReceive>?deque?=?stagedReceives.get(channel); ????deque.add(receive); }
在之后的addToCompletedReceives
方法中會對該集合進行處理。
private?void?addToCompletedReceives()?{ ????if?(!this.stagedReceives.isEmpty())?{ ????????Iterator<Map.Entry<KafkaChannel,?Deque<NetworkReceive>>>?iter?=?this.stagedReceives.entrySet().iterator(); ????????while?(iter.hasNext())?{ ????????????Map.Entry<KafkaChannel,?Deque<NetworkReceive>>?entry?=?iter.next(); ????????????KafkaChannel?channel?=?entry.getKey(); ????????????//?對于client端來說該isMute返回為false,server端則依靠該方法保證消息的順序 ????????????if?(!channel.isMute())?{ ????????????????Deque<NetworkReceive>?deque?=?entry.getValue(); ????????????????addToCompletedReceives(channel,?deque); ????????????????if?(deque.isEmpty()) ????????????????????iter.remove(); ????????????} ????????} ????} } private?void?addToCompletedReceives(KafkaChannel?channel,?Deque<NetworkReceive>?stagedDeque)?{ ????//?將每個channel的第一個NetworkReceive加入到completedReceives ????NetworkReceive?networkReceive?=?stagedDeque.poll(); ????this.completedReceives.add(networkReceive); ????this.sensors.recordBytesReceived(channel.id(),?networkReceive.payload().limit()); }
讀出數據后,會先放到stagedReceives集合中,然后在addToCompletedReceives
方法中對于每個channel都會從stagedReceives取出一個NetworkReceive(如果有的話),放入到completedReceives中。
這樣做的原因有兩點:
對于SSL的連接來說,其數據內容是加密的,所以不能精準的確定本次需要讀取的數據大小,只能盡可能的多讀,這樣會導致可能會比請求的數據讀的要多。那如果該channel之后沒有數據可以讀,會導致多讀的數據將不會被處理。
kafka需要確保一個channel上request被處理的順序是其發送的順序。因此對于每個channel而言,每次poll上層最多只能看見一個請求,當該請求處理完成之后,再處理其他的請求。在sever端,每次poll后都會將該channel給mute
掉,即不再從該channel上讀取數據。當處理完成之后,才將該channelunmute
,即之后可以從該socket上讀取數據。而client端則是通過InFlightRequests#canSendMore
控制。
代碼中關于這段邏輯的注釋如下:
/*?In?the?"Plaintext"?setting,?we?are?using?socketChannel?to?read?&?write?to?the?network.?But?for?the?"SSL"?setting, *?we?encrypt?the?data?before?we?use?socketChannel?to?write?data?to?the?network,?and?decrypt?before?we?return?the?responses. *?This?requires?additional?buffers?to?be?maintained?as?we?are?reading?from?network,?since?the?data?on?the?wire?is?encrypted *?we?won't?be?able?to?read?exact?no.of?bytes?as?kafka?protocol?requires.?We?read?as?many?bytes?as?we?can,?up?to?SSLEngine's *?application?buffer?size.?This?means?we?might?be?reading?additional?bytes?than?the?requested?size. *?If?there?is?no?further?data?to?read?from?socketChannel?selector?won't?invoke?that?channel?and?we've?have?additional?bytes *?in?the?buffer.?To?overcome?this?issue?we?added?"stagedReceives"?map?which?contains?per-channel?deque.?When?we?are *?reading?a?channel?we?read?as?many?responses?as?we?can?and?store?them?into?"stagedReceives"?and?pop?one?response?during *?the?poll?to?add?the?completedReceives.?If?there?are?any?active?channels?in?the?"stagedReceives"?we?set?"timeout"?to?0 *?and?pop?response?and?add?to?the?completedReceives. *?Atmost?one?entry?is?added?to?"completedReceives"?for?a?channel?in?each?poll.?This?is?necessary?to?guarantee?that ?????*?requests?from?a?channel?are?processed?on?the?broker?in?the?order?they?are?sent.?Since?outstanding?requests?added ?????*?by?SocketServer?to?the?request?queue?may?be?processed?by?different?request?handler?threads,?requests?on?each ?????*?channel?must?be?processed?one-at-a-time?to?guarantee?ordering. */
本文分析了kafka network層的實現,在閱讀kafka源碼時,如果不把network層搞清楚會比較迷,比如req/resp的順序保障機制、真正進行網絡IO的不是send方法等等。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。