您好,登錄后才能下訂單哦!
延遲任務應用場景
場景一:物聯網系統經常會遇到向終端下發命令,如果命令一段時間沒有應答,就需要設置成超時。
場景二:訂單下單之后30分鐘后,如果用戶沒有付錢,則系統自動取消訂單。
實現方案
定時任務輪詢數據庫,看是否有產生新任務,如果產生則消費任務
pcntl_alarm為進程設置一個鬧鐘信號
swoole的異步高精度定時器:swoole_time_tick(類似javascript的setInterval)和swoole_time_after(相當于javascript的setTimeout)
rabbitmq延遲任務
以上四種方案,如果生產環境有使用到swoole建議使用第三種方案。此篇文章重點講述第四種方案實現
Rabbitmq延遲隊列實現
RabbitMQ沒有直接去實現延遲隊列這個功能。而是需要通過消息的TTL和死信Exchange這兩者的組合來實現。
消息的TTL就是消息的存活時間。RabbitMQ可以對隊列和消息分別設置TTL。對隊列設置就是隊列沒有消費者連著的保留時間,也可以對每一個單獨的消息做單獨的設置。超過了這個時間,我們認為這個消息就死了,稱之為死信。如果隊列設置了,消息也設置了,那么會取小的。所以一個消息如果被路由到不同的隊列中,這個消息死亡的時間有可能不一樣(不同的隊列設置)。這里單講單個消息的TTL,因為它才是實現延遲任務的關鍵。
可以通過設置消息的expiration字段或者隊列x-message-ttl屬性來設置時間,兩者是一樣的效果。下面例子是通過隊列的ttl實現死信
$queue = new AMQPQueue($channel); $queue->setName($params['queueName']?:''); $queue->setFlags(AMQP_DURABLE); $queue->setArguments(array( 'x-dead-letter-exchange' => 'delay_exchange', 'x-dead-letter-routing-key' => 'delay_route', 'x-message-ttl' => 60000, )); $queue->declareQueue();
當上面的消息扔到該隊列中后,過了60秒,如果沒有被消費,它就死了。不會被消費者消費到。這個消息后面的,沒有“死掉”的消息對頂上來,被消費者消費。死信在隊列中并不會被刪除和釋放,它會被統計到隊列的消息數中去。單靠死信還不能實現延遲任務,還要靠Dead Letter Exchange。
Exchage的概念在這里就不在贅述,可以從這里進行了解。一個消息在滿足如下條件下,會進死信路由,記住這里是路由而不是隊列,一個路由可以對應很多隊列。
1. 一個消息被Consumer拒收了,并且reject方法的參數里requeue是false。也就是說不會被再次放在隊列里,被其他消費者使用。
2. 上面的消息的TTL到了,消息過期了。
3. 隊列的長度限制滿了。排在前面的消息會被丟棄或者扔到死信路由上。
Dead Letter Exchange其實就是一種普通的exchange,和創建其他exchange沒有兩樣。只是在某一個設置Dead Letter Exchange的隊列中有消息過期了,會自動觸發消息的轉發,發送到Dead Letter Exchange中去。
示例
生產者:
<?php header('Content-Type:text/html;charset=utf8;'); $params = array( 'exchangeName' => 'test_cache_exchange', 'queueName' => 'test_cache_queue', 'routeKey' => 'test_cache_route', ); $connectConfig = array( 'host' => 'localhost', 'port' => 5672, 'login' => 'rabbitmq', 'password' => 'rabbitmq', 'vhost' => '/' ); //var_dump(extension_loaded('amqp')); 判斷是否加載amqp擴展 //exit(); try { $conn = new AMQPConnection($connectConfig); $conn->connect(); if (!$conn->isConnected()) { //die('Conexiune esuata'); //TODO 記錄日志 echo 'rabbit-mq 連接錯誤:', json_encode($connectConfig); exit(); } $channel = new AMQPChannel($conn); if (!$channel->isConnected()) { // die('Connection through channel failed'); //TODO 記錄日志 echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig); exit(); } $exchange = new AMQPExchange($channel); $exchange->setFlags(AMQP_DURABLE);//持久化 $exchange->setName($params['exchangeName']?:''); $exchange->setType(AMQP_EX_TYPE_DIRECT); //direct類型 $exchange->declareExchange(); //$channel->startTransaction(); $queue = new AMQPQueue($channel); $queue->setName($params['queueName']?:''); $queue->setFlags(AMQP_DURABLE); $queue->setArguments(array( 'x-dead-letter-exchange' => 'delay_exchange', 'x-dead-letter-routing-key' => 'delay_route', 'x-message-ttl' => 60000, )); $queue->declareQueue(); //綁定 $queue->bind($params['exchangeName'], $params['routeKey']); } catch(Exception $e) { } //$num = mt_rand(100, 500); $num = 1; //生成消息 $exchange->publish("this is test message..", $params['routeKey'], AMQP_MANDATORY, array('delivery_mode'=>2));
消費者:
<?php header('Content-Type:text/html;charset=utf8;'); $params = array( 'exchangeName' => 'delay_exchange', 'queueName' => 'delay_queue', 'routeKey' => 'delay_route', ); $connectConfig = array( 'host' => 'localhost', 'port' => 5672, 'login' => 'rabbitmq', 'password' => 'rabbitmq', 'vhost' => '/' ); //var_dump(extension_loaded('amqp')); //exit(); try { $conn = new AMQPConnection($connectConfig); $conn->connect(); if (!$conn->isConnected()) { //die('Conexiune esuata'); //TODO 記錄日志 echo 'rabbit-mq 連接錯誤:', json_encode($connectConfig); exit(); } $channel = new AMQPChannel($conn); if (!$channel->isConnected()) { // die('Connection through channel failed'); //TODO 記錄日志 echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig); exit(); } $exchange = new AMQPExchange($channel); $exchange->setFlags(AMQP_DURABLE);//聲明一個已存在的交換器的,如果不存在將拋出異常,這個一般用在consume端 $exchange->setName($params['exchangeName']?:''); $exchange->setType(AMQP_EX_TYPE_DIRECT); //direct類型 $exchange->declareExchange(); //$channel->startTransaction(); $queue = new AMQPQueue($channel); $queue->setName($params['queueName']?:''); $queue->setFlags(AMQP_DURABLE); $queue->declareQueue(); //綁定 $queue->bind($params['exchangeName'], $params['routeKey']); } catch(Exception $e) { echo $e->getMessage(); exit(); } function callback(AMQPEnvelope $message) { global $queue; if ($message) { $body = $message->getBody(); echo $body . PHP_EOL; $queue->ack($message->getDeliveryTag()); } else { echo 'no message' . PHP_EOL; } } //$queue->consume('callback'); 第一種消費方式,但是會阻塞,程序一直會卡在此處 //第二種消費方式,非阻塞 $start = time(); while(true) { $message = $queue->get(); if(!empty($message)) { echo $message->getBody(); $queue->ack($message->getDeliveryTag()); //應答,代表該消息已經消費 $end = time(); echo '<br>' . ($end - $start); exit(); } else { //echo 'message not found' . PHP_EOL; } }
這個示例注意要跟上一篇博文示例作對比rabbitmq以及php amqp擴展使用,最關鍵的點就是在生產者那里
$queue->setArguments(array( 'x-dead-letter-exchange' => 'delay_exchange', 'x-dead-letter-routing-key' => 'delay_route', 'x-message-ttl' => 60000, ));
詳細過程:
首先由正常隊列(test_cache_queue)和正常exchange(test_cache_exchange),兩者相綁定。
該正常隊列設置了死信路由(delay_exchange)和死信路由key以及TTL,生產者生產消息到正常隊列和正常路由上.
當正常隊列設置TTL時間一到,那延遲消息就會自動發布到死信路由
消費者通過死信路由(delay_exchange)和死信隊列(delay_queue)來消費
參考文章:
https://www.cnblogs.com/haoxinyue/p/6613706.html
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。