mgbam's picture
Update app.py
ff9180e verified
raw
history blame
7.2 kB
"""
CryptoSentinel AI β€” High-performance FastAPI application.
This is the main entry point that orchestrates the entire application.
- Integrates the asynchronous PriceFetcher for live market data.
- Integrates the asynchronous SentimentAnalyzer for real-time analysis.
- Serves the interactive frontend and provides all necessary API endpoints.
"""
import asyncio
import json
from contextlib import asynccontextmanager
import httpx
from fastapi import FastAPI, Request, BackgroundTasks
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.templating import Jinja2Templates
from pydantic import BaseModel, constr
# ====================================================================
# FIX APPLIED HERE
# ====================================================================
# Use relative imports because price_fetcher and sentiment are in the same 'app' directory.
from .price_fetcher import PriceFetcher
from .sentiment import SentimentAnalyzer
# ====================================================================
# --- Pydantic Model for API Input Validation ---
class SentimentRequest(BaseModel):
"""Ensures the text for sentiment analysis is a non-empty string."""
text: constr(strip_whitespace=True, min_length=1)
# --- Application Lifespan for Resource Management ---
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
Manages application startup and shutdown events using the modern
lifespan context manager.
"""
# On startup:
async with httpx.AsyncClient() as client:
# Instantiate and store our services in the application state.
# This makes them accessible in any request handler via `request.app.state`.
app.state.price_fetcher = PriceFetcher(client=client, coins=["bitcoin", "ethereum", "dogecoin"])
app.state.sentiment_analyzer = SentimentAnalyzer(client=client)
app.state.request_counter = 0 # Simple counter for unique SSE event IDs
# Create a cancellable background task for continuous price updates.
price_update_task = asyncio.create_task(
run_periodic_updates(app.state.price_fetcher, interval_seconds=10)
)
print("πŸš€ CryptoSentinel AI started successfully.")
yield # The application is now running and ready to accept requests.
# On shutdown:
print("⏳ Shutting down background tasks...")
price_update_task.cancel()
try:
await price_update_task
except asyncio.CancelledError:
print("Price update task cancelled successfully.")
print("βœ… Shutdown complete.")
async def run_periodic_updates(fetcher: PriceFetcher, interval_seconds: int):
"""A robust asyncio background task that periodically updates prices."""
while True:
await fetcher.update_prices_async()
await asyncio.sleep(interval_seconds)
# --- FastAPI App Initialization ---
app = FastAPI(title="CryptoSentinel AI", lifespan=lifespan)
# Assuming your project structure is /app/templates
templates = Jinja2Templates(directory="app/templates")
# --- API Endpoints ---
@app.get("/", response_class=HTMLResponse)
async def serve_dashboard(request: Request):
"""Serves the main interactive dashboard from `index.html`."""
return templates.TemplateResponse("index.html", {"request": request})
@app.get("/api/prices", response_class=HTMLResponse)
async def get_prices_fragment(request: Request):
"""Returns an HTML fragment with the latest cached crypto prices for HTMX."""
price_fetcher: PriceFetcher = request.app.state.price_fetcher
prices = price_fetcher.get_current_prices()
html_fragment = ""
for coin, price in prices.items():
# Format the price nicely, handling the initial '--' state
price_str = f"${price:,.2f}" if isinstance(price, (int, float)) else price
html_fragment += f"<div><strong>{coin.capitalize()}:</strong> {price_str}</div>"
return HTMLResponse(content=html_fragment)
@app.post("/api/sentiment")
async def analyze_sentiment(
payload: SentimentRequest,
request: Request,
background_tasks: BackgroundTasks
):
"""
Validates and queues a text for sentiment analysis. The heavy lifting is
done in the background to ensure the API responds instantly.
"""
analyzer: SentimentAnalyzer = request.app.state.sentiment_analyzer
request.app.state.request_counter += 1
request_id = request.app.state.request_counter
# The actual API call to Hugging Face will run after this response is sent.
background_tasks.add_task(analyzer.compute_and_publish, payload.text, request_id)
return HTMLResponse(content="<small>Queued for analysis...</small>")
@app.get("/api/sentiment/stream")
async def sentiment_stream(request: Request):
"""
Establishes a Server-Sent Events (SSE) connection. It efficiently pushes
new sentiment results as HTML fragments to the client as they become available.
"""
analyzer: SentimentAnalyzer = request.app.state.sentiment_analyzer
async def event_generator():
# Clear the initial "waiting..." message on the client.
# hx-swap-oob="innerHTML" swaps this div out-of-band without affecting the target.
yield f"event: sentiment_update\ndata: <div id='sentiment-results' hx-swap-oob='innerHTML'></div>\n\n"
# Listen for new results from the analyzer's internal queue.
async for result_payload in analyzer.stream_results():
try:
result = result_payload['result']
label = str(result.get('label', 'NEUTRAL')).lower()
score = result.get('score', 0.0) * 100
text = result_payload['text']
# Dynamically build the HTML fragment to be sent to the client.
html_fragment = f"""
<div>
<blockquote>{text}</blockquote>
<p>
<strong>Result:</strong>
<span class="sentiment-{label}">{label.upper()}</span>
(Confidence: {score:.1f}%)
</p>
</div>
"""
# First, process the string to remove newlines. This avoids a
# backslash in the f-string expression, fixing the SyntaxError.
data_payload = html_fragment.replace('\n', '')
# Then, use the clean variable in the f-string to build the message.
sse_message = f"event: sentiment_update\ndata: {data_payload}\n\n"
yield sse_message
except (KeyError, TypeError):
continue # Ignore malformed payloads
return StreamingResponse(event_generator(), media_type="text/event-stream")
# --- Main execution block for local development ---
if __name__ == "__main__":
import uvicorn
# This run command assumes you are running `python app.py` from within the /app directory
# or `python -m app.app` from the parent directory.
uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=True)