Spaces:
Running
Running
File size: 7,248 Bytes
30c32c8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 |
const Timer = require('../util/timer');
/**
* This class uses the token bucket algorithm to control a queue of tasks.
*/
class TaskQueue {
/**
* Creates an instance of TaskQueue.
* To allow bursts, set `maxTokens` to several times the average task cost.
* To prevent bursts, set `maxTokens` to the cost of the largest tasks.
* Note that tasks with a cost greater than `maxTokens` will be rejected.
*
* @param {number} maxTokens - the maximum number of tokens in the bucket (burst size).
* @param {number} refillRate - the number of tokens to be added per second (sustain rate).
* @param {object} options - optional settings for the new task queue instance.
* @property {number} startingTokens - the number of tokens the bucket starts with (default: `maxTokens`).
* @property {number} maxTotalCost - reject a task if total queue cost would pass this limit (default: no limit).
* @memberof TaskQueue
*/
constructor (maxTokens, refillRate, options = {}) {
this._maxTokens = maxTokens;
this._refillRate = refillRate;
this._pendingTaskRecords = [];
this._tokenCount = options.hasOwnProperty('startingTokens') ? options.startingTokens : maxTokens;
this._maxTotalCost = options.hasOwnProperty('maxTotalCost') ? options.maxTotalCost : Infinity;
this._timer = new Timer();
this._timer.start();
this._timeout = null;
this._lastUpdateTime = this._timer.timeElapsed();
this._runTasks = this._runTasks.bind(this);
}
/**
* Get the number of queued tasks which have not yet started.
*
* @readonly
* @memberof TaskQueue
*/
get length () {
return this._pendingTaskRecords.length;
}
/**
* Wait until the token bucket is full enough, then run the provided task.
*
* @param {Function} task - the task to run.
* @param {number} [cost=1] - the number of tokens this task consumes from the bucket.
* @returns {Promise} - a promise for the task's return value.
* @memberof TaskQueue
*/
do (task, cost = 1) {
if (this._maxTotalCost < Infinity) {
const currentTotalCost = this._pendingTaskRecords.reduce((t, r) => t + r.cost, 0);
if (currentTotalCost + cost > this._maxTotalCost) {
return Promise.reject('Maximum total cost exceeded');
}
}
const newRecord = {
cost
};
newRecord.promise = new Promise((resolve, reject) => {
newRecord.cancel = () => {
reject(new Error('Task canceled'));
};
// The caller, `_runTasks()`, is responsible for cost-checking and spending tokens.
newRecord.wrappedTask = () => {
try {
resolve(task());
} catch (e) {
reject(e);
}
};
});
this._pendingTaskRecords.push(newRecord);
// If the queue has been idle we need to prime the pump
if (this._pendingTaskRecords.length === 1) {
this._runTasks();
}
return newRecord.promise;
}
/**
* Cancel one pending task, rejecting its promise.
*
* @param {Promise} taskPromise - the promise returned by `do()`.
* @returns {boolean} - true if the task was found, or false otherwise.
* @memberof TaskQueue
*/
cancel (taskPromise) {
const taskIndex = this._pendingTaskRecords.findIndex(r => r.promise === taskPromise);
if (taskIndex !== -1) {
const [taskRecord] = this._pendingTaskRecords.splice(taskIndex, 1);
taskRecord.cancel();
if (taskIndex === 0 && this._pendingTaskRecords.length > 0) {
this._runTasks();
}
return true;
}
return false;
}
/**
* Cancel all pending tasks, rejecting all their promises.
*
* @memberof TaskQueue
*/
cancelAll () {
if (this._timeout !== null) {
this._timer.clearTimeout(this._timeout);
this._timeout = null;
}
const oldTasks = this._pendingTaskRecords;
this._pendingTaskRecords = [];
oldTasks.forEach(r => r.cancel());
}
/**
* Shorthand for calling _refill() then _spend(cost).
*
* @see {@link TaskQueue#_refill}
* @see {@link TaskQueue#_spend}
* @param {number} cost - the number of tokens to try to spend.
* @returns {boolean} true if we had enough tokens; false otherwise.
* @memberof TaskQueue
*/
_refillAndSpend (cost) {
this._refill();
return this._spend(cost);
}
/**
* Refill the token bucket based on the amount of time since the last refill.
*
* @memberof TaskQueue
*/
_refill () {
const now = this._timer.timeElapsed();
const timeSinceRefill = now - this._lastUpdateTime;
if (timeSinceRefill <= 0) return;
this._lastUpdateTime = now;
this._tokenCount += timeSinceRefill * this._refillRate / 1000;
this._tokenCount = Math.min(this._tokenCount, this._maxTokens);
}
/**
* If we can "afford" the given cost, subtract that many tokens and return true.
* Otherwise, return false.
*
* @param {number} cost - the number of tokens to try to spend.
* @returns {boolean} true if we had enough tokens; false otherwise.
* @memberof TaskQueue
*/
_spend (cost) {
if (cost <= this._tokenCount) {
this._tokenCount -= cost;
return true;
}
return false;
}
/**
* Loop until the task queue is empty, running each task and spending tokens to do so.
* Any time the bucket can't afford the next task, delay asynchronously until it can.
*
* @memberof TaskQueue
*/
_runTasks () {
if (this._timeout) {
this._timer.clearTimeout(this._timeout);
this._timeout = null;
}
for (;;) {
const nextRecord = this._pendingTaskRecords.shift();
if (!nextRecord) {
// We ran out of work. Go idle until someone adds another task to the queue.
return;
}
if (nextRecord.cost > this._maxTokens) {
throw new Error(`Task cost ${nextRecord.cost} is greater than bucket limit ${this._maxTokens}`);
}
// Refill before each task in case the time it took for the last task to run was enough to afford the next.
if (this._refillAndSpend(nextRecord.cost)) {
nextRecord.wrappedTask();
} else {
// We can't currently afford this task. Put it back and wait until we can and try again.
this._pendingTaskRecords.unshift(nextRecord);
const tokensNeeded = Math.max(nextRecord.cost - this._tokenCount, 0);
const estimatedWait = Math.ceil(1000 * tokensNeeded / this._refillRate);
this._timeout = this._timer.setTimeout(this._runTasks, estimatedWait);
return;
}
}
}
}
module.exports = TaskQueue;
|