import { AsyncResource } from 'async_hooks'; import { Worker } from 'worker_threads'; import { cpus } from 'os'; import { EventEmitter } from 'events'; import serializeJavascript from 'serialize-javascript'; import { freeWorker, taskInfo, workerPoolWorkerFlag } from './constants'; import type { WorkerCallback, WorkerContext, WorkerOutput, WorkerPoolOptions, WorkerPoolTask, WorkerWithTaskInfo } from './type'; class WorkerPoolTaskInfo extends AsyncResource { constructor(private callback: WorkerCallback) { super('WorkerPoolTaskInfo'); } done(err: Error | null, result: any) { this.runInAsyncScope(this.callback, null, err, result); this.emitDestroy(); } } export class WorkerPool extends EventEmitter { protected maxInstances: number; protected filePath: string; protected tasks: WorkerPoolTask[] = []; protected workers: WorkerWithTaskInfo[] = []; protected freeWorkers: WorkerWithTaskInfo[] = []; constructor(options: WorkerPoolOptions) { super(); this.maxInstances = options.maxWorkers || cpus().length; this.filePath = options.filePath; this.on(freeWorker, () => { if (this.tasks.length > 0) { const { context, cb } = this.tasks.shift()!; this.runTask(context, cb); } }); } get numWorkers(): number { return this.workers.length; } addAsync(context: WorkerContext): Promise { return new Promise((resolve, reject) => { this.runTask(context, (err, output) => { if (err) { reject(err); return; } if (!output) { reject(new Error('The output is empty')); return; } resolve(output); }); }); } close() { for (let i = 0; i < this.workers.length; i++) { const worker = this.workers[i]; worker.terminate(); } } private addNewWorker() { const worker: WorkerWithTaskInfo = new Worker(this.filePath, { workerData: workerPoolWorkerFlag }); worker.on('message', (result) => { worker[taskInfo]?.done(null, result); worker[taskInfo] = null; this.freeWorkers.push(worker); this.emit(freeWorker); }); worker.on('error', (err) => { if (worker[taskInfo]) { worker[taskInfo].done(err, null); } else { this.emit('error', err); } this.workers.splice(this.workers.indexOf(worker), 1); this.addNewWorker(); }); this.workers.push(worker); this.freeWorkers.push(worker); this.emit(freeWorker); } private runTask(context: WorkerContext, cb: WorkerCallback) { if (this.freeWorkers.length === 0) { this.tasks.push({ context, cb }); if (this.numWorkers < this.maxInstances) { this.addNewWorker(); } return; } const worker = this.freeWorkers.pop(); if (worker) { worker[taskInfo] = new WorkerPoolTaskInfo(cb); worker.postMessage({ code: context.code, options: serializeJavascript(context.options) }); } } }