您好,登錄后才能下訂單哦!
Nodejs中怎么實現一個線程池,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學習下,希望你能有所收獲。
nodejs雖然提供了線程的能力,但是很多時候,往往不能直接使用線程或者無限制地創建線程,比如我們有一個功能是cpu密集型的,如果一個請求就開一個線程,這很明顯不是最好的實踐,這時候,我們需要使用池化的技術,本文介紹在nodejs線程模塊的基礎上,如何設計和實現一個線程池庫(https://github.com/theanarkh/nodejs-threadpool或npm i nodejs-threadpool )。下面是線程池的總體架構。
設計一個線程池,在真正寫代碼之前,有很多設計需要考慮,大概如下:
1任務隊列的設計,一個隊列,多個線程互斥訪問,或者每個線程一個隊列,不需要互斥訪問。
2 線程退出的設計,可以由主線程檢測空閑線程,然后使子線程退出。或者子線程退出,通知主線程。空閑不一定是沒有任務就退出,可以設計空閑時間達到閾值后退出,因為創建線程是有時間開銷的。
3 任務數的設計,每個線程可以有個任務數,還可以增加一個總任務數,即全部線程任務數加起來
4 選擇線程的設計,選擇任務數最少的線程。
5 線程類型的設計,可以區分核心線程和預備線程,任務少的時候,核心線程處理就行。任務多也創建預備線程幫忙處理。
6 線程池類型的設計,cpu密集型的,線程數等于核數,否則自定義線程數就行。
7 支持任務的取消和超時機制,防止一個任務時間過長或者死循環。
本文介紹的線程池具體設計思想如下(參考java):
1 主線程維護一個隊列,子線程的任務由子線程負責分發,不需要互斥訪問,子線程也不需要維護自己的隊列。
2 線程退出的設計,主線程負責檢查子線程空閑時間是否達到閾值,是則使子線程退出。
3 任務數的設計,主線程負責管理任務個數并應有相應的策略。
4 選擇線程的設計,選擇任務數最少的線程。
5 線程類型的設計,區分核心線程和預備線程,任務少的時候,核心線程處理就行。任務多也創建預備線程幫忙處理。
6 線程池類型的設計,cpu密集型的,線程數等于核數,否則自定義線程數就行。
7 支持任務的取消和超時機制,超時或者取消的時候,主線程判斷任務是待執行還是正在執行,如果是待執行則從任務隊列中刪除,如果是正在執行則殺死對應的子線程。下面我們看一下具體的設計。
1 主線程和子線程通信的數據結構
// 任務類,一個任務對應一個id class Work { constructor({workId, filename, options}) { // 任務id this.workId = workId; // 任務邏輯,字符串或者js文件路徑 this.filename = filename; // 任務返回的結果 this.data = null; // 任務返回的錯誤 this.error = null; // 執行任務時傳入的參數,用戶定義 this.options = options; } }
主線程給子線程分派一個任務的時候,就給子線程發送一個Work對象。在nodejs中線程間通信需要經過序列化和反序列化,所以通信的數據結構包括的信息不能過多。
2 子線程處理任務邏輯
const { parentPort } = require('worker_threads'); const vm = require('vm'); const { isFunction, isJSFile } = require('./utils'); // 監聽主線程提交過來的任務 parentPort.on('message', async (work) => { try { const { filename, options } = work; let aFunction; if (isJSFile(filename)) { aFunction = require(filename); } else { aFunction = vm.runInThisContext(`(${filename})`); } if (!isFunction(aFunction)) { throw new Error('work type error: js file or string'); } work.data = await aFunction(options); parentPort.postMessage({event: 'done', work}); } catch (error) { work.error = error.toString(); parentPort.postMessage({event: 'error', work}); } }); process.on('uncaughtException', (...rest) => { console.error(...rest); }); process.on('unhandledRejection', (...rest) => { console.error(...rest); });
子線程的邏輯比較簡單,就是監聽主線程分派過來的任務,然后執行任務,執行完之后通知主線程。任務支持js文件和字符串代碼的形式。需要返回一個Promise或者async函數。用于用于通知主線程任務已經完成。
3 線程池和業務的通信
// 提供給用戶側的接口 class UserWork extends EventEmitter { constructor({ workId }) { super(); // 任務id this.workId = workId; // 支持超時取消任務 this.timer = null; // 任務狀態 this.state = WORK_STATE.PENDDING; } // 超時后取消任務 setTimeout(timeout) { this.timer = setTimeout(() => { this.timer && this.cancel() && this.emit('timeout'); }, ~~timeout); } // 取消之前設置的定時器 clearTimeout() { clearTimeout(this.timer); this.timer = null; } // 直接取消任務,如果執行完了就不能取消了,this.terminate是動態設置的 cancel() { if (this.state === WORK_STATE.END || this.state === WORK_STATE.CANCELED) { return false; } else { this.terminate(); return true; } } // 修改任務狀態 setState(state) { this.state = state; } }
業務提交一個任務給線程池的時候,線程池會返回一個UserWork類,業務側通過UserWork類和線程池通信。
4 管理子線程的數據結構
// 管理子線程的數據結構 class Thread { constructor({ worker }) { // nodejs的Worker對象,nodejs的worker_threads模塊的Worker this.worker = worker; // 線程狀態 this.state = THREAD_STATE.IDLE; // 上次工作的時間 this.lastWorkTime = Date.now(); } // 修改線程狀態 setState(state) { this.state = state; } // 修改線程最后工作時間 setLastWorkTime(time) { this.lastWorkTime = time; } }
線程池中維護了多個子線程,Thread類用于管理子線程的信息。
5 線程池 線程池的實現是核心,我們分為幾個部分講。
5.1 支持的配置
constructor(options = {}) { this.options = options; // 子線程隊列 this.workerQueue = []; // 核心線程數 this.coreThreads = ~~options.coreThreads || config.CORE_THREADS; // 線程池最大線程數,如果不支持動態擴容則最大線程數等于核心線程數 this.maxThreads = options.expansion !== false ? Math.max(this.coreThreads, config.MAX_THREADS) : this.coreThreads; // 超過任務隊列長度時的處理策略 this.discardPolicy = options.discardPolicy ? options.discardPolicy : DISCARD_POLICY.NOT_DISCARD; // 是否預創建子線程 this.preCreate = options.preCreate === true; // 線程最大空閑時間,達到后自動退出 this.maxIdleTime = ~~options.maxIdleTime || config.MAX_IDLE_TIME; // 是否預創建線程池 this.preCreate && this.preCreateThreads(); // 保存線程池中任務對應的UserWork this.workPool = {}; // 線程池中當前可用的任務id,每次有新任務時自增1 this.workId = 0; // 線程池中的任務隊列 this.queue = []; // 線程池總任務數 this.totalWork = 0; // 支持的最大任務數 this.maxWork = ~~options.maxWork || config.MAX_WORK; // 處理任務的超時時間,全局配置 this.timeout = ~~options.timeout; this.pollIdle(); }
上面的代碼列出了線程池所支持的能力。
5.2 創建線程
newThread() { const worker = new Worker(workerPath); const thread = new Thread({worker}); this.workerQueue.push(thread); const threadId = worker.threadId; worker.on('exit', () => { // 找到該線程對應的數據結構,然后刪除該線程的數據結構 const position = this.workerQueue.findIndex(({worker}) => { return worker.threadId === threadId; }); const exitedThread = this.workerQueue.splice(position, 1); // 退出時狀態是BUSY說明還在處理任務(非正常退出) this.totalWork -= exitedThread.state === THREAD_STATE.BUSY ? 1 : 0; }); // 和子線程通信 worker.on('message', (result) => { const { work, event, } = result; const { data, error, workId } = work; // 通過workId拿到對應的userWork const userWork = this.workPool[workId]; // 不存在說明任務被取消了 if (!userWork) { return; } // 修改線程池數據結構 this.endWork(userWork); // 修改線程數據結構 thread.setLastWorkTime(Date.now()); // 還有任務則通知子線程處理,否則修改子線程狀態為空閑 if (this.queue.length) { // 從任務隊列拿到一個任務交給子線程 this.submitWorkToThread(thread, this.queue.shift()); } else { thread.setState(THREAD_STATE.IDLE); } switch(event) { case 'done': // 通知用戶,任務完成 userWork.emit('done', data); break; case 'error': // 通知用戶,任務出錯 if (EventEmitter.listenerCount(userWork, 'error')) { userWork.emit('error', error); } break; default: break; } }); worker.on('error', (...rest) => { console.error(...rest); }); return thread; }
創建線程,并保持線程對應的數據結構、退出、通信管理、任務分派。子線程執行完任務后,會通知線程池,主線程通知用戶。
5.3 選擇線程
selectThead() { // 找出空閑的線程,把任務交給他 for (let i = 0; i < this.workerQueue.length; i++) { if (this.workerQueue[i].state === THREAD_STATE.IDLE) { return this.workerQueue[i]; } } // 沒有空閑的則隨機選擇一個 return this.workerQueue[~~(Math.random() * this.workerQueue.length)]; }
當用戶給線程池提交一個任務時,線程池會選擇一個空閑的線程處理該任務。如果沒有可用線程則任務插入待處理隊列等待處理。
5.4 提交任務
// 給線程池提交一個任務 submit(filename, options = {}) { return new Promise(async (resolve, reject) => { let thread; // 沒有線程則創建一個 if (this.workerQueue.length) { thread = this.selectThead(); // 該線程還有任務需要處理 if (thread.state === THREAD_STATE.BUSY) { // 子線程個數還沒有達到核心線程數,則新建線程處理 if (this.workerQueue.length < this.coreThreads) { thread = this.newThread(); } else if (this.totalWork + 1 > this.maxWork){ // 總任務數已達到閾值,還沒有達到線程數閾值,則創建 if(this.workerQueue.length < this.maxThreads) { thread = this.newThread(); } else { // 處理溢出的任務 switch(this.discardPolicy) { case DISCARD_POLICY.ABORT: return reject(new Error('queue overflow')); case DISCARD_POLICY.CALLER_RUN: const workId = this.generateWorkId(); const userWork = new UserWork({workId}); userWork.setState(WORK_STATE.RUNNING); userWork.terminate = () => { userWork.setState(WORK_STATE.CANCELED); }; this.timeout && userWork.setTimeout(this.timeout); resolve(userWork); try { let aFunction; if (isJSFile(filename)) { aFunction = require(filename); } else { aFunction = vm.runInThisContext(`(${filename})`); } if (!isFunction(aFunction)) { throw new Error('work type error: js file or string'); } const result = await aFunction(options); // 延遲通知,讓用戶有機會取消或者注冊事件 setImmediate(() => { if (userWork.state !== WORK_STATE.CANCELED) { userWork.setState(WORK_STATE.END); userWork.emit('done', result); } }); } catch (error) { setImmediate(() => { if (userWork.state !== WORK_STATE.CANCELED) { userWork.setState(WORK_STATE.END); userWork.emit('error', error.toString()); } }); } return; case DISCARD_POLICY.OLDEST_DISCARD: const work = this.queue.shift(); // maxWork為1時,work會為空 if (work && this.workPool[work.workId]) { this.cancelWork(this.workPool[work.workId]); } else { return reject(new Error('no work can be discarded')); } break; case DISCARD_POLICY.DISCARD: return reject(new Error('discard')); case DISCARD_POLICY.NOT_DISCARD: break; default: break; } } } } } else { thread = this.newThread(); } // 生成一個任務id const workId = this.generateWorkId(); // 新建一個UserWork const userWork = new UserWork({workId}); this.timeout && userWork.setTimeout(this.timeout); // 新建一個work const work = new Work({ workId, filename, options }); // 修改線程池數據結構,把UserWork和Work關聯起來 this.addWork(userWork); // 選中的線程正在處理任務,則先緩存到任務隊列 if (thread.state === THREAD_STATE.BUSY) { this.queue.push(work); userWork.terminate = () => { this.cancelWork(userWork); this.queue = this.queue.filter((node) => { return node.workId !== work.workId; }); } } else { this.submitWorkToThread(thread, work); } resolve(userWork); }) } submitWorkToThread(thread, work) { const userWork = this.workPool[work.workId]; userWork.setState(WORK_STATE.RUNNING); // 否則交給線程處理,并修改狀態和記錄該線程當前處理的任務id thread.setState(THREAD_STATE.BUSY); thread.worker.postMessage(work); userWork.terminate = () => { this.cancelWork(userWork); thread.setState(THREAD_STATE.DEAD); thread.worker.terminate(); } } addWork(userWork) { userWork.setState(WORK_STATE.PENDDING); this.workPool[userWork.workId] = userWork; this.totalWork++; } endWork(userWork) { delete this.workPool[userWork.workId]; this.totalWork--; userWork.setState(WORK_STATE.END); userWork.clearTimeout(); } cancelWork(userWork) { delete this.workPool[userWork.workId]; this.totalWork--; userWork.setState(WORK_STATE.CANCELED); userWork.emit('cancel'); }
提交任務是線程池暴露給用戶側的接口,主要處理的邏輯包括,根據當前的策略判斷是否需要新建線程、選擇線程處理任務、排隊任務等,如果任務數達到閾值,則根據丟棄策略處理該任務。
5.5 空閑處理
pollIdle() { setTimeout(() => { for (let i = 0; i < this.workerQueue.length; i++) { const node = this.workerQueue[i]; if (node.state === THREAD_STATE.IDLE && Date.now() - node.lastWorkTime > this.maxIdleTime) { node.worker.terminate(); } } this.pollIdle(); }, 1000); }
當子線程空閑時間達到閾值后,主線程會殺死子線程,避免浪費系統資源。
看完上述內容是否對您有幫助呢?如果還想對相關知識有進一步的了解或閱讀更多相關文章,請關注億速云行業資訊頻道,感謝您對億速云的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。