""" CryptoSentinel AI — High-performance FastAPI application. This is the main entry point that orchestrates the entire application. - Integrates the asynchronous PriceFetcher for live market data. - Integrates the asynchronous SentimentAnalyzer for real-time analysis. - Serves the interactive frontend and provides all necessary API endpoints. """ import asyncio import json from contextlib import asynccontextmanager import httpx from fastapi import FastAPI, Request, BackgroundTasks from fastapi.responses import HTMLResponse, StreamingResponse from fastapi.templating import Jinja2Templates from pydantic import BaseModel, constr # These relative imports will now work correctly. from .price_fetcher import PriceFetcher from .sentiment import SentimentAnalyzer # --- Pydantic Model for API Input Validation --- class SentimentRequest(BaseModel): """Ensures the text for sentiment analysis is a non-empty string.""" text: constr(strip_whitespace=True, min_length=1) # --- Application Lifespan for Resource Management --- @asynccontextmanager async def lifespan(app: FastAPI): """ Manages application startup and shutdown events using the modern lifespan context manager. """ # On startup: async with httpx.AsyncClient() as client: # Instantiate and store our services in the application state. app.state.price_fetcher = PriceFetcher(client=client, coins=["bitcoin", "ethereum", "dogecoin"]) app.state.sentiment_analyzer = SentimentAnalyzer(client=client) app.state.request_counter = 0 # Create a cancellable background task for continuous price updates. price_update_task = asyncio.create_task( run_periodic_updates(app.state.price_fetcher, interval_seconds=10) ) print("🚀 CryptoSentinel AI started successfully.") yield # On shutdown: print("⏳ Shutting down background tasks...") price_update_task.cancel() try: await price_update_task except asyncio.CancelledError: print("Price update task cancelled successfully.") print("✅ Shutdown complete.") async def run_periodic_updates(fetcher: PriceFetcher, interval_seconds: int): """A robust asyncio background task that periodically updates prices.""" while True: await fetcher.update_prices_async() await asyncio.sleep(interval_seconds) # --- FastAPI App Initialization --- app = FastAPI(title="CryptoSentinel AI", lifespan=lifespan) # This path assumes the app is run from the root directory. templates = Jinja2Templates(directory="app/templates") # --- API Endpoints --- # ... (The rest of the file is unchanged) ... @app.get("/", response_class=HTMLResponse) async def serve_dashboard(request: Request): return templates.TemplateResponse("index.html", {"request": request}) @app.get("/api/prices", response_class=HTMLResponse) async def get_prices_fragment(request: Request): price_fetcher: PriceFetcher = request.app.state.price_fetcher prices = price_fetcher.get_current_prices() html_fragment = "" for coin, price in prices.items(): price_str = f"${price:,.2f}" if isinstance(price, (int, float)) else price html_fragment += f"
{coin.capitalize()}: {price_str}
" return HTMLResponse(content=html_fragment) @app.post("/api/sentiment") async def analyze_sentiment(payload: SentimentRequest, request: Request, background_tasks: BackgroundTasks): analyzer: SentimentAnalyzer = request.app.state.sentiment_analyzer request.app.state.request_counter += 1 request_id = request.app.state.request_counter background_tasks.add_task(analyzer.compute_and_publish, payload.text, request_id) return HTMLResponse(content="Queued for analysis...") @app.get("/api/sentiment/stream") async def sentiment_stream(request: Request): analyzer: SentimentAnalyzer = request.app.state.sentiment_analyzer async def event_generator(): yield f"event: sentiment_update\ndata:
\n\n" async for result_payload in analyzer.stream_results(): try: result = result_payload['result'] label = str(result.get('label', 'NEUTRAL')).lower() score = result.get('score', 0.0) * 100 text = result_payload['text'] html_fragment = f"""
{text}

Result: {label.upper()} (Confidence: {score:.1f}%)

""" data_payload = html_fragment.replace('\n', '') sse_message = f"event: sentiment_update\ndata: {data_payload}\n\n" yield sse_message except (KeyError, TypeError): continue return StreamingResponse(event_generator(), media_type="text/event-stream") # This block is now mostly for IDEs, the primary run method is the uvicorn command. if __name__ == "__main__": # Note: Running this file directly (`python app/app.py`) will fail due to relative imports. # Use the command: `uvicorn app.app:app --reload` from the project root. print("To run this application, use the command from the root directory:") print("uvicorn app.app:app --host 0.0.0.0 --port 7860 --reload")