mgbam commited on
Commit
0e87c05
Β·
verified Β·
1 Parent(s): fd31489

Update app/main.py

Browse files
Files changed (1) hide show
  1. app/main.py +113 -52
app/main.py CHANGED
@@ -1,81 +1,142 @@
1
  """
2
- CryptoSentinel AI β€” FastAPI app entry point (no /static mount)
3
- - Fetches live prices from CoinGecko
4
- - Provides real-time sentiment analysis via SSE
5
- - Compatible with Hugging Face Spaces
6
- """
7
 
8
- import json
 
 
 
 
 
9
  import asyncio
10
- from pathlib import Path
 
11
 
 
12
  from fastapi import FastAPI, Request, BackgroundTasks
13
- from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse
14
  from fastapi.templating import Jinja2Templates
15
- from apscheduler.schedulers.background import BackgroundScheduler
 
 
 
 
 
 
 
 
 
16
 
17
- from price_fetcher import fetch_prices, CURRENT_PRICES
18
- from sentiment import SentimentCache
19
 
20
- # ────── Setup ───────────────────────────────────────────────────────────────
 
 
 
 
 
 
 
 
 
 
 
21
 
22
- BASE_DIR = Path(__file__).parent
23
- templates = Jinja2Templates(directory=str(BASE_DIR / "templates"))
 
 
24
 
25
- app = FastAPI(title="CryptoSentinel AI (CDN-only static)")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
26
 
27
- # Start background job to refresh prices every 10s
28
- scheduler = BackgroundScheduler(daemon=True)
29
- scheduler.add_job(fetch_prices, trigger="interval", seconds=10)
30
- scheduler.start()
 
31
 
32
- @app.on_event("shutdown")
33
- def shutdown():
34
- scheduler.shutdown(wait=False)
35
 
36
- @app.on_event("startup")
37
- def warmup_model():
38
- # Preload the sentiment model once on startup
39
- SentimentCache.compute("The market is pumping πŸš€")
40
 
41
- # ────── Routes ──────────────────────────────────────────────────────────────
42
 
43
  @app.get("/", response_class=HTMLResponse)
44
  async def index(request: Request):
 
 
 
 
 
45
  """
46
- Renders index.html which now should reference HTMX via CDN:
47
- <script src="https://unpkg.com/[email protected]"></script>
48
  """
49
- return templates.TemplateResponse("index.html", {"request": request})
 
50
 
51
- @app.get("/prices", response_class=JSONResponse)
52
- async def prices():
53
- """Return the latest cached crypto prices."""
54
- return CURRENT_PRICES
 
 
55
 
56
- @app.post("/sentiment")
57
- async def sentiment(request: Request, background_tasks: BackgroundTasks):
 
 
 
 
58
  """
59
- Queue sentiment analysis for the given text.
60
- Frontend will pick up results via SSE.
61
  """
62
- body = await request.json()
63
- background_tasks.add_task(SentimentCache.compute, body.get("text", ""))
64
- return {"status": "queued"}
 
 
 
 
 
 
 
65
 
66
- @app.get("/sentiment/stream")
67
- async def sentiment_stream():
68
  """
69
- Server-Sent Events endpoint that pushes new sentiment results
70
- as they become available in SentimentCache.latest_result.
 
71
  """
 
 
72
  async def event_generator():
73
- last_id = 0
74
  while True:
75
- if SentimentCache.latest_id != last_id:
76
- last_id = SentimentCache.latest_id
77
- payload = json.dumps(SentimentCache.latest_result)
78
- yield f"id:{last_id}\ndata:{payload}\n\n"
79
- await asyncio.sleep(1)
 
 
 
 
80
 
81
- return StreamingResponse(event_generator(), media_type="text/event-stream")
 
1
  """
2
+ CryptoSentinel AI β€” High-performance FastAPI application.
 
 
 
 
3
 
4
+ Features:
5
+ - Fully asynchronous architecture using modern FastAPI lifespan and background tasks.
6
+ - Integrates a robust, async PriceFetcher with multi-API fallback.
7
+ - Provides real-time sentiment analysis via an efficient, non-polling SSE stream.
8
+ - Centralized state management for testability and clarity.
9
+ """
10
  import asyncio
11
+ import json
12
+ from contextlib import asynccontextmanager
13
 
14
+ import httpx
15
  from fastapi import FastAPI, Request, BackgroundTasks
16
+ from fastapi.responses import HTMLResponse, StreamingResponse
17
  from fastapi.templating import Jinja2Templates
18
+ from pydantic import BaseModel, constr
19
+
20
+ from .price_fetcher import PriceFetcher
21
+ from .sentiment import SentimentAnalyzer
22
+
23
+ # --- Configuration & Models ---
24
+
25
+ class SentimentRequest(BaseModel):
26
+ """Pydantic model for validating sentiment analysis requests."""
27
+ text: constr(strip_whitespace=True, min_length=1)
28
 
