91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

kafka手動提交偏移量怎么實現

小億
162
2023-11-28 17:11:46
欄目: 大數據

Kafka 提供了兩種方式來手動提交偏移量:

  1. 使用 commitSync() 方法同步提交偏移量:
import org.apache.kafka.clients.consumer.*;

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "false"); // 關閉自動提交

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test-topic"));

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            // 處理消息
        }
        consumer.commitSync(); // 手動提交偏移量
    }
} finally {
    consumer.close();
}
  1. 使用 commitAsync() 方法異步提交偏移量:
import org.apache.kafka.clients.consumer.*;

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "false"); // 關閉自動提交

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test-topic"));

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            // 處理消息
        }
        consumer.commitAsync(); // 異步提交偏移量
    }
} finally {
    consumer.close();
}

在這兩種方式中,commitSync() 方法會一直阻塞直到偏移量提交成功或發生錯誤。而 commitAsync() 方法則會在提交請求發送后立即返回,不會等待確認。如果發生錯誤,可以在 commitAsync() 方法的回調函數中處理。

0
灵寿县| 杨浦区| 沙坪坝区| 丹棱县| 阿拉尔市| 册亨县| 收藏| 米脂县| 台湾省| 佛山市| 荃湾区| 南部县| 钦州市| 舟山市| 普陀区| 张家川| 科技| 万盛区| 玛纳斯县| 绩溪县| 水城县| 军事| 徐汇区| 罗平县| 泾源县| 四平市| 吉水县| 易门县| 绵竹市| 黔江区| 来安县| 凤冈县| 探索| 东乡族自治县| 微博| 台江县| 柘城县| 肇州县| 灵寿县| 楚雄市| 句容市|