mgbam commited on
Commit
175724c
Β·
verified Β·
1 Parent(s): ff9180e

Update app/main.py

Browse files
Files changed (1) hide show
  1. app/main.py +56 -67
app/main.py CHANGED
@@ -1,11 +1,10 @@
1
  """
2
  CryptoSentinel AI β€” High-performance FastAPI application.
3
 
4
- Features:
5
- - Fully asynchronous architecture using modern FastAPI lifespan and background tasks.
6
- - Integrates a robust, async PriceFetcher with multi-API fallback.
7
- - Provides real-time sentiment analysis via an efficient, non-polling SSE stream.
8
- - Centralized state management for testability and clarity.
9
  """
10
  import asyncio
11
  import json
@@ -17,44 +16,41 @@ from fastapi.responses import HTMLResponse, StreamingResponse
17
  from fastapi.templating import Jinja2Templates
18
  from pydantic import BaseModel, constr
19
 
 
20
  from .price_fetcher import PriceFetcher
21
  from .sentiment import SentimentAnalyzer
22
 
23
- # --- Configuration & Models ---
 
24
 
25
  class SentimentRequest(BaseModel):
26
- """Pydantic model for validating sentiment analysis requests."""
27
  text: constr(strip_whitespace=True, min_length=1)
28
 
29
- # --- Application Lifespan Management ---
30
 
31
  @asynccontextmanager
32
  async def lifespan(app: FastAPI):
33
  """
34
- Manages application startup and shutdown events. This is the modern
35
- replacement for @app.on_event("startup") and "shutdown".
36
  """
37
- # -- Startup --
38
- # Create a single, shared httpx client for the application's lifespan.
39
  async with httpx.AsyncClient() as client:
40
- # Initialize our stateful services
41
- price_fetcher = PriceFetcher(client=client, coins=["bitcoin", "ethereum", "dogecoin"])
42
- sentiment_analyzer = SentimentAnalyzer()
43
-
44
- # Store service instances in the app's state for access in routes
45
- app.state.price_fetcher = price_fetcher
46
- app.state.sentiment_analyzer = sentiment_analyzer
47
  app.state.request_counter = 0
48
 
49
- # Create a cancellable background task for periodic price updates
50
  price_update_task = asyncio.create_task(
51
- run_periodic_updates(price_fetcher, interval_seconds=10)
52
  )
53
 
54
  print("πŸš€ CryptoSentinel AI started successfully.")
55
- yield # The application is now running
56
 
57
- # -- Shutdown --
58
  print("⏳ Shutting down background tasks...")
59
  price_update_task.cancel()
60
  try:
@@ -64,79 +60,72 @@ async def lifespan(app: FastAPI):
64
  print("βœ… Shutdown complete.")
65
 
66
  async def run_periodic_updates(fetcher: PriceFetcher, interval_seconds: int):
67
- """A simple, robust asyncio background task runner."""
68
  while True:
69
  await fetcher.update_prices_async()
70
  await asyncio.sleep(interval_seconds)
71
 
72
  # --- FastAPI App Initialization ---
73
 
74
- templates = Jinja2Templates(directory="app/templates")
75
  app = FastAPI(title="CryptoSentinel AI", lifespan=lifespan)
 
 
76
 
77
- # --- Routes ---
78
-
79
  @app.get("/", response_class=HTMLResponse)
80
- async def index(request: Request):
81
- """Renders the main single-page application view."""
82
  return templates.TemplateResponse("index.html", {"request": request})
83
 
84
  @app.get("/api/prices", response_class=HTMLResponse)
85
  async def get_prices_fragment(request: Request):
86
- """
87
- Returns an HTML fragment with the latest crypto prices.
88
- Designed to be called by HTMX.
89
- """
90
  price_fetcher: PriceFetcher = request.app.state.price_fetcher
91
  prices = price_fetcher.get_current_prices()
92
-
93
  html_fragment = ""
94
  for coin, price in prices.items():
