Spaces:
Sleeping
Sleeping
File size: 6,660 Bytes
b159555 431c9a5 073930c 431c9a5 073930c b159555 c6f94f2 b159555 073930c b159555 431c9a5 b159555 01e217d 073930c b349d30 b159555 073930c a17b947 0e87c05 073930c 0e87c05 073930c b159555 073930c 431c9a5 073930c 2f96557 073930c 431c9a5 073930c 2f96557 073930c a17b947 431c9a5 c6f94f2 0e87c05 073930c c6f94f2 073930c a17b947 073930c b159555 073930c 431c9a5 073930c 2f96557 073930c 2f96557 073930c 2f96557 073930c 2f96557 073930c 2f96557 073930c 2f96557 669ca48 073930c 2f96557 073930c 2f96557 073930c 431c9a5 b159555 431c9a5 073930c 431c9a5 073930c 431c9a5 a17b947 073930c a17b947 073930c a17b947 b159555 073930c 431c9a5 b159555 431c9a5 24a8706 c6f94f2 431c9a5 e446a42 431c9a5 073930c 669ca48 a17b947 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
"""
Sentinel TradeFlow Protocol β High-performance FastAPI application.
This is the main entry point that orchestrates the entire application.
- Integrates an asynchronous PriceFetcher for live market data.
- Integrates a sophisticated GeminiAnalyzer for deep text analysis.
- Implements an automated pipeline to fetch, analyze, and stream trading signals.
- Serves the interactive frontend and provides all necessary API endpoints.
"""
import asyncio
import json
import os
from contextlib import asynccontextmanager
from typing import Optional, Union
import httpx
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.templating import Jinja2Templates
# Use relative imports because these modules are in the same 'app' package.
from .price_fetcher import PriceFetcher
from .gemini_analyzer import GeminiAnalyzer
from newsapi import NewsApiClient
# --- Application Lifespan for Resource Management ---
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
Manages application startup and shutdown events using the modern
lifespan context manager.
"""
async with httpx.AsyncClient() as client:
app.state.price_fetcher = PriceFetcher(client=client, coins=["bitcoin", "ethereum", "dogecoin"])
app.state.gemini_analyzer = GeminiAnalyzer(client=client)
app.state.news_api = NewsApiClient(api_key=os.getenv("NEWS_API_KEY"))
app.state.signal_queue: asyncio.Queue = asyncio.Queue()
# Create cancellable background tasks. Let's use a shorter timer for testing.
price_task = asyncio.create_task(
run_periodic_updates(app.state.price_fetcher, interval_seconds=60)
)
news_task = asyncio.create_task(
run_periodic_news_analysis(app, interval_seconds=300) # Check news every 5 minutes for debugging
)
print("π Sentinel TradeFlow Protocol started successfully.")
yield
print("β³ Shutting down background tasks...")
price_task.cancel()
news_task.cancel()
try:
await asyncio.gather(price_task, news_task)
except asyncio.CancelledError:
print("Background tasks cancelled successfully.")
print("β
Shutdown complete.")
async def run_periodic_updates(fetcher: PriceFetcher, interval_seconds: int):
"""A robust asyncio background task that periodically updates prices."""
while True:
await fetcher.update_prices_async()
await asyncio.sleep(interval_seconds)
async def run_periodic_news_analysis(app: FastAPI, interval_seconds: int):
"""Fetches, analyzes, and queues top crypto news periodically with detailed logging."""
while True:
print("π° [1/5] Fetching latest crypto news...")
try:
top_headlines = app.state.news_api.get_everything(
q='bitcoin OR ethereum OR "binance coin" OR solana OR ripple OR cardano',
language='en',
sort_by='publishedAt',
page_size=5
)
articles = top_headlines.get('articles', [])
print(f"π° [2/5] NewsAPI call successful. Found {len(articles)} articles.")
if not articles:
print("π° [SKIP] No new articles found in this cycle.")
await asyncio.sleep(interval_seconds)
continue
analyzer: GeminiAnalyzer = app.state.gemini_analyzer
for article in articles:
title = article.get('title')
print(f"π° [3/5] Processing article: '{title}'")
if not title or "[Removed]" in title:
print(f"π° [SKIP] Article has no title or was removed.")
continue
print(f"π° [4/5] Sending to Gemini for analysis...")
analysis = await analyzer.analyze_text(title)
if analysis.get("error"):
print(f"β [SKIP] Gemini analysis failed for '{title}'. Reason: {analysis.get('reason')}")
continue
analysis['url'] = article.get('url')
await app.state.signal_queue.put(analysis)
print(f"β
[5/5] Signal generated and queued for: '{title}'")
except Exception as e:
print(f"βββ CRITICAL ERROR in news analysis loop: {e}")
print(f"π° Loop finished. Waiting for {interval_seconds} seconds.")
await asyncio.sleep(interval_seconds)
# --- FastAPI App Initialization ---
app = FastAPI(title="Sentinel TradeFlow Protocol", lifespan=lifespan)
templates = Jinja2Templates(directory="templates")
# --- HTML Rendering Helper ---
def render_signal_card(payload: dict) -> str:
"""Renders a dictionary of analysis into a styled HTML card."""
s = payload
url = s.get('url', '#')
summary = s.get('summary', 'Analysis failed or not available.')
text_to_show = f'<a href="{url}" target="_blank" rel="noopener noreferrer">{summary}</a>'
impact_class = f"impact-{s.get('impact', 'low').lower()}"
sentiment_class = f"sentiment-{s.get('sentiment', 'neutral').lower()}"
return f"""
<div class="card {impact_class}">
<blockquote>{text_to_show}</blockquote>
<div class="grid">
<div><strong>Sentiment:</strong> <span class="{sentiment_class}">{s.get('sentiment')} ({s.get('sentiment_score', 0):.2f})</span></div>
<div><strong>Impact:</strong> {s.get('impact')}</div>
</div>
<div class="grid">
<div><strong>Topic:</strong> {s.get('topic')}</div>
<div><strong>Entities:</strong> {', '.join(s.get('entities', []))}</div>
</div>
</div>
"""
# --- API Endpoints ---
@app.get("/", response_class=HTMLResponse)
async def serve_dashboard(request: Request):
"""Serves the main interactive dashboard from `index.html`."""
return templates.TemplateResponse("index.html", {"request": request})
@app.get("/api/signals/stream")
async def signal_stream(request: Request):
"""SSE stream for the automated Signal Stream."""
queue: asyncio.Queue = request.app.state.signal_queue
async def event_generator():
while True:
payload = await queue.get()
html = render_signal_card(payload)
data_payload = html.replace('\n', '')
sse_message = f"event: message\ndata: {data_payload}\n\n"
yield sse_message
return StreamingResponse(event_generator(), media_type="text/event-stream") |