Spaces:
Running
Running
| ; | |
| const { Duplex } = require('stream'); | |
| /** | |
| * Emits the `'close'` event on a stream. | |
| * | |
| * @param {stream.Duplex} The stream. | |
| * @private | |
| */ | |
| function emitClose(stream) { | |
| stream.emit('close'); | |
| } | |
| /** | |
| * The listener of the `'end'` event. | |
| * | |
| * @private | |
| */ | |
| function duplexOnEnd() { | |
| if (!this.destroyed && this._writableState.finished) { | |
| this.destroy(); | |
| } | |
| } | |
| /** | |
| * The listener of the `'error'` event. | |
| * | |
| * @private | |
| */ | |
| function duplexOnError(err) { | |
| this.removeListener('error', duplexOnError); | |
| this.destroy(); | |
| if (this.listenerCount('error') === 0) { | |
| // Do not suppress the throwing behavior. | |
| this.emit('error', err); | |
| } | |
| } | |
| /** | |
| * Wraps a `WebSocket` in a duplex stream. | |
| * | |
| * @param {WebSocket} ws The `WebSocket` to wrap | |
| * @param {Object} options The options for the `Duplex` constructor | |
| * @return {stream.Duplex} The duplex stream | |
| * @public | |
| */ | |
| function createWebSocketStream(ws, options) { | |
| let resumeOnReceiverDrain = true; | |
| function receiverOnDrain() { | |
| if (resumeOnReceiverDrain) ws._socket.resume(); | |
| } | |
| if (ws.readyState === ws.CONNECTING) { | |
| ws.once('open', function open() { | |
| ws._receiver.removeAllListeners('drain'); | |
| ws._receiver.on('drain', receiverOnDrain); | |
| }); | |
| } else { | |
| ws._receiver.removeAllListeners('drain'); | |
| ws._receiver.on('drain', receiverOnDrain); | |
| } | |
| const duplex = new Duplex({ | |
| ...options, | |
| autoDestroy: false, | |
| emitClose: false, | |
| objectMode: false, | |
| readableObjectMode: false, | |
| writableObjectMode: false | |
| }); | |
| ws.on('message', function message(msg) { | |
| if (!duplex.push(msg)) { | |
| resumeOnReceiverDrain = false; | |
| ws._socket.pause(); | |
| } | |
| }); | |
| ws.once('error', function error(err) { | |
| duplex.destroy(err); | |
| }); | |
| ws.once('close', function close() { | |
| if (duplex.destroyed) return; | |
| duplex.push(null); | |
| }); | |
| duplex._destroy = function(err, callback) { | |
| if (ws.readyState === ws.CLOSED) { | |
| callback(err); | |
| process.nextTick(emitClose, duplex); | |
| return; | |
| } | |
| ws.once('close', function close() { | |
| callback(err); | |
| process.nextTick(emitClose, duplex); | |
| }); | |
| ws.terminate(); | |
| }; | |
| duplex._final = function(callback) { | |
| if (ws.readyState === ws.CONNECTING) { | |
| ws.once('open', function open() { | |
| duplex._final(callback); | |
| }); | |
| return; | |
| } | |
| if (ws._socket._writableState.finished) { | |
| if (duplex._readableState.endEmitted) duplex.destroy(); | |
| callback(); | |
| } else { | |
| ws._socket.once('finish', function finish() { | |
| // `duplex` is not destroyed here because the `'end'` event will be | |
| // emitted on `duplex` after this `'finish'` event. The EOF signaling | |
| // `null` chunk is, in fact, pushed when the WebSocket emits `'close'`. | |
| callback(); | |
| }); | |
| ws.close(); | |
| } | |
| }; | |
| duplex._read = function() { | |
| if (ws.readyState === ws.OPEN && !resumeOnReceiverDrain) { | |
| resumeOnReceiverDrain = true; | |
| if (!ws._receiver._writableState.needDrain) ws._socket.resume(); | |
| } | |
| }; | |
| duplex._write = function(chunk, encoding, callback) { | |
| if (ws.readyState === ws.CONNECTING) { | |
| ws.once('open', function open() { | |
| duplex._write(chunk, encoding, callback); | |
| }); | |
| return; | |
| } | |
| ws.send(chunk, callback); | |
| }; | |
| duplex.on('end', duplexOnEnd); | |
| duplex.on('error', duplexOnError); | |
| return duplex; | |
| } | |
| module.exports = createWebSocketStream; | |