您好,登錄后才能下訂單哦!
本篇文章為大家展示了RabbitMQ怎么在php中使用,內容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。
什么是隊列
消息隊列(Message Queue)是一種應用間的通信方式,消息發送后可以立即返回。消息使用者再從MQ中取消息進行邏輯處理。對于消耗較大的請求,可以立馬返回處理結果。減少服務器壓力。為各個子系統之間解耦和異步處理。
rabbitmq的整體結構
rmq簡單來說就是一個(生產/消費)的模型結構。消息生產者把數據丟到隊列中,消息消費者從隊列中取出數據進行邏輯處理。
那么如何確保,生產者添加的數據,能夠到達指定的隊列中呢?
rmq(消息隊列)主要提供了三個概念(中間件?)來確保消息的分發。Exchange(交換機)、RoutingKey(路由)、Queue(隊列)。從上面的圖也可以看出來。 處理消息的接收、分發,主要在Broker模塊中。
Exchange 所有生產消息的入口都是到交換機這里。exchange通過進來的路由(RoutingKey),去和已binding的規則進行匹配,找到指定的隊列。
RoutingKey 我的理解,這里相當于一把鑰匙。而binding的操作相當于一把鎖頭。
Queue 消息的存放區域,等到消費者來取。
Binding Exchange和Queue之間的一個綁定。
從這些概念來看,影響規則的主要是依賴Exchange。那rmq提供了哪些類型,都有什么特點呢?
exchange類型
RabbitMQ提供了四種Exchange類型
direct
fanout
topic
header
header類型在實際使用中較少,所以在這里就不進行說明。
Direct Exchange
direct 的規則比較簡單。在發布消息前,需要把exchange和queue做一個綁定。 如果發布消息的時候,RoutingKey 和綁定的值(key)一致。則將消息投遞到該隊列中。如果不存在對應的隊列,則消息會被丟棄。 (這時候訪問rmq管理web時。可以看到消息進來,但是隊列中沒有值)
Fanout Exchange
fanout 類型則更簡單一些。 只要exchange和隊列做了綁定。發布的消息都會到隊列中去。
Topic Exchange
相對來說 topic類型要復雜一些。 和direct類型相比。topic相當于模糊匹配,而direct為全等。類似mysql中 ‘like’關鍵詞。
針對direct 類型寫一個實例
實例分兩部分 生產者、消費者(回調函數)
因為我的代碼,對mq的部分做了封裝,懶得拆分出來。 所以我只貼業務代碼和封裝的核心方法。
生產者代碼
$mqModel = new Rabbitmq(); // 初始化(rmq連接操作) $newResult = ['tom','bill','jack']; if ($mqModel) { $mqRoute = 'push_data_to_crm_routing'; // 路由 $mqExchange = 'push_data_to_crm_exchange'; // 交換機 $mqQuery = 'push_data_to_crm_queue'; // 隊列 // 建立連接,設置交換機,設置隊列 $mqModel->setChannel()->setExchange($mqExchange,AMQP_EX_TYPE_DIRECT,AMQP_DURABLE)->setQueue($mqQuery,AMQP_DURABLE,$mqExchange,$mqRoute); foreach ($newResult as $k => $v){ $push_data = $v; $mqModel->publishMessage($push_data,$mqRoute); // 消息推送 } }
消費者代碼
$mqModel = new Rabbitmq(); // $mqRoute = 'push_data_to_crm_routing'; 消費者用不上路由,因為不需要指定。 只要想取隊列,消費即可。 $mqExchange = 'push_data_to_crm_exchange'; $mqQuery = 'push_data_to_crm_queue'; $mqModel->setChannel()->setExchange($mqExchange,'', AMQP_PASSIVE)->setQueue($mqQuery, AMQP_PASSIVE); $zmq->consume(function($msg){ var_dump($msg); return true; });
封裝類中的核心方法
//設置交換機 public function setExchange($changeName = '', $changeType = '', $flags = false) { $errorMsg = ''; try{ if(!$this->channel){ throw new \AMQPQueueException("Error channel on method setExchange", 1); } $this->exchange = new \AMQPExchange($this->channel); if($changeName){ $this->changeName = $changeName; // 交換機名稱 $this->exchange->setName($changeName); // 設置名稱 $changeType = $changeType ? $changeType : AMQP_EX_TYPE_DIRECT; // 交換機類型 }else{ $this->changeName = ''; } if($changeType){ $this->changeType = $changeType; $this->exchange->settype($changeType); // 設置交換機類型 }else{ $this->changeType = ''; } if($flags){ $this->exchange->setFlags($flags); //交換機標志 } if($changeType || $flags){ $this->exchange->declareExchange(); // 創建 } } catch(AMQPQueueException $ex) { $errorMsg = "AMQPQueueException error exchange: {$ex->getMessage()},\r\nline: {$ex->getLine()}\r\n"; } catch(Exception $ex) { $errorMsg = "Exception error exchange: {$ex->getMessage()},\r\nline: {$ex->getLine()}\r\n"; } if($errorMsg){ throw new Exception($errorMsg, 1); } return $this; } // 設置隊列 public function setQueue($queueName = '', $flags = '', $exchange_name = '', $routing_key = '', $arguments=[] ){ $errorMsg = ''; try{ if(!$this->channel){ throw new \AMQPQueueException("Error channel on method setQueue", 1); } $this->queue = new \AMQPQueue($this->channel); if(!$queueName){ return false; } $this->queueName = $queueName; // 隊列名稱 $this->queue->setName($queueName); if($flags){ $this->queue->setFlags($flags); // 隊列標志。與消息持久化有關。 這篇文字不涉及這一塊的說明 } if(is_array($arguments) && !empty($arguments)){ $this->queue->setArguments($arguments); // 參數配置 } $this->queue->declareQueue(); // 創建一個隊列 $exchange_name = $exchange_name === false ? '' : ($exchange_name === true || !$exchange_name ? $this->changeName : $exchange_name); $routing_key = $routing_key ? $routing_key : $this->queueName; if($exchange_name && $routing_key ){ $this->queue->bind($exchange_name, $routing_key); // 交換機和隊列的綁定操作 } } catch(AMQPQueueException $ex) { $errorMsg = "AMQPQueueException error queue: {$ex->getMessage()},\r\nline: {$ex->getLine()}\r\n"; } catch(Exception $ex) { $errorMsg = "Exception error queue: {$ex->getMessage()},\r\nline: {$ex->getLine()}\r\n"; } if($errorMsg){ throw new Exception($errorMsg, 1); } return $this; } // 發布消息 public function publishMessage($message = '', $routing_key = '', $flags = AMQP_NOPARAM, $attributes = []){ if(!$message){ return false; } $routing_key = $routing_key ? $routing_key : $this->queueName; // 發布消息,帶有路由key。如果需要,則會用于關聯。 $this->exchange->publish($message, $routing_key, $flags, $attributes); return true; } // 消費 public function consume($callback = null, $qos = 0, $isAct = true){ if($qos){ $this->channel->qos(0, $qos); } $errorMsg = ''; try{ if(!$this->queue){ throw new \AMQPQueueException("Error queue on method consume", 1); } $this->callBackFnc = $callback; $this->isAct = $isAct; $callback = function($envelope, $queue){ if(is_callable($this->callBackFnc)){ call_user_func($this->callBackFnc, $envelope->getBody()); if($this->isAct){ $queue->ack($envelope->getDeliveryTag()); }else{ $queue->nack($envelope->getDeliveryTag()); } } }; $this->queue->consume($callback); } catch(AMQPQueueException $ex) { $errorMsg = "AMQPQueueException error queue: {$ex->getMessage()},\r\nline: {$ex->getLine()}\r\n"; } catch(Exception $ex) { $errorMsg = "Exception error queue: {$ex->getMessage()},\r\nline: {$ex->getLine()}\r\n"; } if($errorMsg){ throw new \Exception($errorMsg, 1); } }
因為封裝代碼里寫了很多 try catch 所以看起來特別亂。 還有部分兼容的邏輯。 看起來不舒服,就先刪掉再看吧。
執行結果
先跑一遍生產者代碼,這里可以用瀏覽器直接訪問。 執行完了之后。 到rabbitmq 的web管理頁面中查看。 發現消息已經正常添加到隊列中。(web管理頁面可查詢別的文章開啟)
這時候再執行消費者代碼。 消費者代碼需要在cli下執行。因為消費者為輪詢等待,是死循環,無法在瀏覽器下執行。
上述內容就是RabbitMQ怎么在php中使用,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。