Spaces:
Sleeping
Sleeping
""" | |
Sentinel TradeFlow Protocol β High-performance FastAPI application. | |
This is the main entry point that orchestrates the entire application. | |
- Integrates an asynchronous PriceFetcher for live market data. | |
- Integrates a sophisticated GeminiAnalyzer for deep text analysis. | |
- Implements an automated pipeline to fetch, analyze, and stream trading signals. | |
- Serves the interactive frontend and provides all necessary API endpoints. | |
""" | |
import asyncio | |
import json | |
import os | |
from contextlib import asynccontextmanager | |
from typing import Optional, Union | |
import httpx | |
from fastapi import FastAPI, Request | |
from fastapi.responses import HTMLResponse, StreamingResponse | |
from fastapi.templating import Jinja2Templates | |
# Use relative imports because these modules are in the same 'app' package. | |
from .price_fetcher import PriceFetcher | |
from .gemini_analyzer import GeminiAnalyzer | |
from newsapi import NewsApiClient | |
# --- Application Lifespan for Resource Management --- | |
async def lifespan(app: FastAPI): | |
""" | |
Manages application startup and shutdown events using the modern | |
lifespan context manager. | |
""" | |
async with httpx.AsyncClient() as client: | |
app.state.price_fetcher = PriceFetcher(client=client, coins=["bitcoin", "ethereum", "dogecoin"]) | |
app.state.gemini_analyzer = GeminiAnalyzer(client=client) | |
app.state.news_api = NewsApiClient(api_key=os.getenv("NEWS_API_KEY")) | |
app.state.signal_queue: asyncio.Queue = asyncio.Queue() | |
# Create cancellable background tasks. Let's use a shorter timer for testing. | |
price_task = asyncio.create_task( | |
run_periodic_updates(app.state.price_fetcher, interval_seconds=60) | |
) | |
news_task = asyncio.create_task( | |
run_periodic_news_analysis(app, interval_seconds=300) # Check news every 5 minutes for debugging | |
) | |
print("π Sentinel TradeFlow Protocol started successfully.") | |
yield | |
print("β³ Shutting down background tasks...") | |
price_task.cancel() | |
news_task.cancel() | |
try: | |
await asyncio.gather(price_task, news_task) | |
except asyncio.CancelledError: | |
print("Background tasks cancelled successfully.") | |
print("β Shutdown complete.") | |
async def run_periodic_updates(fetcher: PriceFetcher, interval_seconds: int): | |
"""A robust asyncio background task that periodically updates prices.""" | |
while True: | |
await fetcher.update_prices_async() | |
await asyncio.sleep(interval_seconds) | |
async def run_periodic_news_analysis(app: FastAPI, interval_seconds: int): | |
"""Fetches, analyzes, and queues top crypto news periodically with detailed logging.""" | |
while True: | |
print("π° [1/5] Fetching latest crypto news...") | |
try: | |
top_headlines = app.state.news_api.get_everything( | |
q='bitcoin OR ethereum OR "binance coin" OR solana OR ripple OR cardano', | |
language='en', | |
sort_by='publishedAt', | |
page_size=5 | |
) | |
articles = top_headlines.get('articles', []) | |
print(f"π° [2/5] NewsAPI call successful. Found {len(articles)} articles.") | |
if not articles: | |
print("π° [SKIP] No new articles found in this cycle.") | |
await asyncio.sleep(interval_seconds) | |
continue | |
analyzer: GeminiAnalyzer = app.state.gemini_analyzer | |
for article in articles: | |
title = article.get('title') | |
print(f"π° [3/5] Processing article: '{title}'") | |
if not title or "[Removed]" in title: | |
print(f"π° [SKIP] Article has no title or was removed.") | |
continue | |
print(f"π° [4/5] Sending to Gemini for analysis...") | |
analysis = await analyzer.analyze_text(title) | |
if analysis.get("error"): | |
print(f"β [SKIP] Gemini analysis failed for '{title}'. Reason: {analysis.get('reason')}") | |
continue | |
analysis['url'] = article.get('url') | |
await app.state.signal_queue.put(analysis) | |
print(f"β [5/5] Signal generated and queued for: '{title}'") | |
except Exception as e: | |
print(f"βββ CRITICAL ERROR in news analysis loop: {e}") | |
print(f"π° Loop finished. Waiting for {interval_seconds} seconds.") | |
await asyncio.sleep(interval_seconds) | |
# --- FastAPI App Initialization --- | |
app = FastAPI(title="Sentinel TradeFlow Protocol", lifespan=lifespan) | |
templates = Jinja2Templates(directory="templates") | |
# --- HTML Rendering Helper --- | |
def render_signal_card(payload: dict) -> str: | |
"""Renders a dictionary of analysis into a styled HTML card.""" | |
s = payload | |
url = s.get('url', '#') | |
summary = s.get('summary', 'Analysis failed or not available.') | |
text_to_show = f'<a href="{url}" target="_blank" rel="noopener noreferrer">{summary}</a>' | |
impact_class = f"impact-{s.get('impact', 'low').lower()}" | |
sentiment_class = f"sentiment-{s.get('sentiment', 'neutral').lower()}" | |
return f""" | |
<div class="card {impact_class}"> | |
<blockquote>{text_to_show}</blockquote> | |
<div class="grid"> | |
<div><strong>Sentiment:</strong> <span class="{sentiment_class}">{s.get('sentiment')} ({s.get('sentiment_score', 0):.2f})</span></div> | |
<div><strong>Impact:</strong> {s.get('impact')}</div> | |
</div> | |
<div class="grid"> | |
<div><strong>Topic:</strong> {s.get('topic')}</div> | |
<div><strong>Entities:</strong> {', '.join(s.get('entities', []))}</div> | |
</div> | |
</div> | |
""" | |
# --- API Endpoints --- | |
async def serve_dashboard(request: Request): | |
"""Serves the main interactive dashboard from `index.html`.""" | |
return templates.TemplateResponse("index.html", {"request": request}) | |
async def signal_stream(request: Request): | |
"""SSE stream for the automated Signal Stream.""" | |
queue: asyncio.Queue = request.app.state.signal_queue | |
async def event_generator(): | |
while True: | |
payload = await queue.get() | |
html = render_signal_card(payload) | |
data_payload = html.replace('\n', '') | |
sse_message = f"event: message\ndata: {data_payload}\n\n" | |
yield sse_message | |
return StreamingResponse(event_generator(), media_type="text/event-stream") |