Spaces:
Running
Running
| /* eslint no-unused-vars: ["error", { "varsIgnorePattern": "^WebSocket$" }] */ | |
| ; | |
| const WebSocket = require('./websocket'); | |
| const { Duplex } = require('stream'); | |
| /** | |
| * Emits the `'close'` event on a stream. | |
| * | |
| * @param {Duplex} stream The stream. | |
| * @private | |
| */ | |
| function emitClose(stream) { | |
| stream.emit('close'); | |
| } | |
| /** | |
| * The listener of the `'end'` event. | |
| * | |
| * @private | |
| */ | |
| function duplexOnEnd() { | |
| if (!this.destroyed && this._writableState.finished) { | |
| this.destroy(); | |
| } | |
| } | |
| /** | |
| * The listener of the `'error'` event. | |
| * | |
| * @param {Error} err The error | |
| * @private | |
| */ | |
| function duplexOnError(err) { | |
| this.removeListener('error', duplexOnError); | |
| this.destroy(); | |
| if (this.listenerCount('error') === 0) { | |
| // Do not suppress the throwing behavior. | |
| this.emit('error', err); | |
| } | |
| } | |
| /** | |
| * Wraps a `WebSocket` in a duplex stream. | |
| * | |
| * @param {WebSocket} ws The `WebSocket` to wrap | |
| * @param {Object} [options] The options for the `Duplex` constructor | |
| * @return {Duplex} The duplex stream | |
| * @public | |
| */ | |
| function createWebSocketStream(ws, options) { | |
| let terminateOnDestroy = true; | |
| const duplex = new Duplex({ | |
| ...options, | |
| autoDestroy: false, | |
| emitClose: false, | |
| objectMode: false, | |
| writableObjectMode: false | |
| }); | |
| ws.on('message', function message(msg, isBinary) { | |
| const data = | |
| !isBinary && duplex._readableState.objectMode ? msg.toString() : msg; | |
| if (!duplex.push(data)) ws.pause(); | |
| }); | |
| ws.once('error', function error(err) { | |
| if (duplex.destroyed) return; | |
| // Prevent `ws.terminate()` from being called by `duplex._destroy()`. | |
| // | |
| // - If the `'error'` event is emitted before the `'open'` event, then | |
| // `ws.terminate()` is a noop as no socket is assigned. | |
| // - Otherwise, the error is re-emitted by the listener of the `'error'` | |
| // event of the `Receiver` object. The listener already closes the | |
| // connection by calling `ws.close()`. This allows a close frame to be | |
| // sent to the other peer. If `ws.terminate()` is called right after this, | |
| // then the close frame might not be sent. | |
| terminateOnDestroy = false; | |
| duplex.destroy(err); | |
| }); | |
| ws.once('close', function close() { | |
| if (duplex.destroyed) return; | |
| duplex.push(null); | |
| }); | |
| duplex._destroy = function (err, callback) { | |
| if (ws.readyState === ws.CLOSED) { | |
| callback(err); | |
| process.nextTick(emitClose, duplex); | |
| return; | |
| } | |
| let called = false; | |
| ws.once('error', function error(err) { | |
| called = true; | |
| callback(err); | |
| }); | |
| ws.once('close', function close() { | |
| if (!called) callback(err); | |
| process.nextTick(emitClose, duplex); | |
| }); | |
| if (terminateOnDestroy) ws.terminate(); | |
| }; | |
| duplex._final = function (callback) { | |
| if (ws.readyState === ws.CONNECTING) { | |
| ws.once('open', function open() { | |
| duplex._final(callback); | |
| }); | |
| return; | |
| } | |
| // If the value of the `_socket` property is `null` it means that `ws` is a | |
| // client websocket and the handshake failed. In fact, when this happens, a | |
| // socket is never assigned to the websocket. Wait for the `'error'` event | |
| // that will be emitted by the websocket. | |
| if (ws._socket === null) return; | |
| if (ws._socket._writableState.finished) { | |
| callback(); | |
| if (duplex._readableState.endEmitted) duplex.destroy(); | |
| } else { | |
| ws._socket.once('finish', function finish() { | |
| // `duplex` is not destroyed here because the `'end'` event will be | |
| // emitted on `duplex` after this `'finish'` event. The EOF signaling | |
| // `null` chunk is, in fact, pushed when the websocket emits `'close'`. | |
| callback(); | |
| }); | |
| ws.close(); | |
| } | |
| }; | |
| duplex._read = function () { | |
| if (ws.isPaused) ws.resume(); | |
| }; | |
| duplex._write = function (chunk, encoding, callback) { | |
| if (ws.readyState === ws.CONNECTING) { | |
| ws.once('open', function open() { | |
| duplex._write(chunk, encoding, callback); | |
| }); | |
| return; | |
| } | |
| ws.send(chunk, callback); | |
| }; | |
| duplex.on('end', duplexOnEnd); | |
| duplex.on('error', duplexOnError); | |
| return duplex; | |
| } | |
| module.exports = createWebSocketStream; | |