import pTimeout from 'p-timeout'; const normalizeEmitter = emitter => { const addListener = emitter.addEventListener || emitter.on || emitter.addListener; const removeListener = emitter.removeEventListener || emitter.off || emitter.removeListener; if (!addListener || !removeListener) { throw new TypeError('Emitter is not compatible'); } return { addListener: addListener.bind(emitter), removeListener: removeListener.bind(emitter), }; }; export function pEventMultiple(emitter, event, options) { let cancel; const returnValue = new Promise((resolve, reject) => { options = { rejectionEvents: ['error'], multiArgs: false, resolveImmediately: false, ...options, }; if (!(options.count >= 0 && (options.count === Number.POSITIVE_INFINITY || Number.isInteger(options.count)))) { throw new TypeError('The `count` option should be at least 0 or more'); } options.signal?.throwIfAborted(); // Allow multiple events const events = [event].flat(); const items = []; const {addListener, removeListener} = normalizeEmitter(emitter); const onItem = (...arguments_) => { const value = options.multiArgs ? arguments_ : arguments_[0]; // eslint-disable-next-line unicorn/no-array-callback-reference if (options.filter && !options.filter(value)) { return; } items.push(value); if (options.count === items.length) { cancel(); resolve(items); } }; const rejectHandler = error => { cancel(); reject(error); }; cancel = () => { for (const event of events) { removeListener(event, onItem); } for (const rejectionEvent of options.rejectionEvents) { removeListener(rejectionEvent, rejectHandler); } }; for (const event of events) { addListener(event, onItem); } for (const rejectionEvent of options.rejectionEvents) { addListener(rejectionEvent, rejectHandler); } if (options.signal) { options.signal.addEventListener('abort', () => { rejectHandler(options.signal.reason); }, {once: true}); } if (options.resolveImmediately) { resolve(items); } }); returnValue.cancel = cancel; if (typeof options.timeout === 'number') { const timeout = pTimeout(returnValue, {milliseconds: options.timeout}); timeout.cancel = cancel; return timeout; } return returnValue; } export function pEvent(emitter, event, options) { if (typeof options === 'function') { options = {filter: options}; } options = { ...options, count: 1, resolveImmediately: false, }; const arrayPromise = pEventMultiple(emitter, event, options); const promise = arrayPromise.then(array => array[0]); promise.cancel = arrayPromise.cancel; return promise; } export function pEventIterator(emitter, event, options) { if (typeof options === 'function') { options = {filter: options}; } // Allow multiple events const events = [event].flat(); options = { rejectionEvents: ['error'], resolutionEvents: [], limit: Number.POSITIVE_INFINITY, multiArgs: false, ...options, }; const {limit} = options; const isValidLimit = limit >= 0 && (limit === Number.POSITIVE_INFINITY || Number.isInteger(limit)); if (!isValidLimit) { throw new TypeError('The `limit` option should be a non-negative integer or Infinity'); } options.signal?.throwIfAborted(); if (limit === 0) { // Return an empty async iterator to avoid any further cost return { [Symbol.asyncIterator]() { return this; }, async next() { return { done: true, value: undefined, }; }, }; } const {addListener, removeListener} = normalizeEmitter(emitter); let isDone = false; let error; let hasPendingError = false; const nextQueue = []; const valueQueue = []; let eventCount = 0; let isLimitReached = false; const valueHandler = (...arguments_) => { eventCount++; isLimitReached = eventCount === limit; const value = options.multiArgs ? arguments_ : arguments_[0]; if (nextQueue.length > 0) { const {resolve} = nextQueue.shift(); resolve({done: false, value}); if (isLimitReached) { cancel(); } return; } valueQueue.push(value); if (isLimitReached) { cancel(); } }; const cancel = () => { isDone = true; for (const event of events) { removeListener(event, valueHandler); } for (const rejectionEvent of options.rejectionEvents) { removeListener(rejectionEvent, rejectHandler); } for (const resolutionEvent of options.resolutionEvents) { removeListener(resolutionEvent, resolveHandler); } while (nextQueue.length > 0) { const {resolve} = nextQueue.shift(); resolve({done: true, value: undefined}); } }; const rejectHandler = (...arguments_) => { error = options.multiArgs ? arguments_ : arguments_[0]; if (nextQueue.length > 0) { const {reject} = nextQueue.shift(); reject(error); } else { hasPendingError = true; } cancel(); }; const resolveHandler = (...arguments_) => { const value = options.multiArgs ? arguments_ : arguments_[0]; // eslint-disable-next-line unicorn/no-array-callback-reference if (options.filter && !options.filter(value)) { cancel(); return; } if (nextQueue.length > 0) { const {resolve} = nextQueue.shift(); resolve({done: true, value}); } else { valueQueue.push(value); } cancel(); }; for (const event of events) { addListener(event, valueHandler); } for (const rejectionEvent of options.rejectionEvents) { addListener(rejectionEvent, rejectHandler); } for (const resolutionEvent of options.resolutionEvents) { addListener(resolutionEvent, resolveHandler); } if (options.signal) { options.signal.addEventListener('abort', () => { rejectHandler(options.signal.reason); }, {once: true}); } return { [Symbol.asyncIterator]() { return this; }, async next() { if (valueQueue.length > 0) { const value = valueQueue.shift(); return { done: isDone && valueQueue.length === 0 && !isLimitReached, value, }; } if (hasPendingError) { hasPendingError = false; throw error; } if (isDone) { return { done: true, value: undefined, }; } return new Promise((resolve, reject) => { nextQueue.push({resolve, reject}); }); }, async return(value) { cancel(); return { done: isDone, value, }; }, }; } export {TimeoutError} from 'p-timeout';