File size: 6,660 Bytes
b159555
431c9a5
073930c
 
 
 
431c9a5
073930c
b159555
 
 
c6f94f2
b159555
073930c
b159555
 
431c9a5
b159555
 
 
01e217d
 
 
073930c
b349d30
b159555
073930c
a17b947
0e87c05
 
073930c
 
 
 
0e87c05
073930c
b159555
073930c
431c9a5
073930c
2f96557
073930c
431c9a5
073930c
 
2f96557
073930c
a17b947
431c9a5
c6f94f2
0e87c05
073930c
 
 
c6f94f2
073930c
a17b947
073930c
 
b159555
073930c
431c9a5
073930c
 
 
 
 
2f96557
073930c
2f96557
073930c
 
2f96557
073930c
 
 
 
2f96557
 
 
 
 
 
 
 
 
073930c
2f96557
073930c
2f96557
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
669ca48
073930c
2f96557
073930c
2f96557
073930c
 
 
 
431c9a5
b159555
 
 
431c9a5
073930c
 
431c9a5
 
 
 
073930c
 
431c9a5
a17b947
073930c
 
 
 
 
 
a17b947
073930c
 
a17b947
 
 
b159555
 
 
073930c
431c9a5
b159555
 
431c9a5
 
 
24a8706
c6f94f2
 
 
431c9a5
e446a42
431c9a5
073930c
669ca48
a17b947
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
"""
Sentinel TradeFlow Protocol β€” High-performance FastAPI application.

This is the main entry point that orchestrates the entire application.
- Integrates an asynchronous PriceFetcher for live market data.
- Integrates a sophisticated GeminiAnalyzer for deep text analysis.
- Implements an automated pipeline to fetch, analyze, and stream trading signals.
- Serves the interactive frontend and provides all necessary API endpoints.
"""
import asyncio
import json
import os
from contextlib import asynccontextmanager
from typing import Optional, Union

import httpx
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.templating import Jinja2Templates

# Use relative imports because these modules are in the same 'app' package.
from .price_fetcher import PriceFetcher
from .gemini_analyzer import GeminiAnalyzer
from newsapi import NewsApiClient


# --- Application Lifespan for Resource Management ---

@asynccontextmanager
async def lifespan(app: FastAPI):
    """
    Manages application startup and shutdown events using the modern
    lifespan context manager.
    """
    async with httpx.AsyncClient() as client:
        app.state.price_fetcher = PriceFetcher(client=client, coins=["bitcoin", "ethereum", "dogecoin"])
        app.state.gemini_analyzer = GeminiAnalyzer(client=client)
        app.state.news_api = NewsApiClient(api_key=os.getenv("NEWS_API_KEY"))
        app.state.signal_queue: asyncio.Queue = asyncio.Queue()

        # Create cancellable background tasks. Let's use a shorter timer for testing.
        price_task = asyncio.create_task(
            run_periodic_updates(app.state.price_fetcher, interval_seconds=60)
        )
        news_task = asyncio.create_task(
            run_periodic_news_analysis(app, interval_seconds=300)  # Check news every 5 minutes for debugging
        )
        
        print("πŸš€ Sentinel TradeFlow Protocol started successfully.")
        yield
        
        print("⏳ Shutting down background tasks...")
        price_task.cancel()
        news_task.cancel()
        try:
            await asyncio.gather(price_task, news_task)
        except asyncio.CancelledError:
            print("Background tasks cancelled successfully.")
        print("βœ… Shutdown complete.")

async def run_periodic_updates(fetcher: PriceFetcher, interval_seconds: int):
    """A robust asyncio background task that periodically updates prices."""
    while True:
        await fetcher.update_prices_async()
        await asyncio.sleep(interval_seconds)

async def run_periodic_news_analysis(app: FastAPI, interval_seconds: int):
    """Fetches, analyzes, and queues top crypto news periodically with detailed logging."""
    while True:
        print("πŸ“° [1/5] Fetching latest crypto news...")
        try:
            top_headlines = app.state.news_api.get_everything(
                q='bitcoin OR ethereum OR "binance coin" OR solana OR ripple OR cardano',
                language='en',
                sort_by='publishedAt',
                page_size=5
            )
            
            articles = top_headlines.get('articles', [])
            print(f"πŸ“° [2/5] NewsAPI call successful. Found {len(articles)} articles.")
            
            if not articles:
                print("πŸ“° [SKIP] No new articles found in this cycle.")
                await asyncio.sleep(interval_seconds)
                continue

            analyzer: GeminiAnalyzer = app.state.gemini_analyzer
            for article in articles:
                title = article.get('title')
                print(f"πŸ“° [3/5] Processing article: '{title}'")

                if not title or "[Removed]" in title:
                    print(f"πŸ“° [SKIP] Article has no title or was removed.")
                    continue
                
                print(f"πŸ“° [4/5] Sending to Gemini for analysis...")
                analysis = await analyzer.analyze_text(title)
                
                if analysis.get("error"):
                    print(f"❌ [SKIP] Gemini analysis failed for '{title}'. Reason: {analysis.get('reason')}")
                    continue
                
                analysis['url'] = article.get('url')
                await app.state.signal_queue.put(analysis)
                print(f"βœ… [5/5] Signal generated and queued for: '{title}'")

        except Exception as e:
            print(f"❌❌❌ CRITICAL ERROR in news analysis loop: {e}")
            
        print(f"πŸ“° Loop finished. Waiting for {interval_seconds} seconds.")
        await asyncio.sleep(interval_seconds)

# --- FastAPI App Initialization ---

app = FastAPI(title="Sentinel TradeFlow Protocol", lifespan=lifespan)
templates = Jinja2Templates(directory="templates")

# --- HTML Rendering Helper ---
def render_signal_card(payload: dict) -> str:
    """Renders a dictionary of analysis into a styled HTML card."""
    s = payload
    url = s.get('url', '#')
    summary = s.get('summary', 'Analysis failed or not available.')
    text_to_show = f'<a href="{url}" target="_blank" rel="noopener noreferrer">{summary}</a>'

    impact_class = f"impact-{s.get('impact', 'low').lower()}"
    sentiment_class = f"sentiment-{s.get('sentiment', 'neutral').lower()}"

    return f"""
    <div class="card {impact_class}">
        <blockquote>{text_to_show}</blockquote>
        <div class="grid">
            <div><strong>Sentiment:</strong> <span class="{sentiment_class}">{s.get('sentiment')} ({s.get('sentiment_score', 0):.2f})</span></div>
            <div><strong>Impact:</strong> {s.get('impact')}</div>
        </div>
        <div class="grid">
            <div><strong>Topic:</strong> {s.get('topic')}</div>
            <div><strong>Entities:</strong> {', '.join(s.get('entities', []))}</div>
        </div>
    </div>
    """

# --- API Endpoints ---
@app.get("/", response_class=HTMLResponse)
async def serve_dashboard(request: Request):
    """Serves the main interactive dashboard from `index.html`."""
    return templates.TemplateResponse("index.html", {"request": request})

@app.get("/api/signals/stream")
async def signal_stream(request: Request):
    """SSE stream for the automated Signal Stream."""
    queue: asyncio.Queue = request.app.state.signal_queue
    async def event_generator():
        while True:
            payload = await queue.get()
            html = render_signal_card(payload)
            data_payload = html.replace('\n', '')
            sse_message = f"event: message\ndata: {data_payload}\n\n"
            yield sse_message
            
    return StreamingResponse(event_generator(), media_type="text/event-stream")