91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

@KafkaListener怎么使用

發布時間:2023-02-25 11:58:43 來源:億速云 閱讀:167 作者:iii 欄目:開發技術

這篇文章主要介紹“@KafkaListener怎么使用”,在日常操作中,相信很多人在@KafkaListener怎么使用問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”@KafkaListener怎么使用”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

@KafkaListener 如何使用

@KafkaListener怎么使用

@KafkaListener怎么使用

spring-kafka使用基于@KafkaListener注解,@KafkaListener使用方式如下

@KafkaListener(topics = "topic1")
public void   kafkaListen(List<ConsumerRecord<xxx, xxx>> records) {
    ...
}

@KafkaListener怎么使用

在注解內指定topic名稱,當對應的topic內有新的消息時,testListen方法會被調用,參數就是topic內新的消息。這個過程是異步進行的。

@KafkaListener工作流程主要有以下幾步:

解析;解析@KafkaListener注解。
注冊;解析后的數據注冊到spring-kafka。
監聽;開始監聽topic變更。
調用;調用注解標識的方法,將監聽到的數據作為參數傳入。
下面我們一步一步分析

解析

@KafkaListener注解由KafkaListenerAnnotationBeanPostProcessor類解析,后者實現了BeanPostProcessor接口,這個接口如下

public interface BeanPostProcessor {

    Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException;

    Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException;
}

接口內部有2個方法,分別在bean初始化前后被調用。

KafkaListenerAnnotationBeanPostProcessor內會在postProcessAfterInitialization方法內解析@KafkaListener注解。

注冊
解析步驟里,我們可以獲取到所有含有@KafkaListener注解的類,之后這些類的相關信息會被注冊到 KafkaListenerEndpointRegistry內,包括注解所在的方法,當前的bean等。KafkaListenerEndpointRegistry這個類內部會維護多個Listener Container,每一個@KafkaListener都會對應一個Listener Container。并且每個Container對應一個線程。

監聽
注冊完成之后,每個Listener Container會開始工作,會新啟一個新的線程,初始化KafkaConsumer,監聽topic變更等。

調用
監聽到數據之后,container會組織消息的格式,隨后調用解析得到的@KafkaListener注解標識的方法,將組織后的消息作為參數傳入方法,執行用戶邏輯。

@KafkaListener和@KafkaListners

@KafkaListeners是@KafkaListener的Container Annotation,這也是jdk8的新特性之一,注解可以重復標注。

@KafkaListeners({@KafkaListener(topics="topic1"), @KafkaListener(topics="topic2")})
public void listen(ConsumerRecord<Integer, String> msg) {}
 
等同于
 
@KafkaListener(topics="topic1")
@KafkaListener(topics="topic2")
public void listen(ConsumerRecord<Integer, String> msg) {}

擴展:kafka的消費者分區分配策略

kafka有三種分區分配策略

1. RoundRobin

2. Range

3. Sticky

1. RoundRobin

(1)把所有topic的分區partition放入一個隊列中,按照name的hashcode進行排序;

(2)把consumer放在一個循環隊列,按照name的hashcode進行排序;

(3)循環遍歷consumer,從partition隊列pop出一個partition,分配給當前consumer;以此類推,取下一個consumer,繼續從partition隊列pop出來分配給當前consumer;直到partition隊列中的元素被分配完;

2. Range

(1)假設topicA有4個分區,topicB有5個分區,topicC有6個分區;一共有3個consumer;

(2)遍歷3個topic的分區集合,先取topicA的分區集合,然后準備依次給3個consumer分配分區;對于第1個consumer,所分配的分區數量根據以下公式:假設消費者數量為N,當前主題剩下的分區數量為M,則當前消費者應該分配的分區數量 = M%N==0? M/N +1 : M/N ;按照公式,3個消費者應該分配的分區數量依次為:2/1/1,即topicA-partition-0/1分配給consumer-0,topicA-partition-2分配給consumer-1,topicA-partition-3分配給consumer-2;

(3)按照上述規則按序把topicB和topicC的分區分配給3個consumer;依次為:2/2/1,2/2/2;

3. Sticky

kafka在0.11版本引入了Sticky分區分配策略,它的兩個主要目的是:

1. 分區的分配要盡可能的均勻,分配給消費者者的主題分區數最多相差一個;

2. 分區的分配盡可能的與上次分配的保持相同;

當兩者發生沖突時,第一個目標優先于第二個目標;

粘性分區是由Kafka從0.11x版本開始引入的分配策略,首先會盡量均衡的分配分區到消費者上面,在出現同一消費組內消費者出現問題的時候,會盡量保持原來的分配的分區不變;

Sticky分區初始分配分區的方法與Range相似,但是不同;拿7個分區3個消費者為例,消費者消費的分區依舊是3/2/2,但是不同與Range的是Range分區是排好序的,但是Sticky分區是隨機的。

到此,關于“@KafkaListener怎么使用”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

林口县| 康乐县| 德昌县| 兴隆县| 农安县| 龙里县| 忻州市| 木兰县| 枣阳市| 朝阳市| 桐乡市| 郑州市| 讷河市| 蚌埠市| 丰县| 宝应县| 东至县| 平南县| 桂林市| 九江县| 赣榆县| 沙坪坝区| 五寨县| 龙山县| 青浦区| 磐石市| 墨竹工卡县| 开阳县| 合作市| 宣化县| 盱眙县| 犍为县| 碌曲县| 丰原市| 都昌县| 郧西县| 兴仁县| 平罗县| 措美县| 庆城县| 大关县|