Spaces:
Running
Running
| """ | |
| CryptoSentinel AI β FastAPI app entry point (no /static mount) | |
| - Fetches live prices from CoinGecko | |
| - Provides real-time sentiment analysis via SSE | |
| - Compatible with Hugging Face Spaces | |
| """ | |
| import json | |
| import asyncio | |
| from pathlib import Path | |
| from fastapi import FastAPI, Request, BackgroundTasks | |
| from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse | |
| from fastapi.templating import Jinja2Templates | |
| from apscheduler.schedulers.background import BackgroundScheduler | |
| from price_fetcher import fetch_prices, CURRENT_PRICES | |
| from sentiment import SentimentCache | |
| # ββββββ Setup βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| BASE_DIR = Path(__file__).parent | |
| templates = Jinja2Templates(directory=str(BASE_DIR / "templates")) | |
| app = FastAPI(title="CryptoSentinel AI (CDN-only static)") | |
| # Start background job to refresh prices every 10s | |
| scheduler = BackgroundScheduler(daemon=True) | |
| scheduler.add_job(fetch_prices, trigger="interval", seconds=10) | |
| scheduler.start() | |
| def shutdown(): | |
| scheduler.shutdown(wait=False) | |
| def warmup_model(): | |
| # Preload the sentiment model once on startup | |
| SentimentCache.compute("The market is pumping π") | |
| # ββββββ Routes ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def index(request: Request): | |
| """ | |
| Renders index.html which now should reference HTMX via CDN: | |
| <script src="https://unpkg.com/[email protected]"></script> | |
| """ | |
| return templates.TemplateResponse("index.html", {"request": request}) | |
| async def prices(): | |
| """Return the latest cached crypto prices.""" | |
| return CURRENT_PRICES | |
| async def sentiment(request: Request, background_tasks: BackgroundTasks): | |
| """ | |
| Queue sentiment analysis for the given text. | |
| Frontend will pick up results via SSE. | |
| """ | |
| body = await request.json() | |
| background_tasks.add_task(SentimentCache.compute, body.get("text", "")) | |
| return {"status": "queued"} | |
| async def sentiment_stream(): | |
| """ | |
| Server-Sent Events endpoint that pushes new sentiment results | |
| as they become available in SentimentCache.latest_result. | |
| """ | |
| async def event_generator(): | |
| last_id = 0 | |
| while True: | |
| if SentimentCache.latest_id != last_id: | |
| last_id = SentimentCache.latest_id | |
| payload = json.dumps(SentimentCache.latest_result) | |
| yield f"id:{last_id}\ndata:{payload}\n\n" | |
| await asyncio.sleep(1) | |
| return StreamingResponse(event_generator(), media_type="text/event-stream") | |