mgbam's picture
Update app/app.py
ea758f6 verified
raw
history blame
5.57 kB
"""
Sentinel TradeFlow Protocol β€” v3.0 FINAL
This is the definitive, robust version of the application, designed for
guaranteed functionality and a superior user experience.
"""
import asyncio
import os
from contextlib import asynccontextmanager
from datetime import datetime, timezone
import httpx
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.templating import Jinja2Templates
from .price_fetcher import PriceFetcher
from .gemini_analyzer import GeminiAnalyzer
from newsapi import NewsApiClient
@asynccontextmanager
async def lifespan(app: FastAPI):
async with httpx.AsyncClient() as client:
app.state.price_fetcher = PriceFetcher(client=client, coins=["bitcoin", "ethereum", "dogecoin"])
app.state.gemini_analyzer = GeminiAnalyzer(client=client)
app.state.news_api = NewsApiClient(api_key=os.getenv("NEWS_API_KEY"))
app.state.signal_queue: asyncio.Queue = asyncio.Queue()
price_task = asyncio.create_task(run_periodic_updates(app.state.price_fetcher, 30))
news_task = asyncio.create_task(run_periodic_news_analysis(app, 300))
print("πŸš€ Sentinel TradeFlow Protocol v3.0 FINAL started successfully.")
yield
print("⏳ Shutting down background tasks...")
price_task.cancel()
news_task.cancel()
try:
await asyncio.gather(price_task, news_task)
except asyncio.CancelledError:
print("Background tasks cancelled.")
async def run_periodic_updates(fetcher: PriceFetcher, interval_seconds: int):
while True:
await fetcher.update_prices_async()
await asyncio.sleep(interval_seconds)
async def run_periodic_news_analysis(app: FastAPI, interval_seconds: int):
processed_urls = set()
while True:
print("πŸ“° Fetching news...")
try:
top_headlines = app.state.news_api.get_everything(
q='(crypto OR bitcoin OR ethereum) AND (regulation OR partnership OR hack OR update OR adoption)',
language='en', sort_by='publishedAt', page_size=10
)
articles = top_headlines.get('articles', [])
print(f"πŸ“° Found {len(articles)} articles.")
analyzer: GeminiAnalyzer = app.state.gemini_analyzer
for article in reversed(articles):
title = article.get('title')
url = article.get('url')
if title and "[Removed]" not in title and url not in processed_urls:
print(f"✨ Analyzing: '{title}'")
analysis = await analyzer.analyze_text(title)
if not analysis.get("error"):
analysis['url'] = url
analysis['timestamp'] = datetime.now(timezone.utc).isoformat()
await app.state.signal_queue.put(analysis)
processed_urls.add(url)
print(f"βœ… Queued: '{title}'")
except Exception as e:
print(f"❌ CRITICAL ERROR in news loop: {e}")
await asyncio.sleep(interval_seconds)
app = FastAPI(title="Sentinel TradeFlow Protocol", lifespan=lifespan)
templates = Jinja2Templates(directory="templates")
def render_signal_card(payload: dict) -> str:
s = payload
url = s.get('url', '#')
summary = s.get('summary', 'N/A')
time_str = datetime.fromisoformat(s['timestamp']).strftime('%H:%M:%S UTC')
# This entire block will be sent as a single data payload
return f"""
<div id="last-signal-time" hx-swap-oob="true">{time_str}</div>
<div class="card impact-{s.get('impact', 'low').lower()}" hx-swap-oob="afterbegin:#signal-container">
<header class="card-header">
<span>{s.get('topic', 'General News')}</span>
<span>{', '.join(s.get('entities', []))}</span>
</header>
<blockquote><a href="{url}" target="_blank" rel="noopener noreferrer">{summary}</a></blockquote>
<footer><strong>Sentiment:</strong> <span class="sentiment-{s.get('sentiment', 'neutral').lower()}">{s.get('sentiment')} ({s.get('sentiment_score', 0):.2f})</span> β€’ <strong>Impact:</strong> {s.get('impact')}</footer>
</div>
"""
@app.get("/", response_class=HTMLResponse)
async def serve_dashboard(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
@app.get("/api/prices", response_class=HTMLResponse)
async def get_prices_fragment(request: Request):
prices = request.app.state.price_fetcher.get_current_prices()
return HTMLResponse("".join(
f"<span>{c.capitalize()}: <strong>${p:,.2f}</strong></span>" if isinstance(p, (int, float))
else f"<span>{c.capitalize()}: <strong>{p}</strong></span>"
for c, p in prices.items()
))
@app.get("/api/signals/stream")
async def signal_stream(request: Request):
queue: asyncio.Queue = request.app.state.signal_queue
async def event_generator():
# --- THE GUARANTEED "HELLO" MESSAGE ---
welcome_html = "<div id='signal-container' hx-swap-oob='innerHTML'></div>"
yield f"event: message\ndata: {welcome_html}\n\n"
while True:
payload = await queue.get()
html_card = render_signal_card(payload)
data_payload = html_card.replace('\n', ' ').strip()
yield f"event: message\ndata: {data_payload}\n\n"
return StreamingResponse(event_generator(), media_type="text/event-stream")