Spaces:
Running
Running
| /* eslint-disable no-var */ | |
| var reusify = require('reusify') | |
| function fastqueue (context, worker, concurrency) { | |
| if (typeof context === 'function') { | |
| concurrency = worker | |
| worker = context | |
| context = null | |
| } | |
| if (concurrency < 1) { | |
| throw new Error('fastqueue concurrency must be greater than 1') | |
| } | |
| var cache = reusify(Task) | |
| var queueHead = null | |
| var queueTail = null | |
| var _running = 0 | |
| var errorHandler = null | |
| var self = { | |
| push: push, | |
| drain: noop, | |
| saturated: noop, | |
| pause: pause, | |
| paused: false, | |
| concurrency: concurrency, | |
| running: running, | |
| resume: resume, | |
| idle: idle, | |
| length: length, | |
| getQueue: getQueue, | |
| unshift: unshift, | |
| empty: noop, | |
| kill: kill, | |
| killAndDrain: killAndDrain, | |
| error: error | |
| } | |
| return self | |
| function running () { | |
| return _running | |
| } | |
| function pause () { | |
| self.paused = true | |
| } | |
| function length () { | |
| var current = queueHead | |
| var counter = 0 | |
| while (current) { | |
| current = current.next | |
| counter++ | |
| } | |
| return counter | |
| } | |
| function getQueue () { | |
| var current = queueHead | |
| var tasks = [] | |
| while (current) { | |
| tasks.push(current.value) | |
| current = current.next | |
| } | |
| return tasks | |
| } | |
| function resume () { | |
| if (!self.paused) return | |
| self.paused = false | |
| for (var i = 0; i < self.concurrency; i++) { | |
| _running++ | |
| release() | |
| } | |
| } | |
| function idle () { | |
| return _running === 0 && self.length() === 0 | |
| } | |
| function push (value, done) { | |
| var current = cache.get() | |
| current.context = context | |
| current.release = release | |
| current.value = value | |
| current.callback = done || noop | |
| current.errorHandler = errorHandler | |
| if (_running === self.concurrency || self.paused) { | |
| if (queueTail) { | |
| queueTail.next = current | |
| queueTail = current | |
| } else { | |
| queueHead = current | |
| queueTail = current | |
| self.saturated() | |
| } | |
| } else { | |
| _running++ | |
| worker.call(context, current.value, current.worked) | |
| } | |
| } | |
| function unshift (value, done) { | |
| var current = cache.get() | |
| current.context = context | |
| current.release = release | |
| current.value = value | |
| current.callback = done || noop | |
| if (_running === self.concurrency || self.paused) { | |
| if (queueHead) { | |
| current.next = queueHead | |
| queueHead = current | |
| } else { | |
| queueHead = current | |
| queueTail = current | |
| self.saturated() | |
| } | |
| } else { | |
| _running++ | |
| worker.call(context, current.value, current.worked) | |
| } | |
| } | |
| function release (holder) { | |
| if (holder) { | |
| cache.release(holder) | |
| } | |
| var next = queueHead | |
| if (next) { | |
| if (!self.paused) { | |
| if (queueTail === queueHead) { | |
| queueTail = null | |
| } | |
| queueHead = next.next | |
| next.next = null | |
| worker.call(context, next.value, next.worked) | |
| if (queueTail === null) { | |
| self.empty() | |
| } | |
| } else { | |
| _running-- | |
| } | |
| } else if (--_running === 0) { | |
| self.drain() | |
| } | |
| } | |
| function kill () { | |
| queueHead = null | |
| queueTail = null | |
| self.drain = noop | |
| } | |
| function killAndDrain () { | |
| queueHead = null | |
| queueTail = null | |
| self.drain() | |
| self.drain = noop | |
| } | |
| function error (handler) { | |
| errorHandler = handler | |
| } | |
| } | |
| function noop () {} | |
| function Task () { | |
| this.value = null | |
| this.callback = noop | |
| this.next = null | |
| this.release = noop | |
| this.context = null | |
| this.errorHandler = null | |
| var self = this | |
| this.worked = function worked (err, result) { | |
| var callback = self.callback | |
| var errorHandler = self.errorHandler | |
| var val = self.value | |
| self.value = null | |
| self.callback = noop | |
| if (self.errorHandler) { | |
| errorHandler(err, val) | |
| } | |
| callback.call(self.context, err, result) | |
| self.release(self) | |
| } | |
| } | |
| function queueAsPromised (context, worker, concurrency) { | |
| if (typeof context === 'function') { | |
| concurrency = worker | |
| worker = context | |
| context = null | |
| } | |
| function asyncWrapper (arg, cb) { | |
| worker.call(this, arg) | |
| .then(function (res) { | |
| cb(null, res) | |
| }, cb) | |
| } | |
| var queue = fastqueue(context, asyncWrapper, concurrency) | |
| var pushCb = queue.push | |
| var unshiftCb = queue.unshift | |
| queue.push = push | |
| queue.unshift = unshift | |
| queue.drained = drained | |
| return queue | |
| function push (value) { | |
| var p = new Promise(function (resolve, reject) { | |
| pushCb(value, function (err, result) { | |
| if (err) { | |
| reject(err) | |
| return | |
| } | |
| resolve(result) | |
| }) | |
| }) | |
| // Let's fork the promise chain to | |
| // make the error bubble up to the user but | |
| // not lead to a unhandledRejection | |
| p.catch(noop) | |
| return p | |
| } | |
| function unshift (value) { | |
| var p = new Promise(function (resolve, reject) { | |
| unshiftCb(value, function (err, result) { | |
| if (err) { | |
| reject(err) | |
| return | |
| } | |
| resolve(result) | |
| }) | |
| }) | |
| // Let's fork the promise chain to | |
| // make the error bubble up to the user but | |
| // not lead to a unhandledRejection | |
| p.catch(noop) | |
| return p | |
| } | |
| function drained () { | |
| var previousDrain = queue.drain | |
| var p = new Promise(function (resolve) { | |
| queue.drain = function () { | |
| previousDrain() | |
| resolve() | |
| } | |
| }) | |
| return p | |
| } | |
| } | |
| module.exports = fastqueue | |
| module.exports.promise = queueAsPromised | |