Spaces:
Sleeping
Sleeping
""" | |
CryptoSentinel AI β High-performance FastAPI application. | |
This is the main entry point that orchestrates the entire application. | |
- Integrates the asynchronous PriceFetcher for live market data. | |
- Integrates the asynchronous SentimentAnalyzer for real-time analysis. | |
- Serves the interactive frontend and provides all necessary API endpoints. | |
""" | |
import asyncio | |
import json | |
from contextlib import asynccontextmanager | |
import httpx | |
from fastapi import FastAPI, Request, BackgroundTasks | |
from fastapi.responses import HTMLResponse, StreamingResponse | |
from fastapi.templating import Jinja2Templates | |
from pydantic import BaseModel, constr | |
# These relative imports will now work correctly. | |
from .price_fetcher import PriceFetcher | |
from .sentiment import SentimentAnalyzer | |
# --- Pydantic Model for API Input Validation --- | |
class SentimentRequest(BaseModel): | |
"""Ensures the text for sentiment analysis is a non-empty string.""" | |
text: constr(strip_whitespace=True, min_length=1) | |
# --- Application Lifespan for Resource Management --- | |
async def lifespan(app: FastAPI): | |
""" | |
Manages application startup and shutdown events using the modern | |
lifespan context manager. | |
""" | |
# On startup: | |
async with httpx.AsyncClient() as client: | |
# Instantiate and store our services in the application state. | |
app.state.price_fetcher = PriceFetcher(client=client, coins=["bitcoin", "ethereum", "dogecoin"]) | |
app.state.sentiment_analyzer = SentimentAnalyzer(client=client) | |
app.state.request_counter = 0 | |
# Create a cancellable background task for continuous price updates. | |
price_update_task = asyncio.create_task( | |
run_periodic_updates(app.state.price_fetcher, interval_seconds=10) | |
) | |
print("π CryptoSentinel AI started successfully.") | |
yield | |
# On shutdown: | |
print("β³ Shutting down background tasks...") | |
price_update_task.cancel() | |
try: | |
await price_update_task | |
except asyncio.CancelledError: | |
print("Price update task cancelled successfully.") | |
print("β Shutdown complete.") | |
async def run_periodic_updates(fetcher: PriceFetcher, interval_seconds: int): | |
"""A robust asyncio background task that periodically updates prices.""" | |
while True: | |
await fetcher.update_prices_async() | |
await asyncio.sleep(interval_seconds) | |
# --- FastAPI App Initialization --- | |
app = FastAPI(title="CryptoSentinel AI", lifespan=lifespan) | |
# This path assumes the app is run from the root directory. | |
templates = Jinja2Templates(directory="app/templates") | |
# --- API Endpoints --- | |
# ... (The rest of the file is unchanged) ... | |
async def serve_dashboard(request: Request): | |
return templates.TemplateResponse("index.html", {"request": request}) | |
async def get_prices_fragment(request: Request): | |
price_fetcher: PriceFetcher = request.app.state.price_fetcher | |
prices = price_fetcher.get_current_prices() | |
html_fragment = "" | |
for coin, price in prices.items(): | |
price_str = f"${price:,.2f}" if isinstance(price, (int, float)) else price | |
html_fragment += f"<div><strong>{coin.capitalize()}:</strong> {price_str}</div>" | |
return HTMLResponse(content=html_fragment) | |
async def analyze_sentiment(payload: SentimentRequest, request: Request, background_tasks: BackgroundTasks): | |
analyzer: SentimentAnalyzer = request.app.state.sentiment_analyzer | |
request.app.state.request_counter += 1 | |
request_id = request.app.state.request_counter | |
background_tasks.add_task(analyzer.compute_and_publish, payload.text, request_id) | |
return HTMLResponse(content="<small>Queued for analysis...</small>") | |
async def sentiment_stream(request: Request): | |
analyzer: SentimentAnalyzer = request.app.state.sentiment_analyzer | |
async def event_generator(): | |
yield f"event: sentiment_update\ndata: <div id='sentiment-results' hx-swap-oob='innerHTML'></div>\n\n" | |
async for result_payload in analyzer.stream_results(): | |
try: | |
result = result_payload['result'] | |
label = str(result.get('label', 'NEUTRAL')).lower() | |
score = result.get('score', 0.0) * 100 | |
text = result_payload['text'] | |
html_fragment = f""" | |
<div> | |
<blockquote>{text}</blockquote> | |
<p> | |
<strong>Result:</strong> | |
<span class="sentiment-{label}">{label.upper()}</span> | |
(Confidence: {score:.1f}%) | |
</p> | |
</div> | |
""" | |
data_payload = html_fragment.replace('\n', '') | |
sse_message = f"event: sentiment_update\ndata: {data_payload}\n\n" | |
yield sse_message | |
except (KeyError, TypeError): | |
continue | |
return StreamingResponse(event_generator(), media_type="text/event-stream") | |
# This block is now mostly for IDEs, the primary run method is the uvicorn command. | |
if __name__ == "__main__": | |
# Note: Running this file directly (`python app/app.py`) will fail due to relative imports. | |
# Use the command: `uvicorn app.app:app --reload` from the project root. | |
print("To run this application, use the command from the root directory:") | |
print("uvicorn app.app:app --host 0.0.0.0 --port 7860 --reload") |