Spaces:
Sleeping
Sleeping
File size: 5,565 Bytes
b159555 ea758f6 b159555 c6f94f2 b159555 56eb560 b159555 431c9a5 b159555 01e217d 073930c b349d30 0e87c05 073930c b159555 073930c 431c9a5 073930c ea758f6 56eb560 a17b947 ea758f6 c6f94f2 0e87c05 073930c c6f94f2 073930c a17b947 ea758f6 b159555 073930c 56eb560 073930c ea758f6 073930c 56eb560 073930c 2f96557 56eb560 2f96557 073930c ea758f6 073930c 56eb560 ea758f6 56eb560 ea758f6 073930c ea758f6 56eb560 073930c 431c9a5 b159555 431c9a5 073930c 431c9a5 56eb560 ea758f6 56eb560 ea758f6 a17b947 ea758f6 a17b947 b159555 073930c b159555 56eb560 ea758f6 56eb560 431c9a5 24a8706 c6f94f2 ea758f6 c6f94f2 ea758f6 a17b947 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
"""
Sentinel TradeFlow Protocol β v3.0 FINAL
This is the definitive, robust version of the application, designed for
guaranteed functionality and a superior user experience.
"""
import asyncio
import os
from contextlib import asynccontextmanager
from datetime import datetime, timezone
import httpx
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.templating import Jinja2Templates
from .price_fetcher import PriceFetcher
from .gemini_analyzer import GeminiAnalyzer
from newsapi import NewsApiClient
@asynccontextmanager
async def lifespan(app: FastAPI):
async with httpx.AsyncClient() as client:
app.state.price_fetcher = PriceFetcher(client=client, coins=["bitcoin", "ethereum", "dogecoin"])
app.state.gemini_analyzer = GeminiAnalyzer(client=client)
app.state.news_api = NewsApiClient(api_key=os.getenv("NEWS_API_KEY"))
app.state.signal_queue: asyncio.Queue = asyncio.Queue()
price_task = asyncio.create_task(run_periodic_updates(app.state.price_fetcher, 30))
news_task = asyncio.create_task(run_periodic_news_analysis(app, 300))
print("π Sentinel TradeFlow Protocol v3.0 FINAL started successfully.")
yield
print("β³ Shutting down background tasks...")
price_task.cancel()
news_task.cancel()
try:
await asyncio.gather(price_task, news_task)
except asyncio.CancelledError:
print("Background tasks cancelled.")
async def run_periodic_updates(fetcher: PriceFetcher, interval_seconds: int):
while True:
await fetcher.update_prices_async()
await asyncio.sleep(interval_seconds)
async def run_periodic_news_analysis(app: FastAPI, interval_seconds: int):
processed_urls = set()
while True:
print("π° Fetching news...")
try:
top_headlines = app.state.news_api.get_everything(
q='(crypto OR bitcoin OR ethereum) AND (regulation OR partnership OR hack OR update OR adoption)',
language='en', sort_by='publishedAt', page_size=10
)
articles = top_headlines.get('articles', [])
print(f"π° Found {len(articles)} articles.")
analyzer: GeminiAnalyzer = app.state.gemini_analyzer
for article in reversed(articles):
title = article.get('title')
url = article.get('url')
if title and "[Removed]" not in title and url not in processed_urls:
print(f"β¨ Analyzing: '{title}'")
analysis = await analyzer.analyze_text(title)
if not analysis.get("error"):
analysis['url'] = url
analysis['timestamp'] = datetime.now(timezone.utc).isoformat()
await app.state.signal_queue.put(analysis)
processed_urls.add(url)
print(f"β
Queued: '{title}'")
except Exception as e:
print(f"β CRITICAL ERROR in news loop: {e}")
await asyncio.sleep(interval_seconds)
app = FastAPI(title="Sentinel TradeFlow Protocol", lifespan=lifespan)
templates = Jinja2Templates(directory="templates")
def render_signal_card(payload: dict) -> str:
s = payload
url = s.get('url', '#')
summary = s.get('summary', 'N/A')
time_str = datetime.fromisoformat(s['timestamp']).strftime('%H:%M:%S UTC')
# This entire block will be sent as a single data payload
return f"""
<div id="last-signal-time" hx-swap-oob="true">{time_str}</div>
<div class="card impact-{s.get('impact', 'low').lower()}" hx-swap-oob="afterbegin:#signal-container">
<header class="card-header">
<span>{s.get('topic', 'General News')}</span>
<span>{', '.join(s.get('entities', []))}</span>
</header>
<blockquote><a href="{url}" target="_blank" rel="noopener noreferrer">{summary}</a></blockquote>
<footer><strong>Sentiment:</strong> <span class="sentiment-{s.get('sentiment', 'neutral').lower()}">{s.get('sentiment')} ({s.get('sentiment_score', 0):.2f})</span> β’ <strong>Impact:</strong> {s.get('impact')}</footer>
</div>
"""
@app.get("/", response_class=HTMLResponse)
async def serve_dashboard(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
@app.get("/api/prices", response_class=HTMLResponse)
async def get_prices_fragment(request: Request):
prices = request.app.state.price_fetcher.get_current_prices()
return HTMLResponse("".join(
f"<span>{c.capitalize()}: <strong>${p:,.2f}</strong></span>" if isinstance(p, (int, float))
else f"<span>{c.capitalize()}: <strong>{p}</strong></span>"
for c, p in prices.items()
))
@app.get("/api/signals/stream")
async def signal_stream(request: Request):
queue: asyncio.Queue = request.app.state.signal_queue
async def event_generator():
# --- THE GUARANTEED "HELLO" MESSAGE ---
welcome_html = "<div id='signal-container' hx-swap-oob='innerHTML'></div>"
yield f"event: message\ndata: {welcome_html}\n\n"
while True:
payload = await queue.get()
html_card = render_signal_card(payload)
data_payload = html_card.replace('\n', ' ').strip()
yield f"event: message\ndata: {data_payload}\n\n"
return StreamingResponse(event_generator(), media_type="text/event-stream") |