Spaces:
Sleeping
Sleeping
""" | |
CryptoSentinel Co-Pilot β Your AI-Powered Market Analyst. | |
This application provides a multi-faceted view of the crypto market by: | |
- Tracking live prices. | |
- Proactively fetching, analyzing, and synthesizing top news into a daily briefing. | |
- Offering on-demand deep analysis of any text. | |
- Streaming all insights to a dynamic frontend in real-time. | |
""" | |
import asyncio | |
import json | |
import os | |
from contextlib import asynccontextmanager | |
from typing import Optional, Union, List | |
import httpx | |
from fastapi import FastAPI, Request, BackgroundTasks, Form | |
from fastapi.responses import HTMLResponse, StreamingResponse | |
from fastapi.templating import Jinja2Templates | |
from markdown import markdown | |
from newsapi import NewsApiClient | |
# Import our modular service classes | |
from .price_fetcher import PriceFetcher | |
from .gemini_analyzer import GeminiAnalyzer | |
# --- Application Lifespan for Resource Management --- | |
async def lifespan(app: FastAPI): | |
"""Manages application startup and shutdown events.""" | |
async with httpx.AsyncClient() as client: | |
# Instantiate and store services in the application state | |
app.state.price_fetcher = PriceFetcher(client=client, coins=["bitcoin", "ethereum", "dogecoin"]) | |
app.state.gemini_analyzer = GeminiAnalyzer(client=client) | |
app.state.news_api = NewsApiClient(api_key=os.getenv("NEWS_API_KEY")) | |
# State for caching and queues | |
app.state.daily_briefing_cache = "### Briefing Unavailable\nGenerating the first daily briefing, please check back in a few minutes." | |
app.state.analyzed_news_today: List[dict] = [] | |
app.state.sentiment_queue: asyncio.Queue = asyncio.Queue() | |
app.state.news_queue: asyncio.Queue = asyncio.Queue() | |
# Create cancellable background tasks | |
price_task = asyncio.create_task(run_periodic_updates(app.state.price_fetcher, 60)) | |
news_task = asyncio.create_task(run_periodic_news_analysis(app, 900)) # 15 mins | |
briefing_task = asyncio.create_task(run_daily_briefing_generation(app, 3600)) # Every hour | |
print("π CryptoSentinel Co-Pilot started.") | |
yield | |
print("β³ Shutting down background tasks...") | |
price_task.cancel() | |
news_task.cancel() | |
briefing_task.cancel() | |
try: | |
await asyncio.gather(price_task, news_task, briefing_task, return_exceptions=True) | |
except asyncio.CancelledError: | |
print("Background tasks cancelled successfully.") | |
print("β Shutdown complete.") | |
async def run_periodic_updates(fetcher: PriceFetcher, interval_seconds: int): | |
"""Periodically updates crypto prices.""" | |
while True: | |
await fetcher.update_prices_async() | |
await asyncio.sleep(interval_seconds) | |
async def run_periodic_news_analysis(app: FastAPI, interval_seconds: int): | |
"""Periodically fetches, analyzes, and queues top crypto news.""" | |
while True: | |
print("π° Fetching latest crypto news...") | |
try: | |
top_headlines = app.state.news_api.get_everything( | |
q='bitcoin OR ethereum OR crypto OR blockchain', | |
language='en', sort_by='publishedAt', page_size=5 | |
) | |
analyzer: GeminiAnalyzer = app.state.gemini_analyzer | |
for article in top_headlines.get('articles', []): | |
title = article.get('title') | |
if title and "[Removed]" not in title: | |
analysis = await analyzer.analyze_text(title) | |
analysis['url'] = article.get('url') | |
await app.state.news_queue.put(analysis) | |
app.state.analyzed_news_today.append(analysis) | |
except Exception as e: | |
print(f"β Error during news analysis: {e}") | |
await asyncio.sleep(interval_seconds) | |
async def run_daily_briefing_generation(app: FastAPI, interval_seconds: int): | |
"""Periodically generates and caches the daily market briefing.""" | |
while True: | |
await asyncio.sleep(interval_seconds) # Wait first, then generate | |
print("π Generating Daily Market Briefing...") | |
if app.state.analyzed_news_today: | |
analyzer: GeminiAnalyzer = app.state.gemini_analyzer | |
briefing = await analyzer.generate_daily_briefing(app.state.analyzed_news_today) | |
app.state.daily_briefing_cache = briefing | |
app.state.analyzed_news_today = [] # Clear list for the next cycle | |
print("β Daily briefing generated and cached.") | |
else: | |
print("βΉοΈ No new news items to analyze for briefing. Skipping.") | |
# --- FastAPI App Initialization --- | |
app = FastAPI(title="CryptoSentinel Co-Pilot", lifespan=lifespan) | |
templates = Jinja2Templates(directory="templates") | |
# --- HTML Rendering Helper --- | |
def render_analysis_card(payload: dict, is_news: bool = False) -> str: | |
s = payload | |
text_to_show = s.get('summary', 'Analysis failed or not available.') | |
if is_news: | |
url = s.get('url', '#') | |
text_to_show = f'<a href="{url}" target="_blank" rel="noopener noreferrer">{s.get("summary", "N/A")}</a>' | |
impact_class = f"impact-{s.get('impact', 'low').lower()}" | |
sentiment_class = f"sentiment-{s.get('sentiment', 'neutral').lower()}" | |
return f"""<div class="card {impact_class}"><blockquote>{text_to_show}</blockquote><div class="grid"><div><strong>Sentiment:</strong> <span class="{sentiment_class}">{s.get('sentiment')} ({s.get('sentiment_score', 0):.2f})</span></div><div><strong>Impact:</strong> {s.get('impact')}</div></div><div class="grid"><div><strong>Topic:</strong> {s.get('topic')}</div><div><strong>Entities:</strong> {', '.join(s.get('entities', []))}</div></div></div>""" | |
# --- API Endpoints --- | |
async def serve_dashboard(request: Request): | |
return templates.TemplateResponse("index.html", {"request": request}) | |
async def get_prices_fragment(request: Request): | |
prices = request.app.state.price_fetcher.get_current_prices() | |
html = "".join(f"<div><strong>{k.capitalize()}:</strong> ${v:,.2f}</div>" if isinstance(v, (int, float)) else f"<div><strong>{k.capitalize()}:</strong> {v}</div>" for k, v in prices.items()) | |
return HTMLResponse(content=html) | |
async def analyze_sentiment(request: Request, background_tasks: BackgroundTasks, text: str = Form(...)): | |
analyzer: GeminiAnalyzer = request.app.state.gemini_analyzer | |
async def task_wrapper(): | |
analysis = await analyzer.analyze_text(text) | |
await request.app.state.sentiment_queue.put(analysis) | |
background_tasks.add_task(task_wrapper) | |
return HTMLResponse(content="<small>β Queued for deep analysis...</small>") | |
async def get_daily_briefing(request: Request): | |
briefing_html = markdown(request.app.state.daily_briefing_cache) | |
return HTMLResponse(content=briefing_html) | |
async def sentiment_stream(request: Request): | |
queue: asyncio.Queue = request.app.state.sentiment_queue | |
async def event_generator(): | |
while True: | |
payload = await queue.get() | |
html = render_analysis_card(payload) | |
data_payload = html.replace('\n', '') | |
yield f"event: sentiment_update\ndata: {data_payload}\n\n" | |
return StreamingResponse(event_generator(), media_type="text/event-stream") | |
async def news_stream(request: Request): | |
queue: asyncio.Queue = request.app.state.news_queue | |
async def event_generator(): | |
while True: | |
payload = await queue.get() | |
html = render_analysis_card(payload, is_news=True) | |
data_payload = html.replace('\n', '') | |
yield f"event: news_update\ndata: {data_payload}\n\n" | |
return StreamingResponse(event_generator(), media_type="text/event-stream") |