95
  price_str = f"${price:,.2f}" if isinstance(price, (int, float)) else price
96
  html_fragment += f"<div><strong>{coin.capitalize()}:</strong> {price_str}</div>"
97
-
98
  return HTMLResponse(content=html_fragment)
99
 
100
  @app.post("/api/sentiment")
101
- async def analyze_sentiment(
102
- payload: SentimentRequest,
103
- request: Request,
104
- background_tasks: BackgroundTasks
105
- ):
106
- """
107
- Accepts text for sentiment analysis, validates it, and queues it
108
- for processing in the background.
109
- """
110
  analyzer: SentimentAnalyzer = request.app.state.sentiment_analyzer
111
-
112
- # Use a simple counter for unique event IDs
113
  request.app.state.request_counter += 1
114
  request_id = request.app.state.request_counter
115
-
116
- # Add the heavy computation to the background so the API returns instantly
117
  background_tasks.add_task(analyzer.compute_and_publish, payload.text, request_id)
118
-
119
- return {"status": "queued", "request_id": request_id}
120
 
121
  @app.get("/api/sentiment/stream")
122
  async def sentiment_stream(request: Request):
123
- """
124
- Server-Sent Events (SSE) endpoint.
125
- This long-lived connection efficiently waits for new sentiment results
126
- from the queue and pushes them to the client.
127
- """
128
  analyzer: SentimentAnalyzer = request.app.state.sentiment_analyzer
129
-
130
  async def event_generator():
131
- while True:
 
132
  try:
133
- # This is the key: efficiently wait for a result to be put in the queue
134
- result_payload = await analyzer.get_next_result()
135
- payload_str = json.dumps(result_payload)
136
- yield f"id:{result_payload['id']}\nevent: sentiment_update\ndata:{payload_str}\n\n"
137
- except asyncio.CancelledError:
138
- # Handle client disconnect
139
- print("Client disconnected from SSE stream.")
140
- break
141
-
142
- return StreamingResponse(event_generator(), media_type="text/event-stream")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  """
2
  CryptoSentinel AI β€” High-performance FastAPI application.
3
 
4
+ This is the main entry point that orchestrates the entire application.
5
+ - Integrates the asynchronous PriceFetcher for live market data.
6
+ - Integrates the asynchronous SentimentAnalyzer for real-time analysis.
7
+ - Serves the interactive frontend and provides all necessary API endpoints.
 
8
  """
9
  import asyncio
10
  import json
 
16
  from fastapi.templating import Jinja2Templates
17
  from pydantic import BaseModel, constr
18
 
19
+ # These relative imports will now work correctly.
20
  from .price_fetcher import PriceFetcher
21
  from .sentiment import SentimentAnalyzer
22
 
23
+
24
+ # --- Pydantic Model for API Input Validation ---
25
 
26
  class SentimentRequest(BaseModel):
27
+ """Ensures the text for sentiment analysis is a non-empty string."""
28
  text: constr(strip_whitespace=True, min_length=1)
29
 
30
+ # --- Application Lifespan for Resource Management ---
31
 
32
  @asynccontextmanager
33
  async def lifespan(app: FastAPI):
34
  """
35
+ Manages application startup and shutdown events using the modern
36
+ lifespan context manager.
37
  """
38
+ # On startup:
 
39
  async with httpx.AsyncClient() as client:
40
+ # Instantiate and store our services in the application state.
41
+ app.state.price_fetcher = PriceFetcher(client=client, coins=["bitcoin", "ethereum", "dogecoin"])
42
+ app.state.sentiment_analyzer = SentimentAnalyzer(client=client)
 
 
 
 
43
  app.state.request_counter = 0
44
 
45
+ # Create a cancellable background task for continuous price updates.
46
  price_update_task = asyncio.create_task(
47
+ run_periodic_updates(app.state.price_fetcher, interval_seconds=10)
48
  )
49
 
50
  print("πŸš€ CryptoSentinel AI started successfully.")
51
+ yield
52
 
