mgbam's picture
Update app/app.py
d5b9420 verified
raw
history blame
7.96 kB
"""
CryptoSentinel Co-Pilot β€” Your AI-Powered Market Analyst.
This application provides a multi-faceted view of the crypto market by:
- Tracking live prices.
- Proactively fetching, analyzing, and synthesizing top news into a daily briefing.
- Offering on-demand deep analysis of any text.
- Streaming all insights to a dynamic frontend in real-time.
"""
import asyncio
import json
import os
from contextlib import asynccontextmanager
from typing import Optional, Union, List
import httpx
from fastapi import FastAPI, Request, BackgroundTasks, Form
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.templating import Jinja2Templates
from markdown import markdown
from newsapi import NewsApiClient
# Import our modular service classes
from .price_fetcher import PriceFetcher
from .gemini_analyzer import GeminiAnalyzer
# --- Application Lifespan for Resource Management ---
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manages application startup and shutdown events."""
async with httpx.AsyncClient() as client:
# Instantiate and store services in the application state
app.state.price_fetcher = PriceFetcher(client=client, coins=["bitcoin", "ethereum", "dogecoin"])
app.state.gemini_analyzer = GeminiAnalyzer(client=client)
app.state.news_api = NewsApiClient(api_key=os.getenv("NEWS_API_KEY"))
# State for caching and queues
app.state.daily_briefing_cache = "### Briefing Unavailable\nGenerating the first daily briefing, please check back in a few minutes."
app.state.analyzed_news_today: List[dict] = []
app.state.sentiment_queue: asyncio.Queue = asyncio.Queue()
app.state.news_queue: asyncio.Queue = asyncio.Queue()
# Create cancellable background tasks
price_task = asyncio.create_task(run_periodic_updates(app.state.price_fetcher, 60))
news_task = asyncio.create_task(run_periodic_news_analysis(app, 900)) # 15 mins
briefing_task = asyncio.create_task(run_daily_briefing_generation(app, 3600)) # Every hour
print("πŸš€ CryptoSentinel Co-Pilot started.")
yield
print("⏳ Shutting down background tasks...")
price_task.cancel()
news_task.cancel()
briefing_task.cancel()
try:
await asyncio.gather(price_task, news_task, briefing_task, return_exceptions=True)
except asyncio.CancelledError:
print("Background tasks cancelled successfully.")
print("βœ… Shutdown complete.")
async def run_periodic_updates(fetcher: PriceFetcher, interval_seconds: int):
"""Periodically updates crypto prices."""
while True:
await fetcher.update_prices_async()
await asyncio.sleep(interval_seconds)
async def run_periodic_news_analysis(app: FastAPI, interval_seconds: int):
"""Periodically fetches, analyzes, and queues top crypto news."""
while True:
print("πŸ“° Fetching latest crypto news...")
try:
top_headlines = app.state.news_api.get_everything(
q='bitcoin OR ethereum OR crypto OR blockchain',
language='en', sort_by='publishedAt', page_size=5
)
analyzer: GeminiAnalyzer = app.state.gemini_analyzer
for article in top_headlines.get('articles', []):
title = article.get('title')
if title and "[Removed]" not in title:
analysis = await analyzer.analyze_text(title)
analysis['url'] = article.get('url')
await app.state.news_queue.put(analysis)
app.state.analyzed_news_today.append(analysis)
except Exception as e:
print(f"❌ Error during news analysis: {e}")
await asyncio.sleep(interval_seconds)
async def run_daily_briefing_generation(app: FastAPI, interval_seconds: int):
"""Periodically generates and caches the daily market briefing."""
while True:
await asyncio.sleep(interval_seconds) # Wait first, then generate
print("πŸ“ Generating Daily Market Briefing...")
if app.state.analyzed_news_today:
analyzer: GeminiAnalyzer = app.state.gemini_analyzer
briefing = await analyzer.generate_daily_briefing(app.state.analyzed_news_today)
app.state.daily_briefing_cache = briefing
app.state.analyzed_news_today = [] # Clear list for the next cycle
print("βœ… Daily briefing generated and cached.")
else:
print("ℹ️ No new news items to analyze for briefing. Skipping.")
# --- FastAPI App Initialization ---
app = FastAPI(title="CryptoSentinel Co-Pilot", lifespan=lifespan)
templates = Jinja2Templates(directory="templates")
# --- HTML Rendering Helper ---
def render_analysis_card(payload: dict, is_news: bool = False) -> str:
s = payload
text_to_show = s.get('summary', 'Analysis failed or not available.')
if is_news:
url = s.get('url', '#')
text_to_show = f'<a href="{url}" target="_blank" rel="noopener noreferrer">{s.get("summary", "N/A")}</a>'
impact_class = f"impact-{s.get('impact', 'low').lower()}"
sentiment_class = f"sentiment-{s.get('sentiment', 'neutral').lower()}"
return f"""<div class="card {impact_class}"><blockquote>{text_to_show}</blockquote><div class="grid"><div><strong>Sentiment:</strong> <span class="{sentiment_class}">{s.get('sentiment')} ({s.get('sentiment_score', 0):.2f})</span></div><div><strong>Impact:</strong> {s.get('impact')}</div></div><div class="grid"><div><strong>Topic:</strong> {s.get('topic')}</div><div><strong>Entities:</strong> {', '.join(s.get('entities', []))}</div></div></div>"""
# --- API Endpoints ---
@app.get("/", response_class=HTMLResponse)
async def serve_dashboard(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
@app.get("/api/prices", response_class=HTMLResponse)
async def get_prices_fragment(request: Request):
prices = request.app.state.price_fetcher.get_current_prices()
html = "".join(f"<div><strong>{k.capitalize()}:</strong> ${v:,.2f}</div>" if isinstance(v, (int, float)) else f"<div><strong>{k.capitalize()}:</strong> {v}</div>" for k, v in prices.items())
return HTMLResponse(content=html)
@app.post("/api/sentiment")
async def analyze_sentiment(request: Request, background_tasks: BackgroundTasks, text: str = Form(...)):
analyzer: GeminiAnalyzer = request.app.state.gemini_analyzer
async def task_wrapper():
analysis = await analyzer.analyze_text(text)
await request.app.state.sentiment_queue.put(analysis)
background_tasks.add_task(task_wrapper)
return HTMLResponse(content="<small>βœ… Queued for deep analysis...</small>")
@app.get("/api/briefing", response_class=HTMLResponse)
async def get_daily_briefing(request: Request):
briefing_html = markdown(request.app.state.daily_briefing_cache)
return HTMLResponse(content=briefing_html)
@app.get("/api/sentiment/stream")
async def sentiment_stream(request: Request):
queue: asyncio.Queue = request.app.state.sentiment_queue
async def event_generator():
while True:
payload = await queue.get()
html = render_analysis_card(payload)
data_payload = html.replace('\n', '')
yield f"event: sentiment_update\ndata: {data_payload}\n\n"
return StreamingResponse(event_generator(), media_type="text/event-stream")
@app.get("/api/news/stream")
async def news_stream(request: Request):
queue: asyncio.Queue = request.app.state.news_queue
async def event_generator():
while True:
payload = await queue.get()
html = render_analysis_card(payload, is_news=True)
data_payload = html.replace('\n', '')
yield f"event: news_update\ndata: {data_payload}\n\n"
return StreamingResponse(event_generator(), media_type="text/event-stream")