Spaces:
Sleeping
Sleeping
File size: 5,584 Bytes
448d7a9 0e87c05 eaf2b94 175724c 0e87c05 cb01390 0e87c05 eaf2b94 0e87c05 448d7a9 0e87c05 eaf2b94 0e87c05 175724c 0e87c05 175724c 0e87c05 175724c 0e87c05 eaf2b94 175724c 448d7a9 0e87c05 175724c 0e87c05 175724c 0e87c05 175724c 0e87c05 eaf2b94 175724c 0e87c05 175724c 0e87c05 175724c 0e87c05 175724c 0e87c05 eaf2b94 0e87c05 175724c 0e87c05 448d7a9 0e87c05 448d7a9 0e87c05 175724c eaf2b94 175724c 448d7a9 175724c 0e87c05 448d7a9 0e87c05 175724c 0e87c05 175724c 448d7a9 0e87c05 cb01390 175724c 0e87c05 175724c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
"""
CryptoSentinel AI β High-performance FastAPI application.
This is the main entry point that orchestrates the entire application.
- Integrates the asynchronous PriceFetcher for live market data.
- Integrates the asynchronous SentimentAnalyzer for real-time analysis.
- Serves the interactive frontend and provides all necessary API endpoints.
"""
import asyncio
import json
from contextlib import asynccontextmanager
import httpx
from fastapi import FastAPI, Request, BackgroundTasks
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.templating import Jinja2Templates
from pydantic import BaseModel, constr
# These relative imports will now work correctly.
from .price_fetcher import PriceFetcher
from .sentiment import SentimentAnalyzer
# --- Pydantic Model for API Input Validation ---
class SentimentRequest(BaseModel):
"""Ensures the text for sentiment analysis is a non-empty string."""
text: constr(strip_whitespace=True, min_length=1)
# --- Application Lifespan for Resource Management ---
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
Manages application startup and shutdown events using the modern
lifespan context manager.
"""
# On startup:
async with httpx.AsyncClient() as client:
# Instantiate and store our services in the application state.
app.state.price_fetcher = PriceFetcher(client=client, coins=["bitcoin", "ethereum", "dogecoin"])
app.state.sentiment_analyzer = SentimentAnalyzer(client=client)
app.state.request_counter = 0
# Create a cancellable background task for continuous price updates.
price_update_task = asyncio.create_task(
run_periodic_updates(app.state.price_fetcher, interval_seconds=10)
)
print("π CryptoSentinel AI started successfully.")
yield
# On shutdown:
print("β³ Shutting down background tasks...")
price_update_task.cancel()
try:
await price_update_task
except asyncio.CancelledError:
print("Price update task cancelled successfully.")
print("β
Shutdown complete.")
async def run_periodic_updates(fetcher: PriceFetcher, interval_seconds: int):
"""A robust asyncio background task that periodically updates prices."""
while True:
await fetcher.update_prices_async()
await asyncio.sleep(interval_seconds)
# --- FastAPI App Initialization ---
app = FastAPI(title="CryptoSentinel AI", lifespan=lifespan)
# This path assumes the app is run from the root directory.
templates = Jinja2Templates(directory="app/templates")
# --- API Endpoints ---
# ... (The rest of the file is unchanged) ...
@app.get("/", response_class=HTMLResponse)
async def serve_dashboard(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
@app.get("/api/prices", response_class=HTMLResponse)
async def get_prices_fragment(request: Request):
price_fetcher: PriceFetcher = request.app.state.price_fetcher
prices = price_fetcher.get_current_prices()
html_fragment = ""
for coin, price in prices.items():
price_str = f"${price:,.2f}" if isinstance(price, (int, float)) else price
html_fragment += f"<div><strong>{coin.capitalize()}:</strong> {price_str}</div>"
return HTMLResponse(content=html_fragment)
@app.post("/api/sentiment")
async def analyze_sentiment(payload: SentimentRequest, request: Request, background_tasks: BackgroundTasks):
analyzer: SentimentAnalyzer = request.app.state.sentiment_analyzer
request.app.state.request_counter += 1
request_id = request.app.state.request_counter
background_tasks.add_task(analyzer.compute_and_publish, payload.text, request_id)
return HTMLResponse(content="<small>Queued for analysis...</small>")
@app.get("/api/sentiment/stream")
async def sentiment_stream(request: Request):
analyzer: SentimentAnalyzer = request.app.state.sentiment_analyzer
async def event_generator():
yield f"event: sentiment_update\ndata: <div id='sentiment-results' hx-swap-oob='innerHTML'></div>\n\n"
async for result_payload in analyzer.stream_results():
try:
result = result_payload['result']
label = str(result.get('label', 'NEUTRAL')).lower()
score = result.get('score', 0.0) * 100
text = result_payload['text']
html_fragment = f"""
<div>
<blockquote>{text}</blockquote>
<p>
<strong>Result:</strong>
<span class="sentiment-{label}">{label.upper()}</span>
(Confidence: {score:.1f}%)
</p>
</div>
"""
data_payload = html_fragment.replace('\n', '')
sse_message = f"event: sentiment_update\ndata: {data_payload}\n\n"
yield sse_message
except (KeyError, TypeError):
continue
return StreamingResponse(event_generator(), media_type="text/event-stream")
# This block is now mostly for IDEs, the primary run method is the uvicorn command.
if __name__ == "__main__":
# Note: Running this file directly (`python app/app.py`) will fail due to relative imports.
# Use the command: `uvicorn app.app:app --reload` from the project root.
print("To run this application, use the command from the root directory:")
print("uvicorn app.app:app --host 0.0.0.0 --port 7860 --reload") |