mgbam's picture
Update app/app.py
669ca48 verified
raw
history blame
6.91 kB
"""
Sentinel TradeFlow Protocol β€” High-performance FastAPI application.
This is the main entry point that orchestrates the entire application.
- Integrates an asynchronous PriceFetcher for live market data.
- Integrates a sophisticated GeminiAnalyzer for deep text analysis.
- Implements an automated pipeline to fetch, analyze, and stream trading signals.
- Serves the interactive frontend and provides all necessary API endpoints.
"""
import asyncio
import json
import os
from contextlib import asynccontextmanager
from typing import Optional, Union
import httpx
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.templating import Jinja2Templates
# Use relative imports if your files are in an 'app' package,
# or direct imports if they are at the root. Assuming root for simplicity now.
from price_fetcher import PriceFetcher
from gemini_analyzer import GeminiAnalyzer
from newsapi import NewsApiClient
# --- Application Lifespan for Resource Management ---
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
Manages application startup and shutdown events using the modern
lifespan context manager.
"""
async with httpx.AsyncClient() as client:
# Instantiate and store all services in the application state.
app.state.price_fetcher = PriceFetcher(client=client, coins=["bitcoin", "ethereum", "dogecoin"])
app.state.gemini_analyzer = GeminiAnalyzer(client=client)
app.state.news_api = NewsApiClient(api_key=os.getenv("NEWS_API_KEY"))
# =================== FIX APPLIED HERE (1/2) ===================
# We only need ONE queue for the autonomous signal stream.
app.state.signal_queue: asyncio.Queue = asyncio.Queue()
# =============================================================
# Create cancellable background tasks for periodic updates.
price_task = asyncio.create_task(
run_periodic_updates(app.state.price_fetcher, interval_seconds=60)
)
news_task = asyncio.create_task(
run_periodic_news_analysis(app, interval_seconds=900) # Check news every 15 minutes
)
print("πŸš€ Sentinel TradeFlow Protocol started successfully.")
yield
print("⏳ Shutting down background tasks...")
price_task.cancel()
news_task.cancel()
try:
await asyncio.gather(price_task, news_task)
except asyncio.CancelledError:
print("Background tasks cancelled successfully.")
print("βœ… Shutdown complete.")
async def run_periodic_updates(fetcher: PriceFetcher, interval_seconds: int):
"""A robust asyncio background task that periodically updates prices."""
while True:
await fetcher.update_prices_async()
await asyncio.sleep(interval_seconds)
async def run_periodic_news_analysis(app: FastAPI, interval_seconds: int):
"""Fetches, analyzes, and queues top crypto news periodically."""
while True:
print("πŸ“° Fetching latest crypto news for automated analysis...")
try:
top_headlines = app.state.news_api.get_everything(
q='bitcoin OR ethereum OR crypto OR blockchain',
language='en',
sort_by='publishedAt',
page_size=5
)
analyzer: GeminiAnalyzer = app.state.gemini_analyzer
for article in top_headlines.get('articles', []):
title = article.get('title')
if title and "[Removed]" not in title:
analysis = await analyzer.analyze_text(title)
# Don't proceed if Gemini returned an error
if analysis.get("error"):
print(f"Skipping article due to Gemini error: {analysis['reason']}")
continue
analysis['url'] = article.get('url')
# =================== FIX APPLIED HERE (2/2) ===================
# Ensure the result is put into the one and only signal_queue.
await app.state.signal_queue.put(analysis)
print(f"βœ… Signal generated and queued for: {title}")
# =============================================================
except Exception as e:
print(f"❌ Error during news fetching or analysis: {e}")
await asyncio.sleep(interval_seconds)
# --- FastAPI App Initialization ---
app = FastAPI(title="Sentinel TradeFlow Protocol", lifespan=lifespan)
templates = Jinja2Templates(directory="templates")
# --- HTML Rendering Helper ---
def render_signal_card(payload: dict) -> str:
"""Renders a dictionary of analysis into a styled HTML card."""
s = payload
url = s.get('url', '#')
summary = s.get('summary', 'Analysis failed or not available.')
text_to_show = f'<a href="{url}" target="_blank" rel="noopener noreferrer">{summary}</a>'
impact_class = f"impact-{s.get('impact', 'low').lower()}"
sentiment_class = f"sentiment-{s.get('sentiment', 'neutral').lower()}"
return f"""
<div class="card {impact_class}">
<blockquote>{text_to_show}</blockquote>
<div class="grid">
<div><strong>Sentiment:</strong> <span class="{sentiment_class}">{s.get('sentiment')} ({s.get('sentiment_score', 0):.2f})</span></div>
<div><strong>Impact:</strong> {s.get('impact')}</div>
</div>
<div class="grid">
<div><strong>Topic:</strong> {s.get('topic')}</div>
<div><strong>Entities:</strong> {', '.join(s.get('entities', []))}</div>
</div>
</div>
"""
# --- API Endpoints ---
@app.get("/", response_class=HTMLResponse)
async def serve_dashboard(request: Request):
"""Serves the main interactive dashboard from `index.html`."""
return templates.TemplateResponse("index.html", {"request": request})
@app.get("/api/signals/stream")
async def signal_stream(request: Request):
"""SSE stream for the automated Signal Stream."""
# This was already correct from the last fix, but we confirm it points to signal_queue.
queue: asyncio.Queue = request.app.state.signal_queue
async def event_generator():
# Let's send a confirmation that the stream is connected and ready
yield f"event: message\ndata: <div hx-swap-oob='innerHTML' id='signal-stream-container'><p>Status: ONLINE - Listening for new market signals...</p></div>\n\n"
while True:
payload = await queue.get()
html = render_signal_card(payload)
data_payload = html.replace('\n', '')
sse_message = f"event: message\ndata: {data_payload}\n\n"
yield sse_message
return StreamingResponse(event_generator(), media_type="text/event-stream")