Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
export interface StreamChunk { | |
type: "chunk" | "done" | "error"; | |
content?: string; | |
error?: string; | |
} | |
export class StreamReader { | |
private decoder = new TextDecoder(); | |
private buffer = ""; | |
constructor(private response: Response) { | |
if (!response.body) { | |
throw new Error("Response has no body"); | |
} | |
} | |
async *read(): AsyncGenerator<StreamChunk, void, unknown> { | |
const reader = this.response.body!.getReader(); | |
try { | |
while (true) { | |
const { done, value } = await reader.read(); | |
if (done) break; | |
this.buffer += this.decoder.decode(value, { stream: true }); | |
const lines = this.buffer.split("\n"); | |
this.buffer = lines.pop() || ""; | |
for (const line of lines) { | |
if (line.startsWith("data: ")) { | |
const data = line.slice(6).trim(); | |
if (!data) continue; | |
try { | |
const parsed = JSON.parse(data) as StreamChunk; | |
yield parsed; | |
if (parsed.type === "done") return; | |
} catch { | |
// Ignore malformed JSON | |
} | |
} | |
} | |
} | |
} finally { | |
reader.releaseLock(); | |
} | |
} | |
static async fromFetch(url: string, options?: RequestInit): Promise<StreamReader> { | |
const response = await fetch(url, options); | |
if (!response.ok) { | |
const error = await response.json(); | |
throw new Error(error.error || "Request failed"); | |
} | |
return new StreamReader(response); | |
} | |
} | |
export class StreamWriter { | |
private encoder = new TextEncoder(); | |
private controller?: ReadableStreamDefaultController<Uint8Array>; | |
public readonly stream: ReadableStream<Uint8Array>; | |
constructor() { | |
this.stream = new ReadableStream({ | |
start: controller => { | |
this.controller = controller; | |
}, | |
}); | |
} | |
write(chunk: StreamChunk): void { | |
if (!this.controller) { | |
return; | |
} | |
try { | |
const data = JSON.stringify(chunk); | |
this.controller.enqueue(this.encoder.encode(`data: ${data}\n\n`)); | |
} catch { | |
// Controller might be closed | |
} | |
} | |
writeChunk(content: string): void { | |
this.write({ type: "chunk", content }); | |
} | |
writeError(error: string): void { | |
this.write({ type: "error", error }); | |
} | |
end(): void { | |
if (!this.controller) return; | |
try { | |
this.write({ type: "done" }); | |
this.controller.close(); | |
} catch { | |
// Controller might already be closed | |
} | |
this.controller = undefined; | |
} | |
error(error: Error): void { | |
if (!this.controller) return; | |
try { | |
this.writeError(error.message); | |
this.controller.close(); | |
} catch { | |
// Controller might already be closed | |
} | |
this.controller = undefined; | |
} | |
createResponse(): Response { | |
return new Response(this.stream, { | |
headers: { | |
"Content-Type": "text/event-stream", | |
"Cache-Control": "no-cache", | |
"Connection": "keep-alive", | |
}, | |
}); | |
} | |
} | |
export async function streamFromAsyncIterable<T>( | |
iterable: AsyncIterable<T>, | |
transform: (item: T) => StreamChunk, | |
): Promise<ReadableStream<Uint8Array>> { | |
const writer = new StreamWriter(); | |
(async () => { | |
try { | |
for await (const item of iterable) { | |
writer.write(transform(item)); | |
} | |
writer.end(); | |
} catch (error) { | |
writer.error(error instanceof Error ? error : new Error(String(error))); | |
} | |
})(); | |
return writer.stream; | |
} | |