您好,登錄后才能下訂單哦!
今天小編給大家分享一下springboot+kafka中@KafkaListener動態指定多個topic怎么實現的相關知識點,內容詳細,邏輯清晰,相信大部分人都還太了解這方面的知識,所以分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后有所收獲,下面我們一起來了解一下吧。
本項目為springboot+kafak的整合項目,故其用了springboot中對kafak的消費注解@KafkaListener
首先,application.properties中配置用逗號隔開的多個topic。
方法:利用Spring的SpEl表達式,將topics 配置為:@KafkaListener(topics = “#{’${topics}’.split(’,’)}”)
運行程序,console打印的效果如下:
因為只開了一條消費者線程,所以所有的topic和分區都分配給這條線程。
如果你想開多條消費者線程去消費這些topic,添加@KafkaListener注解的參數concurrency的值為自己想要的消費者個數即可(注意,消費者數要小于等于你開的所有topic的分區數總和)
運行程序,console打印的效果如下:
如何在程序運行的過程中,改變topic,消費者能夠消費修改后的topic?
ans: 經過嘗試,使用@KafkaListener注解實現不了此需求,在程序啟動的時候,程序就會根據@KafkaListener的注解信息初始化好消費者去消費指定好的topic。如果在程序運行的過程中,修改topic,不會讓此消費者修改消費者的配置再重新訂閱topic的。
不過我們可以有個折中的辦法,就是利用@KafkaListener的topicPattern參數來進行topic匹配。
不使用@KafkaListener,使用kafka原生客戶端依賴,手動初始化消費者,開啟消費者線程。
在消費者線程中,每次循環都從配置、數據庫或者其他配置源獲取最新的topic信息,與之前的topic比較,如果發生變化,重新訂閱topic或者初始化消費者。
加入kafka客戶端依賴(本次測試服務端kafka版本:2.12-2.4.0)
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.3.0</version> </dependency>
@Service @Slf4j public class KafkaConsumers implements InitializingBean { /** * 消費者 */ private static KafkaConsumer<String, String> consumer; /** * topic */ private List<String> topicList; public static String getNewTopic() { try { return org.apache.commons.io.FileUtils.readLines(new File("D:/topic.txt"), "utf-8").get(0); } catch (IOException e) { e.printStackTrace(); } return null; } /** * 初始化消費者(配置寫死是為了快速測試,請大家使用配置文件) * * @param topicList * @return */ public KafkaConsumer<String, String> getInitConsumer(List<String> topicList) { //配置信息 Properties props = new Properties(); //kafka服務器地址 props.put("bootstrap.servers", "192.168.9.185:9092"); //必須指定消費者組 props.put("group.id", "haha"); //設置數據key和value的序列化處理類 props.put("key.deserializer", StringDeserializer.class); props.put("value.deserializer", StringDeserializer.class); //創建消息者實例 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //訂閱topic的消息 consumer.subscribe(topicList); return consumer; } /** * 開啟消費者線程 * 異常請自己根據需求自己處理 */ @Override public void afterPropertiesSet() { // 初始化topic topicList = Splitter.on(",").splitToList(Objects.requireNonNull(getNewTopic())); if (org.apache.commons.collections.CollectionUtils.isNotEmpty(topicList)) { consumer = getInitConsumer(topicList); // 開啟一個消費者線程 new Thread(() -> { while (true) { // 模擬從配置源中獲取最新的topic(字符串,逗號隔開) final List<String> newTopic = Splitter.on(",").splitToList(Objects.requireNonNull(getNewTopic())); // 如果topic發生變化 if (!topicList.equals(newTopic)) { log.info("topic 發生變化:newTopic:{},oldTopic:{}-------------------------", newTopic, topicList); // method one:重新訂閱topic: topicList = newTopic; consumer.subscribe(newTopic); // method two:關閉原來的消費者,重新初始化一個消費者 //consumer.close(); //topicList = newTopic; //consumer = getInitConsumer(newTopic); continue; } ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.println("key:" + record.key() + "" + ",value:" + record.value()); } } }).start(); } } }
說一下第72行代碼:
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
上面這行代碼表示:在100ms內等待Kafka的broker返回數據.超市參數指定poll在多久之后可以返回,不管有沒有可用的數據都要返回。
在修改topic后,必須等到此次poll拉取的消息處理完,while(true)循環的時候檢測topic發生變化,才能重新訂閱topic.
poll()方法一次拉取得消息數默認為:500,如下圖,kafka客戶端源碼中設置的。
如果想自定義此配置,可在初始化消費者時加入
運行結果(測試的topic中都無數據)
注意:KafkaConsumer是線程不安全的,不要用一個KafkaConsumer實例開啟多個消費者,要開啟多個消費者,需要new 多個KafkaConsumer實例。
以上就是“springboot+kafka中@KafkaListener動態指定多個topic怎么實現”這篇文章的所有內容,感謝各位的閱讀!相信大家閱讀完這篇文章都有很大的收獲,小編每天都會為大家更新不同的知識,如果還想學習更多的知識,請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。