File size: 7,712 Bytes
b159555
 
 
 
 
 
 
 
 
 
 
c6f94f2
b159555
 
 
 
78decd4
 
 
b159555
 
 
 
 
 
 
 
 
 
448d7a9
0e87c05
 
b159555
 
 
 
0e87c05
b159555
175724c
b159555
c6f94f2
eaf2b94
b159555
 
 
c6f94f2
b159555
 
 
 
 
 
 
 
c6f94f2
 
0e87c05
 
c6f94f2
 
b159555
 
 
 
 
 
 
 
 
 
 
eaf2b94
c6f94f2
 
0e87c05
b159555
c6f94f2
 
b159555
c6f94f2
 
78decd4
c6f94f2
 
 
 
 
 
 
 
 
b159555
c6f94f2
0e87c05
448d7a9
b159555
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
448d7a9
78decd4
0e87c05
78decd4
 
 
 
 
 
b159555
c6f94f2
b159555
 
78decd4
b159555
 
 
 
 
0e87c05
 
c6f94f2
cb01390
c6f94f2
 
 
e446a42
 
 
c6f94f2
f29497b
c6f94f2
 
 
 
 
 
 
e446a42
 
 
b159555
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
"""
CryptoSentinel Pro β€” High-performance FastAPI application.

This is the main entry point that orchestrates the entire application.
- Integrates an asynchronous PriceFetcher for live market data.
- Integrates a sophisticated GeminiAnalyzer for deep text analysis.
- Implements an automated pipeline to fetch, analyze, and stream top crypto news.
- Serves the interactive frontend and provides all necessary API endpoints.
"""
import asyncio
import json
import os
from contextlib import asynccontextmanager
from typing import Optional, Union

import httpx
# =================== FIX APPLIED HERE (1 of 2) ===================
from fastapi import FastAPI, Request, BackgroundTasks, Form
# =================================================================
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.templating import Jinja2Templates
from pydantic import BaseModel, constr

# Import our modular, asynchronous service classes
from .price_fetcher import PriceFetcher
from .gemini_analyzer import GeminiAnalyzer
from newsapi import NewsApiClient

# --- Application Lifespan for Resource Management ---

@asynccontextmanager
async def lifespan(app: FastAPI):
    """
    Manages application startup and shutdown events using the modern
    lifespan context manager.
    """
    async with httpx.AsyncClient() as client:
        # Instantiate and store all services in the application state.
        app.state.price_fetcher = PriceFetcher(client=client, coins=["bitcoin", "ethereum", "dogecoin"])
        app.state.gemini_analyzer = GeminiAnalyzer(client=client)
        app.state.news_api = NewsApiClient(api_key=os.getenv("NEWS_API_KEY"))

        # Create separate queues for the two real-time feeds
        app.state.sentiment_queue: asyncio.Queue = asyncio.Queue()
        app.state.news_queue: asyncio.Queue = asyncio.Queue()

        # Create cancellable background tasks for periodic updates.
        price_task = asyncio.create_task(
            run_periodic_updates(app.state.price_fetcher, interval_seconds=30)
        )
        news_task = asyncio.create_task(
            run_periodic_news_analysis(app, interval_seconds=900)  # Run every 15 minutes
        )
        
        print("πŸš€ CryptoSentinel Pro started successfully.")
        yield
        
        print("⏳ Shutting down background tasks...")
        price_task.cancel()
        news_task.cancel()
        try:
            await asyncio.gather(price_task, news_task)
        except asyncio.CancelledError:
            print("Background tasks cancelled successfully.")
        print("βœ… Shutdown complete.")

async def run_periodic_updates(fetcher: PriceFetcher, interval_seconds: int):
    """A robust asyncio background task that periodically updates prices."""
    while True:
        await fetcher.update_prices_async()
        await asyncio.sleep(interval_seconds)

