import type { ConsumerSensorDriver, ConnectionStatus, SensorFrame, SensorStream, FrameCallback, StreamUpdateCallback, StatusChangeCallback, UnsubscribeFn } from "../types/index.js"; export interface WebRTCConsumerConfig { type: "webrtc-consumer"; signalingUrl: string; // ws://host:port/signaling streamId?: string; } export class WebRTCConsumer implements ConsumerSensorDriver { readonly type = "consumer" as const; readonly id: string; readonly name = "WebRTC Consumer"; private config: WebRTCConsumerConfig; private _status: ConnectionStatus = { isConnected: false }; private pc: RTCPeerConnection | null = null; private dc: RTCDataChannel | null = null; private signaling?: WebSocket; private frameSentCallbacks: FrameCallback[] = []; private streamUpdateCallbacks: StreamUpdateCallback[] = []; private statusCallbacks: StatusChangeCallback[] = []; private activeStreams = new Map(); private sendQueue: SensorFrame[] = []; private isSending = false; private readonly BUFFER_WATERMARK = 4 * 1024 * 1024; // 4 MB constructor(config: WebRTCConsumerConfig) { this.config = config; this.id = `webrtc-consumer-${Date.now()}`; } get status(): ConnectionStatus { return this._status; } async connect(): Promise { // open signaling this.signaling = new WebSocket(this.config.signalingUrl); await new Promise((res, rej) => { this.signaling!.onopen = () => res(); this.signaling!.onerror = rej; }); // create pc this.pc = new RTCPeerConnection({ iceServers: [{ urls: "stun:stun.l.google.com:19302" }] }); // datachannel this.dc = this.pc.createDataChannel("video", { ordered: false, maxRetransmits: 0 }); this.dc.binaryType = "arraybuffer"; this.dc.onopen = () => { this._status = { isConnected: true, lastConnected: new Date() }; this.notifyStatus(); this.flushQueue(); }; this.dc.onclose = () => { this._status = { isConnected: false, error: "DC closed" }; this.notifyStatus(); }; // ICE - Trickle-ICE for faster startup this.pc.onicecandidate = (ev) => { if (ev.candidate) { this.signaling!.send(JSON.stringify({ type: "ice", candidate: ev.candidate.toJSON() })); } else { // Send end-of-candidates marker this.signaling!.send(JSON.stringify({ type: "ice", candidate: { end: true } })); } }; // signaling messages this.signaling.onmessage = async (ev) => { const msg = JSON.parse(ev.data); if (msg.type === "answer") { await this.pc!.setRemoteDescription({ type: "answer", sdp: msg.sdp }); } else if (msg.type === "ice") { // Handle end-of-candidates marker if (msg.candidate?.end) { // ICE gathering complete on remote side return; } await this.pc!.addIceCandidate(msg.candidate); } }; // create offer immediately (trickle-ICE) const offer = await this.pc.createOffer(); await this.pc.setLocalDescription(offer); this.signaling.send(JSON.stringify({ type: "offer", sdp: offer.sdp })); } async disconnect(): Promise { if (this.dc) this.dc.close(); if (this.pc) this.pc.close(); if (this.signaling) this.signaling.close(); this._status = { isConnected: false }; this.notifyStatus(); } // ConsumerSensorDriver impl async sendFrame(frame: SensorFrame): Promise { if (!this.dc || this.dc.readyState !== "open") { throw new Error("DataChannel not open"); } this.sendQueue.push(frame); this.flushQueue(); } async sendFrames(frames: SensorFrame[]): Promise { this.sendQueue.push(...frames); this.flushQueue(); } async startOutputStream(stream: SensorStream): Promise { this.activeStreams.set(stream.id, stream); this.notifyStream(stream); } async stopOutputStream(streamId: string): Promise { const s = this.activeStreams.get(streamId); if (s) { s.active = false; this.activeStreams.delete(streamId); this.notifyStream(s); } } getActiveOutputStreams(): SensorStream[] { return Array.from(this.activeStreams.values()); } // no-op for onFrameSent etc. onFrameSent(cb: FrameCallback): UnsubscribeFn { this.frameSentCallbacks.push(cb); return () => this.pull(this.frameSentCallbacks, cb); } onStreamUpdate(cb: StreamUpdateCallback): UnsubscribeFn { this.streamUpdateCallbacks.push(cb); return () => this.pull(this.streamUpdateCallbacks, cb); } onStatusChange(cb: StatusChangeCallback): UnsubscribeFn { this.statusCallbacks.push(cb); return () => this.pull(this.statusCallbacks, cb); } // helpers private flushQueue() { if (!this.dc || this.dc.readyState !== "open") return; while (this.sendQueue.length && this.dc.bufferedAmount < this.BUFFER_WATERMARK) { const frame = this.sendQueue.shift()!; const packet = this.frameToPacket(frame); this.dc.send(packet); this.frameSentCallbacks.forEach((c) => c(frame)); } } private frameToPacket(frame: SensorFrame): ArrayBuffer { const headerObj = { type: "video_frame", timestamp: frame.timestamp, frameType: frame.type, metadata: frame.metadata, streamId: this.config.streamId }; const headerJson = JSON.stringify(headerObj); const headerBuf = new TextEncoder().encode(headerJson); const lenBuf = new Uint32Array([headerBuf.length]).buffer; let dataBuf: ArrayBuffer; if (frame.data instanceof Blob) { // this is sync because MediaRecorder gives Blob slices pre-gathered // but we need async arrayBuffer — already spec returns Promise return frame.data.arrayBuffer().then((buf) => { const out = new Uint8Array(lenBuf.byteLength + headerBuf.length + buf.byteLength); out.set(new Uint8Array(lenBuf), 0); out.set(headerBuf, lenBuf.byteLength); out.set(new Uint8Array(buf), lenBuf.byteLength + headerBuf.length); return out.buffer; }) as unknown as ArrayBuffer; // caller handles async in flushQueue loop by awaiting? For now assume ArrayBuffer path (MediaRecorder provides ArrayBuffer in config) } else { dataBuf = frame.data as ArrayBuffer; } const out = new Uint8Array(lenBuf.byteLength + headerBuf.length + dataBuf.byteLength); out.set(new Uint8Array(lenBuf), 0); out.set(headerBuf, lenBuf.byteLength); out.set(new Uint8Array(dataBuf), lenBuf.byteLength + headerBuf.length); return out.buffer; } private pull(arr: T[], item: T) { const i = arr.indexOf(item); if (i >= 0) arr.splice(i, 1); } private notifyStream(s: SensorStream) { this.streamUpdateCallbacks.forEach((c) => c(s)); } private notifyStatus() { this.statusCallbacks.forEach((c) => c(this._status)); } }