91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

怎么進行kafka的異步模式分析

發布時間:2021-12-15 09:30:12 來源:億速云 閱讀:213 作者:柒染 欄目:大數據

本篇文章為大家展示了怎么進行kafka的異步模式分析,內容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。

啥是異步模式

kafka的生產者可以選擇使用異步方式發送數據,所謂異步方式,就是我們調用 send() 方法,并指定一個回調函數, 服務器在返回響應時調用該函數。

kafka在客戶端里暴露了兩個send方法,我們可以自己選擇同步或者異步模式。我們來看一個kafka的生產者發送示例,有個直觀的感受。這個示例是一個同步的模式。

ProducerRecord<String, String> record = new ProducerRecord<>(“Kafka”, “Kafka_Products”, “測試”);//Topic Key Value
try{
Future future = producer.send(record);
future.get();//獲取執行結果
} catch(Exception e) {
e.printStackTrace();
}
 

我們從源碼層面來繼續看下。

首先kafka定義了一個接口,

怎么進行kafka的異步模式分析

然后KafkaProducer實現了這兩個方法,我們看下異步方法的實現邏輯。

怎么進行kafka的異步模式分析

可以看到最終是調用doSend方法,調用的時候傳入一個回調。這個回調就是監聽方法的執行結果的。

 

異步模式也會阻塞的

很多人會認為,既然是異步模式,不管結果是成功還是失敗,肯定方法調用會馬上返回的。那我只能告訴你,不好意思,不一定是這樣。我自己就曾經踩過這個坑。

我們當時有個業務流程需要在執行完成后發送kakfa消息給某個業務方,為了盡量減少影響我這個主流程的執行時間,采用了異步方式發送kafka消息。在使用中,因為配錯了kafka的TOPIC信息,發現流程阻塞發送消息這里長達6秒(kafka默認的發送超時時間)。

究竟為啥異步方式還會阻塞呢?我們繼續看源碼。

怎么進行kafka的異步模式分析

不管是同步模式還是異步模式,最終都會調用到doSend方法,注意看上圖中的waitOnMetadata方法,我上面說的阻塞的情況就是阻塞在這個方法里。那我們繼續看這個方法。

怎么進行kafka的異步模式分析

通過代碼中的注釋我們大概能了解這個方法的功能,不過我這里還是要解釋下。(防止有人看不懂英文,哈哈)

waitOnMetadata獲取當前的集群元數據信息,如果緩存有,并且分區沒有超過指定分區范圍則緩存返回,否則觸發更新,等待新的metadata。這個等待的操作在下面這行代碼:

metadata.awaitUpdate(version, remainingWaitMs);
 

然后就繼續跟嘍,

怎么進行kafka的異步模式分析

這個方法很好理解,就是一直在等一個條件,這個條件達到了就返回,否則一直等待超時退出。而這個條件就是當前的版本號要大于上個版本號。

那么誰來更新版本號呢?就是我們前面提到的sender線程。當我們的topic配置錯誤的時候導致metadata一直無法更新,然后一直等到超時。

上述內容就是怎么進行kafka的異步模式分析,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

武清区| 轮台县| 甘德县| 鹤山市| 武冈市| 肥乡县| 高要市| 灵丘县| 大冶市| 曲松县| 仪征市| 随州市| 祁连县| 佳木斯市| 璧山县| 运城市| 宁国市| 天全县| 耿马| 洞头县| 镇赉县| 汉中市| 榆中县| 滦南县| 城步| 威宁| 波密县| 正定县| 台北县| 乐业县| 仪征市| 名山县| 溧水县| 左贡县| 新绛县| 新津县| 平遥县| 新邵县| 沾益县| 平阳县| 周宁县|