您好,登錄后才能下訂單哦!
在使用kafka high-level的consumer,使用多線程消費數據時報錯,簡單分析一下原因下載 ,ConsumerIterator取不到消息時會阻塞,并且將內部狀態置為FAILED,當其他線程訪問時就會拋出異常。
Java代碼
def hasNext(): Boolean = {
if(state == FAILED) //處于FAILED狀態時,另外線程訪問會直接異常
throw new IllegalStateException("Iterator is in failed state")
state match {
case DONE => false
case READY => true
case _ => maybeComputeNext()
}
}
def maybeComputeNext(): Boolean = {
state = FAILED //重置了狀態
nextItem = Some(makeNext())
if(state == DONE) {
false
} else {
state = READY
true
}
}
下載
protected def makeNext(): MessageAndMetadata[K, V] = {
var currentDataChunk: FetchedDataChunk = null
// if we don't have an iterator, get one
var localCurrent = current.get()
if(localCurrent == null || !localCurrent.hasNext) {
if (consumerTimeoutMs < 0)
currentDataChunk = channel.take //channel是BlockingQueue這里會阻塞
else {
currentDataChunk = channel.poll(consumerTimeoutMs, TimeUnit.MILLISECONDS)
if (currentDataChunk == null) {
// reset state to make the iterator re-iterable
resetState()
throw new ConsumerTimeoutException
}
}
//省略部分代碼
}
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。