Spaces:
Running
Running
Update app/app.py
Browse files- app/app.py +12 -28
app/app.py
CHANGED
@@ -14,7 +14,9 @@ from contextlib import asynccontextmanager
|
|
14 |
from typing import Optional, Union
|
15 |
|
16 |
import httpx
|
17 |
-
|
|
|
|
|
18 |
from fastapi.responses import HTMLResponse, StreamingResponse
|
19 |
from fastapi.templating import Jinja2Templates
|
20 |
from pydantic import BaseModel, constr
|
@@ -24,12 +26,6 @@ from .price_fetcher import PriceFetcher
|
|
24 |
from .gemini_analyzer import GeminiAnalyzer
|
25 |
from newsapi import NewsApiClient
|
26 |
|
27 |
-
# --- Pydantic Model for API Input Validation ---
|
28 |
-
|
29 |
-
class SentimentRequest(BaseModel):
|
30 |
-
"""Ensures the text for sentiment analysis is a non-empty string."""
|
31 |
-
text: constr(strip_whitespace=True, min_length=1)
|
32 |
-
|
33 |
# --- Application Lifespan for Resource Management ---
|
34 |
|
35 |
@asynccontextmanager
|
@@ -47,7 +43,6 @@ async def lifespan(app: FastAPI):
|
|
47 |
# Create separate queues for the two real-time feeds
|
48 |
app.state.sentiment_queue: asyncio.Queue = asyncio.Queue()
|
49 |
app.state.news_queue: asyncio.Queue = asyncio.Queue()
|
50 |
-
app.state.request_counter = 0
|
51 |
|
52 |
# Create cancellable background tasks for periodic updates.
|
53 |
price_task = asyncio.create_task(
|
@@ -84,15 +79,13 @@ async def run_periodic_news_analysis(app: FastAPI, interval_seconds: int):
|
|
84 |
q='bitcoin OR ethereum OR crypto OR blockchain',
|
85 |
language='en',
|
86 |
sort_by='publishedAt',
|
87 |
-
page_size=5
|
88 |
)
|
89 |
analyzer: GeminiAnalyzer = app.state.gemini_analyzer
|
90 |
for article in top_headlines.get('articles', []):
|
91 |
title = article.get('title')
|
92 |
if title and "[Removed]" not in title:
|
93 |
-
# Run the full Gemini analysis on each headline
|
94 |
analysis = await analyzer.analyze_text(title)
|
95 |
-
# Add the article URL to the payload for the frontend
|
96 |
analysis['url'] = article.get('url')
|
97 |
await app.state.news_queue.put(analysis)
|
98 |
except Exception as e:
|
@@ -106,21 +99,14 @@ app = FastAPI(title="CryptoSentinel Pro", lifespan=lifespan)
|
|
106 |
templates = Jinja2Templates(directory="templates")
|
107 |
|
108 |
# --- HTML Rendering Helper ---
|
109 |
-
|
110 |
def render_analysis_card(payload: dict, is_news: bool = False) -> str:
|
111 |
-
"""Renders a dictionary of analysis into a styled HTML card."""
|
112 |
s = payload
|
113 |
text_to_show = s.get('summary', 'Analysis failed or not available.')
|
114 |
-
|
115 |
-
# Make the summary a clickable link if it's a news item
|
116 |
if is_news:
|
117 |
url = s.get('url', '#')
|
118 |
text_to_show = f'<a href="{url}" target="_blank" rel="noopener noreferrer">{s.get("summary", "N/A")}</a>'
|
119 |
-
|
120 |
-
# Dynamically set CSS classes based on analysis results
|
121 |
impact_class = f"impact-{s.get('impact', 'low').lower()}"
|
122 |
sentiment_class = f"sentiment-{s.get('sentiment', 'neutral').lower()}"
|
123 |
-
|
124 |
return f"""
|
125 |
<div class="card {impact_class}">
|
126 |
<blockquote>{text_to_show}</blockquote>
|
@@ -139,12 +125,10 @@ def render_analysis_card(payload: dict, is_news: bool = False) -> str:
|
|
139 |
|
140 |
@app.get("/", response_class=HTMLResponse)
|
141 |
async def serve_dashboard(request: Request):
|
142 |
-
"""Serves the main interactive dashboard from `index.html`."""
|
143 |
return templates.TemplateResponse("index.html", {"request": request})
|
144 |
|
145 |
@app.get("/api/prices", response_class=HTMLResponse)
|
146 |
async def get_prices_fragment(request: Request):
|
147 |
-
"""Returns an HTML fragment with the latest cached crypto prices for HTMX."""
|
148 |
price_fetcher: PriceFetcher = request.app.state.price_fetcher
|
149 |
prices = price_fetcher.get_current_prices()
|
150 |
html_fragment = "".join(
|
@@ -154,13 +138,19 @@ async def get_prices_fragment(request: Request):
|
|
154 |
)
|
155 |
return HTMLResponse(content=html_fragment)
|
156 |
|
|
|
157 |
@app.post("/api/sentiment")
|
158 |
-
async def analyze_sentiment(
|
|
|
|
|
|
|
|
|
|
|
159 |
"""Queues user-submitted text for a full Gemini-powered analysis."""
|
160 |
analyzer: GeminiAnalyzer = request.app.state.gemini_analyzer
|
161 |
|
162 |
async def analysis_task_wrapper():
|
163 |
-
analysis_result = await analyzer.analyze_text(
|
164 |
await request.app.state.sentiment_queue.put(analysis_result)
|
165 |
|
166 |
background_tasks.add_task(analysis_task_wrapper)
|
@@ -168,30 +158,24 @@ async def analyze_sentiment(payload: SentimentRequest, request: Request, backgro
|
|
168 |
|
169 |
@app.get("/api/sentiment/stream")
|
170 |
async def sentiment_stream(request: Request):
|
171 |
-
"""SSE stream for results from manual sentiment analysis requests."""
|
172 |
queue: asyncio.Queue = request.app.state.sentiment_queue
|
173 |
async def event_generator():
|
174 |
while True:
|
175 |
payload = await queue.get()
|
176 |
html = render_analysis_card(payload)
|
177 |
-
# =================== FIX APPLIED HERE ===================
|
178 |
data_payload = html.replace('\n', '')
|
179 |
sse_message = f"event: sentiment_update\ndata: {data_payload}\n\n"
|
180 |
yield sse_message
|
181 |
-
# ========================================================
|
182 |
return StreamingResponse(event_generator(), media_type="text/event-stream")
|
183 |
|
184 |
@app.get("/api/news/stream")
|
185 |
async def news_stream(request: Request):
|
186 |
-
"""SSE stream for the automated Market Intelligence Feed."""
|
187 |
queue: asyncio.Queue = request.app.state.news_queue
|
188 |
async def event_generator():
|
189 |
while True:
|
190 |
payload = await queue.get()
|
191 |
html = render_analysis_card(payload, is_news=True)
|
192 |
-
# =================== FIX APPLIED HERE ===================
|
193 |
data_payload = html.replace('\n', '')
|
194 |
sse_message = f"event: news_update\ndata: {data_payload}\n\n"
|
195 |
yield sse_message
|
196 |
-
# ========================================================
|
197 |
return StreamingResponse(event_generator(), media_type="text/event-stream")
|
|
|
14 |
from typing import Optional, Union
|
15 |
|
16 |
import httpx
|
17 |
+
# =================== FIX APPLIED HERE (1 of 2) ===================
|
18 |
+
from fastapi import FastAPI, Request, BackgroundTasks, Form
|
19 |
+
# =================================================================
|
20 |
from fastapi.responses import HTMLResponse, StreamingResponse
|
21 |
from fastapi.templating import Jinja2Templates
|
22 |
from pydantic import BaseModel, constr
|
|
|
26 |
from .gemini_analyzer import GeminiAnalyzer
|
27 |
from newsapi import NewsApiClient
|
28 |
|
|
|
|
|
|
|
|
|
|
|
|
|
29 |
# --- Application Lifespan for Resource Management ---
|
30 |
|
31 |
@asynccontextmanager
|
|
|
43 |
# Create separate queues for the two real-time feeds
|
44 |
app.state.sentiment_queue: asyncio.Queue = asyncio.Queue()
|
45 |
app.state.news_queue: asyncio.Queue = asyncio.Queue()
|
|
|
46 |
|
47 |
# Create cancellable background tasks for periodic updates.
|
48 |
price_task = asyncio.create_task(
|
|
|
79 |
q='bitcoin OR ethereum OR crypto OR blockchain',
|
80 |
language='en',
|
81 |
sort_by='publishedAt',
|
82 |
+
page_size=5
|
83 |
)
|
84 |
analyzer: GeminiAnalyzer = app.state.gemini_analyzer
|
85 |
for article in top_headlines.get('articles', []):
|
86 |
title = article.get('title')
|
87 |
if title and "[Removed]" not in title:
|
|
|
88 |
analysis = await analyzer.analyze_text(title)
|
|
|
89 |
analysis['url'] = article.get('url')
|
90 |
await app.state.news_queue.put(analysis)
|
91 |
except Exception as e:
|
|
|
99 |
templates = Jinja2Templates(directory="templates")
|
100 |
|
101 |
# --- HTML Rendering Helper ---
|
|
|
102 |
def render_analysis_card(payload: dict, is_news: bool = False) -> str:
|
|
|
103 |
s = payload
|
104 |
text_to_show = s.get('summary', 'Analysis failed or not available.')
|
|
|
|
|
105 |
if is_news:
|
106 |
url = s.get('url', '#')
|
107 |
text_to_show = f'<a href="{url}" target="_blank" rel="noopener noreferrer">{s.get("summary", "N/A")}</a>'
|
|
|
|
|
108 |
impact_class = f"impact-{s.get('impact', 'low').lower()}"
|
109 |
sentiment_class = f"sentiment-{s.get('sentiment', 'neutral').lower()}"
|
|
|
110 |
return f"""
|
111 |
<div class="card {impact_class}">
|
112 |
<blockquote>{text_to_show}</blockquote>
|
|
|
125 |
|
126 |
@app.get("/", response_class=HTMLResponse)
|
127 |
async def serve_dashboard(request: Request):
|
|
|
128 |
return templates.TemplateResponse("index.html", {"request": request})
|
129 |
|
130 |
@app.get("/api/prices", response_class=HTMLResponse)
|
131 |
async def get_prices_fragment(request: Request):
|
|
|
132 |
price_fetcher: PriceFetcher = request.app.state.price_fetcher
|
133 |
prices = price_fetcher.get_current_prices()
|
134 |
html_fragment = "".join(
|
|
|
138 |
)
|
139 |
return HTMLResponse(content=html_fragment)
|
140 |
|
141 |
+
# =================== FIX APPLIED HERE (2 of 2) ===================
|
142 |
@app.post("/api/sentiment")
|
143 |
+
async def analyze_sentiment(
|
144 |
+
request: Request,
|
145 |
+
background_tasks: BackgroundTasks,
|
146 |
+
text: str = Form(...) # Tell FastAPI to expect a form field named 'text'
|
147 |
+
):
|
148 |
+
# =================================================================
|
149 |
"""Queues user-submitted text for a full Gemini-powered analysis."""
|
150 |
analyzer: GeminiAnalyzer = request.app.state.gemini_analyzer
|
151 |
|
152 |
async def analysis_task_wrapper():
|
153 |
+
analysis_result = await analyzer.analyze_text(text) # Use the 'text' variable directly
|
154 |
await request.app.state.sentiment_queue.put(analysis_result)
|
155 |
|
156 |
background_tasks.add_task(analysis_task_wrapper)
|
|
|
158 |
|
159 |
@app.get("/api/sentiment/stream")
|
160 |
async def sentiment_stream(request: Request):
|
|
|
161 |
queue: asyncio.Queue = request.app.state.sentiment_queue
|
162 |
async def event_generator():
|
163 |
while True:
|
164 |
payload = await queue.get()
|
165 |
html = render_analysis_card(payload)
|
|
|
166 |
data_payload = html.replace('\n', '')
|
167 |
sse_message = f"event: sentiment_update\ndata: {data_payload}\n\n"
|
168 |
yield sse_message
|
|
|
169 |
return StreamingResponse(event_generator(), media_type="text/event-stream")
|
170 |
|
171 |
@app.get("/api/news/stream")
|
172 |
async def news_stream(request: Request):
|
|
|
173 |
queue: asyncio.Queue = request.app.state.news_queue
|
174 |
async def event_generator():
|
175 |
while True:
|
176 |
payload = await queue.get()
|
177 |
html = render_analysis_card(payload, is_news=True)
|
|
|
178 |
data_payload = html.replace('\n', '')
|
179 |
sse_message = f"event: news_update\ndata: {data_payload}\n\n"
|
180 |
yield sse_message
|
|
|
181 |
return StreamingResponse(event_generator(), media_type="text/event-stream")
|