File size: 3,499 Bytes
2145ed0
 
6e0397b
3003014
2145ed0
 
 
 
 
 
6e0397b
 
 
2145ed0
6e0397b
 
 
2145ed0
3003014
 
5879220
2145ed0
3003014
6e0397b
2145ed0
 
6e0397b
2145ed0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6e0397b
 
2145ed0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from transformers import pipeline, TextStreamer
import asyncio
import queue
import threading
import time
import httpx
import json

class ModelInput(BaseModel):
    prompt: str
    max_new_tokens: int = 128

app = FastAPI()

# Initialize generator once
generator = pipeline(
    "text-generation",
    model="deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B",
    device="cpu"
)

# Shared knowledge graph, just a dict (in-memory)
knowledge_graph = {}

# --- Autonomous knowledge updater --- #
async def update_knowledge_graph_periodically():
    while True:
        try:
            # Pick a random query (here: hardcoded or you can improve)
            queries = ["latest tech startup news", "AI breakthroughs", "funding trends 2025"]
            import random
            query = random.choice(queries)

            # Use DuckDuckGo Instant Answer API (free, no API key)
            async with httpx.AsyncClient() as client:
                resp = await client.get(
                    "https://api.duckduckgo.com/",
                    params={"q": query, "format": "json", "no_redirect": "1", "no_html": "1"}
                )
                data = resp.json()

            # Extract some useful info (abstract text)
            abstract = data.get("AbstractText", "")
            related_topics = data.get("RelatedTopics", [])

            # Save/update knowledge graph (super basic example)
            knowledge_graph[query] = {
                "abstract": abstract,
                "related_topics": related_topics,
                "timestamp": time.time()
            }

            print(f"Knowledge graph updated for query: {query}")

        except Exception as e:
            print(f"Error updating knowledge graph: {e}")

        await asyncio.sleep(60)  # wait 1 minute before next update

# Kick off background task on startup
@app.on_event("startup")
async def startup_event():
    asyncio.create_task(update_knowledge_graph_periodically())

# --- Streaming generation endpoint --- #
@app.post("/generate/stream")
async def generate_stream(input: ModelInput):
    prompt = input.prompt
    max_new_tokens = input.max_new_tokens

    q = queue.Queue()

    def run_generation():
        try:
            streamer = TextStreamer(generator.tokenizer, skip_prompt=True)

            # Monkey-patch streamer to push tokens to queue
            def queue_token(token):
                q.put(token)

            streamer.put = queue_token

            # Run generation with streamer attached
            generator(
                prompt,
                max_new_tokens=max_new_tokens,
                do_sample=False,
                streamer=streamer
            )
        except Exception as e:
            q.put(f"[ERROR] {e}")
        finally:
            q.put(None)  # Sentinel to mark done

    thread = threading.Thread(target=run_generation)
    thread.start()

    async def event_generator():
        while True:
            token = q.get()
            if token is None:
                break
            yield token

    return StreamingResponse(event_generator(), media_type="text/plain")


# Optional: Endpoint to query knowledge graph
@app.get("/knowledge")
async def get_knowledge():
    return knowledge_graph


# Root
@app.get("/")
async def root():
    return {"message": "Welcome to the Streaming Model API with live knowledge graph updater!"}