Spaces:
Build error
Build error
File size: 10,719 Bytes
30c32c8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 |
const log = require('../util/log');
/**
* @typedef {object} DispatchCallMessage - a message to the dispatch system representing a service method call
* @property {*} responseId - send a response message with this response ID. See {@link DispatchResponseMessage}
* @property {string} service - the name of the service to be called
* @property {string} method - the name of the method to be called
* @property {Array|undefined} args - the arguments to be passed to the method
*/
/**
* @typedef {object} DispatchResponseMessage - a message to the dispatch system representing the results of a call
* @property {*} responseId - a copy of the response ID from the call which generated this response
* @property {*|undefined} error - if this is truthy, then it contains results from a failed call (such as an exception)
* @property {*|undefined} result - if error is not truthy, then this contains the return value of the call (if any)
*/
/**
* @typedef {DispatchCallMessage|DispatchResponseMessage} DispatchMessage
* Any message to the dispatch system.
*/
/**
* The SharedDispatch class is responsible for dispatch features shared by
* {@link CentralDispatch} and {@link WorkerDispatch}.
*/
class SharedDispatch {
constructor () {
/**
* List of callback registrations for promises waiting for a response from a call to a service on another
* worker. A callback registration is an array of [resolve,reject] Promise functions.
* Calls to local services don't enter this list.
* @type {Array.<Function[]>}
*/
this.callbacks = [];
/**
* The next response ID to be used.
* @type {int}
*/
this.nextResponseId = 0;
}
/**
* Call a particular method on a particular service, regardless of whether that service is provided locally or on
* a worker. If the service is provided by a worker, the `args` will be copied using the Structured Clone
* algorithm, except for any items which are also in the `transfer` list. Ownership of those items will be
* transferred to the worker, and they should not be used after this call.
* @example
* dispatcher.call('vm', 'setData', 'cat', 42);
* // this finds the worker for the 'vm' service, then on that worker calls:
* vm.setData('cat', 42);
* @param {string} service - the name of the service.
* @param {string} method - the name of the method.
* @param {*} [args] - the arguments to be copied to the method, if any.
* @returns {Promise} - a promise for the return value of the service method.
*/
call (service, method, ...args) {
return this.transferCall(service, method, null, ...args);
}
/**
* Call a particular method on a particular service, regardless of whether that service is provided locally or on
* a worker. If the service is provided by a worker, the `args` will be copied using the Structured Clone
* algorithm, except for any items which are also in the `transfer` list. Ownership of those items will be
* transferred to the worker, and they should not be used after this call.
* @example
* dispatcher.transferCall('vm', 'setData', [myArrayBuffer], 'cat', myArrayBuffer);
* // this finds the worker for the 'vm' service, transfers `myArrayBuffer` to it, then on that worker calls:
* vm.setData('cat', myArrayBuffer);
* @param {string} service - the name of the service.
* @param {string} method - the name of the method.
* @param {Array} [transfer] - objects to be transferred instead of copied. Must be present in `args` to be useful.
* @param {*} [args] - the arguments to be copied to the method, if any.
* @returns {Promise} - a promise for the return value of the service method.
*/
transferCall (service, method, transfer, ...args) {
try {
const {provider, isRemote} = this._getServiceProvider(service);
if (provider) {
if (isRemote) {
return this._remoteTransferCall(provider, service, method, transfer, ...args);
}
const result = provider[method].apply(provider, args);
return Promise.resolve(result);
}
return Promise.reject(new Error(`Service not found: ${service}`));
} catch (e) {
return Promise.reject(e);
}
}
/**
* Check if a particular service lives on another worker.
* @param {string} service - the service to check.
* @returns {boolean} - true if the service is remote (calls must cross a Worker boundary), false otherwise.
* @private
*/
_isRemoteService (service) {
return this._getServiceProvider(service).isRemote;
}
/**
* Like {@link call}, but force the call to be posted through a particular communication channel.
* @param {object} provider - send the call through this object's `postMessage` function.
* @param {string} service - the name of the service.
* @param {string} method - the name of the method.
* @param {*} [args] - the arguments to be copied to the method, if any.
* @returns {Promise} - a promise for the return value of the service method.
*/
_remoteCall (provider, service, method, ...args) {
return this._remoteTransferCall(provider, service, method, null, ...args);
}
/**
* Like {@link transferCall}, but force the call to be posted through a particular communication channel.
* @param {object} provider - send the call through this object's `postMessage` function.
* @param {string} service - the name of the service.
* @param {string} method - the name of the method.
* @param {Array} [transfer] - objects to be transferred instead of copied. Must be present in `args` to be useful.
* @param {*} [args] - the arguments to be copied to the method, if any.
* @returns {Promise} - a promise for the return value of the service method.
*/
_remoteTransferCall (provider, service, method, transfer, ...args) {
return new Promise((resolve, reject) => {
const responseId = this._storeCallbacks(resolve, reject);
/** @TODO: remove this hack! this is just here so we don't try to send `util` to a worker */
// tw: upstream's logic is broken
// Args is actually a 3 length list of [args, util, real block info]
// We only want to send args. The others will throw errors when they try to be cloned
if ((args.length > 0) && (typeof args[args.length - 1].func === 'function')) {
args.pop();
args.pop();
}
if (transfer) {
provider.postMessage({service, method, responseId, args}, transfer);
} else {
provider.postMessage({service, method, responseId, args});
}
});
}
/**
* Store callback functions pending a response message.
* @param {Function} resolve - function to call if the service method returns.
* @param {Function} reject - function to call if the service method throws.
* @returns {*} - a unique response ID for this set of callbacks. See {@link _deliverResponse}.
* @protected
*/
_storeCallbacks (resolve, reject) {
const responseId = this.nextResponseId++;
this.callbacks[responseId] = [resolve, reject];
return responseId;
}
/**
* Deliver call response from a worker. This should only be called as the result of a message from a worker.
* @param {int} responseId - the response ID of the callback set to call.
* @param {DispatchResponseMessage} message - the message containing the response value(s).
* @protected
*/
_deliverResponse (responseId, message) {
try {
const [resolve, reject] = this.callbacks[responseId];
delete this.callbacks[responseId];
if (message.error) {
reject(message.error);
} else {
resolve(message.result);
}
} catch (e) {
log.error(`Dispatch callback failed: ${e}`);
}
}
/**
* Handle a message event received from a connected worker.
* @param {Worker} worker - the worker which sent the message, or the global object if running in a worker.
* @param {MessageEvent} event - the message event to be handled.
* @protected
*/
_onMessage (worker, event) {
/** @type {DispatchMessage} */
const message = event.data;
message.args = message.args || [];
let promise;
if (message.service) {
if (message.service === 'dispatch') {
promise = this._onDispatchMessage(worker, message);
} else {
promise = this.call(message.service, message.method, ...message.args);
}
} else if (typeof message.responseId === 'undefined') {
log.error(`Dispatch caught malformed message from a worker: ${JSON.stringify(event)}`);
} else {
this._deliverResponse(message.responseId, message);
}
if (promise) {
if (typeof message.responseId === 'undefined') {
log.error(`Dispatch message missing required response ID: ${JSON.stringify(event)}`);
} else {
promise.then(
result => worker.postMessage({responseId: message.responseId, result}),
error => worker.postMessage({responseId: message.responseId, error: `${error}`})
);
}
}
}
/**
* Fetch the service provider object for a particular service name.
* @abstract
* @param {string} service - the name of the service to look up
* @returns {{provider:(object|Worker), isRemote:boolean}} - the means to contact the service, if found
* @protected
*/
_getServiceProvider (service) {
throw new Error(`Could not get provider for ${service}: _getServiceProvider not implemented`);
}
/**
* Handle a call message sent to the dispatch service itself
* @abstract
* @param {Worker} worker - the worker which sent the message.
* @param {DispatchCallMessage} message - the message to be handled.
* @returns {Promise|undefined} - a promise for the results of this operation, if appropriate
* @private
*/
_onDispatchMessage (worker, message) {
throw new Error(`Unimplemented dispatch message handler cannot handle ${message.method} method`);
}
}
module.exports = SharedDispatch;
|