File size: 3,499 Bytes
2145ed0 6e0397b 3003014 2145ed0 6e0397b 2145ed0 6e0397b 2145ed0 3003014 5879220 2145ed0 3003014 6e0397b 2145ed0 6e0397b 2145ed0 6e0397b 2145ed0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from transformers import pipeline, TextStreamer
import asyncio
import queue
import threading
import time
import httpx
import json
class ModelInput(BaseModel):
prompt: str
max_new_tokens: int = 128
app = FastAPI()
# Initialize generator once
generator = pipeline(
"text-generation",
model="deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B",
device="cpu"
)
# Shared knowledge graph, just a dict (in-memory)
knowledge_graph = {}
# --- Autonomous knowledge updater --- #
async def update_knowledge_graph_periodically():
while True:
try:
# Pick a random query (here: hardcoded or you can improve)
queries = ["latest tech startup news", "AI breakthroughs", "funding trends 2025"]
import random
query = random.choice(queries)
# Use DuckDuckGo Instant Answer API (free, no API key)
async with httpx.AsyncClient() as client:
resp = await client.get(
"https://api.duckduckgo.com/",
params={"q": query, "format": "json", "no_redirect": "1", "no_html": "1"}
)
data = resp.json()
# Extract some useful info (abstract text)
abstract = data.get("AbstractText", "")
related_topics = data.get("RelatedTopics", [])
# Save/update knowledge graph (super basic example)
knowledge_graph[query] = {
"abstract": abstract,
"related_topics": related_topics,
"timestamp": time.time()
}
print(f"Knowledge graph updated for query: {query}")
except Exception as e:
print(f"Error updating knowledge graph: {e}")
await asyncio.sleep(60) # wait 1 minute before next update
# Kick off background task on startup
@app.on_event("startup")
async def startup_event():
asyncio.create_task(update_knowledge_graph_periodically())
# --- Streaming generation endpoint --- #
@app.post("/generate/stream")
async def generate_stream(input: ModelInput):
prompt = input.prompt
max_new_tokens = input.max_new_tokens
q = queue.Queue()
def run_generation():
try:
streamer = TextStreamer(generator.tokenizer, skip_prompt=True)
# Monkey-patch streamer to push tokens to queue
def queue_token(token):
q.put(token)
streamer.put = queue_token
# Run generation with streamer attached
generator(
prompt,
max_new_tokens=max_new_tokens,
do_sample=False,
streamer=streamer
)
except Exception as e:
q.put(f"[ERROR] {e}")
finally:
q.put(None) # Sentinel to mark done
thread = threading.Thread(target=run_generation)
thread.start()
async def event_generator():
while True:
token = q.get()
if token is None:
break
yield token
return StreamingResponse(event_generator(), media_type="text/plain")
# Optional: Endpoint to query knowledge graph
@app.get("/knowledge")
async def get_knowledge():
return knowledge_graph
# Root
@app.get("/")
async def root():
return {"message": "Welcome to the Streaming Model API with live knowledge graph updater!"}
|