File size: 7,198 Bytes
7239f1c
 
 
 
 
 
 
 
 
 
 
 
526c84c
7239f1c
 
526c84c
7239f1c
526c84c
ff9180e
 
 
 
 
 
 
 
526c84c
7239f1c
526c84c
7239f1c
 
 
526c84c
7239f1c
526c84c
 
 
7239f1c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
526c84c
 
 
7239f1c
ff9180e
7239f1c
526c84c
 
5e698c6
 
7239f1c
 
526c84c
 
 
7239f1c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
526c84c
7239f1c
 
526c84c
7239f1c
 
 
526c84c
7239f1c
 
 
 
526c84c
7239f1c
 
526c84c
7239f1c
 
526c84c
7239f1c
 
 
 
 
 
526c84c
7239f1c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ff9180e
 
67188f2
 
ff9180e
67188f2
 
 
 
7239f1c
 
 
 
 
 
5e698c6
526c84c
ff9180e
 
7239f1c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
"""
CryptoSentinel AI β€” High-performance FastAPI application.

This is the main entry point that orchestrates the entire application.
- Integrates the asynchronous PriceFetcher for live market data.
- Integrates the asynchronous SentimentAnalyzer for real-time analysis.
- Serves the interactive frontend and provides all necessary API endpoints.
"""
import asyncio
import json
from contextlib import asynccontextmanager

import httpx
from fastapi import FastAPI, Request, BackgroundTasks
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.templating import Jinja2Templates
from pydantic import BaseModel, constr

# ====================================================================
#                       FIX APPLIED HERE
# ====================================================================
# Use relative imports because price_fetcher and sentiment are in the same 'app' directory.
from .price_fetcher import PriceFetcher
from .sentiment import SentimentAnalyzer
# ====================================================================


# --- Pydantic Model for API Input Validation ---

class SentimentRequest(BaseModel):
    """Ensures the text for sentiment analysis is a non-empty string."""
    text: constr(strip_whitespace=True, min_length=1)

# --- Application Lifespan for Resource Management ---

@asynccontextmanager
async def lifespan(app: FastAPI):
    """
    Manages application startup and shutdown events using the modern
    lifespan context manager.
    """
    # On startup:
    async with httpx.AsyncClient() as client:
        # Instantiate and store our services in the application state.
        # This makes them accessible in any request handler via `request.app.state`.
        app.state.price_fetcher = PriceFetcher(client=client, coins=["bitcoin", "ethereum", "dogecoin"])
        app.state.sentiment_analyzer = SentimentAnalyzer(client=client)
        app.state.request_counter = 0  # Simple counter for unique SSE event IDs

        # Create a cancellable background task for continuous price updates.
        price_update_task = asyncio.create_task(
            run_periodic_updates(app.state.price_fetcher, interval_seconds=10)
        )
        
        print("πŸš€ CryptoSentinel AI started successfully.")
        yield  # The application is now running and ready to accept requests.
        
        # On shutdown:
        print("⏳ Shutting down background tasks...")
        price_update_task.cancel()
        try:
            await price_update_task
        except asyncio.CancelledError:
            print("Price update task cancelled successfully.")
        print("βœ… Shutdown complete.")

async def run_periodic_updates(fetcher: PriceFetcher, interval_seconds: int):
    """A robust asyncio background task that periodically updates prices."""
    while True:
        await fetcher.update_prices_async()
        await asyncio.sleep(interval_seconds)

# --- FastAPI App Initialization ---

app = FastAPI(title="CryptoSentinel AI", lifespan=lifespan)
# Assuming your project structure is /app/templates
templates = Jinja2Templates(directory="app/templates")

# --- API Endpoints ---

@app.get("/", response_class=HTMLResponse)
async def serve_dashboard(request: Request):
    """Serves the main interactive dashboard from `index.html`."""
    return templates.TemplateResponse("index.html", {"request": request})

@app.get("/api/prices", response_class=HTMLResponse)
async def get_prices_fragment(request: Request):
    """Returns an HTML fragment with the latest cached crypto prices for HTMX."""
    price_fetcher: PriceFetcher = request.app.state.price_fetcher
    prices = price_fetcher.get_current_prices()

    html_fragment = ""
    for coin, price in prices.items():
        # Format the price nicely, handling the initial '--' state
        price_str = f"${price:,.2f}" if isinstance(price, (int, float)) else price
        html_fragment += f"<div><strong>{coin.capitalize()}:</strong> {price_str}</div>"
    
    return HTMLResponse(content=html_fragment)

@app.post("/api/sentiment")
async def analyze_sentiment(
    payload: SentimentRequest,
    request: Request,
    background_tasks: BackgroundTasks
):
    """
    Validates and queues a text for sentiment analysis. The heavy lifting is
    done in the background to ensure the API responds instantly.
    """
    analyzer: SentimentAnalyzer = request.app.state.sentiment_analyzer
    request.app.state.request_counter += 1
    request_id = request.app.state.request_counter

    # The actual API call to Hugging Face will run after this response is sent.
    background_tasks.add_task(analyzer.compute_and_publish, payload.text, request_id)
    
    return HTMLResponse(content="<small>Queued for analysis...</small>")

@app.get("/api/sentiment/stream")
async def sentiment_stream(request: Request):
    """
    Establishes a Server-Sent Events (SSE) connection. It efficiently pushes
    new sentiment results as HTML fragments to the client as they become available.
    """
    analyzer: SentimentAnalyzer = request.app.state.sentiment_analyzer

    async def event_generator():
        # Clear the initial "waiting..." message on the client.
        # hx-swap-oob="innerHTML" swaps this div out-of-band without affecting the target.
        yield f"event: sentiment_update\ndata: <div id='sentiment-results' hx-swap-oob='innerHTML'></div>\n\n"
        
        # Listen for new results from the analyzer's internal queue.
        async for result_payload in analyzer.stream_results():
            try:
                result = result_payload['result']
                label = str(result.get('label', 'NEUTRAL')).lower()
                score = result.get('score', 0.0) * 100
                text = result_payload['text']
                
                # Dynamically build the HTML fragment to be sent to the client.
                html_fragment = f"""
                <div>
                    <blockquote>{text}</blockquote>
                    <p>
                        <strong>Result:</strong> 
                        <span class="sentiment-{label}">{label.upper()}</span> 
                        (Confidence: {score:.1f}%)
                    </p>
                </div>
                """
                # First, process the string to remove newlines. This avoids a
                # backslash in the f-string expression, fixing the SyntaxError.
                data_payload = html_fragment.replace('\n', '')
                
                # Then, use the clean variable in the f-string to build the message.
                sse_message = f"event: sentiment_update\ndata: {data_payload}\n\n"
                
                yield sse_message

            except (KeyError, TypeError):
                continue # Ignore malformed payloads

    return StreamingResponse(event_generator(), media_type="text/event-stream")

# --- Main execution block for local development ---
if __name__ == "__main__":
    import uvicorn
    # This run command assumes you are running `python app.py` from within the /app directory
    # or `python -m app.app` from the parent directory.
    uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=True)