async def run_periodic_news_analysis(app: FastAPI, interval_seconds: int):
    """Fetches, analyzes, and queues top crypto news periodically."""
    while True:
        print("πŸ“° Fetching latest crypto news for automated analysis...")
        try:
            top_headlines = app.state.news_api.get_everything(
                q='bitcoin OR ethereum OR crypto OR blockchain',
                language='en',
                sort_by='publishedAt',
                page_size=5
            )
            analyzer: GeminiAnalyzer = app.state.gemini_analyzer
            for article in top_headlines.get('articles', []):
                title = article.get('title')
                if title and "[Removed]" not in title:
                    analysis = await analyzer.analyze_text(title)
                    analysis['url'] = article.get('url')
                    await app.state.news_queue.put(analysis)
        except Exception as e:
            print(f"❌ Error during news fetching or analysis: {e}")
            
        await asyncio.sleep(interval_seconds)

# --- FastAPI App Initialization ---

app = FastAPI(title="CryptoSentinel Pro", lifespan=lifespan)
templates = Jinja2Templates(directory="templates")

# --- HTML Rendering Helper ---
def render_analysis_card(payload: dict, is_news: bool = False) -> str:
    s = payload
    text_to_show = s.get('summary', 'Analysis failed or not available.')
    if is_news:
        url = s.get('url', '#')
        text_to_show = f'<a href="{url}" target="_blank" rel="noopener noreferrer">{s.get("summary", "N/A")}</a>'
    impact_class = f"impact-{s.get('impact', 'low').lower()}"
    sentiment_class = f"sentiment-{s.get('sentiment', 'neutral').lower()}"
    return f"""
    <div class="card {impact_class}">
        <blockquote>{text_to_show}</blockquote>
        <div class="grid">
            <div><strong>Sentiment:</strong> <span class="{sentiment_class}">{s.get('sentiment')} ({s.get('sentiment_score', 0):.2f})</span></div>
            <div><strong>Impact:</strong> {s.get('impact')}</div>
        </div>
        <div class="grid">
            <div><strong>Topic:</strong> {s.get('topic')}</div>
            <div><strong>Entities:</strong> {', '.join(s.get('entities', []))}</div>
        </div>
    </div>
    """

# --- API Endpoints ---

@app.get("/", response_class=HTMLResponse)
async def serve_dashboard(request: Request):
    return templates.TemplateResponse("index.html", {"request": request})

@app.get("/api/prices", response_class=HTMLResponse)
async def get_prices_fragment(request: Request):
    price_fetcher: PriceFetcher = request.app.state.price_fetcher
    prices = price_fetcher.get_current_prices()
    html_fragment = "".join(
        f"<div><strong>{coin.capitalize()}:</strong> ${price:,.2f}</div>" if isinstance(price, (int, float))
        else f"<div><strong>{coin.capitalize()}:</strong> {price}</div>"
        for coin, price in prices.items()
    )
    return HTMLResponse(content=html_fragment)

# =================== FIX APPLIED HERE (2 of 2) ===================
@app.post("/api/sentiment")
async def analyze_sentiment(
    request: Request,
    background_tasks: BackgroundTasks,
    text: str = Form(...)  # Tell FastAPI to expect a form field named 'text'
):
# =================================================================
    """Queues user-submitted text for a full Gemini-powered analysis."""
    analyzer: GeminiAnalyzer = request.app.state.gemini_analyzer

    async def analysis_task_wrapper():
        analysis_result = await analyzer.analyze_text(text) # Use the 'text' variable directly
        await request.app.state.sentiment_queue.put(analysis_result)

    background_tasks.add_task(analysis_task_wrapper)
    return HTMLResponse(content="<small>βœ… Queued for deep analysis...</small>")

@app.get("/api/sentiment/stream")
async def sentiment_stream(request: Request):
    queue: asyncio.Queue = request.app.state.sentiment_queue
    async def event_generator():
        while True:
            payload = await queue.get()
            html = render_analysis_card(payload)
            data_payload = html.replace('\n', '')
            sse_message = f"event: sentiment_update\ndata: {data_payload}\n\n"
            yield sse_message
    return StreamingResponse(event_generator(), media_type="text/event-stream")

@app.get("/api/news/stream")
async def news_stream(request: Request):
    queue: asyncio.Queue = request.app.state.news_queue
    async def event_generator():
        while True:
            payload = await queue.get()
            html = render_analysis_card(payload, is_news=True)
            data_payload = html.replace('\n', '')
            sse_message = f"event: news_update\ndata: {data_payload}\n\n"
            yield sse_message
    return StreamingResponse(event_generator(), media_type="text/event-stream")