mgbam's picture
Update app/app.py
e446a42 verified
raw
history blame
8.54 kB
"""
CryptoSentinel Pro β€” High-performance FastAPI application.
This is the main entry point that orchestrates the entire application.
- Integrates an asynchronous PriceFetcher for live market data.
- Integrates a sophisticated GeminiAnalyzer for deep text analysis.
- Implements an automated pipeline to fetch, analyze, and stream top crypto news.
- Serves the interactive frontend and provides all necessary API endpoints.
"""
import asyncio
import json
import os
from contextlib import asynccontextmanager
from typing import Optional, Union
import httpx
from fastapi import FastAPI, Request, BackgroundTasks
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.templating import Jinja2Templates
from pydantic import BaseModel, constr
# Import our modular, asynchronous service classes
from .price_fetcher import PriceFetcher
from .gemini_analyzer import GeminiAnalyzer
from newsapi import NewsApiClient
# --- Pydantic Model for API Input Validation ---
class SentimentRequest(BaseModel):
"""Ensures the text for sentiment analysis is a non-empty string."""
text: constr(strip_whitespace=True, min_length=1)
# --- Application Lifespan for Resource Management ---
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
Manages application startup and shutdown events using the modern
lifespan context manager.
"""
async with httpx.AsyncClient() as client:
# Instantiate and store all services in the application state.
app.state.price_fetcher = PriceFetcher(client=client, coins=["bitcoin", "ethereum", "dogecoin"])
app.state.gemini_analyzer = GeminiAnalyzer(client=client)
app.state.news_api = NewsApiClient(api_key=os.getenv("NEWS_API_KEY"))
# Create separate queues for the two real-time feeds
app.state.sentiment_queue: asyncio.Queue = asyncio.Queue()
app.state.news_queue: asyncio.Queue = asyncio.Queue()
app.state.request_counter = 0
# Create cancellable background tasks for periodic updates.
price_task = asyncio.create_task(
run_periodic_updates(app.state.price_fetcher, interval_seconds=30)
)
news_task = asyncio.create_task(
run_periodic_news_analysis(app, interval_seconds=900) # Run every 15 minutes
)
print("πŸš€ CryptoSentinel Pro started successfully.")
yield
print("⏳ Shutting down background tasks...")
price_task.cancel()
news_task.cancel()
try:
await asyncio.gather(price_task, news_task)
except asyncio.CancelledError:
print("Background tasks cancelled successfully.")
print("βœ… Shutdown complete.")
async def run_periodic_updates(fetcher: PriceFetcher, interval_seconds: int):
"""A robust asyncio background task that periodically updates prices."""
while True:
await fetcher.update_prices_async()
await asyncio.sleep(interval_seconds)
async def run_periodic_news_analysis(app: FastAPI, interval_seconds: int):
"""Fetches, analyzes, and queues top crypto news periodically."""
while True:
print("πŸ“° Fetching latest crypto news for automated analysis...")
try:
top_headlines = app.state.news_api.get_everything(
q='bitcoin OR ethereum OR crypto OR blockchain',
language='en',
sort_by='publishedAt',
page_size=5 # Fetch the 5 most recent articles
)
analyzer: GeminiAnalyzer = app.state.gemini_analyzer
for article in top_headlines.get('articles', []):
title = article.get('title')
if title and "[Removed]" not in title:
# Run the full Gemini analysis on each headline
analysis = await analyzer.analyze_text(title)
# Add the article URL to the payload for the frontend
analysis['url'] = article.get('url')
await app.state.news_queue.put(analysis)
except Exception as e:
print(f"❌ Error during news fetching or analysis: {e}")
await asyncio.sleep(interval_seconds)
# --- FastAPI App Initialization ---
app = FastAPI(title="CryptoSentinel Pro", lifespan=lifespan)
templates = Jinja2Templates(directory="templates")
# --- HTML Rendering Helper ---
def render_analysis_card(payload: dict, is_news: bool = False) -> str:
"""Renders a dictionary of analysis into a styled HTML card."""
s = payload
text_to_show = s.get('summary', 'Analysis failed or not available.')
# Make the summary a clickable link if it's a news item
if is_news:
url = s.get('url', '#')
text_to_show = f'<a href="{url}" target="_blank" rel="noopener noreferrer">{s.get("summary", "N/A")}</a>'
# Dynamically set CSS classes based on analysis results
impact_class = f"impact-{s.get('impact', 'low').lower()}"
sentiment_class = f"sentiment-{s.get('sentiment', 'neutral').lower()}"
return f"""
<div class="card {impact_class}">
<blockquote>{text_to_show}</blockquote>
<div class="grid">
<div><strong>Sentiment:</strong> <span class="{sentiment_class}">{s.get('sentiment')} ({s.get('sentiment_score', 0):.2f})</span></div>
<div><strong>Impact:</strong> {s.get('impact')}</div>
</div>
<div class="grid">
<div><strong>Topic:</strong> {s.get('topic')}</div>
<div><strong>Entities:</strong> {', '.join(s.get('entities', []))}</div>
</div>
</div>
"""
# --- API Endpoints ---
@app.get("/", response_class=HTMLResponse)
async def serve_dashboard(request: Request):
"""Serves the main interactive dashboard from `index.html`."""
return templates.TemplateResponse("index.html", {"request": request})
@app.get("/api/prices", response_class=HTMLResponse)
async def get_prices_fragment(request: Request):
"""Returns an HTML fragment with the latest cached crypto prices for HTMX."""
price_fetcher: PriceFetcher = request.app.state.price_fetcher
prices = price_fetcher.get_current_prices()
html_fragment = "".join(
f"<div><strong>{coin.capitalize()}:</strong> ${price:,.2f}</div>" if isinstance(price, (int, float))
else f"<div><strong>{coin.capitalize()}:</strong> {price}</div>"
for coin, price in prices.items()
)
return HTMLResponse(content=html_fragment)
@app.post("/api/sentiment")
async def analyze_sentiment(payload: SentimentRequest, request: Request, background_tasks: BackgroundTasks):
"""Queues user-submitted text for a full Gemini-powered analysis."""
analyzer: GeminiAnalyzer = request.app.state.gemini_analyzer
async def analysis_task_wrapper():
analysis_result = await analyzer.analyze_text(payload.text)
await request.app.state.sentiment_queue.put(analysis_result)
background_tasks.add_task(analysis_task_wrapper)
return HTMLResponse(content="<small>βœ… Queued for deep analysis...</small>")
@app.get("/api/sentiment/stream")
async def sentiment_stream(request: Request):
"""SSE stream for results from manual sentiment analysis requests."""
queue: asyncio.Queue = request.app.state.sentiment_queue
async def event_generator():
while True:
payload = await queue.get()
html = render_analysis_card(payload)
# =================== FIX APPLIED HERE ===================
data_payload = html.replace('\n', '')
sse_message = f"event: sentiment_update\ndata: {data_payload}\n\n"
yield sse_message
# ========================================================
return StreamingResponse(event_generator(), media_type="text/event-stream")
@app.get("/api/news/stream")
async def news_stream(request: Request):
"""SSE stream for the automated Market Intelligence Feed."""
queue: asyncio.Queue = request.app.state.news_queue
async def event_generator():
while True:
payload = await queue.get()
html = render_analysis_card(payload, is_news=True)
# =================== FIX APPLIED HERE ===================
data_payload = html.replace('\n', '')
sse_message = f"event: news_update\ndata: {data_payload}\n\n"
yield sse_message
# ========================================================
return StreamingResponse(event_generator(), media_type="text/event-stream")