File size: 7,956 Bytes
b159555
d5b9420
b159555
d5b9420
 
 
 
 
b159555
 
 
c6f94f2
b159555
d5b9420
b159555
 
78decd4
b159555
 
d5b9420
 
b159555
d5b9420
b159555
 
 
 
448d7a9
0e87c05
 
d5b9420
0e87c05
d5b9420
175724c
b159555
c6f94f2
eaf2b94
d5b9420
 
 
b159555
 
c6f94f2
d5b9420
 
 
 
 
 
c6f94f2
0e87c05
 
c6f94f2
 
d5b9420
b159555
d5b9420
b159555
 
 
 
 
d5b9420
b159555
 
 
eaf2b94
c6f94f2
d5b9420
0e87c05
d5b9420
c6f94f2
 
b159555
d5b9420
c6f94f2
 
 
 
 
 
 
 
d5b9420
c6f94f2
d5b9420
0e87c05
448d7a9
d5b9420
 
 
 
 
 
 
 
 
 
 
 
 
 
b159555
 
d5b9420
b159555
 
 
 
 
 
 
 
 
 
 
d5b9420
b159555
 
 
 
 
 
 
 
 
d5b9420
 
 
 
0e87c05
d5b9420
c6f94f2
d5b9420
 
 
 
b159555
 
d5b9420
 
 
 
 
0e87c05
 
c6f94f2
cb01390
c6f94f2
 
 
e446a42
d5b9420
c6f94f2
f29497b
c6f94f2
 
 
 
 
 
 
e446a42
d5b9420
b159555
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
"""
CryptoSentinel Co-Pilot β€” Your AI-Powered Market Analyst.

This application provides a multi-faceted view of the crypto market by:
- Tracking live prices.
- Proactively fetching, analyzing, and synthesizing top news into a daily briefing.
- Offering on-demand deep analysis of any text.
- Streaming all insights to a dynamic frontend in real-time.
"""
import asyncio
import json
import os
from contextlib import asynccontextmanager
from typing import Optional, Union, List

import httpx
from fastapi import FastAPI, Request, BackgroundTasks, Form
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.templating import Jinja2Templates
from markdown import markdown
from newsapi import NewsApiClient

# Import our modular service classes
from .price_fetcher import PriceFetcher
from .gemini_analyzer import GeminiAnalyzer

