要確認消息已被正確處理,您可以使用 PHP RdKafka 擴展的 ack
方法
<?php
// 創建消費者
$conf = new \RdKafka\Conf();
$conf->set('group.id', 'myGroup');
$conf->set('bootstrap.servers', 'localhost:9092');
$consumer = new \RdKafka\KafkaConsumer($conf);
// 訂閱主題
$consumer->subscribe(['myTopic']);
while (true) {
// 拉取消息
$message = $consumer->consume(120*1000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "Reached end of partition event\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
case RD_KAFKA_RESP_ERR__PARTITION_NOT_FOUND:
echo "Partition not found\n";
break;
case RD_KAFKA_RESP_ERR__UNKNOWN:
throw new \Exception($message->errstr(), $message->err);
default:
if ($message->err == RD_KAFKA_RESP_ERR_NO_ERROR) {
// 消息已處理,確認
$consumer->ack($message);
echo "Message consumed and acknowledged\n";
} else {
throw new \Exception($message->errstr(), $message->err);
}
break;
}
}
在這個示例中,我們創建了一個 Kafka 消費者,訂閱了名為 “myTopic” 的主題。然后,我們進入一個無限循環,不斷從 Kafka 拉取消息。當成功拉取到消息時($message->err == RD_KAFKA_RESP_ERR_NO_ERROR
),我們調用 ack
方法來確認消息已被正確處理。如果發生錯誤,我們將拋出異常。