53
+ # On shutdown:
54
  print("⏳ Shutting down background tasks...")
55
  price_update_task.cancel()
56
  try:
 
60
  print("βœ… Shutdown complete.")
61
 
62
  async def run_periodic_updates(fetcher: PriceFetcher, interval_seconds: int):
63
+ """A robust asyncio background task that periodically updates prices."""
64
  while True:
65
  await fetcher.update_prices_async()
66
  await asyncio.sleep(interval_seconds)
67
 
68
  # --- FastAPI App Initialization ---
69
 
 
70
  app = FastAPI(title="CryptoSentinel AI", lifespan=lifespan)
71
+ # This path assumes the app is run from the root directory.
72
+ templates = Jinja2Templates(directory="app/templates")
73
 
74
+ # --- API Endpoints ---
75
+ # ... (The rest of the file is unchanged) ...
76
  @app.get("/", response_class=HTMLResponse)
77
+ async def serve_dashboard(request: Request):
 
78
  return templates.TemplateResponse("index.html", {"request": request})
79
 
80
  @app.get("/api/prices", response_class=HTMLResponse)
81
  async def get_prices_fragment(request: Request):
 
 
 
 
82
  price_fetcher: PriceFetcher = request.app.state.price_fetcher
83
  prices = price_fetcher.get_current_prices()
 
84
  html_fragment = ""
85
  for coin, price in prices.items():
86
  price_str = f"${price:,.2f}" if isinstance(price, (int, float)) else price
87
  html_fragment += f"<div><strong>{coin.capitalize()}:</strong> {price_str}</div>"
 
88
  return HTMLResponse(content=html_fragment)
89
 
90
  @app.post("/api/sentiment")
91
+ async def analyze_sentiment(payload: SentimentRequest, request: Request, background_tasks: BackgroundTasks):
 
 
 
 
 
 
 
 
92
  analyzer: SentimentAnalyzer = request.app.state.sentiment_analyzer
 
 
93
  request.app.state.request_counter += 1
94
  request_id = request.app.state.request_counter
 
 
95
  background_tasks.add_task(analyzer.compute_and_publish, payload.text, request_id)
96
+ return HTMLResponse(content="<small>Queued for analysis...</small>")
 
97
 
98
  @app.get("/api/sentiment/stream")
99
  async def sentiment_stream(request: Request):
 
 
 
 
 
100
  analyzer: SentimentAnalyzer = request.app.state.sentiment_analyzer
 
101
  async def event_generator():
102
+ yield f"event: sentiment_update\ndata: <div id='sentiment-results' hx-swap-oob='innerHTML'></div>\n\n"
103
+ async for result_payload in analyzer.stream_results():
104
  try:
105
+ result = result_payload['result']
106
+ label = str(result.get('label', 'NEUTRAL')).lower()
107
+ score = result.get('score', 0.0) * 100
108
+ text = result_payload['text']
109
+ html_fragment = f"""
110
+ <div>
111
+ <blockquote>{text}</blockquote>
112
+ <p>
113
+ <strong>Result:</strong>
114
+ <span class="sentiment-{label}">{label.upper()}</span>
115
+ (Confidence: {score:.1f}%)
116
+ </p>
117
+ </div>
118
+ """
119
+ data_payload = html_fragment.replace('\n', '')
120
+ sse_message = f"event: sentiment_update\ndata: {data_payload}\n\n"
121
+ yield sse_message
122
+ except (KeyError, TypeError):
123
+ continue
124
+ return StreamingResponse(event_generator(), media_type="text/event-stream")
125
+
126
+ # This block is now mostly for IDEs, the primary run method is the uvicorn command.
127
+ if __name__ == "__main__":
128
+ # Note: Running this file directly (`python app/app.py`) will fail due to relative imports.
129
+ # Use the command: `uvicorn app.app:app --reload` from the project root.
130
+ print("To run this application, use the command from the root directory:")
131
+ print("uvicorn app.app:app --host 0.0.0.0 --port 7860 --reload")