您好,登錄后才能下訂單哦!
Nodejs中怎么實現多線程,針對這個問題,這篇文章詳細介紹了相對應的分析和解答,希望可以幫助更多想解決這個問題的小伙伴找到更簡單易行的方法。
1 背景
需求中有以下場景
1 對稱解密、非對稱解密
2 壓縮、解壓
3 大量文件的增刪改查
4 處理大量的字符串,解析協議
上面的場景都是非常耗時間的,解密、壓縮、文件操作,nodejs使用了內置的線程池支持了異步。但是處理字符串和解析協議是單純消耗cpu的操作。而且nodejs對解密的支持似乎不是很好。我使用了純js的解密庫,所以無法在nodejs主線程里處理。尤其rsa解密,非常耗時間。
所以這時候就要探索解決方案,nodejs提供了多線程的能力。所以自然就選擇了這種方案。但是這只是初步的想法和方案。因為nodejs雖然提供了多線程能力,但是沒有提供一個應用層的線程池。所以如果我們單純地使用多線程,一個請求一個線程,這顯然不現實。我們不得不實現自己的線程池。本文分享的內容是這個線程池的實現。
線程池的設計涉及到很多方面,對于純cpu型的任務,線程數和cpu核數要相等才能達到最優的性能,否則過多的線程引起的上下文切換反而會導致性能下降。而對于io型的任務,更多的線程理論上是會更好,因為可以更早地給硬盤發出命令,磁盤會優化并持續地處理請求,想象一下,如果發出一個命令,硬盤處理一個,然后再發下一個命令,再處理一個,這樣顯然效率很低。當然,線程數也不是越多越好。線程過多會引起系統負載過高,過多上下文切換也會帶來性能的下降。下面看一下線程池的實現方案。
2 設計思路
首先根據配置創建多個線程(分為預創建和懶創建),然后對用戶暴露提交任務的接口,由調度中心負責接收任務,然后根據策略選擇處理該任務的線程。子線程一直在輪詢是否有任務需要處理。處理完通知調度中心。
下面看一下具體的實現
2.1 和用戶通信的數據結構
class UserWork extends EventEmitter { constructor({ workId, threadId }) { super(); this.workId = workId; this.threadId = threadId; workPool[workId] = this; } }
用戶提交任務的時候,調度中心返回一個UserWork對象。用戶可以使用該對象和調度中心通信。
2.2 調度中心的實現
調度中心的實現大致分為以下幾個邏輯。
2.2.1 初始化
constructor(options = {}) { this.options = options; // 線程池總任務數 this.totalWork = 0; // 子線程隊列 this.workerQueue = []; // 核心線程數 this.coreThreads = ~~options.coreThreads || config.CORE_THREADS; // 線程池最大線程數,如果不支持動態擴容則最大線程數等于核心線程數 this.maxThreads = options.expansion !== false ? Math.max(this.coreThreads, config.MAX_THREADS) : this.coreThreads; // 工作線程處理任務的模式 this.sync = options.sync !== false; // 超過任務隊列長度時的處理策略 this.discardPolicy = options.discardPolicy ? options.discardPolicy : DISCARD_POLICY.NOT_DISCARD; // 是否預創建子線程 this.preCreate = options.preCreate === true; this.maxIdleTime = ~~options.maxIdleTime || config.MAX_IDLE_TIME; this.pollIntervalTime = ~~options.pollIntervalTime || config.POLL_INTERVAL_TIME; this.maxWork = ~~options.maxWork || config.MAX_WORK; // 是否預創建線程池 this.preCreate && this.preCreateThreads(); }
從初始化代碼中我們看到線程池大致支持的能力。
鴻蒙官方戰略合作共建——HarmonyOS技術社區
核心線程數
最大線程數
過載時的處理策略,和過載的閾值
子線程空閑退出的時間和輪詢任務的時間
是否預創建線程池
是否支持動態擴容
核心線程數是任務數沒有達到閾值時的工作線程集合。是處理任務的主力軍。任務數達到閾值后,如果支持動態擴容(可配置)則會創建新的線程去處理更多的任務。一旦負載變低,線程空閑時間達到閾值則會自動退出。如果擴容的線程數達到閾值,還有新的任務到來,則根據丟棄策略進行相關的處理。
2.2.2 創建線程
newThread() { let { sync } = this; const worker = new Worker(workerPath, {workerData: { sync, maxIdleTime: this.maxIdleTime, pollIntervalTime: this.pollIntervalTime, }}); const node = { worker, // 該線程處理的任務數量 queueLength: 0, }; this.workerQueue.push(node); const threadId = worker.threadId; worker.on('exit', (status) => { // 異常退出則補充線程,正常退出則不補充 if (status) { this.newThread(); } this.totalWork -= node.queueLength; this.workerQueue = this.workerQueue.filter((worker) => { return worker.threadId !== threadId; }); }); // 和子線程通信 worker.on('message', (result) => { const { work, event, } = result; const { data, error, workId } = work; // 通過workId拿到對應的userWorker const userWorker = workPool[workId]; delete workPool[workId]; // 任務數減一 node.queueLength--; this.totalWork--; switch(event) { case 'done': // 通知用戶,任務完成 userWorker.emit('done', data); break; case 'error': // 通知用戶,任務出錯 if (EventEmitter.listenerCount(userWorker, 'error')) { userWorker.emit('error', error); } break; default: break; } }); worker.on('error', (...rest) => { console.log(...rest) }); return node; }
創建線程主要是調用nodejs提供的模塊進行創建。然后監聽子線程的退出和message、error事件。如果是異常退出則補充線程。調度中心維護了一個子線程的隊列。記錄了每個子線程(worker)的實例和任務數。
2.2.3 選擇執行任務的線程
selectThead() { let min = Number.MAX_SAFE_INTEGER; let i = 0; let index = 0; // 找出任務數最少的線程,把任務交給他 for (; i < this.workerQueue.length; i++) { const { queueLength } = this.workerQueue[i]; if (queueLength < min) { index = i; min = queueLength; } } return this.workerQueue[index]; }
選擇策略目前是選擇任務數最少的,本來還支持隨機和輪詢方式,但是貌似沒有什么場景和必要,就去掉了。
2.2.4 暴露提交任務的接口
submit(filename, options = {}) { return new Promise(async (resolve, reject) => { let thread; // 沒有線程則創建一個 if (this.workerQueue.length) { thread = this.selectThead(); // 任務隊列非空 if (thread.queueLength !== 0) { // 子線程個數還沒有達到核心線程數,則新建線程處理 if (this.workerQueue.length < this.coreThreads) { thread = this.newThread(); } else if (this.totalWork + 1 > this.maxWork){ // 總任務數已達到閾值,還沒有達到線程數閾值,則創建 if(this.workerQueue.length < this.maxThreads) { thread = this.newThread(); } else { // 處理溢出的任務 switch(this.discardPolicy) { case DISCARD_POLICY.ABORT: return reject(new Error('queue overflow')); case DISCARD_POLICY.CALLER_RUNS: const userWork = new UserWork({workId: this.generateWorkId(), threadId}); try { const asyncFunction = require(filename); if (!isAsyncFunction(asyncFunction)) { return reject(new Error('need export a async function')); } const result = await asyncFunction(options); resolve(userWork); setImmediate(() => { userWork.emit('done', result); }); } catch (error) { resolve(userWork); setImmediate(() => { userWork.emit('error', error); }); } return; case DISCARD_POLICY.DISCARD_OLDEST: thread.worker.postMessage({cmd: 'delete'}); break; case DISCARD_POLICY.DISCARD: return reject(new Error('discard')); case DISCARD_POLICY.NOT_DISCARD: break; default: break; } } } } } else { thread = this.newThread(); } // 生成一個任務id const workId = this.generateWorkId(); // 新建一個work,交給對應的子線程 const work = new Work({ workId, filename, options }); const userWork = new UserWork({workId, threadId: thread.worker.threadId}); thread.queueLength++; this.totalWork++; thread.worker.postMessage({cmd: 'add', work}); resolve(userWork); }) }
提交任務的函數比較復雜,提交一個任務的時候,調度中心會根據當前的負載情況和線程數,決定對一個任務做如何處理。如果可以處理,則把任務交給選中的子線程。最后給用戶返回一個UserWorker對象。
2.3調度中心和子線程的通信數據結構
class Work { constructor({workId, filename, options}) { // 任務id this.workId = workId; // 文件名 this.filename = filename; // 處理結果,由用戶代碼返回 this.data = null; // 執行出錯 this.error = null; // 執行時入參 this.options = options; } }
一個任務對應一個id,目前只支持文件的執行模式,后續會支持字符串。
2.4 子線程的實現
子線程的實現主要分為幾個部分
2.4.1 監聽調度中心分發的命令
parentPort.on('message', ({cmd, work}) => { switch(cmd) { case 'delete': return queue.shift(); case 'add': return queue.push(work); } });
2.4.2 輪詢是否有任務需要處理
function poll() { const now = Date.now(); if (now - lastWorkTime > maxIdleTime && !queue.length) { process.exit(0); } setTimeout(async () => { // 處理任務 poll(); } }, pollIntervalTime); } // 輪詢判斷是否有任務 poll();
不斷輪詢是否有任務需要處理,如果沒有并且空閑時間達到閾值則退出。
2.4.3 處理任務
處理任務模式分為同步和異步
while(queue.length) { const work = queue.shift(); try { const { filename, options } = work; const asyncFunction = require(filename); if (!isAsyncFunction(asyncFunction)) { return; } lastWorkTime = now; const result = await asyncFunction(options); work.data = result; parentPort.postMessage({event: 'done', work}); } catch (error) { work.error = error.toString(); parentPort.postMessage({event: 'error', work}); } }
用戶需要導出一個async函數,使用這種方案主要是為了執行時可以給用戶傳入參數。并且實現同步。處理完后通知調度中心。下面是異步處理方式,子線程不需要同步等待用戶的代碼結果。
const arr = []; while(queue.length) { const work = queue.shift(); try { const { filename } = work; const asyncFunction = require(filename); if (!isAsyncFunction(asyncFunction)) { return; } arr.push({asyncFunction, work}); } catch (error) { work.error = error.toString(); parentPort.postMessage({event: 'error', work}); } } arr.map(async ({asyncFunction, work}) => { try { const { options } = work; lastWorkTime = now; const result = await asyncFunction(options); work.data = result; parentPort.postMessage({event: 'done', work}); } catch (e) { work.error = error.toString(); parentPort.postMessage({event: 'done', work}); } })
最后還有一些配置和定制化的功能。
module.exports = { // 最大的線程數 MAX_THREADS: 50, // 線程池最大任務數 MAX_WORK: Infinity, // 默認核心線程數 CORE_THREADS: 10, // 最大空閑時間 MAX_IDLE_TIME: 10 * 60 * 1000, // 子線程輪詢時間 POLL_INTERVAL_TIME: 10, }; // 丟棄策略 const DISCARD_POLICY = { // 報錯 ABORT: 1, // 在主線程里執行 CALLER_RUNS: 2, // 丟棄最老的的任務 DISCARD_OLDEST: 3, // 丟棄 DISCARD: 4, // 不丟棄 NOT_DISCARD: 5, };
支持多個類型的線程池
class AsyncThreadPool extends ThreadPool { constructor(options) { super({...options, sync: false}); } } class SyncThreadPool extends ThreadPool { constructor(options) { super({...options, sync: true}); } } // cpu型任務的線程池,線程數和cpu核數一樣,不支持動態擴容 class CPUThreadPool extends ThreadPool { constructor(options) { super({...options, coreThreads: cores, expansion: false}); } } // 線程池只有一個線程,類似消息隊列 class SingleThreadPool extends ThreadPool { constructor(options) { super({...options, coreThreads: 1, expansion: false }); } } // 線程數固定的線程池,不支持動態擴容線程 class FixedThreadPool extends ThreadPool { constructor(options) { super({ ...options, expansion: false }); } }
這就是線程池的實現,有很多細節還需要思考。下面是一個性能測試的例子。
3 測試
const { MAX } = require('./constants'); module.exports = async function() { let ret = 0; let i = 0; while(i++ < MAX) { ret++; Buffer.from(String(Math.random())).toString('base64'); } return ret; }
在服務器以單線程和多線程的方式執行以上代碼,下面是MAX為10000和100000時,使用CPUThreadPool類型線程池的性能對比(具體代碼參考https://github.com/theanarkh/nodejs-threadpool)。
10000
單線程 [ 358.35, 490.93, 705.23, 982.6, 1155.72 ]
多線程 [ 379.3, 230.35, 315.52, 429.4, 496.04 ]
100000
單線程 [ 2485.5, 4454.63, 6894.5, 9173.16, 11011.16 ]
多線程 [ 1791.75, 2787.15, 3275.08, 4093.39, 3674.91 ]
關于Nodejs中怎么實現多線程問題的解答就分享到這里了,希望以上內容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關注億速云行業資訊頻道了解更多相關知識。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。