Spaces:
Sleeping
Sleeping
""" | |
CryptoSentinel AI β High-performance FastAPI application. | |
This is the main entry point that orchestrates the entire application. | |
- Integrates the asynchronous PriceFetcher for live market data. | |
- Integrates the asynchronous SentimentAnalyzer for real-time analysis. | |
- Serves the interactive frontend and provides all necessary API endpoints. | |
""" | |
import asyncio | |
import json | |
from contextlib import asynccontextmanager | |
import httpx | |
from fastapi import FastAPI, Request, BackgroundTasks | |
from fastapi.responses import HTMLResponse, StreamingResponse | |
from fastapi.templating import Jinja2Templates | |
from pydantic import BaseModel, constr | |
# ==================================================================== | |
# FIX APPLIED HERE | |
# ==================================================================== | |
# Use relative imports because price_fetcher and sentiment are in the same 'app' directory. | |
from .price_fetcher import PriceFetcher | |
from .sentiment import SentimentAnalyzer | |
# ==================================================================== | |
# --- Pydantic Model for API Input Validation --- | |
class SentimentRequest(BaseModel): | |
"""Ensures the text for sentiment analysis is a non-empty string.""" | |
text: constr(strip_whitespace=True, min_length=1) | |
# --- Application Lifespan for Resource Management --- | |
async def lifespan(app: FastAPI): | |
""" | |
Manages application startup and shutdown events using the modern | |
lifespan context manager. | |
""" | |
# On startup: | |
async with httpx.AsyncClient() as client: | |
# Instantiate and store our services in the application state. | |
# This makes them accessible in any request handler via `request.app.state`. | |
app.state.price_fetcher = PriceFetcher(client=client, coins=["bitcoin", "ethereum", "dogecoin"]) | |
app.state.sentiment_analyzer = SentimentAnalyzer(client=client) | |
app.state.request_counter = 0 # Simple counter for unique SSE event IDs | |
# Create a cancellable background task for continuous price updates. | |
price_update_task = asyncio.create_task( | |
run_periodic_updates(app.state.price_fetcher, interval_seconds=10) | |
) | |
print("π CryptoSentinel AI started successfully.") | |
yield # The application is now running and ready to accept requests. | |
# On shutdown: | |
print("β³ Shutting down background tasks...") | |
price_update_task.cancel() | |
try: | |
await price_update_task | |
except asyncio.CancelledError: | |
print("Price update task cancelled successfully.") | |
print("β Shutdown complete.") | |
async def run_periodic_updates(fetcher: PriceFetcher, interval_seconds: int): | |
"""A robust asyncio background task that periodically updates prices.""" | |
while True: | |
await fetcher.update_prices_async() | |
await asyncio.sleep(interval_seconds) | |
# --- FastAPI App Initialization --- | |
app = FastAPI(title="CryptoSentinel AI", lifespan=lifespan) | |
# Assuming your project structure is /app/templates | |
templates = Jinja2Templates(directory="app/templates") | |
# --- API Endpoints --- | |
async def serve_dashboard(request: Request): | |
"""Serves the main interactive dashboard from `index.html`.""" | |
return templates.TemplateResponse("index.html", {"request": request}) | |
async def get_prices_fragment(request: Request): | |
"""Returns an HTML fragment with the latest cached crypto prices for HTMX.""" | |
price_fetcher: PriceFetcher = request.app.state.price_fetcher | |
prices = price_fetcher.get_current_prices() | |
html_fragment = "" | |
for coin, price in prices.items(): | |
# Format the price nicely, handling the initial '--' state | |
price_str = f"${price:,.2f}" if isinstance(price, (int, float)) else price | |
html_fragment += f"<div><strong>{coin.capitalize()}:</strong> {price_str}</div>" | |
return HTMLResponse(content=html_fragment) | |
async def analyze_sentiment( | |
payload: SentimentRequest, | |
request: Request, | |
background_tasks: BackgroundTasks | |
): | |
""" | |
Validates and queues a text for sentiment analysis. The heavy lifting is | |
done in the background to ensure the API responds instantly. | |
""" | |
analyzer: SentimentAnalyzer = request.app.state.sentiment_analyzer | |
request.app.state.request_counter += 1 | |
request_id = request.app.state.request_counter | |
# The actual API call to Hugging Face will run after this response is sent. | |
background_tasks.add_task(analyzer.compute_and_publish, payload.text, request_id) | |
return HTMLResponse(content="<small>Queued for analysis...</small>") | |
async def sentiment_stream(request: Request): | |
""" | |
Establishes a Server-Sent Events (SSE) connection. It efficiently pushes | |
new sentiment results as HTML fragments to the client as they become available. | |
""" | |
analyzer: SentimentAnalyzer = request.app.state.sentiment_analyzer | |
async def event_generator(): | |
# Clear the initial "waiting..." message on the client. | |
# hx-swap-oob="innerHTML" swaps this div out-of-band without affecting the target. | |
yield f"event: sentiment_update\ndata: <div id='sentiment-results' hx-swap-oob='innerHTML'></div>\n\n" | |
# Listen for new results from the analyzer's internal queue. | |
async for result_payload in analyzer.stream_results(): | |
try: | |
result = result_payload['result'] | |
label = str(result.get('label', 'NEUTRAL')).lower() | |
score = result.get('score', 0.0) * 100 | |
text = result_payload['text'] | |
# Dynamically build the HTML fragment to be sent to the client. | |
html_fragment = f""" | |
<div> | |
<blockquote>{text}</blockquote> | |
<p> | |
<strong>Result:</strong> | |
<span class="sentiment-{label}">{label.upper()}</span> | |
(Confidence: {score:.1f}%) | |
</p> | |
</div> | |
""" | |
# First, process the string to remove newlines. This avoids a | |
# backslash in the f-string expression, fixing the SyntaxError. | |
data_payload = html_fragment.replace('\n', '') | |
# Then, use the clean variable in the f-string to build the message. | |
sse_message = f"event: sentiment_update\ndata: {data_payload}\n\n" | |
yield sse_message | |
except (KeyError, TypeError): | |
continue # Ignore malformed payloads | |
return StreamingResponse(event_generator(), media_type="text/event-stream") | |
# --- Main execution block for local development --- | |
if __name__ == "__main__": | |
import uvicorn | |
# This run command assumes you are running `python app.py` from within the /app directory | |
# or `python -m app.app` from the parent directory. | |
uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=True) |