您好,登錄后才能下訂單哦!
Kafka網絡引擎的核心字段及初始化是什么樣的,針對這個問題,這篇文章詳細介紹了相對應的分析和解答,希望可以幫助更多想解決這個問題的小伙伴找到更簡單易行的方法。
Kafka的網絡層采用多線程,多個Selector的設計,這跟RMNT的思路差不多(Redis特殊一些,后面不再強調這1點)。
核心類是SocketServer,包含1個Acceptor用于接受新連接,每個Acceptor對應多個Processor線程,每個Processor線程擁有自己獨立的Selector.主要用于從連接中讀取請求和寫回響應。
每個Processor擁有自己的Selector,這樣才可以將某個socket限制在自己的處理范圍內,直到這個Socket的生命周期結束。 而且不但讀取請求是這個Processor的Selector線程處理,寫回響應也是它處理, 一言以蔽之:我會對你負責到底的!!! :) 其實想想也正常,總不可能同1個 socket一會被Selector A處理,一會被B處理,這就亂套了,還是穩定起來好!
每個Acceptor也對應多個Handle線程,一般業內稱之為業務處理線程池
所以如果你去別的單位面試, 問你Netty如何處理耗時業務的,你不說要新開一個業務線程池,相信我,面試官內心會把你鄙視一頓的 :) 千萬別說Netty的業務處理跟IO線程池在一個線程處理的,絕對要丟分! 此話一出,基本Netty這一項就不用問下去了
---這里要注意的是:業務線程池的結果是要返回給IO線程池的,也就是Processor線程組,
這2種線程之間通過RequestChannel進行通信
在Thrift中,是通過Runnable封裝FrameBuffer來實現的 protected Runnable getRunnable(FrameBuffer frameBuffer) { return new Invocation(frameBuffer); } FrameBuffer封裝的就是業務邏輯完整的一個請求體, 你就理解為一個完整意義的HTTP請求體一樣的 技術背景:TCP的字節流協議特性!!!不多說了
SocketServer的核心字段
源碼位置
find ./ -name SocketServer.* ./core/src/main/scala/kafka/network/SocketServer.scala
1)endpoints:
EndPoint集合。一般服務器有多個網卡,這就可以配置多個IP,Kafka可以同時監聽多個端口,
一個endpoint就定義了host,port,網絡協議等信息,
每個Endpoint對應1個Acceptor對象
這個其實有點類似于ActiveMQ的概念,ActiveMQ支持多個協議,每個協議開啟了一個TCP協議的監聽端口, 所以一個ActiveMQ進程其實占用了很多個listening port.
2)numProcessorThreads & totalProcessorThreads
numProcessorThreads 的意思是 每個endpoint的Processor線程的個數
那么后面1個呢?因為有多個endpoint,所以就是endpoint的個數* numProcessorThreads
3)maxQueuedRequests: 緩存的最大請求個數
想一想,在Thrift中,最多可以緩存多少個? :)
這個其實是通過ProcessorThread對Socket進行讀取后得到請求,塞到這個隊列里進行緩沖
4)maxConnectionsPerIp: 每個IP上能創建的最大連接數
正常來說,不會有限制吧,難道要限制 client不連過來嗎???
5)maxConnectionsPerIpOverrides: 略
6)requestChannel: 隊列
kafka里的一個邏輯完整請求封裝對應的隊列,想想http的請求體對應的是HttpRequest
Thrift對應的是
在Thrift中,是通過Runnable封裝FrameBuffer來實現的 protected Runnable getRunnable(FrameBuffer frameBuffer) { return new Invocation(frameBuffer); } FrameBuffer封裝的就是業務邏輯完整的一個請求體, 你就理解為一個完整意義的HTTP請求體一樣的 技術背景:TCP的字節流協議特性!!!不多說了
不解釋!
7)Acceptors:
Acceptor對象集合,每個Endpoint對應一個這樣的對象,不解釋!
8)Processors:
IO線程的集合,不解釋!
===介紹完了核心字段,下面看SocketServer的初始化流程===
首先,老規矩,構造Linux的debug環境。
1)啟動Kafka server 2)查看啟動命令 /root/myAllFiles/jdk1.8.0_111/bin/java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true -Xloggc:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../logs -Dlog4j.configuration=file:bin/../config/log4j.properties -cp /root/leveldb_0.9:/root/leveldb_0.9/*::/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/aopalliance-repackaged-2.5.0-b05.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/argparse4j-0.7.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/commons-lang3-3.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/connect-api-0.11.0.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/connect-file-0.11.0.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/connect-json-0.11.0.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/connect-runtime-0.11.0.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/connect-transforms-0.11.0.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/guava-20.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/hk2-api-2.5.0-b05.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/hk2-locator-2.5.0-b05.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/hk2-utils-2.5.0-b05.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jackson-annotations-2.8.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jackson-core-2.8.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jackson-databind-2.8.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jackson-jaxrs-base-2.8.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jackson-jaxrs-json-provider-2.8.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jackson-module-jaxb-annotations-2.8.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/javassist-3.21.0-GA.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/javax.annotation-api-1.2.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/javax.inject-1.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/javax.inject-2.5.0-b05.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/javax.servlet-api-3.1.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/javax.ws.rs-api-2.0.1.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-client-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-common-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-container-servlet-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-container-servlet-core-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-guava-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-media-jaxb-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-server-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jetty-continuation-9.2.15.v20160210.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jetty-http-9.2.15.v20160210.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jetty-io-9.2.15.v20160210.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jetty-security-9.2.15.v20160210.jar: 3)構造debug命令 jdb -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true -Xloggc:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../logs -Dlog4j.configuration=file:bin/../config/log4j.properties -classpath /root/leveldb_0.9:/root/leveldb_0.9/*::/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/aopalliance-repackaged-2.5.0-b05.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/argparse4j-0.7.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/commons-lang3-3.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/connect-api-0.11.0.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/connect-file-0.11.0.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/connect-json-0.11.0.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/connect-runtime-0.11.0.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/connect-transforms-0.11.0.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/guava-20.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/hk2-api-2.5.0-b05.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/hk2-locator-2.5.0-b05.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/hk2-utils-2.5.0-b05.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jackson-annotations-2.8.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jackson-core-2.8.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jackson-databind-2.8.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jackson-jaxrs-base-2.8.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jackson-jaxrs-json-provider-2.8.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jackson-module-jaxb-annotations-2.8.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/javassist-3.21.0-GA.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/javax.annotation-api-1.2.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/javax.inject-1.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/javax.inject-2.5.0-b05.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/javax.servlet-api-3.1.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/javax.ws.rs-api-2.0.1.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-client-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-common-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-container-servlet-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-container-servlet-core-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-guava-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-media-jaxb-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-server-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jetty-continuation-9.2.15.v20160210.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jetty-http-9.2.15.v20160210.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jetty-io-9.2.15.v20160210.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jetty-security-9.2.15.v20160210.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/libs/*: kafka.Kafka config/server.properties
第3步如果報錯,讀者可以自己體會然后修正 :)
好,通過jdb跑起來之后,我們的目標是大體了解SocketServer的執行過程,具體每個組件的實現會在后面詳細介紹。
SocketServer會在啟動時遍歷EndPoint,啟動對應的各種線程 :)
安裝scala插件 見 http://www.cnblogs.com/xiyuan2016/p/6626825.html http://scala-ide.org/download/prev-stable.html
------開始嘗試debug,來熱熱身------
先來個斷點
stop in kafka.network.SocketServer:54
然后可以開始debug了,必要的話,請自己加上源碼的文件夾即可。
如圖所示:
================下面正式debug==================
小貼士:
屬性文件的對應關系,請參考: kafka.server.KafkaConfig.scala
class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time, val credentialProvider: CredentialProvider) extends Logging with KafkaMetricsGroup { //默認就1個 private val endpoints = config.listeners.map(l => l.listenerName -> l).toMap private val numProcessorThreads = config.numNetworkThreads//配置文件中是3,val NumNetworkThreadsProp = "num.network.threads" private val maxQueuedRequests = config.queuedMaxRequests//默認500, val QueuedMaxRequestsProp = "queued.max.requests" private val totalProcessorThreads = numProcessorThreads * endpoints.size//3*1 private val maxConnectionsPerIp = config.maxConnectionsPerIp//默認值Int.MaxValue->2147483647 val MaxConnectionsPerIpProp = "max.connections.per.ip" private val maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides this.logIdent = "[Socket Server on Broker " + config.brokerId + "], "
接下來初始化RequestChannel對象
val requestChannel = new RequestChannel(totalProcessorThreads, maxQueuedRequests)
跟進去
class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup { private var responseListeners: List[(Int) => Unit] = Nil private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)//構造與業務線程池的通道 private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)//構造業務線程池的返回通道 for(i <- 0 until numProcessors)//初始化,用了LinkedBlockingQueue responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]()
回到SocketServer.scala
private val processors = new Array[Processor](totalProcessorThreads)//準備構造IO線程池 private[network] val acceptors = mutable.Map[EndPoint, Acceptor]()//Acceptor private var connectionQuotas: ConnectionQuotas = _
// register the processor threads for notification of responses requestChannel.addResponseListener(id => processors(id).wakeup()) 意思就是說當有業務響應準備好時,需要wakeup當前io線程的Selector.
接下來,執行startup方法,這是核心
Step completed: "thread=main", kafka.network.SocketServer.startup(), line=74 bci=0
我們來看看做了哪些事情
/** * Start the socket server */ def startup() { this.synchronized { //限額 connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides) //一些TCP的參數 val sendBufferSize = config.socketSendBufferBytes//配置文件:102400 val SocketSendBufferBytesProp = "socket.send.buffer.bytes" val recvBufferSize = config.socketReceiveBufferBytes//配置文件:102400 val SocketReceiveBufferBytesProp = "socket.receive.buffer.bytes" val brokerId = config.brokerId//這個就不用說了 var processorBeginIndex = 0 config.listeners.foreach { endpoint => val listenerName = endpoint.listenerName val securityProtocol = endpoint.securityProtocol val processorEndIndex = processorBeginIndex + numProcessorThreads//每個endpoint都啟動這么多個線程 for (i <- processorBeginIndex until processorEndIndex) processors(i) = newProcessor(i, connectionQuotas, listenerName, securityProtocol)//初始化Processor線程 val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId, processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)//初始化Acceptor線程 acceptors.put(endpoint, acceptor) Utils.newThread(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor, false).start() acceptor.awaitStartup() processorBeginIndex = processorEndIndex } }
關閉就比較簡單了
/** * Shutdown the socket server */ def shutdown() = {//關閉操作 info("Shutting down") this.synchronized { acceptors.values.foreach(_.shutdown)//關閉acceptor processors.foreach(_.shutdown)//關閉processor } info("Shutdown completed") }
AbstractServerThread
看下面2個
/** * Thread that accepts and configures new connections. There is one of these per endpoint. */ private[kafka] class Acceptor(val endPoint: EndPoint, val sendBufferSize: Int, val recvBufferSize: Int, brokerId: Int, processors: Array[Processor], connectionQuotas: ConnectionQuotas) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup { /** * Thread that processes all requests from a single connection. There are N of these running in parallel * each of which has its own selector */ private[kafka] class Processor(val id: Int, time: Time, maxRequestSize: Int, requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, connectionsMaxIdleMs: Long, listenerName: ListenerName, securityProtocol: SecurityProtocol, config: KafkaConfig, metrics: Metrics, credentialProvider: CredentialProvider) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
知道,Acceptor和Processor都是繼承了AbstractServerThread這個類
/** * A base class with some helper variables and methods */ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQuotas) extends Runnable with Logging {
它實現了Runnable接口的抽象類,分別為acceptor和Processor線程提供了具體的startup/shutdown功能!
小貼士: 停下來,回顧一下Netty的玩法,是不是很熟悉?
/** * A base class with some helper variables and methods */ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQuotas) extends Runnable with Logging { private val startupLatch = new CountDownLatch(1)//標識是否已經啟動完畢 // `shutdown()` is invoked before `startupComplete` and `shutdownComplete` if an exception is thrown in the constructor // (e.g. if the address is already in use). We want `shutdown` to proceed in such cases, so we first assign an open // latch and then replace it in `startupComplete()`. @volatile private var shutdownLatch = new CountDownLatch(0)//標記是否關閉完畢 private val alive = new AtomicBoolean(true)//是否存活 def wakeup(): Unit /** * Initiates a graceful shutdown by signaling to stop and waiting for the shutdown to complete */ def shutdown(): Unit = { alive.set(false) wakeup() shutdownLatch.await() } /** * Wait for the thread to completely start up */ def awaitStartup(): Unit = startupLatch.await /** * Record that the thread startup is complete */ protected def startupComplete(): Unit = { // Replace the open latch with a closed one shutdownLatch = new CountDownLatch(1) startupLatch.countDown() } /** * Record that the thread shutdown is complete */ protected def shutdownComplete(): Unit = shutdownLatch.countDown() /** * Is the server still running? */ protected def isRunning: Boolean = alive.get /** * Close the connection identified by `connectionId` and decrement the connection count. */ def close(selector: KSelector, connectionId: String): Unit = {//關閉socket,減少連接數統計 val channel = selector.channel(connectionId) if (channel != null) { debug(s"Closing selector connection $connectionId") val address = channel.socketAddress if (address != null) connectionQuotas.dec(address) selector.close(connectionId) } } /** * Close `channel` and decrement the connection count. */ def close(channel: SocketChannel): Unit = { if (channel != null) { debug("Closing connection from " + channel.socket.getRemoteSocketAddress()) connectionQuotas.dec(channel.socket.getInetAddress) swallowError(channel.socket().close()) swallowError(channel.close()) } } }
關于Kafka網絡引擎的核心字段及初始化是什么樣的問題的解答就分享到這里了,希望以上內容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關注億速云行業資訊頻道了解更多相關知識。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。