File size: 5,584 Bytes
448d7a9
0e87c05
eaf2b94
175724c
 
 
 
0e87c05
cb01390
0e87c05
 
eaf2b94
0e87c05
448d7a9
0e87c05
eaf2b94
0e87c05
 
175724c
0e87c05
 
 
175724c
 
0e87c05
 
175724c
0e87c05
eaf2b94
175724c
448d7a9
0e87c05
 
 
175724c
 
0e87c05
175724c
0e87c05
175724c
 
 
0e87c05
eaf2b94
175724c
0e87c05
175724c
0e87c05
 
 
175724c
0e87c05
175724c
0e87c05
 
 
 
 
 
 
eaf2b94
0e87c05
175724c
0e87c05
 
 
448d7a9
0e87c05
448d7a9
0e87c05
175724c
 
eaf2b94
175724c
 
448d7a9
175724c
0e87c05
 
 
 
 
 
 
 
 
 
 
448d7a9
0e87c05
175724c
0e87c05
 
 
 
175724c
448d7a9
0e87c05
 
 
cb01390
175724c
 
0e87c05
175724c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
"""
CryptoSentinel AI β€” High-performance FastAPI application.

This is the main entry point that orchestrates the entire application.
- Integrates the asynchronous PriceFetcher for live market data.
- Integrates the asynchronous SentimentAnalyzer for real-time analysis.
- Serves the interactive frontend and provides all necessary API endpoints.
"""
import asyncio
import json
from contextlib import asynccontextmanager

import httpx
from fastapi import FastAPI, Request, BackgroundTasks
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.templating import Jinja2Templates
from pydantic import BaseModel, constr

# These relative imports will now work correctly.
from .price_fetcher import PriceFetcher
from .sentiment import SentimentAnalyzer


# --- Pydantic Model for API Input Validation ---

class SentimentRequest(BaseModel):
    """Ensures the text for sentiment analysis is a non-empty string."""
    text: constr(strip_whitespace=True, min_length=1)

# --- Application Lifespan for Resource Management ---

@asynccontextmanager
async def lifespan(app: FastAPI):
    """
    Manages application startup and shutdown events using the modern
    lifespan context manager.
    """
    # On startup:
    async with httpx.AsyncClient() as client:
        # Instantiate and store our services in the application state.
        app.state.price_fetcher = PriceFetcher(client=client, coins=["bitcoin", "ethereum", "dogecoin"])
        app.state.sentiment_analyzer = SentimentAnalyzer(client=client)
        app.state.request_counter = 0

        # Create a cancellable background task for continuous price updates.
        price_update_task = asyncio.create_task(
            run_periodic_updates(app.state.price_fetcher, interval_seconds=10)
        )
        
        print("πŸš€ CryptoSentinel AI started successfully.")
        yield
        
        # On shutdown:
        print("⏳ Shutting down background tasks...")
        price_update_task.cancel()
        try:
            await price_update_task
        except asyncio.CancelledError:
            print("Price update task cancelled successfully.")
        print("βœ… Shutdown complete.")

async def run_periodic_updates(fetcher: PriceFetcher, interval_seconds: int):
    """A robust asyncio background task that periodically updates prices."""
    while True:
        await fetcher.update_prices_async()
        await asyncio.sleep(interval_seconds)

# --- FastAPI App Initialization ---

app = FastAPI(title="CryptoSentinel AI", lifespan=lifespan)
# This path assumes the app is run from the root directory.
templates = Jinja2Templates(directory="app/templates")

# --- API Endpoints ---
# ... (The rest of the file is unchanged) ...
@app.get("/", response_class=HTMLResponse)
async def serve_dashboard(request: Request):
    return templates.TemplateResponse("index.html", {"request": request})

@app.get("/api/prices", response_class=HTMLResponse)
async def get_prices_fragment(request: Request):
    price_fetcher: PriceFetcher = request.app.state.price_fetcher
    prices = price_fetcher.get_current_prices()
    html_fragment = ""
    for coin, price in prices.items():
        price_str = f"${price:,.2f}" if isinstance(price, (int, float)) else price
        html_fragment += f"<div><strong>{coin.capitalize()}:</strong> {price_str}</div>"
    return HTMLResponse(content=html_fragment)

@app.post("/api/sentiment")
async def analyze_sentiment(payload: SentimentRequest, request: Request, background_tasks: BackgroundTasks):
    analyzer: SentimentAnalyzer = request.app.state.sentiment_analyzer
    request.app.state.request_counter += 1
    request_id = request.app.state.request_counter
    background_tasks.add_task(analyzer.compute_and_publish, payload.text, request_id)
    return HTMLResponse(content="<small>Queued for analysis...</small>")

@app.get("/api/sentiment/stream")
async def sentiment_stream(request: Request):
    analyzer: SentimentAnalyzer = request.app.state.sentiment_analyzer
    async def event_generator():
        yield f"event: sentiment_update\ndata: <div id='sentiment-results' hx-swap-oob='innerHTML'></div>\n\n"
        async for result_payload in analyzer.stream_results():
            try:
                result = result_payload['result']
                label = str(result.get('label', 'NEUTRAL')).lower()
                score = result.get('score', 0.0) * 100
                text = result_payload['text']
                html_fragment = f"""
                <div>
                    <blockquote>{text}</blockquote>
                    <p>
                        <strong>Result:</strong> 
                        <span class="sentiment-{label}">{label.upper()}</span> 
                        (Confidence: {score:.1f}%)
                    </p>
                </div>
                """
                data_payload = html_fragment.replace('\n', '')
                sse_message = f"event: sentiment_update\ndata: {data_payload}\n\n"
                yield sse_message
            except (KeyError, TypeError):
                continue
    return StreamingResponse(event_generator(), media_type="text/event-stream")

# This block is now mostly for IDEs, the primary run method is the uvicorn command.
if __name__ == "__main__":
    # Note: Running this file directly (`python app/app.py`) will fail due to relative imports.
    # Use the command: `uvicorn app.app:app --reload` from the project root.
    print("To run this application, use the command from the root directory:")
    print("uvicorn app.app:app --host 0.0.0.0 --port 7860 --reload")