|
from fastapi import FastAPI, Request, HTTPException |
|
from fastapi.responses import StreamingResponse |
|
from pydantic import BaseModel |
|
from transformers import pipeline, TextStreamer |
|
import asyncio |
|
import queue |
|
import threading |
|
import time |
|
import httpx |
|
import json |
|
|
|
class ModelInput(BaseModel): |
|
prompt: str |
|
max_new_tokens: int = 128 |
|
|
|
app = FastAPI() |
|
|
|
|
|
generator = pipeline( |
|
"text-generation", |
|
model="deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B", |
|
device="cpu" |
|
) |
|
|
|
|
|
knowledge_graph = {} |
|
|
|
|
|
async def update_knowledge_graph_periodically(): |
|
while True: |
|
try: |
|
|
|
queries = ["latest tech startup news", "AI breakthroughs", "funding trends 2025"] |
|
import random |
|
query = random.choice(queries) |
|
|
|
|
|
async with httpx.AsyncClient() as client: |
|
resp = await client.get( |
|
"https://api.duckduckgo.com/", |
|
params={"q": query, "format": "json", "no_redirect": "1", "no_html": "1"} |
|
) |
|
data = resp.json() |
|
|
|
|
|
abstract = data.get("AbstractText", "") |
|
related_topics = data.get("RelatedTopics", []) |
|
|
|
|
|
knowledge_graph[query] = { |
|
"abstract": abstract, |
|
"related_topics": related_topics, |
|
"timestamp": time.time() |
|
} |
|
|
|
print(f"Knowledge graph updated for query: {query}") |
|
|
|
except Exception as e: |
|
print(f"Error updating knowledge graph: {e}") |
|
|
|
await asyncio.sleep(60) |
|
|
|
|
|
@app.on_event("startup") |
|
async def startup_event(): |
|
asyncio.create_task(update_knowledge_graph_periodically()) |
|
|
|
|
|
@app.post("/generate/stream") |
|
async def generate_stream(input: ModelInput): |
|
prompt = input.prompt |
|
max_new_tokens = input.max_new_tokens |
|
|
|
q = queue.Queue() |
|
|
|
def run_generation(): |
|
try: |
|
streamer = TextStreamer(generator.tokenizer, skip_prompt=True) |
|
|
|
|
|
def queue_token(token): |
|
q.put(token) |
|
|
|
streamer.put = queue_token |
|
|
|
|
|
generator( |
|
prompt, |
|
max_new_tokens=max_new_tokens, |
|
do_sample=False, |
|
streamer=streamer |
|
) |
|
except Exception as e: |
|
q.put(f"[ERROR] {e}") |
|
finally: |
|
q.put(None) |
|
|
|
thread = threading.Thread(target=run_generation) |
|
thread.start() |
|
|
|
async def event_generator(): |
|
while True: |
|
token = q.get() |
|
if token is None: |
|
break |
|
yield token |
|
|
|
return StreamingResponse(event_generator(), media_type="text/plain") |
|
|
|
|
|
|
|
@app.get("/knowledge") |
|
async def get_knowledge(): |
|
return knowledge_graph |
|
|
|
|
|
|
|
@app.get("/") |
|
async def root(): |
|
return {"message": "Welcome to the Streaming Model API with live knowledge graph updater!"} |
|
|