91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何進行kafka批量消費多消費者問題分析

發布時間:2021-12-15 09:29:26 來源:億速云 閱讀:741 作者:柒染 欄目:大數據

今天就跟大家聊聊有關如何進行kafka批量消費多消費者問題分析,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。

package com.llw.medical.bs.listener;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.kafka.annotation.TopicPartition;import org.springframework.stereotype.Component;import java.util.List;import java.util.Optional;@Componentpublic class KafakaListener {@KafkaListener(id = "1", topics = {"topic2"})public void listen(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            System.out.println("----------------- record =" + record);
            System.out.println("----------------- message =" + message);
        }
    }@KafkaListener(id = "2", topicPartitions =
            {@TopicPartition(topic = "topic1",
                    partitions = {"1", "2", "3"}// partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4")            )
            })public void listen2(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            System.out.println("----------------- record 1=" + record);
            System.out.println("------------------ message 1=" + message);
        }
    }//id = "4",    //id="4"    @KafkaListener( id= "4",groupId = "1",topics="topic1", /*topicPartitions =            {@TopicPartition(topic = "topic1",                    partitions = {"0"}                    // partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4")            )*//*            },*/ containerFactory = "kafkaBatchListener6")public void listen3(List<ConsumerRecord<?, ?>> records) {//, Acknowledgment ack        try {for (ConsumerRecord<?, ?> record : records) {
                Optional<?> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {
                    Object message = kafkaMessage.get();
                    System.out.println("----------------- record 4=" + record);//   System.out.println("------------------ message 4=" + message);                }
            }
        } finally {//   ack.acknowledge();        }
    }//id="5"    @KafkaListener(id = "5",groupId = "1",topics="topic1", /*topicPartitions =            {@TopicPartition(topic = "topic1",                    partitions = {"0"}                    // partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4")            )            },*/ containerFactory = "kafkaBatchListener6")public void listen2(List<ConsumerRecord<?, ?>> records) {//, Acknowledgment ack        try {for (ConsumerRecord<?, ?> record : records) {
                Optional<?> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {
                    Object message = kafkaMessage.get();
                    System.out.println("----------------- record 6=" + record);//   System.out.println("------------------ message 6=" + message);                }
            }
        } finally {//   ack.acknowledge();        }
    }//https://www.cnblogs.com/linjiqin/p/13171789.html    @KafkaListener(id = "6",groupId = "1",topics="topic1",/* topicPartitions =            {@TopicPartition(topic = "topic1",                    partitions = {"0"}                    // partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4")            )            }, */containerFactory = "kafkaBatchListener6")public void listen4(List<ConsumerRecord<?, ?>> records) {try {for (ConsumerRecord<?, ?> record : records) {
                Optional<?> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {
                    Object message = kafkaMessage.get();
                    System.out.println("----------------- record 3=" + record);//   System.out.println("------------------ message 6=" + message);                }
            }
        } finally {//   ack.acknowledge();        }
    }

}

一個partition只能有一個消費者,如果多個消費者會是廣播模式,每個消費者都會有一條數據,kafka是一個發布和訂閱模式的主鍵,并不是隊列模式,

spring boot整合時,如果使用topicPartitions 注解參數指定partition會有消息重復消費的問題,最好使用topics注解,并指定groupId。

看完上述內容,你們對如何進行kafka批量消費多消費者問題分析有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注億速云行業資訊頻道,感謝大家的支持。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

贞丰县| 西乌珠穆沁旗| 茌平县| 思南县| 石家庄市| 儋州市| 栾城县| 隆回县| 新闻| 漳州市| 郑州市| 沂源县| 常熟市| 北宁市| 木里| 西林县| 海城市| 浪卡子县| 漯河市| 托里县| 潜山县| 屏边| 鱼台县| 启东市| 石首市| 万源市| 永昌县| 大田县| 潜江市| 连平县| 治县。| 舞阳县| 泰宁县| 松江区| 昌黎县| 延庆县| 镇远县| 安溪县| 丰台区| 普安县| 泰兴市|