在Kafka中,可以使用生產者的 send
方法來發送消息,可以批量發送消息的方式有以下幾種:
send
方法時,將多條消息封裝成一個 ProducerRecord
的列表,然后使用 send
方法一次性發送。示例代碼如下:List<ProducerRecord<String, String>> records = new ArrayList<>();
records.add(new ProducerRecord<>("topic", "key1", "value1"));
records.add(new ProducerRecord<>("topic", "key2", "value2"));
records.add(new ProducerRecord<>("topic", "key3", "value3"));
for (ProducerRecord<String, String> record : records) {
producer.send(record).get(); // 同步發送
}
send
方法時,同樣將多條消息封裝成一個 ProducerRecord
的列表,然后使用 send
方法一次性發送。示例代碼如下:List<ProducerRecord<String, String>> records = new ArrayList<>();
records.add(new ProducerRecord<>("topic", "key1", "value1"));
records.add(new ProducerRecord<>("topic", "key2", "value2"));
records.add(new ProducerRecord<>("topic", "key3", "value3"));
for (ProducerRecord<String, String> record : records) {
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
exception.printStackTrace();
} else {
System.out.println("Message sent successfully: " + metadata);
}
}
});
}
以上是兩種常見的批量發送消息的方式,可以根據實際需求選擇合適的方式。