要實現通過C語言向Kafka傳輸數據,可以使用librdkafka這個開源的C語言庫。下面是一個簡單的示例代碼,演示如何使用librdkafka來向Kafka發送消息:
#include <librdkafka/rdkafka.h>
#include <stdio.h>
#include <string.h>
int main(int argc, char *argv[]) {
rd_kafka_t *rk;
rd_kafka_topic_t *rkt;
rd_kafka_conf_t *conf;
char errstr[512];
// 創建Kafka配置對象
conf = rd_kafka_conf_new();
// 設置Kafka配置項,例如bootstrap.servers(Kafka集群的地址)
if (rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "Error configuring Kafka: %s\n", errstr);
return 1;
}
// 創建Kafka生產者對象
rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (!rk) {
fprintf(stderr, "Error creating Kafka producer: %s\n", errstr);
return 1;
}
// 創建Kafka主題對象
rkt = rd_kafka_topic_new(rk, "test_topic", NULL);
// 發送消息到Kafka
char *message = "Hello, Kafka!";
if (rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, message, strlen(message), NULL, 0, NULL) == -1) {
fprintf(stderr, "Error producing message: %s\n", rd_kafka_err2str(rd_kafka_last_error()));
return 1;
}
// 等待消息發送完成
rd_kafka_flush(rk, 10*1000);
// 清理資源
rd_kafka_topic_destroy(rkt);
rd_kafka_destroy(rk);
return 0;
}
以上代碼示例中,我們通過librdkafka庫創建了一個Kafka生產者對象,并向名為"test_topic"的主題發送了一條消息"Hello, Kafka!"。在實際使用中,你可以根據自己的需求配置更多的Kafka參數,并發送不同的消息內容。