Spaces:
Runtime error
Runtime error
| const log = require('../util/log'); | |
| /** | |
| * @typedef {object} DispatchCallMessage - a message to the dispatch system representing a service method call | |
| * @property {*} responseId - send a response message with this response ID. See {@link DispatchResponseMessage} | |
| * @property {string} service - the name of the service to be called | |
| * @property {string} method - the name of the method to be called | |
| * @property {Array|undefined} args - the arguments to be passed to the method | |
| */ | |
| /** | |
| * @typedef {object} DispatchResponseMessage - a message to the dispatch system representing the results of a call | |
| * @property {*} responseId - a copy of the response ID from the call which generated this response | |
| * @property {*|undefined} error - if this is truthy, then it contains results from a failed call (such as an exception) | |
| * @property {*|undefined} result - if error is not truthy, then this contains the return value of the call (if any) | |
| */ | |
| /** | |
| * @typedef {DispatchCallMessage|DispatchResponseMessage} DispatchMessage | |
| * Any message to the dispatch system. | |
| */ | |
| /** | |
| * The SharedDispatch class is responsible for dispatch features shared by | |
| * {@link CentralDispatch} and {@link WorkerDispatch}. | |
| */ | |
| class SharedDispatch { | |
| constructor () { | |
| /** | |
| * List of callback registrations for promises waiting for a response from a call to a service on another | |
| * worker. A callback registration is an array of [resolve,reject] Promise functions. | |
| * Calls to local services don't enter this list. | |
| * @type {Array.<Function[]>} | |
| */ | |
| this.callbacks = []; | |
| /** | |
| * The next response ID to be used. | |
| * @type {int} | |
| */ | |
| this.nextResponseId = 0; | |
| } | |
| /** | |
| * Call a particular method on a particular service, regardless of whether that service is provided locally or on | |
| * a worker. If the service is provided by a worker, the `args` will be copied using the Structured Clone | |
| * algorithm, except for any items which are also in the `transfer` list. Ownership of those items will be | |
| * transferred to the worker, and they should not be used after this call. | |
| * @example | |
| * dispatcher.call('vm', 'setData', 'cat', 42); | |
| * // this finds the worker for the 'vm' service, then on that worker calls: | |
| * vm.setData('cat', 42); | |
| * @param {string} service - the name of the service. | |
| * @param {string} method - the name of the method. | |
| * @param {*} [args] - the arguments to be copied to the method, if any. | |
| * @returns {Promise} - a promise for the return value of the service method. | |
| */ | |
| call (service, method, ...args) { | |
| return this.transferCall(service, method, null, ...args); | |
| } | |
| /** | |
| * Call a particular method on a particular service, regardless of whether that service is provided locally or on | |
| * a worker. If the service is provided by a worker, the `args` will be copied using the Structured Clone | |
| * algorithm, except for any items which are also in the `transfer` list. Ownership of those items will be | |
| * transferred to the worker, and they should not be used after this call. | |
| * @example | |
| * dispatcher.transferCall('vm', 'setData', [myArrayBuffer], 'cat', myArrayBuffer); | |
| * // this finds the worker for the 'vm' service, transfers `myArrayBuffer` to it, then on that worker calls: | |
| * vm.setData('cat', myArrayBuffer); | |
| * @param {string} service - the name of the service. | |
| * @param {string} method - the name of the method. | |
| * @param {Array} [transfer] - objects to be transferred instead of copied. Must be present in `args` to be useful. | |
| * @param {*} [args] - the arguments to be copied to the method, if any. | |
| * @returns {Promise} - a promise for the return value of the service method. | |
| */ | |
| transferCall (service, method, transfer, ...args) { | |
| try { | |
| const {provider, isRemote} = this._getServiceProvider(service); | |
| if (provider) { | |
| if (isRemote) { | |
| return this._remoteTransferCall(provider, service, method, transfer, ...args); | |
| } | |
| const result = provider[method].apply(provider, args); | |
| return Promise.resolve(result); | |
| } | |
| return Promise.reject(new Error(`Service not found: ${service}`)); | |
| } catch (e) { | |
| return Promise.reject(e); | |
| } | |
| } | |
| /** | |
| * Check if a particular service lives on another worker. | |
| * @param {string} service - the service to check. | |
| * @returns {boolean} - true if the service is remote (calls must cross a Worker boundary), false otherwise. | |
| * @private | |
| */ | |
| _isRemoteService (service) { | |
| return this._getServiceProvider(service).isRemote; | |
| } | |
| /** | |
| * Like {@link call}, but force the call to be posted through a particular communication channel. | |
| * @param {object} provider - send the call through this object's `postMessage` function. | |
| * @param {string} service - the name of the service. | |
| * @param {string} method - the name of the method. | |
| * @param {*} [args] - the arguments to be copied to the method, if any. | |
| * @returns {Promise} - a promise for the return value of the service method. | |
| */ | |
| _remoteCall (provider, service, method, ...args) { | |
| return this._remoteTransferCall(provider, service, method, null, ...args); | |
| } | |
| /** | |
| * Like {@link transferCall}, but force the call to be posted through a particular communication channel. | |
| * @param {object} provider - send the call through this object's `postMessage` function. | |
| * @param {string} service - the name of the service. | |
| * @param {string} method - the name of the method. | |
| * @param {Array} [transfer] - objects to be transferred instead of copied. Must be present in `args` to be useful. | |
| * @param {*} [args] - the arguments to be copied to the method, if any. | |
| * @returns {Promise} - a promise for the return value of the service method. | |
| */ | |
| _remoteTransferCall (provider, service, method, transfer, ...args) { | |
| return new Promise((resolve, reject) => { | |
| const responseId = this._storeCallbacks(resolve, reject); | |
| /** @TODO: remove this hack! this is just here so we don't try to send `util` to a worker */ | |
| // tw: upstream's logic is broken | |
| // Args is actually a 3 length list of [args, util, real block info] | |
| // We only want to send args. The others will throw errors when they try to be cloned | |
| if ((args.length > 0) && (typeof args[args.length - 1].func === 'function')) { | |
| args.pop(); | |
| args.pop(); | |
| } | |
| if (transfer) { | |
| provider.postMessage({service, method, responseId, args}, transfer); | |
| } else { | |
| provider.postMessage({service, method, responseId, args}); | |
| } | |
| }); | |
| } | |
| /** | |
| * Store callback functions pending a response message. | |
| * @param {Function} resolve - function to call if the service method returns. | |
| * @param {Function} reject - function to call if the service method throws. | |
| * @returns {*} - a unique response ID for this set of callbacks. See {@link _deliverResponse}. | |
| * @protected | |
| */ | |
| _storeCallbacks (resolve, reject) { | |
| const responseId = this.nextResponseId++; | |
| this.callbacks[responseId] = [resolve, reject]; | |
| return responseId; | |
| } | |
| /** | |
| * Deliver call response from a worker. This should only be called as the result of a message from a worker. | |
| * @param {int} responseId - the response ID of the callback set to call. | |
| * @param {DispatchResponseMessage} message - the message containing the response value(s). | |
| * @protected | |
| */ | |
| _deliverResponse (responseId, message) { | |
| try { | |
| const [resolve, reject] = this.callbacks[responseId]; | |
| delete this.callbacks[responseId]; | |
| if (message.error) { | |
| reject(message.error); | |
| } else { | |
| resolve(message.result); | |
| } | |
| } catch (e) { | |
| log.error(`Dispatch callback failed: ${e}`); | |
| } | |
| } | |
| /** | |
| * Handle a message event received from a connected worker. | |
| * @param {Worker} worker - the worker which sent the message, or the global object if running in a worker. | |
| * @param {MessageEvent} event - the message event to be handled. | |
| * @protected | |
| */ | |
| _onMessage (worker, event) { | |
| /** @type {DispatchMessage} */ | |
| const message = event.data; | |
| message.args = message.args || []; | |
| let promise; | |
| if (message.service) { | |
| if (message.service === 'dispatch') { | |
| promise = this._onDispatchMessage(worker, message); | |
| } else { | |
| promise = this.call(message.service, message.method, ...message.args); | |
| } | |
| } else if (typeof message.responseId === 'undefined') { | |
| log.error(`Dispatch caught malformed message from a worker: ${JSON.stringify(event)}`); | |
| } else { | |
| this._deliverResponse(message.responseId, message); | |
| } | |
| if (promise) { | |
| if (typeof message.responseId === 'undefined') { | |
| log.error(`Dispatch message missing required response ID: ${JSON.stringify(event)}`); | |
| } else { | |
| promise.then( | |
| result => worker.postMessage({responseId: message.responseId, result}), | |
| error => worker.postMessage({responseId: message.responseId, error: `${error}`}) | |
| ); | |
| } | |
| } | |
| } | |
| /** | |
| * Fetch the service provider object for a particular service name. | |
| * @abstract | |
| * @param {string} service - the name of the service to look up | |
| * @returns {{provider:(object|Worker), isRemote:boolean}} - the means to contact the service, if found | |
| * @protected | |
| */ | |
| _getServiceProvider (service) { | |
| throw new Error(`Could not get provider for ${service}: _getServiceProvider not implemented`); | |
| } | |
| /** | |
| * Handle a call message sent to the dispatch service itself | |
| * @abstract | |
| * @param {Worker} worker - the worker which sent the message. | |
| * @param {DispatchCallMessage} message - the message to be handled. | |
| * @returns {Promise|undefined} - a promise for the results of this operation, if appropriate | |
| * @private | |
| */ | |
| _onDispatchMessage (worker, message) { | |
| throw new Error(`Unimplemented dispatch message handler cannot handle ${message.method} method`); | |
| } | |
| } | |
| module.exports = SharedDispatch; | |