Spaces:
Sleeping
Sleeping
| """ | |
| CryptoSentinel AI β High-performance FastAPI application. | |
| This is the main entry point that orchestrates the entire application. | |
| - Integrates the asynchronous PriceFetcher for live market data. | |
| - Integrates the asynchronous SentimentAnalyzer for real-time analysis. | |
| - Serves the interactive frontend and provides all necessary API endpoints. | |
| """ | |
| import asyncio | |
| import json | |
| from contextlib import asynccontextmanager | |
| import httpx | |
| from fastapi import FastAPI, Request, BackgroundTasks | |
| from fastapi.responses import HTMLResponse, StreamingResponse | |
| from fastapi.templating import Jinja2Templates | |
| from pydantic import BaseModel, constr | |
| # Import our modular, asynchronous service classes | |
| from app.price_fetcher import PriceFetcher | |
| from app.sentiment import SentimentAnalyzer | |
| # --- Pydantic Model for API Input Validation --- | |
| class SentimentRequest(BaseModel): | |
| """Ensures the text for sentiment analysis is a non-empty string.""" | |
| text: constr(strip_whitespace=True, min_length=1) | |
| # --- Application Lifespan for Resource Management --- | |
| async def lifespan(app: FastAPI): | |
| """ | |
| Manages application startup and shutdown events using the modern | |
| lifespan context manager. | |
| """ | |
| # On startup: | |
| async with httpx.AsyncClient() as client: | |
| # Instantiate and store our services in the application state. | |
| # This makes them accessible in any request handler via `request.app.state`. | |
| app.state.price_fetcher = PriceFetcher(client=client, coins=["bitcoin", "ethereum", "dogecoin"]) | |
| app.state.sentiment_analyzer = SentimentAnalyzer(client=client) | |
| app.state.request_counter = 0 # Simple counter for unique SSE event IDs | |
| # Create a cancellable background task for continuous price updates. | |
| price_update_task = asyncio.create_task( | |
| run_periodic_updates(app.state.price_fetcher, interval_seconds=10) | |
| ) | |
| print("π CryptoSentinel AI started successfully.") | |
| yield # The application is now running and ready to accept requests. | |
| # On shutdown: | |
| print("β³ Shutting down background tasks...") | |
| price_update_task.cancel() | |
| try: | |
| await price_update_task | |
| except asyncio.CancelledError: | |
| print("Price update task cancelled successfully.") | |
| print("β Shutdown complete.") | |
| async def run_periodic_updates(fetcher: PriceFetcher, interval_seconds: int): | |
| """A robust asyncio background task that periodically updates prices.""" | |
| while True: | |
| await fetcher.update_prices_async() | |
| await asyncio.sleep(interval_seconds) | |
| # --- FastAPI App Initialization --- | |
| app = FastAPI(title="CryptoSentinel AI", lifespan=lifespan) | |
| templates = Jinja2Templates(directory="app/templates") | |
| # --- API Endpoints --- | |
| async def serve_dashboard(request: Request): | |
| """Serves the main interactive dashboard from `index.html`.""" | |
| return templates.TemplateResponse("index.html", {"request": request}) | |
| async def get_prices_fragment(request: Request): | |
| """Returns an HTML fragment with the latest cached crypto prices for HTMX.""" | |
| price_fetcher: PriceFetcher = request.app.state.price_fetcher | |
| prices = price_fetcher.get_current_prices() | |
| html_fragment = "" | |
| for coin, price in prices.items(): | |
| # Format the price nicely, handling the initial '--' state | |
| price_str = f"${price:,.2f}" if isinstance(price, (int, float)) else price | |
| html_fragment += f"<div><strong>{coin.capitalize()}:</strong> {price_str}</div>" | |
| return HTMLResponse(content=html_fragment) | |
| async def analyze_sentiment( | |
| payload: SentimentRequest, | |
| request: Request, | |
| background_tasks: BackgroundTasks | |
| ): | |
| """ | |
| Validates and queues a text for sentiment analysis. The heavy lifting is | |
| done in the background to ensure the API responds instantly. | |
| """ | |
| analyzer: SentimentAnalyzer = request.app.state.sentiment_analyzer | |
| request.app.state.request_counter += 1 | |
| request_id = request.app.state.request_counter | |
| # The actual API call to Hugging Face will run after this response is sent. | |
| background_tasks.add_task(analyzer.compute_and_publish, payload.text, request_id) | |
| return HTMLResponse(content="<small>Queued for analysis...</small>") | |
| async def sentiment_stream(request: Request): | |
| """ | |
| Establishes a Server-Sent Events (SSE) connection. It efficiently pushes | |
| new sentiment results as HTML fragments to the client as they become available. | |
| """ | |
| analyzer: SentimentAnalyzer = request.app.state.sentiment_analyzer | |
| async def event_generator(): | |
| # Clear the initial "waiting..." message on the client. | |
| # hx-swap-oob="innerHTML" swaps this div out-of-band without affecting the target. | |
| yield f"event: sentiment_update\ndata: <div id='sentiment-results' hx-swap-oob='innerHTML'></div>\n\n" | |
| # Listen for new results from the analyzer's internal queue. | |
| async for result_payload in analyzer.stream_results(): | |
| try: | |
| result = result_payload['result'] | |
| label = str(result.get('label', 'NEUTRAL')).lower() | |
| score = result.get('score', 0.0) * 100 | |
| text = result_payload['text'] | |
| # Dynamically build the HTML fragment to be sent to the client. | |
| html_fragment = f""" | |
| <div> | |
| <blockquote>{text}</blockquote> | |
| <p> | |
| <strong>Result:</strong> | |
| <span class="sentiment-{label}">{label.upper()}</span> | |
| (Confidence: {score:.1f}%) | |
| </p> | |
| </div> | |
| """ | |
| # Send the fragment using our custom event name. | |
| yield f"event: sentiment_update\ndata: {html_fragment.replace('\n', '')}\n\n" | |
| except (KeyError, TypeError): | |
| continue # Ignore malformed payloads | |
| return StreamingResponse(event_generator(), media_type="text/event-stream") | |
| # --- Main execution block for local development --- | |
| if __name__ == "__main__": | |
| import uvicorn | |
| # Correct run command for a file named 'app.py' | |
| uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=True) |