File size: 7,248 Bytes
30c32c8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
const Timer = require('../util/timer');

/**
 * This class uses the token bucket algorithm to control a queue of tasks.
 */
class TaskQueue {
    /**
     * Creates an instance of TaskQueue.
     * To allow bursts, set `maxTokens` to several times the average task cost.
     * To prevent bursts, set `maxTokens` to the cost of the largest tasks.
     * Note that tasks with a cost greater than `maxTokens` will be rejected.
     *
     * @param {number} maxTokens - the maximum number of tokens in the bucket (burst size).
     * @param {number} refillRate - the number of tokens to be added per second (sustain rate).
     * @param {object} options - optional settings for the new task queue instance.
     * @property {number} startingTokens - the number of tokens the bucket starts with (default: `maxTokens`).
     * @property {number} maxTotalCost - reject a task if total queue cost would pass this limit (default: no limit).
     * @memberof TaskQueue
     */
    constructor (maxTokens, refillRate, options = {}) {
        this._maxTokens = maxTokens;
        this._refillRate = refillRate;
        this._pendingTaskRecords = [];
        this._tokenCount = options.hasOwnProperty('startingTokens') ? options.startingTokens : maxTokens;
        this._maxTotalCost = options.hasOwnProperty('maxTotalCost') ? options.maxTotalCost : Infinity;
        this._timer = new Timer();
        this._timer.start();
        this._timeout = null;
        this._lastUpdateTime = this._timer.timeElapsed();

        this._runTasks = this._runTasks.bind(this);
    }

    /**
     * Get the number of queued tasks which have not yet started.
     *
     * @readonly
     * @memberof TaskQueue
     */
    get length () {
        return this._pendingTaskRecords.length;
    }

    /**
     * Wait until the token bucket is full enough, then run the provided task.
     *
     * @param {Function} task - the task to run.
     * @param {number} [cost=1] - the number of tokens this task consumes from the bucket.
     * @returns {Promise} - a promise for the task's return value.
     * @memberof TaskQueue
     */
    do (task, cost = 1) {
        if (this._maxTotalCost < Infinity) {
            const currentTotalCost = this._pendingTaskRecords.reduce((t, r) => t + r.cost, 0);
            if (currentTotalCost + cost > this._maxTotalCost) {
                return Promise.reject('Maximum total cost exceeded');
            }
        }
        const newRecord = {
            cost
        };
        newRecord.promise = new Promise((resolve, reject) => {
            newRecord.cancel = () => {
                reject(new Error('Task canceled'));
            };

            // The caller, `_runTasks()`, is responsible for cost-checking and spending tokens.
            newRecord.wrappedTask = () => {
                try {
                    resolve(task());
                } catch (e) {
                    reject(e);
                }
            };
        });
        this._pendingTaskRecords.push(newRecord);

        // If the queue has been idle we need to prime the pump
        if (this._pendingTaskRecords.length === 1) {
            this._runTasks();
        }

        return newRecord.promise;
    }

    /**
     * Cancel one pending task, rejecting its promise.
     *
     * @param {Promise} taskPromise - the promise returned by `do()`.
     * @returns {boolean} - true if the task was found, or false otherwise.
     * @memberof TaskQueue
     */
    cancel (taskPromise) {
        const taskIndex = this._pendingTaskRecords.findIndex(r => r.promise === taskPromise);
        if (taskIndex !== -1) {
            const [taskRecord] = this._pendingTaskRecords.splice(taskIndex, 1);
            taskRecord.cancel();
            if (taskIndex === 0 && this._pendingTaskRecords.length > 0) {
                this._runTasks();
            }
            return true;
        }
        return false;
    }

    /**
     * Cancel all pending tasks, rejecting all their promises.
     *
     * @memberof TaskQueue
     */
    cancelAll () {
        if (this._timeout !== null) {
            this._timer.clearTimeout(this._timeout);
            this._timeout = null;
        }
        const oldTasks = this._pendingTaskRecords;
        this._pendingTaskRecords = [];
        oldTasks.forEach(r => r.cancel());
    }

    /**
     * Shorthand for calling _refill() then _spend(cost).
     *
     * @see {@link TaskQueue#_refill}
     * @see {@link TaskQueue#_spend}
     * @param {number} cost - the number of tokens to try to spend.
     * @returns {boolean} true if we had enough tokens; false otherwise.
     * @memberof TaskQueue
     */
    _refillAndSpend (cost) {
        this._refill();
        return this._spend(cost);
    }

    /**
     * Refill the token bucket based on the amount of time since the last refill.
     *
     * @memberof TaskQueue
     */
    _refill () {
        const now = this._timer.timeElapsed();
        const timeSinceRefill = now - this._lastUpdateTime;
        if (timeSinceRefill <= 0) return;

        this._lastUpdateTime = now;
        this._tokenCount += timeSinceRefill * this._refillRate / 1000;
        this._tokenCount = Math.min(this._tokenCount, this._maxTokens);
    }

    /**
     * If we can "afford" the given cost, subtract that many tokens and return true.
     * Otherwise, return false.
     *
     * @param {number} cost - the number of tokens to try to spend.
     * @returns {boolean} true if we had enough tokens; false otherwise.
     * @memberof TaskQueue
     */
    _spend (cost) {
        if (cost <= this._tokenCount) {
            this._tokenCount -= cost;
            return true;
        }
        return false;
    }

    /**
     * Loop until the task queue is empty, running each task and spending tokens to do so.
     * Any time the bucket can't afford the next task, delay asynchronously until it can.
     *
     * @memberof TaskQueue
     */
    _runTasks () {
        if (this._timeout) {
            this._timer.clearTimeout(this._timeout);
            this._timeout = null;
        }
        for (;;) {
            const nextRecord = this._pendingTaskRecords.shift();
            if (!nextRecord) {
                // We ran out of work. Go idle until someone adds another task to the queue.
                return;
            }
            if (nextRecord.cost > this._maxTokens) {
                throw new Error(`Task cost ${nextRecord.cost} is greater than bucket limit ${this._maxTokens}`);
            }
            // Refill before each task in case the time it took for the last task to run was enough to afford the next.
            if (this._refillAndSpend(nextRecord.cost)) {
                nextRecord.wrappedTask();
            } else {
                // We can't currently afford this task. Put it back and wait until we can and try again.
                this._pendingTaskRecords.unshift(nextRecord);
                const tokensNeeded = Math.max(nextRecord.cost - this._tokenCount, 0);
                const estimatedWait = Math.ceil(1000 * tokensNeeded / this._refillRate);
                this._timeout = this._timer.setTimeout(this._runTasks, estimatedWait);
                return;
            }
        }
    }
}

module.exports = TaskQueue;