Kafka 提供了兩種方式來手動提交偏移量:
import org.apache.kafka.clients.consumer.*;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "false"); // 關閉自動提交
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test-topic"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 處理消息
}
consumer.commitSync(); // 手動提交偏移量
}
} finally {
consumer.close();
}
import org.apache.kafka.clients.consumer.*;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "false"); // 關閉自動提交
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test-topic"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 處理消息
}
consumer.commitAsync(); // 異步提交偏移量
}
} finally {
consumer.close();
}
在這兩種方式中,commitSync() 方法會一直阻塞直到偏移量提交成功或發生錯誤。而 commitAsync() 方法則會在提交請求發送后立即返回,不會等待確認。如果發生錯誤,可以在 commitAsync() 方法的回調函數中處理。