Java可以使用Apache Kafka來實現消費消息。
首先,你需要設置Kafka的消費者配置。以下是一個示例:
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
public class ConsumerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Received message: key = %s, value = %s%n", record.key(), record.value());
}
}
}
}
上述代碼中,我們創建了一個Kafka消費者,并使用給定的配置訂閱了"my-topic"主題。然后,我們使用poll()
方法循環地從Kafka服務器拉取新的消息記錄,并對每個消息記錄進行處理。
你可以將以上代碼和其他邏輯結合起來,根據需要處理和消費消息。