Spaces:
Sleeping
Sleeping
""" | |
Sentinel TradeFlow Protocol β v3.0 FINAL | |
This is the definitive, robust version of the application, designed for | |
guaranteed functionality and a superior user experience. | |
""" | |
import asyncio | |
import os | |
from contextlib import asynccontextmanager | |
from datetime import datetime, timezone | |
import httpx | |
from fastapi import FastAPI, Request | |
from fastapi.responses import HTMLResponse, StreamingResponse | |
from fastapi.templating import Jinja2Templates | |
from .price_fetcher import PriceFetcher | |
from .gemini_analyzer import GeminiAnalyzer | |
from newsapi import NewsApiClient | |
async def lifespan(app: FastAPI): | |
async with httpx.AsyncClient() as client: | |
app.state.price_fetcher = PriceFetcher(client=client, coins=["bitcoin", "ethereum", "dogecoin"]) | |
app.state.gemini_analyzer = GeminiAnalyzer(client=client) | |
app.state.news_api = NewsApiClient(api_key=os.getenv("NEWS_API_KEY")) | |
app.state.signal_queue: asyncio.Queue = asyncio.Queue() | |
price_task = asyncio.create_task(run_periodic_updates(app.state.price_fetcher, 30)) | |
news_task = asyncio.create_task(run_periodic_news_analysis(app, 300)) | |
print("π Sentinel TradeFlow Protocol v3.0 FINAL started successfully.") | |
yield | |
print("β³ Shutting down background tasks...") | |
price_task.cancel() | |
news_task.cancel() | |
try: | |
await asyncio.gather(price_task, news_task) | |
except asyncio.CancelledError: | |
print("Background tasks cancelled.") | |
async def run_periodic_updates(fetcher: PriceFetcher, interval_seconds: int): | |
while True: | |
await fetcher.update_prices_async() | |
await asyncio.sleep(interval_seconds) | |
async def run_periodic_news_analysis(app: FastAPI, interval_seconds: int): | |
processed_urls = set() | |
while True: | |
print("π° Fetching news...") | |
try: | |
top_headlines = app.state.news_api.get_everything( | |
q='(crypto OR bitcoin OR ethereum) AND (regulation OR partnership OR hack OR update OR adoption)', | |
language='en', sort_by='publishedAt', page_size=10 | |
) | |
articles = top_headlines.get('articles', []) | |
print(f"π° Found {len(articles)} articles.") | |
analyzer: GeminiAnalyzer = app.state.gemini_analyzer | |
for article in reversed(articles): | |
title = article.get('title') | |
url = article.get('url') | |
if title and "[Removed]" not in title and url not in processed_urls: | |
print(f"β¨ Analyzing: '{title}'") | |
analysis = await analyzer.analyze_text(title) | |
if not analysis.get("error"): | |
analysis['url'] = url | |
analysis['timestamp'] = datetime.now(timezone.utc).isoformat() | |
await app.state.signal_queue.put(analysis) | |
processed_urls.add(url) | |
print(f"β Queued: '{title}'") | |
except Exception as e: | |
print(f"β CRITICAL ERROR in news loop: {e}") | |
await asyncio.sleep(interval_seconds) | |
app = FastAPI(title="Sentinel TradeFlow Protocol", lifespan=lifespan) | |
templates = Jinja2Templates(directory="templates") | |
def render_signal_card(payload: dict) -> str: | |
s = payload | |
url = s.get('url', '#') | |
summary = s.get('summary', 'N/A') | |
time_str = datetime.fromisoformat(s['timestamp']).strftime('%H:%M:%S UTC') | |
# This entire block will be sent as a single data payload | |
return f""" | |
<div id="last-signal-time" hx-swap-oob="true">{time_str}</div> | |
<div class="card impact-{s.get('impact', 'low').lower()}" hx-swap-oob="afterbegin:#signal-container"> | |
<header class="card-header"> | |
<span>{s.get('topic', 'General News')}</span> | |
<span>{', '.join(s.get('entities', []))}</span> | |
</header> | |
<blockquote><a href="{url}" target="_blank" rel="noopener noreferrer">{summary}</a></blockquote> | |
<footer><strong>Sentiment:</strong> <span class="sentiment-{s.get('sentiment', 'neutral').lower()}">{s.get('sentiment')} ({s.get('sentiment_score', 0):.2f})</span> β’ <strong>Impact:</strong> {s.get('impact')}</footer> | |
</div> | |
""" | |
async def serve_dashboard(request: Request): | |
return templates.TemplateResponse("index.html", {"request": request}) | |
async def get_prices_fragment(request: Request): | |
prices = request.app.state.price_fetcher.get_current_prices() | |
return HTMLResponse("".join( | |
f"<span>{c.capitalize()}: <strong>${p:,.2f}</strong></span>" if isinstance(p, (int, float)) | |
else f"<span>{c.capitalize()}: <strong>{p}</strong></span>" | |
for c, p in prices.items() | |
)) | |
async def signal_stream(request: Request): | |
queue: asyncio.Queue = request.app.state.signal_queue | |
async def event_generator(): | |
# --- THE GUARANTEED "HELLO" MESSAGE --- | |
welcome_html = "<div id='signal-container' hx-swap-oob='innerHTML'></div>" | |
yield f"event: message\ndata: {welcome_html}\n\n" | |
while True: | |
payload = await queue.get() | |
html_card = render_signal_card(payload) | |
data_payload = html_card.replace('\n', ' ').strip() | |
yield f"event: message\ndata: {data_payload}\n\n" | |
return StreamingResponse(event_generator(), media_type="text/event-stream") |