在Java中,你可以使用Apache Kafka的Java客戶端庫來從Kafka中讀取數據。下面是一個簡單的示例代碼:
首先,你需要在你的項目中添加Kafka的Java客戶端庫的依賴。你可以在你的構建工具(如Maven或Gradle)的配置文件中添加以下依賴:
<!-- Kafka client -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
然后,你可以使用以下代碼從Kafka中讀取數據:
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) {
// Kafka集群的地址
String bootstrapServers = "localhost:9092";
// 消費者組的ID
String groupId = "my-group";
// 要消費的主題
String topic = "my-topic";
// 配置消費者的屬性
Properties properties = new Properties();
properties.put("bootstrap.servers", bootstrapServers);
properties.put("group.id", groupId);
properties.put("key.deserializer", StringDeserializer.class);
properties.put("value.deserializer", StringDeserializer.class);
// 創建消費者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// 訂閱主題
consumer.subscribe(Collections.singletonList(topic));
// 無限循環從Kafka中讀取消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received message: " + record.value());
}
}
}
}
以上代碼創建了一個消費者并訂閱了一個主題。然后,通過調用consumer.poll(1000)
來從Kafka中拉取數據。在這個例子中,我們只是簡單地將接收到的消息打印到控制臺上。
請確保替換bootstrapServers
、groupId
和topic
為你要連接的Kafka集群的地址、你的消費者組的ID和你要消費的主題。
希望對你有幫助!