Spaces:
Sleeping
Sleeping
File size: 5,587 Bytes
f9f0fec |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
"use strict";
var __importDefault = (this && this.__importDefault) || function (mod) {
return (mod && mod.__esModule) ? mod : { "default": mod };
};
Object.defineProperty(exports, "__esModule", { value: true });
exports.serveFile = exports.restoreAdapter = exports.patchAdapter = void 0;
const socket_io_adapter_1 = require("socket.io-adapter");
const fs_1 = require("fs");
const debug_1 = __importDefault(require("debug"));
const debug = (0, debug_1.default)("socket.io:adapter-uws");
const SEPARATOR = "\x1f"; // see https://en.wikipedia.org/wiki/Delimiter#ASCII_delimited_text
const { addAll, del, broadcast } = socket_io_adapter_1.Adapter.prototype;
function patchAdapter(app /* : TemplatedApp */) {
socket_io_adapter_1.Adapter.prototype.addAll = function (id, rooms) {
const isNew = !this.sids.has(id);
addAll.call(this, id, rooms);
const socket = this.nsp.sockets.get(id);
if (!socket) {
return;
}
if (socket.conn.transport.name === "websocket") {
subscribe(this.nsp.name, socket, isNew, rooms);
return;
}
if (isNew) {
socket.conn.on("upgrade", () => {
const rooms = this.sids.get(id);
if (rooms) {
subscribe(this.nsp.name, socket, isNew, rooms);
}
});
}
};
socket_io_adapter_1.Adapter.prototype.del = function (id, room) {
del.call(this, id, room);
const socket = this.nsp.sockets.get(id);
if (socket && socket.conn.transport.name === "websocket") {
// @ts-ignore
const sessionId = socket.conn.id;
// @ts-ignore
const websocket = socket.conn.transport.socket;
const topic = `${this.nsp.name}${SEPARATOR}${room}`;
debug("unsubscribe connection %s from topic %s", sessionId, topic);
websocket.unsubscribe(topic);
}
};
socket_io_adapter_1.Adapter.prototype.broadcast = function (packet, opts) {
const useFastPublish = opts.rooms.size <= 1 && opts.except.size === 0;
if (!useFastPublish) {
broadcast.call(this, packet, opts);
return;
}
const flags = opts.flags || {};
const basePacketOpts = {
preEncoded: true,
volatile: flags.volatile,
compress: flags.compress,
};
packet.nsp = this.nsp.name;
const encodedPackets = this.encoder.encode(packet);
const topic = opts.rooms.size === 0
? this.nsp.name
: `${this.nsp.name}${SEPARATOR}${opts.rooms.keys().next().value}`;
debug("fast publish to %s", topic);
// fast publish for clients connected with WebSocket
encodedPackets.forEach((encodedPacket) => {
const isBinary = typeof encodedPacket !== "string";
// "4" being the message type in the Engine.IO protocol, see https://github.com/socketio/engine.io-protocol
app.publish(topic, isBinary ? encodedPacket : "4" + encodedPacket, isBinary);
});
this.apply(opts, (socket) => {
if (socket.conn.transport.name !== "websocket") {
// classic publish for clients connected with HTTP long-polling
socket.client.writeToEngine(encodedPackets, basePacketOpts);
}
});
};
}
exports.patchAdapter = patchAdapter;
function subscribe(namespaceName, socket, isNew, rooms) {
// @ts-ignore
const sessionId = socket.conn.id;
// @ts-ignore
const websocket = socket.conn.transport.socket;
if (isNew) {
debug("subscribe connection %s to topic %s", sessionId, namespaceName);
websocket.subscribe(namespaceName);
}
rooms.forEach((room) => {
const topic = `${namespaceName}${SEPARATOR}${room}`; // '#' can be used as wildcard
debug("subscribe connection %s to topic %s", sessionId, topic);
websocket.subscribe(topic);
});
}
function restoreAdapter() {
socket_io_adapter_1.Adapter.prototype.addAll = addAll;
socket_io_adapter_1.Adapter.prototype.del = del;
socket_io_adapter_1.Adapter.prototype.broadcast = broadcast;
}
exports.restoreAdapter = restoreAdapter;
const toArrayBuffer = (buffer) => {
const { buffer: arrayBuffer, byteOffset, byteLength } = buffer;
return arrayBuffer.slice(byteOffset, byteOffset + byteLength);
};
// imported from https://github.com/kolodziejczak-sz/uwebsocket-serve
function serveFile(res /* : HttpResponse */, filepath) {
const { size } = (0, fs_1.statSync)(filepath);
const readStream = (0, fs_1.createReadStream)(filepath);
const destroyReadStream = () => !readStream.destroyed && readStream.destroy();
const onError = (error) => {
destroyReadStream();
throw error;
};
const onDataChunk = (chunk) => {
const arrayBufferChunk = toArrayBuffer(chunk);
const lastOffset = res.getWriteOffset();
const [ok, done] = res.tryEnd(arrayBufferChunk, size);
if (!done && !ok) {
readStream.pause();
res.onWritable((offset) => {
const [ok, done] = res.tryEnd(arrayBufferChunk.slice(offset - lastOffset), size);
if (!done && ok) {
readStream.resume();
}
return ok;
});
}
};
res.onAborted(destroyReadStream);
readStream
.on("data", onDataChunk)
.on("error", onError)
.on("end", destroyReadStream);
}
exports.serveFile = serveFile;
|