mgbam's picture
Create app/main.py
448d7a9 verified
raw
history blame
2.26 kB
"""
CryptoSentinel AI – FastAPI entry‑point
"""
import os
from fastapi import FastAPI, Request, BackgroundTasks
from fastapi.responses import HTMLResponse, StreamingResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from apscheduler.schedulers.background import BackgroundScheduler
from dotenv import load_dotenv
from price_fetcher import fetch_prices, CURRENT_PRICES
from sentiment import analyze_sentiment, SentimentCache
from pathlib import Path
import json
import asyncio
load_dotenv()
app = FastAPI(title="CryptoSentinel AI")
templ_dir = Path(__file__).parent / "templates"
app.mount("/static", StaticFiles(directory=Path(__file__).parent / "static"), name="static")
scheduler = BackgroundScheduler(daemon=True)
scheduler.add_job(fetch_prices, "interval", seconds=10) # refresh price cache
scheduler.start()
@app.on_event("shutdown")
def shutdown_event():
scheduler.shutdown(wait=False)
# ---------- ROUTES -----------------------------------------------------------
@app.get("/", response_class=HTMLResponse)
async def index(request: Request):
from fastapi.templating import Jinja2Templates
templates = Jinja2Templates(directory=str(templ_dir))
return templates.TemplateResponse("index.html", {"request": request})
@app.get("/prices")
async def prices():
"""JSON endpoint for latest cached prices."""
return JSONResponse(CURRENT_PRICES)
@app.post("/sentiment")
async def sentiment(request: Request, background_tasks: BackgroundTasks):
body = await request.json()
text = body.get("text", "")
# Run expensive inference in background to keep latency low
background_tasks.add_task(SentimentCache.compute, text)
return {"status": "processing"}
@app.get("/sentiment/stream")
async def sentiment_stream():
"""Server‑sent events stream – pushes sentiment results."""
async def event_generator():
last_id = 0
while True:
if SentimentCache.latest_id != last_id:
last_id = SentimentCache.latest_id
data = json.dumps(SentimentCache.latest_result)
yield f"id:{last_id}\ndata:{data}\n\n"
await asyncio.sleep(1)
return StreamingResponse(event_generator(), media_type="text/event-stream")