Kafka中的消費者可以通過獲取記錄的方式來獲取結果。以下是使用Java API獲取Kafka消費者結果的步驟:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-consumer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received record: " + record.value());
}
}
在上述代碼中,poll()
方法用于獲取記錄,參數是最大等待時間。ConsumerRecords
對象包含了一批消費的記錄,您可以通過遍歷每個ConsumerRecord
對象來獲取結果。
注意:這里的示例代碼是一個簡化版本,實際使用中可能需要更多的配置和處理邏輯。