Spaces:
Sleeping
Sleeping
File size: 7,956 Bytes
b159555 d5b9420 b159555 d5b9420 b159555 c6f94f2 b159555 d5b9420 b159555 78decd4 b159555 d5b9420 b159555 d5b9420 b159555 448d7a9 0e87c05 d5b9420 0e87c05 d5b9420 175724c b159555 c6f94f2 eaf2b94 d5b9420 b159555 c6f94f2 d5b9420 c6f94f2 0e87c05 c6f94f2 d5b9420 b159555 d5b9420 b159555 d5b9420 b159555 eaf2b94 c6f94f2 d5b9420 0e87c05 d5b9420 c6f94f2 b159555 d5b9420 c6f94f2 d5b9420 c6f94f2 d5b9420 0e87c05 448d7a9 d5b9420 b159555 d5b9420 b159555 d5b9420 b159555 d5b9420 0e87c05 d5b9420 c6f94f2 d5b9420 b159555 d5b9420 0e87c05 c6f94f2 cb01390 c6f94f2 e446a42 d5b9420 c6f94f2 f29497b c6f94f2 e446a42 d5b9420 b159555 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
"""
CryptoSentinel Co-Pilot β Your AI-Powered Market Analyst.
This application provides a multi-faceted view of the crypto market by:
- Tracking live prices.
- Proactively fetching, analyzing, and synthesizing top news into a daily briefing.
- Offering on-demand deep analysis of any text.
- Streaming all insights to a dynamic frontend in real-time.
"""
import asyncio
import json
import os
from contextlib import asynccontextmanager
from typing import Optional, Union, List
import httpx
from fastapi import FastAPI, Request, BackgroundTasks, Form
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.templating import Jinja2Templates
from markdown import markdown
from newsapi import NewsApiClient
# Import our modular service classes
from .price_fetcher import PriceFetcher
from .gemini_analyzer import GeminiAnalyzer
# --- Application Lifespan for Resource Management ---
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manages application startup and shutdown events."""
async with httpx.AsyncClient() as client:
# Instantiate and store services in the application state
app.state.price_fetcher = PriceFetcher(client=client, coins=["bitcoin", "ethereum", "dogecoin"])
app.state.gemini_analyzer = GeminiAnalyzer(client=client)
app.state.news_api = NewsApiClient(api_key=os.getenv("NEWS_API_KEY"))
# State for caching and queues
app.state.daily_briefing_cache = "### Briefing Unavailable\nGenerating the first daily briefing, please check back in a few minutes."
app.state.analyzed_news_today: List[dict] = []
app.state.sentiment_queue: asyncio.Queue = asyncio.Queue()
app.state.news_queue: asyncio.Queue = asyncio.Queue()
# Create cancellable background tasks
price_task = asyncio.create_task(run_periodic_updates(app.state.price_fetcher, 60))
news_task = asyncio.create_task(run_periodic_news_analysis(app, 900)) # 15 mins
briefing_task = asyncio.create_task(run_daily_briefing_generation(app, 3600)) # Every hour
print("π CryptoSentinel Co-Pilot started.")
yield
print("β³ Shutting down background tasks...")
price_task.cancel()
news_task.cancel()
briefing_task.cancel()
try:
await asyncio.gather(price_task, news_task, briefing_task, return_exceptions=True)
except asyncio.CancelledError:
print("Background tasks cancelled successfully.")
print("β
Shutdown complete.")
async def run_periodic_updates(fetcher: PriceFetcher, interval_seconds: int):
"""Periodically updates crypto prices."""
while True:
await fetcher.update_prices_async()
await asyncio.sleep(interval_seconds)
async def run_periodic_news_analysis(app: FastAPI, interval_seconds: int):
"""Periodically fetches, analyzes, and queues top crypto news."""
while True:
print("π° Fetching latest crypto news...")
try:
top_headlines = app.state.news_api.get_everything(
q='bitcoin OR ethereum OR crypto OR blockchain',
language='en', sort_by='publishedAt', page_size=5
)
analyzer: GeminiAnalyzer = app.state.gemini_analyzer
for article in top_headlines.get('articles', []):
title = article.get('title')
if title and "[Removed]" not in title:
analysis = await analyzer.analyze_text(title)
analysis['url'] = article.get('url')
await app.state.news_queue.put(analysis)
app.state.analyzed_news_today.append(analysis)
except Exception as e:
print(f"β Error during news analysis: {e}")
await asyncio.sleep(interval_seconds)
async def run_daily_briefing_generation(app: FastAPI, interval_seconds: int):
"""Periodically generates and caches the daily market briefing."""
while True:
await asyncio.sleep(interval_seconds) # Wait first, then generate
print("π Generating Daily Market Briefing...")
if app.state.analyzed_news_today:
analyzer: GeminiAnalyzer = app.state.gemini_analyzer
briefing = await analyzer.generate_daily_briefing(app.state.analyzed_news_today)
app.state.daily_briefing_cache = briefing
app.state.analyzed_news_today = [] # Clear list for the next cycle
print("β
Daily briefing generated and cached.")
else:
print("βΉοΈ No new news items to analyze for briefing. Skipping.")
# --- FastAPI App Initialization ---
app = FastAPI(title="CryptoSentinel Co-Pilot", lifespan=lifespan)
templates = Jinja2Templates(directory="templates")
# --- HTML Rendering Helper ---
def render_analysis_card(payload: dict, is_news: bool = False) -> str:
s = payload
text_to_show = s.get('summary', 'Analysis failed or not available.')
if is_news:
url = s.get('url', '#')
text_to_show = f'<a href="{url}" target="_blank" rel="noopener noreferrer">{s.get("summary", "N/A")}</a>'
impact_class = f"impact-{s.get('impact', 'low').lower()}"
sentiment_class = f"sentiment-{s.get('sentiment', 'neutral').lower()}"
return f"""<div class="card {impact_class}"><blockquote>{text_to_show}</blockquote><div class="grid"><div><strong>Sentiment:</strong> <span class="{sentiment_class}">{s.get('sentiment')} ({s.get('sentiment_score', 0):.2f})</span></div><div><strong>Impact:</strong> {s.get('impact')}</div></div><div class="grid"><div><strong>Topic:</strong> {s.get('topic')}</div><div><strong>Entities:</strong> {', '.join(s.get('entities', []))}</div></div></div>"""
# --- API Endpoints ---
@app.get("/", response_class=HTMLResponse)
async def serve_dashboard(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
@app.get("/api/prices", response_class=HTMLResponse)
async def get_prices_fragment(request: Request):
prices = request.app.state.price_fetcher.get_current_prices()
html = "".join(f"<div><strong>{k.capitalize()}:</strong> ${v:,.2f}</div>" if isinstance(v, (int, float)) else f"<div><strong>{k.capitalize()}:</strong> {v}</div>" for k, v in prices.items())
return HTMLResponse(content=html)
@app.post("/api/sentiment")
async def analyze_sentiment(request: Request, background_tasks: BackgroundTasks, text: str = Form(...)):
analyzer: GeminiAnalyzer = request.app.state.gemini_analyzer
async def task_wrapper():
analysis = await analyzer.analyze_text(text)
await request.app.state.sentiment_queue.put(analysis)
background_tasks.add_task(task_wrapper)
return HTMLResponse(content="<small>β
Queued for deep analysis...</small>")
@app.get("/api/briefing", response_class=HTMLResponse)
async def get_daily_briefing(request: Request):
briefing_html = markdown(request.app.state.daily_briefing_cache)
return HTMLResponse(content=briefing_html)
@app.get("/api/sentiment/stream")
async def sentiment_stream(request: Request):
queue: asyncio.Queue = request.app.state.sentiment_queue
async def event_generator():
while True:
payload = await queue.get()
html = render_analysis_card(payload)
data_payload = html.replace('\n', '')
yield f"event: sentiment_update\ndata: {data_payload}\n\n"
return StreamingResponse(event_generator(), media_type="text/event-stream")
@app.get("/api/news/stream")
async def news_stream(request: Request):
queue: asyncio.Queue = request.app.state.news_queue
async def event_generator():
while True:
payload = await queue.get()
html = render_analysis_card(payload, is_news=True)
data_payload = html.replace('\n', '')
yield f"event: news_update\ndata: {data_payload}\n\n"
return StreamingResponse(event_generator(), media_type="text/event-stream") |