使用PHP的rdkafka擴展庫來消費消息的步驟如下:
composer require edenhill/php-rdkafka
<?php
$conf = new RdKafka\Conf();
$conf->set('group.id', 'myConsumerGroup');
$conf->set('metadata.broker.list', 'localhost:9092');
$consumer = new RdKafka\Consumer($conf);
$consumer->subscribe(['myTopic']);
while (true) {
$message = $consumer->consume(120 * 1000); // 120秒超時
if ($message->err) {
echo "Error: {$message->errstr()}\n";
continue;
}
echo "Received message: {$message->payload}\n";
}
處理消費到的消息,可以根據業務需求進行處理。
最后,記得在結束時關閉消費者實例:
$consumer->close();
以上就是使用PHP的rdkafka擴展庫來消費消息的基本步驟,可以根據實際情況進行適當的調整和優化。