Spaces:
Runtime error
Runtime error
| const { | |
| Readable, | |
| Duplex, | |
| PassThrough | |
| } = require('stream') | |
| const { | |
| InvalidArgumentError, | |
| InvalidReturnValueError, | |
| RequestAbortedError | |
| } = require('../core/errors') | |
| const util = require('../core/util') | |
| const { AsyncResource } = require('async_hooks') | |
| const { addSignal, removeSignal } = require('./abort-signal') | |
| const assert = require('assert') | |
| const kResume = Symbol('resume') | |
| class PipelineRequest extends Readable { | |
| constructor () { | |
| super({ autoDestroy: true }) | |
| this[kResume] = null | |
| } | |
| _read () { | |
| const { [kResume]: resume } = this | |
| if (resume) { | |
| this[kResume] = null | |
| resume() | |
| } | |
| } | |
| _destroy (err, callback) { | |
| this._read() | |
| callback(err) | |
| } | |
| } | |
| class PipelineResponse extends Readable { | |
| constructor (resume) { | |
| super({ autoDestroy: true }) | |
| this[kResume] = resume | |
| } | |
| _read () { | |
| this[kResume]() | |
| } | |
| _destroy (err, callback) { | |
| if (!err && !this._readableState.endEmitted) { | |
| err = new RequestAbortedError() | |
| } | |
| callback(err) | |
| } | |
| } | |
| class PipelineHandler extends AsyncResource { | |
| constructor (opts, handler) { | |
| if (!opts || typeof opts !== 'object') { | |
| throw new InvalidArgumentError('invalid opts') | |
| } | |
| if (typeof handler !== 'function') { | |
| throw new InvalidArgumentError('invalid handler') | |
| } | |
| const { signal, method, opaque, onInfo, responseHeaders } = opts | |
| if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') { | |
| throw new InvalidArgumentError('signal must be an EventEmitter or EventTarget') | |
| } | |
| if (method === 'CONNECT') { | |
| throw new InvalidArgumentError('invalid method') | |
| } | |
| if (onInfo && typeof onInfo !== 'function') { | |
| throw new InvalidArgumentError('invalid onInfo callback') | |
| } | |
| super('UNDICI_PIPELINE') | |
| this.opaque = opaque || null | |
| this.responseHeaders = responseHeaders || null | |
| this.handler = handler | |
| this.abort = null | |
| this.context = null | |
| this.onInfo = onInfo || null | |
| this.req = new PipelineRequest().on('error', util.nop) | |
| this.ret = new Duplex({ | |
| readableObjectMode: opts.objectMode, | |
| autoDestroy: true, | |
| read: () => { | |
| const { body } = this | |
| if (body && body.resume) { | |
| body.resume() | |
| } | |
| }, | |
| write: (chunk, encoding, callback) => { | |
| const { req } = this | |
| if (req.push(chunk, encoding) || req._readableState.destroyed) { | |
| callback() | |
| } else { | |
| req[kResume] = callback | |
| } | |
| }, | |
| destroy: (err, callback) => { | |
| const { body, req, res, ret, abort } = this | |
| if (!err && !ret._readableState.endEmitted) { | |
| err = new RequestAbortedError() | |
| } | |
| if (abort && err) { | |
| abort() | |
| } | |
| util.destroy(body, err) | |
| util.destroy(req, err) | |
| util.destroy(res, err) | |
| removeSignal(this) | |
| callback(err) | |
| } | |
| }).on('prefinish', () => { | |
| const { req } = this | |
| // Node < 15 does not call _final in same tick. | |
| req.push(null) | |
| }) | |
| this.res = null | |
| addSignal(this, signal) | |
| } | |
| onConnect (abort, context) { | |
| const { ret, res } = this | |
| assert(!res, 'pipeline cannot be retried') | |
| if (ret.destroyed) { | |
| throw new RequestAbortedError() | |
| } | |
| this.abort = abort | |
| this.context = context | |
| } | |
| onHeaders (statusCode, rawHeaders, resume) { | |
| const { opaque, handler, context } = this | |
| if (statusCode < 200) { | |
| if (this.onInfo) { | |
| const headers = this.responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders) | |
| this.onInfo({ statusCode, headers }) | |
| } | |
| return | |
| } | |
| this.res = new PipelineResponse(resume) | |
| let body | |
| try { | |
| this.handler = null | |
| const headers = this.responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders) | |
| body = this.runInAsyncScope(handler, null, { | |
| statusCode, | |
| headers, | |
| opaque, | |
| body: this.res, | |
| context | |
| }) | |
| } catch (err) { | |
| this.res.on('error', util.nop) | |
| throw err | |
| } | |
| if (!body || typeof body.on !== 'function') { | |
| throw new InvalidReturnValueError('expected Readable') | |
| } | |
| body | |
| .on('data', (chunk) => { | |
| const { ret, body } = this | |
| if (!ret.push(chunk) && body.pause) { | |
| body.pause() | |
| } | |
| }) | |
| .on('error', (err) => { | |
| const { ret } = this | |
| util.destroy(ret, err) | |
| }) | |
| .on('end', () => { | |
| const { ret } = this | |
| ret.push(null) | |
| }) | |
| .on('close', () => { | |
| const { ret } = this | |
| if (!ret._readableState.ended) { | |
| util.destroy(ret, new RequestAbortedError()) | |
| } | |
| }) | |
| this.body = body | |
| } | |
| onData (chunk) { | |
| const { res } = this | |
| return res.push(chunk) | |
| } | |
| onComplete (trailers) { | |
| const { res } = this | |
| res.push(null) | |
| } | |
| onError (err) { | |
| const { ret } = this | |
| this.handler = null | |
| util.destroy(ret, err) | |
| } | |
| } | |
| function pipeline (opts, handler) { | |
| try { | |
| const pipelineHandler = new PipelineHandler(opts, handler) | |
| this.dispatch({ ...opts, body: pipelineHandler.req }, pipelineHandler) | |
| return pipelineHandler.ret | |
| } catch (err) { | |
| return new PassThrough().destroy(err) | |
| } | |
| } | |
| module.exports = pipeline | |