Spaces:
Sleeping
Sleeping
""" | |
Sentinel TradeFlow Protocol β High-performance FastAPI application. | |
This is the main entry point that orchestrates the entire application. | |
- Integrates an asynchronous PriceFetcher for live market data. | |
- Integrates a sophisticated GeminiAnalyzer for deep text analysis. | |
- Implements an automated pipeline to fetch, analyze, and stream trading signals. | |
- Serves the interactive frontend and provides all necessary API endpoints. | |
""" | |
import asyncio | |
import json | |
import os | |
from contextlib import asynccontextmanager | |
from typing import Optional, Union | |
import httpx | |
from fastapi import FastAPI, Request | |
from fastapi.responses import HTMLResponse, StreamingResponse | |
from fastapi.templating import Jinja2Templates | |
# Use relative imports if your files are in an 'app' package, | |
# or direct imports if they are at the root. Assuming root for simplicity now. | |
from price_fetcher import PriceFetcher | |
from gemini_analyzer import GeminiAnalyzer | |
from newsapi import NewsApiClient | |
# --- Application Lifespan for Resource Management --- | |
async def lifespan(app: FastAPI): | |
""" | |
Manages application startup and shutdown events using the modern | |
lifespan context manager. | |
""" | |
async with httpx.AsyncClient() as client: | |
# Instantiate and store all services in the application state. | |
app.state.price_fetcher = PriceFetcher(client=client, coins=["bitcoin", "ethereum", "dogecoin"]) | |
app.state.gemini_analyzer = GeminiAnalyzer(client=client) | |
app.state.news_api = NewsApiClient(api_key=os.getenv("NEWS_API_KEY")) | |
# =================== FIX APPLIED HERE (1/2) =================== | |
# We only need ONE queue for the autonomous signal stream. | |
app.state.signal_queue: asyncio.Queue = asyncio.Queue() | |
# ============================================================= | |
# Create cancellable background tasks for periodic updates. | |
price_task = asyncio.create_task( | |
run_periodic_updates(app.state.price_fetcher, interval_seconds=60) | |
) | |
news_task = asyncio.create_task( | |
run_periodic_news_analysis(app, interval_seconds=900) # Check news every 15 minutes | |
) | |
print("π Sentinel TradeFlow Protocol started successfully.") | |
yield | |
print("β³ Shutting down background tasks...") | |
price_task.cancel() | |
news_task.cancel() | |
try: | |
await asyncio.gather(price_task, news_task) | |
except asyncio.CancelledError: | |
print("Background tasks cancelled successfully.") | |
print("β Shutdown complete.") | |
async def run_periodic_updates(fetcher: PriceFetcher, interval_seconds: int): | |
"""A robust asyncio background task that periodically updates prices.""" | |
while True: | |
await fetcher.update_prices_async() | |
await asyncio.sleep(interval_seconds) | |
async def run_periodic_news_analysis(app: FastAPI, interval_seconds: int): | |
"""Fetches, analyzes, and queues top crypto news periodically.""" | |
while True: | |
print("π° Fetching latest crypto news for automated analysis...") | |
try: | |
top_headlines = app.state.news_api.get_everything( | |
q='bitcoin OR ethereum OR crypto OR blockchain', | |
language='en', | |
sort_by='publishedAt', | |
page_size=5 | |
) | |
analyzer: GeminiAnalyzer = app.state.gemini_analyzer | |
for article in top_headlines.get('articles', []): | |
title = article.get('title') | |
if title and "[Removed]" not in title: | |
analysis = await analyzer.analyze_text(title) | |
# Don't proceed if Gemini returned an error | |
if analysis.get("error"): | |
print(f"Skipping article due to Gemini error: {analysis['reason']}") | |
continue | |
analysis['url'] = article.get('url') | |
# =================== FIX APPLIED HERE (2/2) =================== | |
# Ensure the result is put into the one and only signal_queue. | |
await app.state.signal_queue.put(analysis) | |
print(f"β Signal generated and queued for: {title}") | |
# ============================================================= | |
except Exception as e: | |
print(f"β Error during news fetching or analysis: {e}") | |
await asyncio.sleep(interval_seconds) | |
# --- FastAPI App Initialization --- | |
app = FastAPI(title="Sentinel TradeFlow Protocol", lifespan=lifespan) | |
templates = Jinja2Templates(directory="templates") | |
# --- HTML Rendering Helper --- | |
def render_signal_card(payload: dict) -> str: | |
"""Renders a dictionary of analysis into a styled HTML card.""" | |
s = payload | |
url = s.get('url', '#') | |
summary = s.get('summary', 'Analysis failed or not available.') | |
text_to_show = f'<a href="{url}" target="_blank" rel="noopener noreferrer">{summary}</a>' | |
impact_class = f"impact-{s.get('impact', 'low').lower()}" | |
sentiment_class = f"sentiment-{s.get('sentiment', 'neutral').lower()}" | |
return f""" | |
<div class="card {impact_class}"> | |
<blockquote>{text_to_show}</blockquote> | |
<div class="grid"> | |
<div><strong>Sentiment:</strong> <span class="{sentiment_class}">{s.get('sentiment')} ({s.get('sentiment_score', 0):.2f})</span></div> | |
<div><strong>Impact:</strong> {s.get('impact')}</div> | |
</div> | |
<div class="grid"> | |
<div><strong>Topic:</strong> {s.get('topic')}</div> | |
<div><strong>Entities:</strong> {', '.join(s.get('entities', []))}</div> | |
</div> | |
</div> | |
""" | |
# --- API Endpoints --- | |
async def serve_dashboard(request: Request): | |
"""Serves the main interactive dashboard from `index.html`.""" | |
return templates.TemplateResponse("index.html", {"request": request}) | |
async def signal_stream(request: Request): | |
"""SSE stream for the automated Signal Stream.""" | |
# This was already correct from the last fix, but we confirm it points to signal_queue. | |
queue: asyncio.Queue = request.app.state.signal_queue | |
async def event_generator(): | |
# Let's send a confirmation that the stream is connected and ready | |
yield f"event: message\ndata: <div hx-swap-oob='innerHTML' id='signal-stream-container'><p>Status: ONLINE - Listening for new market signals...</p></div>\n\n" | |
while True: | |
payload = await queue.get() | |
html = render_signal_card(payload) | |
data_payload = html.replace('\n', '') | |
sse_message = f"event: message\ndata: {data_payload}\n\n" | |
yield sse_message | |
return StreamingResponse(event_generator(), media_type="text/event-stream") |