Spaces:
Build error
Build error
const log = require('../util/log'); | |
/** | |
* @typedef {object} DispatchCallMessage - a message to the dispatch system representing a service method call | |
* @property {*} responseId - send a response message with this response ID. See {@link DispatchResponseMessage} | |
* @property {string} service - the name of the service to be called | |
* @property {string} method - the name of the method to be called | |
* @property {Array|undefined} args - the arguments to be passed to the method | |
*/ | |
/** | |
* @typedef {object} DispatchResponseMessage - a message to the dispatch system representing the results of a call | |
* @property {*} responseId - a copy of the response ID from the call which generated this response | |
* @property {*|undefined} error - if this is truthy, then it contains results from a failed call (such as an exception) | |
* @property {*|undefined} result - if error is not truthy, then this contains the return value of the call (if any) | |
*/ | |
/** | |
* @typedef {DispatchCallMessage|DispatchResponseMessage} DispatchMessage | |
* Any message to the dispatch system. | |
*/ | |
/** | |
* The SharedDispatch class is responsible for dispatch features shared by | |
* {@link CentralDispatch} and {@link WorkerDispatch}. | |
*/ | |
class SharedDispatch { | |
constructor () { | |
/** | |
* List of callback registrations for promises waiting for a response from a call to a service on another | |
* worker. A callback registration is an array of [resolve,reject] Promise functions. | |
* Calls to local services don't enter this list. | |
* @type {Array.<Function[]>} | |
*/ | |
this.callbacks = []; | |
/** | |
* The next response ID to be used. | |
* @type {int} | |
*/ | |
this.nextResponseId = 0; | |
} | |
/** | |
* Call a particular method on a particular service, regardless of whether that service is provided locally or on | |
* a worker. If the service is provided by a worker, the `args` will be copied using the Structured Clone | |
* algorithm, except for any items which are also in the `transfer` list. Ownership of those items will be | |
* transferred to the worker, and they should not be used after this call. | |
* @example | |
* dispatcher.call('vm', 'setData', 'cat', 42); | |
* // this finds the worker for the 'vm' service, then on that worker calls: | |
* vm.setData('cat', 42); | |
* @param {string} service - the name of the service. | |
* @param {string} method - the name of the method. | |
* @param {*} [args] - the arguments to be copied to the method, if any. | |
* @returns {Promise} - a promise for the return value of the service method. | |
*/ | |
call (service, method, ...args) { | |
return this.transferCall(service, method, null, ...args); | |
} | |
/** | |
* Call a particular method on a particular service, regardless of whether that service is provided locally or on | |
* a worker. If the service is provided by a worker, the `args` will be copied using the Structured Clone | |
* algorithm, except for any items which are also in the `transfer` list. Ownership of those items will be | |
* transferred to the worker, and they should not be used after this call. | |
* @example | |
* dispatcher.transferCall('vm', 'setData', [myArrayBuffer], 'cat', myArrayBuffer); | |
* // this finds the worker for the 'vm' service, transfers `myArrayBuffer` to it, then on that worker calls: | |
* vm.setData('cat', myArrayBuffer); | |
* @param {string} service - the name of the service. | |
* @param {string} method - the name of the method. | |
* @param {Array} [transfer] - objects to be transferred instead of copied. Must be present in `args` to be useful. | |
* @param {*} [args] - the arguments to be copied to the method, if any. | |
* @returns {Promise} - a promise for the return value of the service method. | |
*/ | |
transferCall (service, method, transfer, ...args) { | |
try { | |
const {provider, isRemote} = this._getServiceProvider(service); | |
if (provider) { | |
if (isRemote) { | |
return this._remoteTransferCall(provider, service, method, transfer, ...args); | |
} | |
const result = provider[method].apply(provider, args); | |
return Promise.resolve(result); | |
} | |
return Promise.reject(new Error(`Service not found: ${service}`)); | |
} catch (e) { | |
return Promise.reject(e); | |
} | |
} | |
/** | |
* Check if a particular service lives on another worker. | |
* @param {string} service - the service to check. | |
* @returns {boolean} - true if the service is remote (calls must cross a Worker boundary), false otherwise. | |
* @private | |
*/ | |
_isRemoteService (service) { | |
return this._getServiceProvider(service).isRemote; | |
} | |
/** | |
* Like {@link call}, but force the call to be posted through a particular communication channel. | |
* @param {object} provider - send the call through this object's `postMessage` function. | |
* @param {string} service - the name of the service. | |
* @param {string} method - the name of the method. | |
* @param {*} [args] - the arguments to be copied to the method, if any. | |
* @returns {Promise} - a promise for the return value of the service method. | |
*/ | |
_remoteCall (provider, service, method, ...args) { | |
return this._remoteTransferCall(provider, service, method, null, ...args); | |
} | |
/** | |
* Like {@link transferCall}, but force the call to be posted through a particular communication channel. | |
* @param {object} provider - send the call through this object's `postMessage` function. | |
* @param {string} service - the name of the service. | |
* @param {string} method - the name of the method. | |
* @param {Array} [transfer] - objects to be transferred instead of copied. Must be present in `args` to be useful. | |
* @param {*} [args] - the arguments to be copied to the method, if any. | |
* @returns {Promise} - a promise for the return value of the service method. | |
*/ | |
_remoteTransferCall (provider, service, method, transfer, ...args) { | |
return new Promise((resolve, reject) => { | |
const responseId = this._storeCallbacks(resolve, reject); | |
/** @TODO: remove this hack! this is just here so we don't try to send `util` to a worker */ | |
// tw: upstream's logic is broken | |
// Args is actually a 3 length list of [args, util, real block info] | |
// We only want to send args. The others will throw errors when they try to be cloned | |
if ((args.length > 0) && (typeof args[args.length - 1].func === 'function')) { | |
args.pop(); | |
args.pop(); | |
} | |
if (transfer) { | |
provider.postMessage({service, method, responseId, args}, transfer); | |
} else { | |
provider.postMessage({service, method, responseId, args}); | |
} | |
}); | |
} | |
/** | |
* Store callback functions pending a response message. | |
* @param {Function} resolve - function to call if the service method returns. | |
* @param {Function} reject - function to call if the service method throws. | |
* @returns {*} - a unique response ID for this set of callbacks. See {@link _deliverResponse}. | |
* @protected | |
*/ | |
_storeCallbacks (resolve, reject) { | |
const responseId = this.nextResponseId++; | |
this.callbacks[responseId] = [resolve, reject]; | |
return responseId; | |
} | |
/** | |
* Deliver call response from a worker. This should only be called as the result of a message from a worker. | |
* @param {int} responseId - the response ID of the callback set to call. | |
* @param {DispatchResponseMessage} message - the message containing the response value(s). | |
* @protected | |
*/ | |
_deliverResponse (responseId, message) { | |
try { | |
const [resolve, reject] = this.callbacks[responseId]; | |
delete this.callbacks[responseId]; | |
if (message.error) { | |
reject(message.error); | |
} else { | |
resolve(message.result); | |
} | |
} catch (e) { | |
log.error(`Dispatch callback failed: ${e}`); | |
} | |
} | |
/** | |
* Handle a message event received from a connected worker. | |
* @param {Worker} worker - the worker which sent the message, or the global object if running in a worker. | |
* @param {MessageEvent} event - the message event to be handled. | |
* @protected | |
*/ | |
_onMessage (worker, event) { | |
/** @type {DispatchMessage} */ | |
const message = event.data; | |
message.args = message.args || []; | |
let promise; | |
if (message.service) { | |
if (message.service === 'dispatch') { | |
promise = this._onDispatchMessage(worker, message); | |
} else { | |
promise = this.call(message.service, message.method, ...message.args); | |
} | |
} else if (typeof message.responseId === 'undefined') { | |
log.error(`Dispatch caught malformed message from a worker: ${JSON.stringify(event)}`); | |
} else { | |
this._deliverResponse(message.responseId, message); | |
} | |
if (promise) { | |
if (typeof message.responseId === 'undefined') { | |
log.error(`Dispatch message missing required response ID: ${JSON.stringify(event)}`); | |
} else { | |
promise.then( | |
result => worker.postMessage({responseId: message.responseId, result}), | |
error => worker.postMessage({responseId: message.responseId, error: `${error}`}) | |
); | |
} | |
} | |
} | |
/** | |
* Fetch the service provider object for a particular service name. | |
* @abstract | |
* @param {string} service - the name of the service to look up | |
* @returns {{provider:(object|Worker), isRemote:boolean}} - the means to contact the service, if found | |
* @protected | |
*/ | |
_getServiceProvider (service) { | |
throw new Error(`Could not get provider for ${service}: _getServiceProvider not implemented`); | |
} | |
/** | |
* Handle a call message sent to the dispatch service itself | |
* @abstract | |
* @param {Worker} worker - the worker which sent the message. | |
* @param {DispatchCallMessage} message - the message to be handled. | |
* @returns {Promise|undefined} - a promise for the results of this operation, if appropriate | |
* @private | |
*/ | |
_onDispatchMessage (worker, message) { | |
throw new Error(`Unimplemented dispatch message handler cannot handle ${message.method} method`); | |
} | |
} | |
module.exports = SharedDispatch; | |