您好,登錄后才能下訂單哦!
今天就跟大家聊聊有關如何在PHP中使用rabbitmq操作類實現一個生產者和消費者功能,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。
注意事項:
1、accept.php消費者代碼需要在命令行執行
2、'username'=>'asdf'
,'password'=>'123456'
改成自己的帳號和密碼
RabbitMQCommand.php操作類代碼
<?php /* * amqp協議操作類,可以訪問rabbitMQ * 需先安裝php_amqp擴展 */ class RabbitMQCommand{ public $configs = array(); //交換機名稱 public $exchange_name = ''; //隊列名稱 public $queue_name = ''; //路由名稱 public $route_key = ''; /* * 持久化,默認True */ public $durable = True; /* * 自動刪除 * exchange is deleted when all queues have finished using it * queue is deleted when last consumer unsubscribes * */ public $autodelete = False; /* * 鏡像 * 鏡像隊列,打開后消息會在節點之間復制,有master和slave的概念 */ public $mirror = False; private $_conn = Null; private $_exchange = Null; private $_channel = Null; private $_queue = Null; /* * @configs array('host'=>$host,'port'=>5672,'username'=>$username,'password'=>$password,'vhost'=>'/') */ public function __construct($configs = array(), $exchange_name = '', $queue_name = '', $route_key = '') { $this->setConfigs($configs); $this->exchange_name = $exchange_name; $this->queue_name = $queue_name; $this->route_key = $route_key; } private function setConfigs($configs) { if (!is_array($configs)) { throw new Exception('configs is not array'); } if (!($configs['host'] && $configs['port'] && $configs['username'] && $configs['password'])) { throw new Exception('configs is empty'); } if (empty($configs['vhost'])) { $configs['vhost'] = '/'; } $configs['login'] = $configs['username']; unset($configs['username']); $this->configs = $configs; } /* * 設置是否持久化,默認為True */ public function setDurable($durable) { $this->durable = $durable; } /* * 設置是否自動刪除 */ public function setAutoDelete($autodelete) { $this->autodelete = $autodelete; } /* * 設置是否鏡像 */ public function setMirror($mirror) { $this->mirror = $mirror; } /* * 打開amqp連接 */ private function open() { if (!$this->_conn) { try { $this->_conn = new AMQPConnection($this->configs); $this->_conn->connect(); $this->initConnection(); } catch (AMQPConnectionException $ex) { throw new Exception('cannot connection rabbitmq',500); } } } /* * rabbitmq連接不變 * 重置交換機,隊列,路由等配置 */ public function reset($exchange_name, $queue_name, $route_key) { $this->exchange_name = $exchange_name; $this->queue_name = $queue_name; $this->route_key = $route_key; $this->initConnection(); } /* * 初始化rabbit連接的相關配置 */ private function initConnection() { if (empty($this->exchange_name) || empty($this->queue_name) || empty($this->route_key)) { throw new Exception('rabbitmq exchange_name or queue_name or route_key is empty',500); } $this->_channel = new AMQPChannel($this->_conn); $this->_exchange = new AMQPExchange($this->_channel); $this->_exchange->setName($this->exchange_name); $this->_exchange->setType(AMQP_EX_TYPE_DIRECT); if ($this->durable) $this->_exchange->setFlags(AMQP_DURABLE); if ($this->autodelete) $this->_exchange->setFlags(AMQP_AUTODELETE); $this->_exchange->declare(); $this->_queue = new AMQPQueue($this->_channel); $this->_queue->setName($this->queue_name); if ($this->durable) $this->_queue->setFlags(AMQP_DURABLE); if ($this->autodelete) $this->_queue->setFlags(AMQP_AUTODELETE); if ($this->mirror) $this->_queue->setArgument('x-ha-policy', 'all'); $this->_queue->declare(); $this->_queue->bind($this->exchange_name, $this->route_key); } public function close() { if ($this->_conn) { $this->_conn->disconnect(); } } public function __sleep() { $this->close(); return array_keys(get_object_vars($this)); } public function __destruct() { $this->close(); } /* * 生產者發送消息 */ public function send($msg) { $this->open(); if(is_array($msg)){ $msg = json_encode($msg); }else{ $msg = trim(strval($msg)); } return $this->_exchange->publish($msg, $this->route_key); } /* * 消費者 * $fun_name = array($classobj,$function) or function name string * $autoack 是否自動應答 * * function processMessage($envelope, $queue) { $msg = $envelope->getBody(); echo $msg."\n"; //處理消息 $queue->ack($envelope->getDeliveryTag());//手動應答 } */ public function run($fun_name, $autoack = True){ $this->open(); if (!$fun_name || !$this->_queue) return False; while(True){ if ($autoack) $this->_queue->consume($fun_name, AMQP_AUTOACK); else $this->_queue->consume($fun_name); } } }
send.php生產者代碼
<?php set_time_limit(0); include_once('RabbitMQCommand.php'); $configs = array('host'=>'127.0.0.1','port'=>5672,'username'=>'asdf','password'=>'123456','vhost'=>'/'); $exchange_name = 'class-e-1'; $queue_name = 'class-q-1'; $route_key = 'class-r-1'; $ra = new RabbitMQCommand($configs,$exchange_name,$queue_name,$route_key); for($i=0;$i<=100;$i++){ $ra->send(date('Y-m-d H:i:s',time())); } exit();
accept.php消費者代碼
<?php error_reporting(0); include_once('RabbitMQCommand.php'); $configs = array('host'=>'127.0.0.1','port'=>5672,'username'=>'asdf','password'=>'123456','vhost'=>'/'); $exchange_name = 'class-e-1'; $queue_name = 'class-q-1'; $route_key = 'class-r-1'; $ra = new RabbitMQCommand($configs,$exchange_name,$queue_name,$route_key); class A{ function processMessage($envelope, $queue) { $msg = $envelope->getBody(); $envelopeID = $envelope->getDeliveryTag(); $pid = posix_getpid(); file_put_contents("log{$pid}.log", $msg.'|'.$envelopeID.''."\r\n",FILE_APPEND); $queue->ack($envelopeID); } } $a = new A(); $s = $ra->run(array($a,'processMessage'),false);
看完上述內容,你們對如何在PHP中使用rabbitmq操作類實現一個生產者和消費者功能有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注億速云行業資訊頻道,感謝大家的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。