Reality123b commited on
Commit
2145ed0
·
verified ·
1 Parent(s): 5879220

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +104 -27
app.py CHANGED
@@ -1,43 +1,120 @@
1
- from fastapi import FastAPI, HTTPException
 
2
  from pydantic import BaseModel
3
  from transformers import pipeline, TextStreamer
4
- import torch
 
 
 
 
 
5
 
6
  class ModelInput(BaseModel):
7
  prompt: str
8
- max_new_tokens: int = 128000
9
 
10
  app = FastAPI()
11
 
12
- # Initialize text generation pipeline
13
  generator = pipeline(
14
  "text-generation",
15
  model="deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B",
16
- device="cpu" # Use CPU (change to device=0 for GPU)
17
  )
18
 
19
- # Create text streamer
20
- streamer = TextStreamer(generator.tokenizer, skip_prompt=True)
21
-
22
- def generate_response(prompt: str, max_new_tokens: int = 64000):
23
- try:
24
- messages = [{"role": "user", "content": prompt}]
25
- output = generator(messages, max_new_tokens=max_new_tokens, do_sample=False, streamer=streamer)
26
- return output[0]["generated_text"][-1]["content"]
27
- except Exception as e:
28
- raise ValueError(f"Error generating response: {e}")
29
-
30
- @app.post("/generate")
31
- async def generate_text(input: ModelInput):
32
- try:
33
- response = generate_response(
34
- prompt=input.prompt,
35
- max_new_tokens=input.max_new_tokens
36
- )
37
- return {"generated_text": response}
38
- except Exception as e:
39
- raise HTTPException(status_code=500, detail=str(e))
40
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
41
  @app.get("/")
42
  async def root():
43
- return {"message": "Welcome to the Streaming Model API!"}
 
1
+ from fastapi import FastAPI, Request, HTTPException
2
+ from fastapi.responses import StreamingResponse
3
  from pydantic import BaseModel
4
  from transformers import pipeline, TextStreamer
5
+ import asyncio
6
+ import queue
7
+ import threading
8
+ import time
9
+ import httpx
10
+ import json
11
 
12
  class ModelInput(BaseModel):
13
  prompt: str
14
+ max_new_tokens: int = 128
15
 
16
  app = FastAPI()
17
 
18
+ # Initialize generator once
19
  generator = pipeline(
20
  "text-generation",
21
  model="deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B",
22
+ device="cpu"
23
  )
24
 
25
+ # Shared knowledge graph, just a dict (in-memory)
26
+ knowledge_graph = {}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
27
 
28
+ # --- Autonomous knowledge updater --- #
29
+ async def update_knowledge_graph_periodically():
30
+ while True:
31
+ try:
32
+ # Pick a random query (here: hardcoded or you can improve)
33
+ queries = ["latest tech startup news", "AI breakthroughs", "funding trends 2025"]
34
+ import random
35
+ query = random.choice(queries)
36
+
37
+ # Use DuckDuckGo Instant Answer API (free, no API key)
38
+ async with httpx.AsyncClient() as client:
39
+ resp = await client.get(
40
+ "https://api.duckduckgo.com/",
41
+ params={"q": query, "format": "json", "no_redirect": "1", "no_html": "1"}
42
+ )
43
+ data = resp.json()
44
+
45
+ # Extract some useful info (abstract text)
46
+ abstract = data.get("AbstractText", "")
47
+ related_topics = data.get("RelatedTopics", [])
48
+
49
+ # Save/update knowledge graph (super basic example)
50
+ knowledge_graph[query] = {
51
+ "abstract": abstract,
52
+ "related_topics": related_topics,
53
+ "timestamp": time.time()
54
+ }
55
+
56
+ print(f"Knowledge graph updated for query: {query}")
57
+
58
+ except Exception as e:
59
+ print(f"Error updating knowledge graph: {e}")
60
+
61
+ await asyncio.sleep(60) # wait 1 minute before next update
62
+
63
+ # Kick off background task on startup
64
+ @app.on_event("startup")
65
+ async def startup_event():
66
+ asyncio.create_task(update_knowledge_graph_periodically())
67
+
68
+ # --- Streaming generation endpoint --- #
69
+ @app.post("/generate/stream")
70
+ async def generate_stream(input: ModelInput):
71
+ prompt = input.prompt
72
+ max_new_tokens = input.max_new_tokens
73
+
74
+ q = queue.Queue()
75
+
76
+ def run_generation():
77
+ try:
78
+ streamer = TextStreamer(generator.tokenizer, skip_prompt=True)
79
+
80
+ # Monkey-patch streamer to push tokens to queue
81
+ def queue_token(token):
82
+ q.put(token)
83
+
84
+ streamer.put = queue_token
85
+
86
+ # Run generation with streamer attached
87
+ generator(
88
+ prompt,
89
+ max_new_tokens=max_new_tokens,
90
+ do_sample=False,
91
+ streamer=streamer
92
+ )
93
+ except Exception as e:
94
+ q.put(f"[ERROR] {e}")
95
+ finally:
96
+ q.put(None) # Sentinel to mark done
97
+
98
+ thread = threading.Thread(target=run_generation)
99
+ thread.start()
100
+
101
+ async def event_generator():
102
+ while True:
103
+ token = q.get()
104
+ if token is None:
105
+ break
106
+ yield token
107
+
108
+ return StreamingResponse(event_generator(), media_type="text/plain")
109
+
110
+
111
+ # Optional: Endpoint to query knowledge graph
112
+ @app.get("/knowledge")
113
+ async def get_knowledge():
114
+ return knowledge_graph
115
+
116
+
117
+ # Root
118
  @app.get("/")
119
  async def root():
120
+ return {"message": "Welcome to the Streaming Model API with live knowledge graph updater!"}