Spaces:
Sleeping
Sleeping
File size: 7,198 Bytes
7239f1c 526c84c 7239f1c 526c84c 7239f1c 526c84c ff9180e 526c84c 7239f1c 526c84c 7239f1c 526c84c 7239f1c 526c84c 7239f1c 526c84c 7239f1c ff9180e 7239f1c 526c84c 5e698c6 7239f1c 526c84c 7239f1c 526c84c 7239f1c 526c84c 7239f1c 526c84c 7239f1c 526c84c 7239f1c 526c84c 7239f1c 526c84c 7239f1c 526c84c 7239f1c ff9180e 67188f2 ff9180e 67188f2 7239f1c 5e698c6 526c84c ff9180e 7239f1c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
"""
CryptoSentinel AI β High-performance FastAPI application.
This is the main entry point that orchestrates the entire application.
- Integrates the asynchronous PriceFetcher for live market data.
- Integrates the asynchronous SentimentAnalyzer for real-time analysis.
- Serves the interactive frontend and provides all necessary API endpoints.
"""
import asyncio
import json
from contextlib import asynccontextmanager
import httpx
from fastapi import FastAPI, Request, BackgroundTasks
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.templating import Jinja2Templates
from pydantic import BaseModel, constr
# ====================================================================
# FIX APPLIED HERE
# ====================================================================
# Use relative imports because price_fetcher and sentiment are in the same 'app' directory.
from .price_fetcher import PriceFetcher
from .sentiment import SentimentAnalyzer
# ====================================================================
# --- Pydantic Model for API Input Validation ---
class SentimentRequest(BaseModel):
"""Ensures the text for sentiment analysis is a non-empty string."""
text: constr(strip_whitespace=True, min_length=1)
# --- Application Lifespan for Resource Management ---
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
Manages application startup and shutdown events using the modern
lifespan context manager.
"""
# On startup:
async with httpx.AsyncClient() as client:
# Instantiate and store our services in the application state.
# This makes them accessible in any request handler via `request.app.state`.
app.state.price_fetcher = PriceFetcher(client=client, coins=["bitcoin", "ethereum", "dogecoin"])
app.state.sentiment_analyzer = SentimentAnalyzer(client=client)
app.state.request_counter = 0 # Simple counter for unique SSE event IDs
# Create a cancellable background task for continuous price updates.
price_update_task = asyncio.create_task(
run_periodic_updates(app.state.price_fetcher, interval_seconds=10)
)
print("π CryptoSentinel AI started successfully.")
yield # The application is now running and ready to accept requests.
# On shutdown:
print("β³ Shutting down background tasks...")
price_update_task.cancel()
try:
await price_update_task
except asyncio.CancelledError:
print("Price update task cancelled successfully.")
print("β
Shutdown complete.")
async def run_periodic_updates(fetcher: PriceFetcher, interval_seconds: int):
"""A robust asyncio background task that periodically updates prices."""
while True:
await fetcher.update_prices_async()
await asyncio.sleep(interval_seconds)
# --- FastAPI App Initialization ---
app = FastAPI(title="CryptoSentinel AI", lifespan=lifespan)
# Assuming your project structure is /app/templates
templates = Jinja2Templates(directory="app/templates")
# --- API Endpoints ---
@app.get("/", response_class=HTMLResponse)
async def serve_dashboard(request: Request):
"""Serves the main interactive dashboard from `index.html`."""
return templates.TemplateResponse("index.html", {"request": request})
@app.get("/api/prices", response_class=HTMLResponse)
async def get_prices_fragment(request: Request):
"""Returns an HTML fragment with the latest cached crypto prices for HTMX."""
price_fetcher: PriceFetcher = request.app.state.price_fetcher
prices = price_fetcher.get_current_prices()
html_fragment = ""
for coin, price in prices.items():
# Format the price nicely, handling the initial '--' state
price_str = f"${price:,.2f}" if isinstance(price, (int, float)) else price
html_fragment += f"<div><strong>{coin.capitalize()}:</strong> {price_str}</div>"
return HTMLResponse(content=html_fragment)
@app.post("/api/sentiment")
async def analyze_sentiment(
payload: SentimentRequest,
request: Request,
background_tasks: BackgroundTasks
):
"""
Validates and queues a text for sentiment analysis. The heavy lifting is
done in the background to ensure the API responds instantly.
"""
analyzer: SentimentAnalyzer = request.app.state.sentiment_analyzer
request.app.state.request_counter += 1
request_id = request.app.state.request_counter
# The actual API call to Hugging Face will run after this response is sent.
background_tasks.add_task(analyzer.compute_and_publish, payload.text, request_id)
return HTMLResponse(content="<small>Queued for analysis...</small>")
@app.get("/api/sentiment/stream")
async def sentiment_stream(request: Request):
"""
Establishes a Server-Sent Events (SSE) connection. It efficiently pushes
new sentiment results as HTML fragments to the client as they become available.
"""
analyzer: SentimentAnalyzer = request.app.state.sentiment_analyzer
async def event_generator():
# Clear the initial "waiting..." message on the client.
# hx-swap-oob="innerHTML" swaps this div out-of-band without affecting the target.
yield f"event: sentiment_update\ndata: <div id='sentiment-results' hx-swap-oob='innerHTML'></div>\n\n"
# Listen for new results from the analyzer's internal queue.
async for result_payload in analyzer.stream_results():
try:
result = result_payload['result']
label = str(result.get('label', 'NEUTRAL')).lower()
score = result.get('score', 0.0) * 100
text = result_payload['text']
# Dynamically build the HTML fragment to be sent to the client.
html_fragment = f"""
<div>
<blockquote>{text}</blockquote>
<p>
<strong>Result:</strong>
<span class="sentiment-{label}">{label.upper()}</span>
(Confidence: {score:.1f}%)
</p>
</div>
"""
# First, process the string to remove newlines. This avoids a
# backslash in the f-string expression, fixing the SyntaxError.
data_payload = html_fragment.replace('\n', '')
# Then, use the clean variable in the f-string to build the message.
sse_message = f"event: sentiment_update\ndata: {data_payload}\n\n"
yield sse_message
except (KeyError, TypeError):
continue # Ignore malformed payloads
return StreamingResponse(event_generator(), media_type="text/event-stream")
# --- Main execution block for local development ---
if __name__ == "__main__":
import uvicorn
# This run command assumes you are running `python app.py` from within the /app directory
# or `python -m app.app` from the parent directory.
uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=True) |