29
+ # --- Application Lifespan Management ---
 
30
 
31
+ @asynccontextmanager
32
+ async def lifespan(app: FastAPI):
33
+ """
34
+ Manages application startup and shutdown events. This is the modern
35
+ replacement for @app.on_event("startup") and "shutdown".
36
+ """
37
+ # -- Startup --
38
+ # Create a single, shared httpx client for the application's lifespan.
39
+ async with httpx.AsyncClient() as client:
40
+ # Initialize our stateful services
41
+ price_fetcher = PriceFetcher(client=client, coins=["bitcoin", "ethereum", "dogecoin"])
42
+ sentiment_analyzer = SentimentAnalyzer()
43
 
44
+ # Store service instances in the app's state for access in routes
45
+ app.state.price_fetcher = price_fetcher
46
+ app.state.sentiment_analyzer = sentiment_analyzer
47
+ app.state.request_counter = 0
48
 
49
+ # Create a cancellable background task for periodic price updates
50
+ price_update_task = asyncio.create_task(
51
+ run_periodic_updates(price_fetcher, interval_seconds=10)
52
+ )
53
+
54
+ print("πŸš€ CryptoSentinel AI started successfully.")
55
+ yield # The application is now running
56
+
57
+ # -- Shutdown --
58
+ print("⏳ Shutting down background tasks...")
59
+ price_update_task.cancel()
60
+ try:
61
+ await price_update_task
62
+ except asyncio.CancelledError:
63
+ print("Price update task cancelled successfully.")
64
+ print("βœ… Shutdown complete.")
65
 
66
+ async def run_periodic_updates(fetcher: PriceFetcher, interval_seconds: int):
67
+ """A simple, robust asyncio background task runner."""
68
+ while True:
69
+ await fetcher.update_prices_async()
70
+ await asyncio.sleep(interval_seconds)
71
 
72
+ # --- FastAPI App Initialization ---
 
 
73
 
74
+ templates = Jinja2Templates(directory="app/templates")
75
+ app = FastAPI(title="CryptoSentinel AI", lifespan=lifespan)
 
 
76
 
77
+ # --- Routes ---
78
 
79
  @app.get("/", response_class=HTMLResponse)
80
  async def index(request: Request):
81
+ """Renders the main single-page application view."""
82
+ return templates.TemplateResponse("index.html", {"request": request})
83
+
84
+ @app.get("/api/prices", response_class=HTMLResponse)
85
+ async def get_prices_fragment(request: Request):
86
  """
87
+ Returns an HTML fragment with the latest crypto prices.
88
+ Designed to be called by HTMX.
89
  """
90
+ price_fetcher: PriceFetcher = request.app.state.price_fetcher
91
+ prices = price_fetcher.get_current_prices()
92
 
93
+ html_fragment = ""
94
+ for coin, price in prices.items():
95
+ price_str = f"${price:,.2f}" if isinstance(price, (int, float)) else price
96
+ html_fragment += f"<div><strong>{coin.capitalize()}:</strong> {price_str}</div>"
97
+
98
+ return HTMLResponse(content=html_fragment)
99
 
100
+ @app.post("/api/sentiment")
101
+ async def analyze_sentiment(
102
+ payload: SentimentRequest,
103
+ request: Request,
104
+ background_tasks: BackgroundTasks
105
+ ):
106
  """
107
+ Accepts text for sentiment analysis, validates it, and queues it
108
+ for processing in the background.
109
  """
110
+ analyzer: SentimentAnalyzer = request.app.state.sentiment_analyzer
111
+
112
+ # Use a simple counter for unique event IDs
113
+ request.app.state.request_counter += 1
114
+ request_id = request.app.state.request_counter
115
+
116
+ # Add the heavy computation to the background so the API returns instantly
117
+ background_tasks.add_task(analyzer.compute_and_publish, payload.text, request_id)
118
+
119
+ return {"status": "queued", "request_id": request_id}
120
 
121
+ @app.get("/api/sentiment/stream")
122
+ async def sentiment_stream(request: Request):
123
  """
124
+ Server-Sent Events (SSE) endpoint.
125
+ This long-lived connection efficiently waits for new sentiment results
126
+ from the queue and pushes them to the client.
127
  """
128
+ analyzer: SentimentAnalyzer = request.app.state.sentiment_analyzer
129
+
130
  async def event_generator():
 
131
  while True:
132
+ try:
133
+ # This is the key: efficiently wait for a result to be put in the queue
134
+ result_payload = await analyzer.get_next_result()
135
+ payload_str = json.dumps(result_payload)
136
+ yield f"id:{result_payload['id']}\nevent: sentiment_update\ndata:{payload_str}\n\n"
137
+ except asyncio.CancelledError:
138
+ # Handle client disconnect
139
+ print("Client disconnected from SSE stream.")
140
+ break
141
 
142
+ return StreamingResponse(event_generator(), media_type="text/event-stream")