您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關Node.js中多進程模型的示例分析,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。
Node.js 提供了 Cluster 模塊解決上述問題,通過該模塊,開發者可以通過創建子進程的模式創建一個集群,充分利用機器或容器的資源,同時該模塊允許多個子進程監聽同一個端口。
const cluster = require('cluster'); const http = require('http'); const numCPUs = require('os').cpus().length; if (cluster.isMaster) { // Fork workers. for (let i = 0; i < numCPUs; i++) { cluster.fork(); } cluster.on('exit', function(worker, code, signal) { console.log('worker ' + worker.process.pid + ' died'); }); } else { // Workers can share any TCP connection // In this case it is an HTTP server http.createServer(function(req, res) { res.writeHead(200); res.end("hello world\n"); }).listen(8000); }
首先從 const cluster = require('cluster')
說起,這行代碼導入了 Node 的 Cluster 模塊,而在 Node 內部,Master 進程與 Worker 進程引入的文件卻不一樣,詳情見如下代碼:
'use strict'; const childOrPrimary = 'NODE_UNIQUE_ID' in process.env ? 'child' : 'master'; module.exports = require(`internal/cluster/${childOrPrimary}`);
不同的文件意味著兩種進程在執行中的表現也不一樣,例如:
// internal/cluster/master.js cluster.isWorker = false; cluster.isMaster = true; // internal/cluster/child.js cluster.isWorker = true; cluster.isMaster = false;
這也是為什么 Cluster 模塊到處的變量能區分不同類型進程的原因,接下來讓我們分別從主、子進程兩個方向去了解具體的過程
在上述代碼里,Master 進程并沒有做太多事情,只是根據 CPU 數量去 fork 子進程,那么我們深入到源代碼里大致來看一下,相關描述均在代碼的注釋內
// lib/internal/cluster/master.js // 初始化cluster const cluster = new EventEmitter(); // 創建監聽地址與server對應的map const handles = new SafeMap(); // 初始化 cluster.isWorker = false; cluster.isMaster = true; cluster.workers = {}; cluster.settings = {}; cluster.SCHED_NONE = SCHED_NONE; // Leave it to the operating system. cluster.SCHED_RR = SCHED_RR; // Master distributes connections. // 自增的子進程id let ids = 0; // 向cluster添加fork方法 cluster.fork = function(env) { // 初始化cluster.settings cluster.setupMaster(); // 為當前fork的子進程生成當前cluster內的唯一id const id = ++ids; // 創建子進程 const workerProcess = createWorkerProcess(id, env); // 創建對應的worker實例 const worker = new Worker({ id: id, process: workerProcess }); // 省略一些worker的事件監聽.... // 監聽內部消息事件,并交由onmessage處理 worker.process.on('internalMessage', internal(worker, onmessage)); // cluster發出fork事件 process.nextTick(emitForkNT, worker); // 將worker實例放在cluster.workers中維護 cluster.workers[worker.id] = worker; // 返回worker return worker; }; // 創建子進程函數 function createWorkerProcess(id, env) { // 將主進程的env、調用cluster.fork時傳入的env以及NODE_UNIQUE_ID env構建成一個env對象 const workerEnv = { ...process.env, ...env, NODE_UNIQUE_ID: `${id}` }; // 執行參數 const execArgv = [...cluster.settings.execArgv]; // 省略debug模式相關邏輯... // 調用child_process模塊的fork函數創建子進程并返回,至此子進程實例創建完成 return fork(cluster.settings.exec, cluster.settings.args, { cwd: cluster.settings.cwd, env: workerEnv, serialization: cluster.settings.serialization, silent: cluster.settings.silent, windowsHide: cluster.settings.windowsHide, execArgv: execArgv, stdio: cluster.settings.stdio, gid: cluster.settings.gid, uid: cluster.settings.uid }); } // 內部消息事件處理函數 function onmessage(message, handle) { const worker = this; if (message.act === 'online') online(worker); // 當子進程向主進程發出queryServer消息后,執行queryServer函數,創建server else if (message.act === 'queryServer') queryServer(worker, message); else if (message.act === 'listening') listening(worker, message); else if (message.act === 'exitedAfterDisconnect') exitedAfterDisconnect(worker, message); else if (message.act === 'close') close(worker, message); } // 獲取server function queryServer(worker, message) { // Stop processing if worker already disconnecting if (worker.exitedAfterDisconnect) return; // 創建當前子進程監聽地址信息的key const key = `${message.address}:${message.port}:${message.addressType}:` + `${message.fd}:${message.index}`; // 在handles map中查詢是否有已經創建好的該監聽地址的server let handle = handles.get(key); // 沒有對應的server則進行創建 if (handle === undefined) { let address = message.address; // Find shortest path for unix sockets because of the ~100 byte limit if (message.port < 0 && typeof address === 'string' && process.platform !== 'win32') { address = path.relative(process.cwd(), address); if (message.address.length < address.length) address = message.address; } // 主、子進程處理連接的方式,默認為輪詢 let constructor = RoundRobinHandle; // UDP is exempt from round-robin connection balancing for what should // be obvious reasons: it's connectionless. There is nothing to send to // the workers except raw datagrams and that's pointless. if (schedulingPolicy !== SCHED_RR || message.addressType === 'udp4' || message.addressType === 'udp6') { constructor = SharedHandle; } // 將監聽地址信息傳入構造函數創建監聽實例 handle = new constructor(key, address, message); // 緩存監聽實例 handles.set(key, handle); } // 向server添加自定義信息,用于server發出listening事件后透傳到worker if (!handle.data) handle.data = message.data; // 添加server發出listening事件后的回調函數通知子進程 handle.add(worker, (errno, reply, handle) => { const { data } = handles.get(key); if (errno) handles.delete(key); // Gives other workers a chance to retry. send(worker, { errno, key, ack: message.seq, data, ...reply }, handle); }); }
// lib/internal/cluster/round_robin_handle.js // 構造函數,參數為server對應的key,ip地址(對于http(s)來說),監聽相關信息 function RoundRobinHandle(key, address, { port, fd, flags }) { // 初始化handle this.key = key; this.all = new SafeMap(); this.free = new SafeMap(); this.handles = []; this.handle = null; this.server = net.createServer(assert.fail); // 監聽文件描述符,不討論 if (fd >= 0) this.server.listen({ fd }); // 監聽ip:port else if (port >= 0) { this.server.listen({ port, host: address, // Currently, net module only supports `ipv6Only` option in `flags`. ipv6Only: Boolean(flags & constants.UV_TCP_IPV6ONLY), }); // 監聽UNIX socket,不討論 } else this.server.listen(address); // UNIX socket path. // 注冊server發出listening事件的回調函數 this.server.once('listening', () => { this.handle = this.server._handle; this.handle.onconnection = (err, handle) => this.distribute(err, handle); this.server._handle = null; this.server = null; }); } // 添加worker,server發出listening事件后調用master.js中傳入的回調函數 RoundRobinHandle.prototype.add = function(worker, send) { assert(this.all.has(worker.id) === false); this.all.set(worker.id, worker); const done = () => { if (this.handle.getsockname) { const out = {}; this.handle.getsockname(out); // TODO(bnoordhuis) Check err. send(null, { sockname: out }, null); } else { send(null, null, null); // UNIX socket. } this.handoff(worker); // In case there are connections pending. }; if (this.server === null) return done(); // Still busy binding. this.server.once('listening', done); this.server.once('error', (err) => { send(err.errno, null); }); }; // 刪除worker,輪詢時不再分配給該worker RoundRobinHandle.prototype.remove = function(worker) { const existed = this.all.delete(worker.id); if (!existed) return false; this.free.delete(worker.id); if (this.all.size !== 0) return false; for (const handle of this.handles) { handle.close(); } this.handles = []; this.handle.close(); this.handle = null; return true; }; // 輪詢調度函數 RoundRobinHandle.prototype.distribute = function(err, handle) { ArrayPrototypePush(this.handles, handle); const [ workerEntry ] = this.free; // this.free is a SafeMap if (ArrayIsArray(workerEntry)) { const { 0: workerId, 1: worker } = workerEntry; this.free.delete(workerId); this.handoff(worker); } }; // 將handle交給worker RoundRobinHandle.prototype.handoff = function(worker) { if (!this.all.has(worker.id)) { return; // Worker is closing (or has closed) the server. } const handle = ArrayPrototypeShift(this.handles); if (handle === undefined) { this.free.set(worker.id, worker); // Add to ready queue again. return; } // 向該worker發出newconn事件 const message = { act: 'newconn', key: this.key }; sendHelper(worker.process, message, handle, (reply) => { if (reply.accepted) handle.close(); else this.distribute(0, handle); // Worker is shutting down. Send to another. this.handoff(worker); }); };
在每個子進程中,我們都創建了一個 HTTP Server,然后執行 listen
函數監聽 8000 端口,而 HTTP Server 實例是由 Net Server 原型鏈繼承得到的,listen
函數即為 Net Server 原型上的 listen
函數,具體如下:
// lib/_http_server.js function Server(options, requestListener) { .... } ObjectSetPrototypeOf(Server.prototype, net.Server.prototype); ObjectSetPrototypeOf(Server, net.Server);
// lib/net.js Server.prototype.listen = function(...args) { // 由于篇幅原因,省略一些參數nomolize和其他監聽的處理 // 經過這段邏輯中,會調用listenInCluster函數去真正的監聽端口 if (typeof options.port === 'number' || typeof options.port === 'string') { validatePort(options.port, 'options.port'); backlog = options.backlog || backlogFromArgs; // start TCP server listening on host:port if (options.host) { lookupAndListen(this, options.port | 0, options.host, backlog, options.exclusive, flags); } else { // Undefined host, listens on unspecified address // Default addressType 4 will be used to search for master server listenInCluster(this, null, options.port | 0, 4, backlog, undefined, options.exclusive); } return this; } // 省略... }; // 集群監聽函數 function listenInCluster(server, address, port, addressType, backlog, fd, exclusive, flags) { exclusive = !!exclusive; if (cluster === undefined) cluster = require('cluster'); // 判斷是否是master,單進程中cluster.isMaster默認為true,然后進行監聽并返回 if (cluster.isMaster || exclusive) { // Will create a new handle // _listen2 sets up the listened handle, it is still named like this // to avoid breaking code that wraps this method server._listen2(address, port, addressType, backlog, fd, flags); return; } // 在子進程中,會將監聽地址信息傳入cluster實例中的_getServer函數從而獲取一個faux handle const serverQuery = { address: address, port: port, addressType: addressType, fd: fd, flags, }; // Get the master's server handle, and listen on it cluster._getServer(server, serverQuery, listenOnMasterHandle); // 獲取net server回調函數,拿到faux handle之后,調用_listen2函數,即setupListenHandle函數 function listenOnMasterHandle(err, handle) { err = checkBindError(err, port, handle); if (err) { const ex = exceptionWithHostPort(err, 'bind', address, port); return server.emit('error', ex); } // Reuse master's server handle server._handle = handle; // _listen2 sets up the listened handle, it is still named like this // to avoid breaking code that wraps this method server._listen2(address, port, addressType, backlog, fd, flags); } } // 啟用監聽handle function setupListenHandle(address, port, addressType, backlog, fd, flags) { debug('setupListenHandle', address, port, addressType, backlog, fd); // 如同英文注釋所說的那樣,如果沒有監聽句柄,則創建,有監聽句柄則跳過 // If there is not yet a handle, we need to create one and bind. // In the case of a server sent via IPC, we don't need to do this. if (this._handle) { debug('setupListenHandle: have a handle already'); } else { debug('setupListenHandle: create a handle'); let rval = null; // 篇幅原因,創建監聽句柄的代碼... this._handle = rval; } // 在this上設置的faux handle上設置onconnection函數用于監聽連接進入 this._handle.onconnection = onconnection; }
同時,在開始解析的時候我們說過,在引入 Cluster 模塊的時候,會根據當前進程的env中是否包含NODE_UNIQUE_ID去判斷是否為子進程,若為子進程,則執行 child.js
文件
Tips:IPC 通信中發送的message.cmd的值如果以NODE為前綴,它將響應一個內部事件internalMessage
// lib/internal/cluster/child.js // 初始化 const cluster = new EventEmitter(); // 存儲生成的 faux handle const handles = new SafeMap(); // 存儲監聽地址與監聽地址index的對應關系 const indexes = new SafeMap(); cluster.isWorker = true; cluster.isMaster = false; cluster.worker = null; cluster.Worker = Worker; // 子進程啟動時會執行該函數,進行初始化,同時在執行完畢后,會刪除 env 中的 NODE_UNIQUE_ID 環境變量 // 詳細代碼見 lib/internal/bootstrap/pre_excution.js 中的 initializeClusterIPC 函數 cluster._setupWorker = function() { // 初始化worker實例 const worker = new Worker({ id: +process.env.NODE_UNIQUE_ID | 0, process: process, state: 'online' }); cluster.worker = worker; // 處理斷開連接事件 process.once('disconnect', () => { worker.emit('disconnect'); if (!worker.exitedAfterDisconnect) { // Unexpected disconnect, master exited, or some such nastiness, so // worker exits immediately. process.exit(0); } }); // IPC 內部通信事件監聽 process.on('internalMessage', internal(worker, onmessage)); send({ act: 'online' }); function onmessage(message, handle) { // 如果為新連接,則執行 onconnection 函數將得到的句柄傳入子進程中啟動的HTTP Server if (message.act === 'newconn') onconnection(message, handle); else if (message.act === 'disconnect') ReflectApply(_disconnect, worker, [true]); } }; // 添加獲取server函數,會在net server監聽端口時被執行 // `obj` is a net#Server or a dgram#Socket object. cluster._getServer = function(obj, options, cb) { let address = options.address; // Resolve unix socket paths to absolute paths if (options.port < 0 && typeof address === 'string' && process.platform !== 'win32') address = path.resolve(address); // 生成地址信息的的key const indexesKey = ArrayPrototypeJoin( [ address, options.port, options.addressType, options.fd, ], ':'); // 檢查是否緩存了indexedKey,如果沒有,則表明是新的監聽地址,在 master.js 中會生成新的net server let index = indexes.get(indexesKey); if (index === undefined) index = 0; else index++; // 設置 indexesKey 與 index的對應關系 indexes.set(indexesKey, index); // 傳遞地址信息及index const message = { act: 'queryServer', index, data: null, ...options }; message.address = address; // Set custom data on handle (i.e. tls tickets key) if (obj._getServerData) message.data = obj._getServerData(); // 向主進程發送queryServer消息 send(message, (reply, handle) => { if (typeof obj._setServerData === 'function') obj._setServerData(reply.data); // 根據相應負載均衡handle添加worker時的處理,執行相應的負載均衡代碼,并執行 cb 函數 // 輪詢是沒有傳遞handle的,對應代碼在 RoundRobinHandle.prototype.add 內 if (handle) shared(reply, handle, indexesKey, cb); // Shared listen socket. else rr(reply, indexesKey, cb); // Round-robin. }); obj.once('listening', () => { cluster.worker.state = 'listening'; const address = obj.address(); message.act = 'listening'; message.port = (address && address.port) || options.port; send(message); }); }; // 創建 faux handle,并保存其對應關系 // Round-robin. Master distributes handles across workers. function rr(message, indexesKey, cb) { if (message.errno) return cb(message.errno, null); let key = message.key; function listen(backlog) { // TODO(bnoordhuis) Send a message to the master that tells it to // update the backlog size. The actual backlog should probably be // the largest requested size by any worker. return 0; } function close() { // lib/net.js treats server._handle.close() as effectively synchronous. // That means there is a time window between the call to close() and // the ack by the master process in which we can still receive handles. // onconnection() below handles that by sending those handles back to // the master. if (key === undefined) return; send({ act: 'close', key }); handles.delete(key); indexes.delete(indexesKey); key = undefined; } function getsockname(out) { if (key) ObjectAssign(out, message.sockname); return 0; } // 創建Faux handle // Faux handle. Mimics a TCPWrap with just enough fidelity to get away // with it. Fools net.Server into thinking that it's backed by a real // handle. Use a noop function for ref() and unref() because the control // channel is going to keep the worker alive anyway. const handle = { close, listen, ref: noop, unref: noop }; if (message.sockname) { handle.getsockname = getsockname; // TCP handles only. } assert(handles.has(key) === false); // 保存faux handle handles.set(key, handle); // 執行 net 模塊調用 cluster._getServer 函數傳進來的回調函數 cb(0, handle); } // 處理請求 // Round-robin connection. function onconnection(message, handle) { // 獲取faux handle的key const key = message.key; // 獲取faux hadle const server = handles.get(key); const accepted = server !== undefined; send({ ack: message.seq, accepted }); // 調用在 net 模塊中 setupListenHandle 函數里為該 faux handle 設置的連接處理函數處理請求 if (accepted) server.onconnection(0, handle); }
至此,所有的內容都聯系起來了。
在之前的代碼分析中我們可以知道,Cluster 集群會在 Master 進程中創建 Net Server,在 Worker 進程運行創建 HTTP Server 的時候,會將監聽地址的信息傳入 cluster._getServer
函數創建一個 faux handle
并設置到子進程的 Net Server 上,在 Worker 進程初始化的時候會注冊 IPC 通信回調函數,在回調函數內 ,調用在子進程中 Net Server 模塊初始化后的 {faux handle}.onconnection
函數,并將傳過來的連接的 handle 傳入完成請求響應。
我們可以在 Master 進程中監聽 Worker 進程的 error
、disconntect
、exit
事件,在這些事件中去做對應的處理,例如清理退出的進程并重新 fork
,或者使用已經封裝好的 npm 包,例如 cfork
在 Egg.js 的多進程模型中,多了另外一個進程類型,即 Agent 進程,該進程主要用于處理多進程不好處理的一些事情還有減少長鏈接的數量,具體關系如下:
+---------+ +---------+ +---------+ | Master | | Agent | | Worker | +---------+ +----+----+ +----+----+ | fork agent | | +-------------------->| | | agent ready | | |<--------------------+ | | | fork worker | +----------------------------------------->| | worker ready | | |<-----------------------------------------+ | Egg ready | | +-------------------->| | | Egg ready | | +----------------------------------------->|
在 egg-cluster
包內,使用了 cfork
包去保證 Worker 進程掛掉后自動重啟
在我們的一個 Egg 應用內,日志系統并沒有使用 Egg 原生的日志,使用了一個內部基于
log4js
包的日志庫,在使用的時候,將需要用到的 Logger 擴展至 Application 對象上,這樣的話每個 Worker 進程在初始化的時候都會創建新的 Logger,也就是會存在多進程寫日志的問題,但是并沒有出現多進程寫日志的錯誤問題
在追蹤源碼的過程中發現,log4js
雖然提供了 Cluster 模式,但是在上層封裝中并沒有開啟 log4js
的 Cluster 模式,所以每個 Logger 的 appender 都使用 flag a
打開一個寫入流,到這里并沒有得到答案
后來在 CNode 中找到了答案,在 unix 下使用 flag a
打開的可寫流對應的 libuv 文件池實現是 UV_FS_O_APPEND
,即 O_APPEND
,而 O_APPEND
本身在 man 手冊里就定義為原子操作,內核保證了對這個可寫流的并發寫是安全的不需要在應用層額外加鎖(除了在 NFS 類的文件系統上并發寫會造成文件信息丟失或者損壞),NFS 類的網絡掛載的文件系統主要是靠模擬掉底層的 api 來實現的類本地操作,顯然無法在競爭條件下完美還原這類的原子操作 api,所以如果你的日志要寫到類似 oss 云盤掛載本地的這種就不能這么干,多進程寫的話必須在應用層自己手動加鎖
關于“Node.js中多進程模型的示例分析”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,使各位可以學到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。