您好,登錄后才能下訂單哦!
這篇文章給大家分享的是有關PHP模擬supervisor進程管理的示例的內容。小編覺得挺實用的,因此分享給大家做個參考,一起跟隨小編過來看看吧。
模擬supervisor進程管理DEMO(簡易實現)
截圖:
在圖中自己實現了一個
Copy
子進程的功能。如果用在AMQP增減消費者時,我覺得應該會很有用。
1、在主進程循環內啟動子進程執行命令
2、在web輸入 127.0.0.1:7865 獲取子進程狀態
3、socket接收請求消息,并且執行相應操作,返回web頁面
4、回收子進程,防止稱為僵尸進程
不足:無法持續監聽錯誤頁面。由于socket得到的響應是通過include
函數加載的,所以在加載的頁面內不能出現tail -f
命令,否則stream就會掉入了死循環了~。我想應該有方案解決(寫了socket+多進程模式,模仿fpm在接收到請求之后就啟動一個子進程去處理的模式,但是執行有問題。因此將代碼貼出來希望得到大家的指點)。
延伸:由于對進程可以很好的管理(期望如此),那么就可以定制化自己的一些需求,比如:(1)定制AMQP的消費者進程管理服務。(2)模擬crontab定時服務。
代碼實現的過程中,有很多的細節是值得學習的。
1、在while()循環中,啟用了stream的非阻塞模式。所以不能在循環中使用sleep(1)
,而是用stream_select($read, $write, $except, 1)
讓stream內部阻塞。
關于阻塞非阻塞模式,可以參閱這里
2、能夠執行外部程序的函數很多,但是都稍有不同。這里采用的是proc_open
,是一個很強大的函數。在這之前我曾用pcntl_exec
執行過外部程序,但是需要先pcntl_fork
。而用其他的如exec
,shell_exec
無法對子進程進行管理。
3、重啟或停止等操作子進程時,只是先更改主進程中該子進程在內存中的的狀態,并不是真正的對子進程操作。在統一處init()
處理子進程。如此才能防止因為子進程啟動時的上下文導致的一些怪異的現象。
由于代碼過多,所以如果你對我的方案有更好的建議可以在github這里看。
主進程代碼:Process.php
<?php require_once __DIR__ . '/Consumer.php';require_once __DIR__ . '/StreamConnection.php';require_once __DIR__ . '/Http.php';class Process{ /** * 待啟動的消費者數組 */ protected $consumers = array(); protected $childPids = array(); const PPID_FILE = __DIR__ . '/process'; protected $serializerConsumer; public function __construct() { $this->consumers = $this->getConsumers(); } // 這里是個DEMO,實際可以用讀取配置文件的方式。 public function getConsumers() { $consumer = new Consumer([ 'program' => 'test', 'command' => '/usr/bin/php test.php', 'directory' => __DIR__, 'logfile' => __DIR__ . '/test.log', 'uniqid' => uniqid(), 'auto_restart' => false, ]); return [ $consumer->uniqid => $consumer, ]; } public function run() { if (empty($this->consumers)) { // consumer empty return; } if ($this->_notifyMaster()) { // master alive return; } $pid = pcntl_fork(); if ($pid < 0) { exit; } elseif ($pid > 0) { exit; } if (!posix_setsid()) { exit; } $stream = new StreamConnection('tcp://0.0.0.0:7865'); @cli_set_process_title('AMQP Master Process'); // 將主進程ID寫入文件 file_put_contents(self::PPID_FILE, getmypid()); // master進程繼續 while (true) { $this->init(); pcntl_signal_dispatch(); $this->waitpid(); // 如果子進程被全部回收,則主進程退出 // if (empty($this->childPids)) { // $stream->close($stream->getSocket()); // break; // } $stream->accept(function ($uniqid, $action) { $this->handle($uniqid, $action); return $this->display(); }); } } protected function init() { foreach ($this->consumers as &$c) { switch ($c->state) { case Consumer::RUNNING: case Consumer::STOP: break; case Consumer::NOMINAL: case Consumer::STARTING: $this->fork($c); break; case Consumer::STOPING: if ($c->pid && posix_kill($c->pid, SIGTERM)) { $this->reset($c, Consumer::STOP); } break; case Consumer::RESTART: if (empty($c->pid)) { $this->fork($c); break; } if (posix_kill($c->pid, SIGTERM)) { $this->reset($c, Consumer::STOP); $this->fork($c); } break; default: break; } } } protected function reset(Consumer $c, $state) { $c->pid = ''; $c->uptime = ''; $c->state = $state; $c->process = null; } protected function waitpid() { foreach ($this->childPids as $uniqid => $pid) { $result = pcntl_waitpid($pid, $status, WNOHANG); if ($result == $pid || $result == -1) { unset($this->childPids[$uniqid]); $c = &$this->consumers[$uniqid]; $state = pcntl_wifexited($status) ? Consumer::EXITED : Consumer::STOP; $this->reset($c, $state); } } } /** * 父進程存活情況下,只會通知父進程信息,否則可能產生多個守護進程 */ private function _notifyMaster() { $ppid = file_get_contents(self::PPID_FILE ); $isAlive = $this->checkProcessAlive($ppid); if (!$isAlive) return false; return true; } public function checkProcessAlive($pid) { if (empty($pid)) return false; $pidinfo = `ps co pid {$pid} | xargs`; $pidinfo = trim($pidinfo); $pattern = "/.*?PID.*?(\d+).*?/"; preg_match($pattern, $pidinfo, $matches); return empty($matches) ? false : ($matches[1] == $pid ? true : false); } /** * fork一個新的子進程 */ protected function fork(Consumer $c) { $descriptorspec = [2 => ['file', $c->logfile, 'a'],]; $process = proc_open('exec ' . $c->command, $descriptorspec, $pipes, $c->directory); if ($process) { $ret = proc_get_status($process); if ($ret['running']) { $c->state = Consumer::RUNNING; $c->pid = $ret['pid']; $c->process = $process; $c->uptime = date('m-d H:i'); $this->childPids[$c->uniqid] = $ret['pid']; } else { $c->state = Consumer::EXITED; proc_close($process); } } else { $c->state = Consumer::ERROR; } return $c; } public function display() { $location = 'http://127.0.0.1:7865'; $basePath = Http::$basePath; $scriptName = isset($_SERVER['SCRIPT_NAME']) && !empty($_SERVER['SCRIPT_NAME']) && $_SERVER['SCRIPT_NAME'] != '/' ? $_SERVER['SCRIPT_NAME'] : '/index.php'; if ($scriptName == '/index.html') { return Http::status_301($location); } $sourcePath = $basePath . $scriptName; if (!is_file($sourcePath)) { return Http::status_404(); } ob_start(); include $sourcePath; $response = ob_get_contents(); ob_clean(); return Http::status_200($response); } public function handle($uniqid, $action) { if (!empty($uniqid) && !isset($this->consumers[$uniqid])) { return; } switch ($action) { case 'refresh': break; case 'restartall': $this->killall(true); break; case 'stopall': $this->killall(); break; case 'stop': $c = &$this->consumers[$uniqid]; if ($c->state != Consumer::RUNNING) break; $c->state = Consumer::STOPING; break; case 'start': $c = &$this->consumers[$uniqid]; if ($c->state == Consumer::RUNNING) break; $c->state = Consumer::STARTING; break; case 'restart': $c = &$this->consumers[$uniqid]; $c->state = Consumer::RESTART; break; case 'copy': $c = $this->consumers[$uniqid]; $newC = clone $c; $newC->uniqid = uniqid('C'); $newC->state = Consumer::NOMINAL; $newC->pid = ''; $this->consumers[$newC->uniqid] = $newC; break; default: break; } } protected function killall($restart = false) { foreach ($this->consumers as &$c) { $c->state = $restart ? Consumer::RESTART : Consumer::STOPING; } }}$cli = new Process();$cli->run();
Consumer消費者對象
<?php require_once __DIR__ . '/BaseObject.php';class Consumer extends BaseObject{ /** 開啟多少個消費者 */ public $numprocs = 1; /** 當前配置的唯一標志 */ public $program; /** 執行的命令 */ public $command; /** 當前工作的目錄 */ public $directory; /** 通過 $qos $queueName $duplicate 生成的 $queue */ public $queue; /** 程序執行日志記錄 */ public $logfile = ''; /** 消費進程的唯一ID */ public $uniqid; /** 進程IDpid */ public $pid; /** 進程狀態 */ public $state = self::NOMINAL; /** 自啟動 */ public $auto_restart = false; public $process; /** 啟動時間 */ public $uptime; const RUNNING = 'running'; const STOP = 'stoped'; const NOMINAL = 'nominal'; const RESTART = 'restart'; const STOPING = 'stoping'; const STARTING = 'stating'; const ERROR = 'error'; const BLOCKED = 'blocked'; const EXITED = 'exited'; const FATEL = 'fatel';}
stream相關代碼:StreamConnection.php
<?php class StreamConnection{ protected $socket; protected $timeout = 2; //s protected $client; public function __construct($host) { $this->socket = $this->connect($host); } public function connect($host) { $socket = stream_socket_server($host, $errno, $errstr); if (!$socket) { exit('stream error'); } stream_set_timeout($socket, $this->timeout); stream_set_chunk_size($socket, 1024); stream_set_blocking($socket, false); $this->client = [$socket]; return $socket; } public function accept(Closure $callback) { $read = $this->client; if (stream_select($read, $write, $except, 1) < 1) return; if (in_array($this->socket, $read)) { $cs = stream_socket_accept($this->socket); $this->client[] = $cs; } foreach ($read as $s) { if ($s == $this->socket) continue; $header = fread($s, 1024); if (empty($header)) { $index = array_search($s, $this->client); if ($index) unset($this->client[$index]); $this->close($s); continue; } Http::parse_http($header); $uniqid = isset($_GET['uniqid']) ? $_GET['uniqid'] : ''; $action = isset($_GET['action']) ? $_GET['action'] : ''; $response = $callback($uniqid, $action); $this->write($s, $response); $index = array_search($s, $this->client); if ($index) unset($this->client[$index]); $this->close($s); } } public function write($socket, $response) { $ret = fwrite($socket, $response, strlen($response)); } public function close($socket) { $flag = fclose($socket); } public function getSocket() { return $this->socket; }}
Http響應代碼:Http.php
<?php class Http{ public static $basePath = __DIR__ . '/views'; public static $max_age = 120; //秒 /* * 函數: parse_http * 描述: 解析http協議 */ public static function parse_http($http) { // 初始化 $_POST = $_GET = $_COOKIE = $_REQUEST = $_SESSION = $_FILES = array(); $GLOBALS['HTTP_RAW_POST_DATA'] = ''; // 需要設置的變量名 $_SERVER = array( 'QUERY_STRING' => '', 'REQUEST_METHOD' => '', 'REQUEST_URI' => '', 'SERVER_PROTOCOL' => '', 'SERVER_SOFTWARE' => '', 'SERVER_NAME' => '', 'HTTP_HOST' => '', 'HTTP_USER_AGENT' => '', 'HTTP_ACCEPT' => '', 'HTTP_ACCEPT_LANGUAGE' => '', 'HTTP_ACCEPT_ENCODING' => '', 'HTTP_COOKIE' => '', 'HTTP_CONNECTION' => '', 'REMOTE_ADDR' => '', 'REMOTE_PORT' => '0', 'SCRIPT_NAME' => '', 'HTTP_REFERER' => '', 'CONTENT_TYPE' => '', 'HTTP_IF_NONE_MATCH' => '', ); // 將header分割成數組 list($http_header, $http_body) = explode("\r\n\r\n", $http, 2); $header_data = explode("\r\n", $http_header); list($_SERVER['REQUEST_METHOD'], $_SERVER['REQUEST_URI'], $_SERVER['SERVER_PROTOCOL']) = explode(' ', $header_data[0]); unset($header_data[0]); foreach ($header_data as $content) { // \r\n\r\n if (empty($content)) { continue; } list($key, $value) = explode(':', $content, 2); $key = strtolower($key); $value = trim($value); switch ($key) { case 'host': $_SERVER['HTTP_HOST'] = $value; $tmp = explode(':', $value); $_SERVER['SERVER_NAME'] = $tmp[0]; if (isset($tmp[1])) { $_SERVER['SERVER_PORT'] = $tmp[1]; } break; case 'cookie': $_SERVER['HTTP_COOKIE'] = $value; parse_str(str_replace('; ', '&', $_SERVER['HTTP_COOKIE']), $_COOKIE); break; case 'user-agent': $_SERVER['HTTP_USER_AGENT'] = $value; break; case 'accept': $_SERVER['HTTP_ACCEPT'] = $value; break; case 'accept-language': $_SERVER['HTTP_ACCEPT_LANGUAGE'] = $value; break; case 'accept-encoding': $_SERVER['HTTP_ACCEPT_ENCODING'] = $value; break; case 'connection': $_SERVER['HTTP_CONNECTION'] = $value; break; case 'referer': $_SERVER['HTTP_REFERER'] = $value; break; case 'if-modified-since': $_SERVER['HTTP_IF_MODIFIED_SINCE'] = $value; break; case 'if-none-match': $_SERVER['HTTP_IF_NONE_MATCH'] = $value; break; case 'content-type': if (!preg_match('/boundary="?(\S+)"?/', $value, $match)) { $_SERVER['CONTENT_TYPE'] = $value; } else { $_SERVER['CONTENT_TYPE'] = 'multipart/form-data'; $http_post_boundary = '--' . $match[1]; } break; } } // script_name $_SERVER['SCRIPT_NAME'] = parse_url($_SERVER['REQUEST_URI'], PHP_URL_PATH); // QUERY_STRING $_SERVER['QUERY_STRING'] = parse_url($_SERVER['REQUEST_URI'], PHP_URL_QUERY); if ($_SERVER['QUERY_STRING']) { // $GET parse_str($_SERVER['QUERY_STRING'], $_GET); } else { $_SERVER['QUERY_STRING'] = ''; } // REQUEST $_REQUEST = array_merge($_GET, $_POST); return array('get' => $_GET, 'post' => $_POST, 'cookie' => $_COOKIE, 'server' => $_SERVER, 'files' => $_FILES); } public static function status_404() { return <<<EOFHTTP/1.1 404 OK content-type: text/htmlEOF; } public static function status_301($location) { return <<<EOFHTTP/1.1 301 Moved Permanently Content-Length: 0 Content-Type: text/plain Location: $locationCache-Control: no-cacheEOF; } public static function status_304() { return <<<EOFHTTP/1.1 304 Not Modified Content-Length: 0EOF; } public static function status_200($response) { $contentType = $_SERVER['CONTENT_TYPE']; $length = strlen($response); $header = ''; if ($contentType) $header = 'Cache-Control: max-age=180'; return <<<EOFHTTP/1.1 200 OK Content-Type: $contentTypeContent-Length: $length$header$responseEOF; }}
待執行的腳本:test.php
<?php while(true) { file_put_contents(__DIR__ . '/test.log', date('Y-m-d H:i:s')); sleep(1);}
在當前目錄下的視圖頁面:
|- Process.php
|- Http.php
|- StreamConnection.php
|- Consumer.php
|- BaseObject.php
|- views/
感謝各位的閱讀!關于“PHP模擬supervisor進程管理的示例”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,讓大家可以學到更多知識,如果覺得文章不錯,可以把它分享出去讓更多的人看到吧!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。