ap3 / app.py
Reality123b's picture
Update app.py
2145ed0 verified
raw
history blame
3.5 kB
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from transformers import pipeline, TextStreamer
import asyncio
import queue
import threading
import time
import httpx
import json
class ModelInput(BaseModel):
prompt: str
max_new_tokens: int = 128
app = FastAPI()
# Initialize generator once
generator = pipeline(
"text-generation",
model="deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B",
device="cpu"
)
# Shared knowledge graph, just a dict (in-memory)
knowledge_graph = {}
# --- Autonomous knowledge updater --- #
async def update_knowledge_graph_periodically():
while True:
try:
# Pick a random query (here: hardcoded or you can improve)
queries = ["latest tech startup news", "AI breakthroughs", "funding trends 2025"]
import random
query = random.choice(queries)
# Use DuckDuckGo Instant Answer API (free, no API key)
async with httpx.AsyncClient() as client:
resp = await client.get(
"https://api.duckduckgo.com/",
params={"q": query, "format": "json", "no_redirect": "1", "no_html": "1"}
)
data = resp.json()
# Extract some useful info (abstract text)
abstract = data.get("AbstractText", "")
related_topics = data.get("RelatedTopics", [])
# Save/update knowledge graph (super basic example)
knowledge_graph[query] = {
"abstract": abstract,
"related_topics": related_topics,
"timestamp": time.time()
}
print(f"Knowledge graph updated for query: {query}")
except Exception as e:
print(f"Error updating knowledge graph: {e}")
await asyncio.sleep(60) # wait 1 minute before next update
# Kick off background task on startup
@app.on_event("startup")
async def startup_event():
asyncio.create_task(update_knowledge_graph_periodically())
# --- Streaming generation endpoint --- #
@app.post("/generate/stream")
async def generate_stream(input: ModelInput):
prompt = input.prompt
max_new_tokens = input.max_new_tokens
q = queue.Queue()
def run_generation():
try:
streamer = TextStreamer(generator.tokenizer, skip_prompt=True)
# Monkey-patch streamer to push tokens to queue
def queue_token(token):
q.put(token)
streamer.put = queue_token
# Run generation with streamer attached
generator(
prompt,
max_new_tokens=max_new_tokens,
do_sample=False,
streamer=streamer
)
except Exception as e:
q.put(f"[ERROR] {e}")
finally:
q.put(None) # Sentinel to mark done
thread = threading.Thread(target=run_generation)
thread.start()
async def event_generator():
while True:
token = q.get()
if token is None:
break
yield token
return StreamingResponse(event_generator(), media_type="text/plain")
# Optional: Endpoint to query knowledge graph
@app.get("/knowledge")
async def get_knowledge():
return knowledge_graph
# Root
@app.get("/")
async def root():
return {"message": "Welcome to the Streaming Model API with live knowledge graph updater!"}