# --- Application Lifespan for Resource Management ---

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Manages application startup and shutdown events."""
    async with httpx.AsyncClient() as client:
        # Instantiate and store services in the application state
        app.state.price_fetcher = PriceFetcher(client=client, coins=["bitcoin", "ethereum", "dogecoin"])
        app.state.gemini_analyzer = GeminiAnalyzer(client=client)
        app.state.news_api = NewsApiClient(api_key=os.getenv("NEWS_API_KEY"))

        # State for caching and queues
        app.state.daily_briefing_cache = "### Briefing Unavailable\nGenerating the first daily briefing, please check back in a few minutes."
        app.state.analyzed_news_today: List[dict] = []
        app.state.sentiment_queue: asyncio.Queue = asyncio.Queue()
        app.state.news_queue: asyncio.Queue = asyncio.Queue()

        # Create cancellable background tasks
        price_task = asyncio.create_task(run_periodic_updates(app.state.price_fetcher, 60))
        news_task = asyncio.create_task(run_periodic_news_analysis(app, 900))  # 15 mins
        briefing_task = asyncio.create_task(run_daily_briefing_generation(app, 3600)) # Every hour

        print("πŸš€ CryptoSentinel Co-Pilot started.")
        yield
        
        print("⏳ Shutting down background tasks...")
        price_task.cancel()
        news_task.cancel()
        briefing_task.cancel()
        try:
            await asyncio.gather(price_task, news_task, briefing_task, return_exceptions=True)
        except asyncio.CancelledError:
            print("Background tasks cancelled successfully.")
        print("βœ… Shutdown complete.")

async def run_periodic_updates(fetcher: PriceFetcher, interval_seconds: int):
    """Periodically updates crypto prices."""
    while True:
        await fetcher.update_prices_async()
        await asyncio.sleep(interval_seconds)

async def run_periodic_news_analysis(app: FastAPI, interval_seconds: int):
    """Periodically fetches, analyzes, and queues top crypto news."""
    while True:
        print("πŸ“° Fetching latest crypto news...")
        try:
            top_headlines = app.state.news_api.get_everything(
                q='bitcoin OR ethereum OR crypto OR blockchain',
                language='en', sort_by='publishedAt', page_size=5
            )
            analyzer: GeminiAnalyzer = app.state.gemini_analyzer
            for article in top_headlines.get('articles', []):
                title = article.get('title')
                if title and "[Removed]" not in title:
                    analysis = await analyzer.analyze_text(title)
                    analysis['url'] = article.get('url')
                    await app.state.news_queue.put(analysis)
                    app.state.analyzed_news_today.append(analysis)
        except Exception as e:
            print(f"❌ Error during news analysis: {e}")
        await asyncio.sleep(interval_seconds)

async def run_daily_briefing_generation(app: FastAPI, interval_seconds: int):
    """Periodically generates and caches the daily market briefing."""
    while True:
        await asyncio.sleep(interval_seconds) # Wait first, then generate
        print("πŸ“ Generating Daily Market Briefing...")
        if app.state.analyzed_news_today:
            analyzer: GeminiAnalyzer = app.state.gemini_analyzer
            briefing = await analyzer.generate_daily_briefing(app.state.analyzed_news_today)
            app.state.daily_briefing_cache = briefing
            app.state.analyzed_news_today = [] # Clear list for the next cycle
            print("βœ… Daily briefing generated and cached.")
        else:
            print("ℹ️ No new news items to analyze for briefing. Skipping.")

# --- FastAPI App Initialization ---

app = FastAPI(title="CryptoSentinel Co-Pilot", lifespan=lifespan)
templates = Jinja2Templates(directory="templates")

# --- HTML Rendering Helper ---
def render_analysis_card(payload: dict, is_news: bool = False) -> str:
    s = payload
    text_to_show = s.get('summary', 'Analysis failed or not available.')
    if is_news:
        url = s.get('url', '#')
        text_to_show = f'<a href="{url}" target="_blank" rel="noopener noreferrer">{s.get("summary", "N/A")}</a>'
    impact_class = f"impact-{s.get('impact', 'low').lower()}"
    sentiment_class = f"sentiment-{s.get('sentiment', 'neutral').lower()}"
    return f"""<div class="card {impact_class}"><blockquote>{text_to_show}</blockquote><div class="grid"><div><strong>Sentiment:</strong> <span class="{sentiment_class}">{s.get('sentiment')} ({s.get('sentiment_score', 0):.2f})</span></div><div><strong>Impact:</strong> {s.get('impact')}</div></div><div class="grid"><div><strong>Topic:</strong> {s.get('topic')}</div><div><strong>Entities:</strong> {', '.join(s.get('entities', []))}</div></div></div>"""

# --- API Endpoints ---

@app.get("/", response_class=HTMLResponse)
async def serve_dashboard(request: Request):
    return templates.TemplateResponse("index.html", {"request": request})

@app.get("/api/prices", response_class=HTMLResponse)
async def get_prices_fragment(request: Request):
    prices = request.app.state.price_fetcher.get_current_prices()
    html = "".join(f"<div><strong>{k.capitalize()}:</strong> ${v:,.2f}</div>" if isinstance(v, (int, float)) else f"<div><strong>{k.capitalize()}:</strong> {v}</div>" for k, v in prices.items())
    return HTMLResponse(content=html)

@app.post("/api/sentiment")
async def analyze_sentiment(request: Request, background_tasks: BackgroundTasks, text: str = Form(...)):
    analyzer: GeminiAnalyzer = request.app.state.gemini_analyzer
    async def task_wrapper():
        analysis = await analyzer.analyze_text(text)
        await request.app.state.sentiment_queue.put(analysis)
    background_tasks.add_task(task_wrapper)
    return HTMLResponse(content="<small>βœ… Queued for deep analysis...</small>")

@app.get("/api/briefing", response_class=HTMLResponse)
async def get_daily_briefing(request: Request):
    briefing_html = markdown(request.app.state.daily_briefing_cache)
    return HTMLResponse(content=briefing_html)

@app.get("/api/sentiment/stream")
async def sentiment_stream(request: Request):
    queue: asyncio.Queue = request.app.state.sentiment_queue
    async def event_generator():
        while True:
            payload = await queue.get()
            html = render_analysis_card(payload)
            data_payload = html.replace('\n', '')
            yield f"event: sentiment_update\ndata: {data_payload}\n\n"
    return StreamingResponse(event_generator(), media_type="text/event-stream")

@app.get("/api/news/stream")
async def news_stream(request: Request):
    queue: asyncio.Queue = request.app.state.news_queue
    async def event_generator():
        while True:
            payload = await queue.get()
            html = render_analysis_card(payload, is_news=True)
            data_payload = html.replace('\n', '')
            yield f"event: news_update\ndata: {data_payload}\n\n"
    return StreamingResponse(event_generator(), media_type="text/event-stream")