Kafka可以通過設置consumer的offset來讀取指定位置的消息。在創建consumer實例時,可以通過指定partition和offset來設置consumer的起始位置。具體步驟如下:
auto.offset.reset
屬性為none
,禁止consumer自動重置offset。這樣可以確保consumer從指定的offset開始讀取消息。Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "none");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
assign()
方法將consumer分配到指定的partition,并設置起始offset。TopicPartition partition = new TopicPartition("test-topic", 0);
consumer.assign(Collections.singletonList(partition));
consumer.seek(partition, 10); // 從offset為10的位置開始讀取消息
poll()
方法來獲取消息了。ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
通過以上步驟,就可以在Kafka中讀取指定位置的消息。