Spaces:
Sleeping
Sleeping
; | |
var _a; | |
Object.defineProperty(exports, "__esModule", { value: true }); | |
exports.SessionAwareAdapter = exports.Adapter = void 0; | |
const events_1 = require("events"); | |
const yeast_1 = require("./contrib/yeast"); | |
const WebSocket = require("ws"); | |
const canPreComputeFrame = typeof ((_a = WebSocket === null || WebSocket === void 0 ? void 0 : WebSocket.Sender) === null || _a === void 0 ? void 0 : _a.frame) === "function"; | |
class Adapter extends events_1.EventEmitter { | |
/** | |
* In-memory adapter constructor. | |
* | |
* @param {Namespace} nsp | |
*/ | |
constructor(nsp) { | |
super(); | |
this.nsp = nsp; | |
this.rooms = new Map(); | |
this.sids = new Map(); | |
this.encoder = nsp.server.encoder; | |
} | |
/** | |
* To be overridden | |
*/ | |
init() { } | |
/** | |
* To be overridden | |
*/ | |
close() { } | |
/** | |
* Returns the number of Socket.IO servers in the cluster | |
* | |
* @public | |
*/ | |
serverCount() { | |
return Promise.resolve(1); | |
} | |
/** | |
* Adds a socket to a list of room. | |
* | |
* @param {SocketId} id the socket id | |
* @param {Set<Room>} rooms a set of rooms | |
* @public | |
*/ | |
addAll(id, rooms) { | |
if (!this.sids.has(id)) { | |
this.sids.set(id, new Set()); | |
} | |
for (const room of rooms) { | |
this.sids.get(id).add(room); | |
if (!this.rooms.has(room)) { | |
this.rooms.set(room, new Set()); | |
this.emit("create-room", room); | |
} | |
if (!this.rooms.get(room).has(id)) { | |
this.rooms.get(room).add(id); | |
this.emit("join-room", room, id); | |
} | |
} | |
} | |
/** | |
* Removes a socket from a room. | |
* | |
* @param {SocketId} id the socket id | |
* @param {Room} room the room name | |
*/ | |
del(id, room) { | |
if (this.sids.has(id)) { | |
this.sids.get(id).delete(room); | |
} | |
this._del(room, id); | |
} | |
_del(room, id) { | |
const _room = this.rooms.get(room); | |
if (_room != null) { | |
const deleted = _room.delete(id); | |
if (deleted) { | |
this.emit("leave-room", room, id); | |
} | |
if (_room.size === 0 && this.rooms.delete(room)) { | |
this.emit("delete-room", room); | |
} | |
} | |
} | |
/** | |
* Removes a socket from all rooms it's joined. | |
* | |
* @param {SocketId} id the socket id | |
*/ | |
delAll(id) { | |
if (!this.sids.has(id)) { | |
return; | |
} | |
for (const room of this.sids.get(id)) { | |
this._del(room, id); | |
} | |
this.sids.delete(id); | |
} | |
/** | |
* Broadcasts a packet. | |
* | |
* Options: | |
* - `flags` {Object} flags for this packet | |
* - `except` {Array} sids that should be excluded | |
* - `rooms` {Array} list of rooms to broadcast to | |
* | |
* @param {Object} packet the packet object | |
* @param {Object} opts the options | |
* @public | |
*/ | |
broadcast(packet, opts) { | |
const flags = opts.flags || {}; | |
const packetOpts = { | |
preEncoded: true, | |
volatile: flags.volatile, | |
compress: flags.compress, | |
}; | |
packet.nsp = this.nsp.name; | |
const encodedPackets = this._encode(packet, packetOpts); | |
this.apply(opts, (socket) => { | |
if (typeof socket.notifyOutgoingListeners === "function") { | |
socket.notifyOutgoingListeners(packet); | |
} | |
socket.client.writeToEngine(encodedPackets, packetOpts); | |
}); | |
} | |
/** | |
* Broadcasts a packet and expects multiple acknowledgements. | |
* | |
* Options: | |
* - `flags` {Object} flags for this packet | |
* - `except` {Array} sids that should be excluded | |
* - `rooms` {Array} list of rooms to broadcast to | |
* | |
* @param {Object} packet the packet object | |
* @param {Object} opts the options | |
* @param clientCountCallback - the number of clients that received the packet | |
* @param ack - the callback that will be called for each client response | |
* | |
* @public | |
*/ | |
broadcastWithAck(packet, opts, clientCountCallback, ack) { | |
const flags = opts.flags || {}; | |
const packetOpts = { | |
preEncoded: true, | |
volatile: flags.volatile, | |
compress: flags.compress, | |
}; | |
packet.nsp = this.nsp.name; | |
// we can use the same id for each packet, since the _ids counter is common (no duplicate) | |
packet.id = this.nsp._ids++; | |
const encodedPackets = this._encode(packet, packetOpts); | |
let clientCount = 0; | |
this.apply(opts, (socket) => { | |
// track the total number of acknowledgements that are expected | |
clientCount++; | |
// call the ack callback for each client response | |
socket.acks.set(packet.id, ack); | |
if (typeof socket.notifyOutgoingListeners === "function") { | |
socket.notifyOutgoingListeners(packet); | |
} | |
socket.client.writeToEngine(encodedPackets, packetOpts); | |
}); | |
clientCountCallback(clientCount); | |
} | |
_encode(packet, packetOpts) { | |
const encodedPackets = this.encoder.encode(packet); | |
if (canPreComputeFrame && | |
encodedPackets.length === 1 && | |
typeof encodedPackets[0] === "string") { | |
// "4" being the "message" packet type in the Engine.IO protocol | |
const data = Buffer.from("4" + encodedPackets[0]); | |
// see https://github.com/websockets/ws/issues/617#issuecomment-283002469 | |
packetOpts.wsPreEncodedFrame = WebSocket.Sender.frame(data, { | |
readOnly: false, | |
mask: false, | |
rsv1: false, | |
opcode: 1, | |
fin: true, | |
}); | |
} | |
return encodedPackets; | |
} | |
/** | |
* Gets a list of sockets by sid. | |
* | |
* @param {Set<Room>} rooms the explicit set of rooms to check. | |
*/ | |
sockets(rooms) { | |
const sids = new Set(); | |
this.apply({ rooms }, (socket) => { | |
sids.add(socket.id); | |
}); | |
return Promise.resolve(sids); | |
} | |
/** | |
* Gets the list of rooms a given socket has joined. | |
* | |
* @param {SocketId} id the socket id | |
*/ | |
socketRooms(id) { | |
return this.sids.get(id); | |
} | |
/** | |
* Returns the matching socket instances | |
* | |
* @param opts - the filters to apply | |
*/ | |
fetchSockets(opts) { | |
const sockets = []; | |
this.apply(opts, (socket) => { | |
sockets.push(socket); | |
}); | |
return Promise.resolve(sockets); | |
} | |
/** | |
* Makes the matching socket instances join the specified rooms | |
* | |
* @param opts - the filters to apply | |
* @param rooms - the rooms to join | |
*/ | |
addSockets(opts, rooms) { | |
this.apply(opts, (socket) => { | |
socket.join(rooms); | |
}); | |
} | |
/** | |
* Makes the matching socket instances leave the specified rooms | |
* | |
* @param opts - the filters to apply | |
* @param rooms - the rooms to leave | |
*/ | |
delSockets(opts, rooms) { | |
this.apply(opts, (socket) => { | |
rooms.forEach((room) => socket.leave(room)); | |
}); | |
} | |
/** | |
* Makes the matching socket instances disconnect | |
* | |
* @param opts - the filters to apply | |
* @param close - whether to close the underlying connection | |
*/ | |
disconnectSockets(opts, close) { | |
this.apply(opts, (socket) => { | |
socket.disconnect(close); | |
}); | |
} | |
apply(opts, callback) { | |
const rooms = opts.rooms; | |
const except = this.computeExceptSids(opts.except); | |
if (rooms.size) { | |
const ids = new Set(); | |
for (const room of rooms) { | |
if (!this.rooms.has(room)) | |
continue; | |
for (const id of this.rooms.get(room)) { | |
if (ids.has(id) || except.has(id)) | |
continue; | |
const socket = this.nsp.sockets.get(id); | |
if (socket) { | |
callback(socket); | |
ids.add(id); | |
} | |
} | |
} | |
} | |
else { | |
for (const [id] of this.sids) { | |
if (except.has(id)) | |
continue; | |
const socket = this.nsp.sockets.get(id); | |
if (socket) | |
callback(socket); | |
} | |
} | |
} | |
computeExceptSids(exceptRooms) { | |
const exceptSids = new Set(); | |
if (exceptRooms && exceptRooms.size > 0) { | |
for (const room of exceptRooms) { | |
if (this.rooms.has(room)) { | |
this.rooms.get(room).forEach((sid) => exceptSids.add(sid)); | |
} | |
} | |
} | |
return exceptSids; | |
} | |
/** | |
* Send a packet to the other Socket.IO servers in the cluster | |
* @param packet - an array of arguments, which may include an acknowledgement callback at the end | |
*/ | |
serverSideEmit(packet) { | |
console.warn("this adapter does not support the serverSideEmit() functionality"); | |
} | |
/** | |
* Save the client session in order to restore it upon reconnection. | |
*/ | |
persistSession(session) { } | |
/** | |
* Restore the session and find the packets that were missed by the client. | |
* @param pid | |
* @param offset | |
*/ | |
restoreSession(pid, offset) { | |
return null; | |
} | |
} | |
exports.Adapter = Adapter; | |
class SessionAwareAdapter extends Adapter { | |
constructor(nsp) { | |
super(nsp); | |
this.nsp = nsp; | |
this.sessions = new Map(); | |
this.packets = []; | |
this.maxDisconnectionDuration = | |
nsp.server.opts.connectionStateRecovery.maxDisconnectionDuration; | |
const timer = setInterval(() => { | |
const threshold = Date.now() - this.maxDisconnectionDuration; | |
this.sessions.forEach((session, sessionId) => { | |
const hasExpired = session.disconnectedAt < threshold; | |
if (hasExpired) { | |
this.sessions.delete(sessionId); | |
} | |
}); | |
for (let i = this.packets.length - 1; i >= 0; i--) { | |
const hasExpired = this.packets[i].emittedAt < threshold; | |
if (hasExpired) { | |
this.packets.splice(0, i + 1); | |
break; | |
} | |
} | |
}, 60 * 1000); | |
// prevents the timer from keeping the process alive | |
timer.unref(); | |
} | |
persistSession(session) { | |
session.disconnectedAt = Date.now(); | |
this.sessions.set(session.pid, session); | |
} | |
restoreSession(pid, offset) { | |
const session = this.sessions.get(pid); | |
if (!session) { | |
// the session may have expired | |
return null; | |
} | |
const hasExpired = session.disconnectedAt + this.maxDisconnectionDuration < Date.now(); | |
if (hasExpired) { | |
// the session has expired | |
this.sessions.delete(pid); | |
return null; | |
} | |
const index = this.packets.findIndex((packet) => packet.id === offset); | |
if (index === -1) { | |
// the offset may be too old | |
return null; | |
} | |
const missedPackets = []; | |
for (let i = index + 1; i < this.packets.length; i++) { | |
const packet = this.packets[i]; | |
if (shouldIncludePacket(session.rooms, packet.opts)) { | |
missedPackets.push(packet.data); | |
} | |
} | |
return Promise.resolve(Object.assign(Object.assign({}, session), { missedPackets })); | |
} | |
broadcast(packet, opts) { | |
var _a; | |
const isEventPacket = packet.type === 2; | |
// packets with acknowledgement are not stored because the acknowledgement function cannot be serialized and | |
// restored on another server upon reconnection | |
const withoutAcknowledgement = packet.id === undefined; | |
const notVolatile = ((_a = opts.flags) === null || _a === void 0 ? void 0 : _a.volatile) === undefined; | |
if (isEventPacket && withoutAcknowledgement && notVolatile) { | |
const id = (0, yeast_1.yeast)(); | |
// the offset is stored at the end of the data array, so the client knows the ID of the last packet it has | |
// processed (and the format is backward-compatible) | |
packet.data.push(id); | |
this.packets.push({ | |
id, | |
opts, | |
data: packet.data, | |
emittedAt: Date.now(), | |
}); | |
} | |
super.broadcast(packet, opts); | |
} | |
} | |
exports.SessionAwareAdapter = SessionAwareAdapter; | |
function shouldIncludePacket(sessionRooms, opts) { | |
const included = opts.rooms.size === 0 || sessionRooms.some((room) => opts.rooms.has(room)); | |
const notExcluded = sessionRooms.every((room) => !opts.except.has(room)); | |
return included && notExcluded; | |
} | |