mgbam commited on
Commit
a17b947
Β·
verified Β·
1 Parent(s): 42fc60a

Update app/app.py

Browse files
Files changed (1) hide show
  1. app/app.py +133 -129
app/app.py CHANGED
@@ -1,167 +1,171 @@
1
  """
2
- The Sentinel Protocol β€” An Autonomous AI Market Analyst.
3
- Connects real-time news events to market data visualizations.
 
 
 
 
 
 
4
  """
5
  import asyncio
6
  import json
7
  import os
8
  from contextlib import asynccontextmanager
9
- from typing import Optional, Union, List
10
- import pandas as pd
11
 
12
  import httpx
13
  from fastapi import FastAPI, Request, BackgroundTasks, Form
14
  from fastapi.responses import HTMLResponse, StreamingResponse
15
  from fastapi.templating import Jinja2Templates
16
- from markdown import markdown
17
- from newsapi import NewsApiClient
18
 
19
- # Import our modular service classes
20
- from .price_fetcher import PriceFetcher
21
  from .gemini_analyzer import GeminiAnalyzer
22
- from .chart_generator import generate_price_chart
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
23
 
24
- # --- Lifespan Manager ---
 
25
  @asynccontextmanager
26
  async def lifespan(app: FastAPI):
27
  """Manages application startup and shutdown events."""
28
  async with httpx.AsyncClient() as client:
29
- app.state.price_fetcher = PriceFetcher(client=client, coins=["bitcoin", "ethereum", "dogecoin"])
30
  app.state.gemini_analyzer = GeminiAnalyzer(client=client)
31
- app.state.news_api = NewsApiClient(api_key=os.getenv("NEWS_API_KEY"))
32
- app.state.coingecko_client = client # For fetching chart data
33
-
34
- app.state.daily_briefing_cache = "### Briefing Unavailable\nSentinel is gathering initial intelligence..."
35
- app.state.analyzed_news_today: List[dict] = []
36
- app.state.sentiment_queue: asyncio.Queue = asyncio.Queue()
37
- app.state.news_queue: asyncio.Queue = asyncio.Queue()
38
-
39
- tasks = [
40
- run_periodic_updates(app.state.price_fetcher, 60),
41
- run_periodic_news_analysis(app, 900), # 15 mins
42
- run_daily_briefing_generation(app, 3600) # Every hour
43
- ]
44
- background_tasks = [asyncio.create_task(task) for task in tasks]
45
 
46
- print("πŸš€ The Sentinel Protocol is online.")
 
 
 
47
  yield
48
 
49
  print("⏳ Shutting down Sentinel protocols...")
50
- for task in background_tasks:
51
- task.cancel()
52
- await asyncio.gather(*background_tasks, return_exceptions=True)
53
- print("βœ… Sentinel Protocol offline.")
54
-
55
- # --- Background Tasks ---
56
- async def run_periodic_updates(fetcher: PriceFetcher, interval: int):
57
- while True:
58
- await fetcher.update_prices_async()
59
- await asyncio.sleep(interval)
60
-
61
- async def run_periodic_news_analysis(app: FastAPI, interval: int):
62
- """Fetches news, analyzes it, generates charts, and queues the full payload."""
63
- while True:
64
- print("πŸ“° Sentinel is scanning for new market intelligence...")
65
  try:
66
- headlines = app.state.news_api.get_everything(q='bitcoin OR ethereum OR solana', language='en', sort_by='publishedAt', page_size=5)
67
- analyzer: GeminiAnalyzer = app.state.gemini_analyzer
68
-
69
- for article in headlines.get('articles', []):
70
- title = article.get('title')
71
- if not title or "[Removed]" in title:
72
- continue
73
-
74
- analysis = await analyzer.analyze_text(title)
75
- if not analysis or analysis.get('sentiment') == 'ERROR':
76
- continue
77
-
78
- analysis['url'] = article.get('url')
79
-
80
- # --- CHART GENERATION LOGIC ---
81
- chart_base64 = ""
82
- main_entity = analysis.get('entities', [])[0].lower() if analysis.get('entities') else None
83
- if main_entity in ["bitcoin", "ethereum", "solana"]: # Add more as needed
84
- event_time = pd.to_datetime(article.get('publishedAt'))
85
- # Fetch last 24h of price data for charting
86
- cg_url = f"https://api.coingecko.com/api/v3/coins/{main_entity}/market_chart"
87
- params = {'vs_currency': 'usd', 'days': '1'}
88
- resp = await app.state.coingecko_client.get(cg_url, params=params)
89
- if resp.status_code == 200:
90
- price_data = resp.json().get('prices', [])
91
- chart_base64 = generate_price_chart(price_data, event_time, main_entity.capitalize())
92
-
93
- analysis['chart'] = chart_base64
94
- await app.state.news_queue.put(analysis)
95
- app.state.analyzed_news_today.append(analysis)
96
-
97
- except Exception as e:
98
- print(f"❌ News analysis pipeline failed: {e}")
99
- await asyncio.sleep(interval)
100
-
101
- async def run_daily_briefing_generation(app: FastAPI, interval: int):
102
- """Generates the daily briefing."""
103
- await asyncio.sleep(120) # Initial delay to gather some news first
104
- while True:
105
- print("πŸ“ Sentinel is synthesizing the Daily Briefing...")
106
- if app.state.analyzed_news_today:
107
- analyzer: GeminiAnalyzer = app.state.gemini_analyzer
108
- briefing = await analyzer.generate_daily_briefing(app.state.analyzed_news_today)
109
- app.state.daily_briefing_cache = briefing
110
- app.state.analyzed_news_today = []
111
- print("βœ… Daily briefing synthesized.")
112
- else:
113
- print("ℹ️ No new intelligence to synthesize. Waiting for next cycle.")
114
- await asyncio.sleep(interval)
115
 
116
- # --- App and Endpoints ---
117
- app = FastAPI(title="Sentinel Protocol", lifespan=lifespan)
118
  templates = Jinja2Templates(directory="templates")
119
 
120
  # --- HTML Rendering Helper ---
121
- def render_analysis_card(payload: dict) -> str:
122
- s = payload
123
- summary_link = f'<a href="{s.get("url", "#")}" target="_blank" rel="noopener noreferrer">{s.get("summary", "N/A")}</a>'
124
- impact_class = f"impact-{s.get('impact', 'low').lower()}"
125
- sentiment_class = f"sentiment-{s.get('sentiment', 'neutral').lower()}"
126
- chart_html = f'<img src="{s.get("chart")}" alt="Price chart" style="width:100%; margin-top:1rem; border-radius:var(--pico-border-radius);">' if s.get("chart") else ""
 
 
127
 
128
- return f"""<div class="card {impact_class}"><blockquote>{summary_link}</blockquote>{chart_html}<div class="grid"><div><strong>Sentiment:</strong> <span class="{sentiment_class}">{s.get('sentiment')} ({s.get('sentiment_score', 0):.2f})</span></div><div><strong>Impact:</strong> {s.get('impact')}</div></div><div class="grid"><div><strong>Topic:</strong> {s.get('topic')}</div><div><strong>Entities:</strong> {', '.join(s.get('entities', []))}</div></div></div>"""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
129
 
130
  # --- API Endpoints ---
131
  @app.get("/", response_class=HTMLResponse)
132
- async def serve_dashboard(request: Request):
133
  return templates.TemplateResponse("index.html", {"request": request})
134
 
135
- # ... (keep /api/prices and /api/sentiment as they were) ...
136
- @app.get("/api/briefing", response_class=HTMLResponse)
137
- async def get_daily_briefing(request: Request):
138
- briefing_html = markdown(request.app.state.daily_briefing_cache)
139
- return HTMLResponse(content=briefing_html)
140
-
141
- @app.get("/api/news/stream")
142
- async def news_stream(request: Request):
143
- queue: asyncio.Queue = request.app.state.news_queue
144
  async def event_generator():
145
  while True:
146
  payload = await queue.get()
147
- html = render_analysis_card(payload)
148
  data_payload = html.replace('\n', '')
149
- yield f"event: news_update\ndata: {data_payload}\n\n"
150
- return StreamingResponse(event_generator(), media_type="text/event-stream")
151
-
152
- # Add back the other endpoints that were removed for brevity
153
- @app.get("/api/prices", response_class=HTMLResponse)
154
- async def get_prices_fragment(request: Request):
155
- prices = request.app.state.price_fetcher.get_current_prices()
156
- html = "".join(f"<div><strong>{k.capitalize()}:</strong> ${v:,.2f}</div>" if isinstance(v, (int, float)) else f"<div><strong>{k.capitalize()}:</strong> {v}</div>" for k, v in prices.items())
157
- return HTMLResponse(content=html)
158
-
159
- @app.post("/api/sentiment")
160
- async def analyze_sentiment(request: Request, background_tasks: BackgroundTasks, text: str = Form(...)):
161
- analyzer: GeminiAnalyzer = request.app.state.gemini_analyzer
162
- async def task_wrapper():
163
- analysis = await analyzer.analyze_text(text)
164
- # We can reuse the news queue for manual analysis results
165
- await request.app.state.news_queue.put(analysis)
166
- background_tasks.add_task(task_wrapper)
167
- return HTMLResponse(content="<small>βœ… Analysis protocol initiated...</small>")
 
1
  """
2
+ The Sentinel TradeFlow Protocol - Main Application
3
+
4
+ Orchestrates the 3-tier intelligence funnel:
5
+ 1. Ingests a (mock) real-time data stream.
6
+ 2. Filters events with the local Tier 1 SentimentEngine.
7
+ 3. Escalates high-conviction events to the Tier 2 GeminiAnalyzer.
8
+ 4. Generates actionable trade hypotheses with the Tier 3 Strategy Engine.
9
+ 5. Pushes final signals to the Command Center UI.
10
  """
11
  import asyncio
12
  import json
13
  import os
14
  from contextlib import asynccontextmanager
15
+ from typing import Optional, Dict
 
16
 
17
  import httpx
18
  from fastapi import FastAPI, Request, BackgroundTasks, Form
19
  from fastapi.responses import HTMLResponse, StreamingResponse
20
  from fastapi.templating import Jinja2Templates
 
 
21
 
22
+ # Import our intelligence modules
 
23
  from .gemini_analyzer import GeminiAnalyzer
24
+ from .sentiment_engine import LocalSentimentFilter
25
+
26
+ # --- Tier 3: The Strategist ---
27
+ def generate_trade_hypothesis(analysis: dict) -> Optional[Dict]:
28
+ """A simple rules-based engine to generate an actionable signal."""
29
+ sentiment = analysis.get("sentiment")
30
+ impact = analysis.get("impact")
31
+ score = analysis.get("sentiment_score", 0.0)
32
+
33
+ # High-conviction rules
34
+ if impact == "HIGH" and sentiment == "NEGATIVE" and score < -0.7:
35
+ return {"type": "HYPOTHETICAL SHORT", "confidence": "HIGH", "reason": "High impact, strongly negative news detected."}
36
+ if impact == "HIGH" and sentiment == "POSITIVE" and score > 0.7:
37
+ return {"type": "HYPOTHETICAL LONG", "confidence": "HIGH", "reason": "High impact, strongly positive news detected."}
38
+
39
+ # Medium-conviction rules
40
+ if impact == "MEDIUM" and sentiment == "NEGATIVE" and score < -0.5:
41
+ return {"type": "HYPOTHETICAL SHORT", "confidence": "MEDIUM", "reason": "Medium impact, negative news."}
42
+ if impact == "MEDIUM" and sentiment == "POSITIVE" and score > 0.5:
43
+ return {"type": "HYPOTHETICAL LONG", "confidence": "MEDIUM", "reason": "Medium impact, positive news."}
44
+
45
+ return None
46
+
47
+ # --- Mock Real-Time Data Feed & Pipeline Orchestration ---
48
+ async def real_time_intelligence_pipeline(app: FastAPI):
49
+ """Mocks a high-frequency WebSocket news feed and runs it through the 3-tier funnel."""
50
+ await asyncio.sleep(5) # Initial delay to let UI connect
51
+ print("πŸš€ [Pipeline] Real-time intelligence pipeline is active.")
52
+
53
+ # A more realistic stream of headlines
54
+ mock_headlines = [
55
+ ("Coinbase reports minor outage, services restored.", 5),
56
+ ("New memecoin 'ShibaCat' gains 20% on low volume.", 3),
57
+ ("BREAKING: US Federal Reserve signals potential for surprise interest rate hike next month.", 8),
58
+ ("Ethereum developer announces successful testnet merge for upcoming 'Prague' upgrade.", 6),
59
+ ("CEO of major crypto fund says market is 'overheated'.", 4),
60
+ ("MASSIVE EXPLOIT: Cross-chain bridge 'Wormhole' drained of $150M in ETH and SOL.", 7),
61
+ ("BlackRock files updated S-1 form for its spot Bitcoin ETF.", 5),
62
+ ("Polygon announces major partnership with a leading gaming studio.", 4),
63
+ ]
64
+
65
+ for headline, delay in mock_headlines:
66
+ print(f"πŸ”₯ [Tier 1] Ingested: '{headline}'")
67
+
68
+ # Tier 1 Analysis: Fast, local filtering
69
+ local_analysis = LocalSentimentFilter.analyze(headline)
70
+
71
+ # Trigger Condition for Tier 2: Is sentiment strong enough?
72
+ if abs(local_analysis['score']) > 0.65 or local_analysis['label'].lower() != 'neutral':
73
+ print(f"⚑️ [Tier 2 Triggered] Event '{headline[:30]}...' escalated to Gemini. Reason: Local sentiment {local_analysis['label']} ({local_analysis['score']:.2f})")
74
+
75
+ analyzer: GeminiAnalyzer = app.state.gemini_analyzer
76
+ gemini_analysis = await analyzer.analyze_text(headline)
77
+
78
+ # Tier 3: Generate actionable signal
79
+ signal = generate_trade_hypothesis(gemini_analysis)
80
+
81
+ if signal:
82
+ print(f"🎯 [Tier 3] Actionable Signal Generated: {signal['type']} with {signal['confidence']} confidence.")
83
+ final_payload = {"signal": signal, "analysis": gemini_analysis, "headline": headline}
84
+ await app.state.signal_queue.put(final_payload)
85
+
86
+ await asyncio.sleep(delay)
87
+ print("βœ… [Pipeline] Mock real-time feed complete.")
88
 
89
+
90
+ # --- Application Lifespan ---
91
  @asynccontextmanager
92
  async def lifespan(app: FastAPI):
93
  """Manages application startup and shutdown events."""
94
  async with httpx.AsyncClient() as client:
 
95
  app.state.gemini_analyzer = GeminiAnalyzer(client=client)
96
+ app.state.signal_queue = asyncio.Queue()
97
+
98
+ # Warm up the local model on startup
99
+ LocalSentimentFilter.analyze("Warming up FinBERT model...")
 
 
 
 
 
 
 
 
 
 
100
 
101
+ # Start the intelligence pipeline as a background task
102
+ pipeline_task = asyncio.create_task(real_time_intelligence_pipeline(app))
103
+
104
+ print("πŸš€ Sentinel TradeFlow Protocol is online.")
105
  yield
106
 
107
  print("⏳ Shutting down Sentinel protocols...")
108
+ pipeline_task.cancel()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
109
  try:
110
+ await pipeline_task
111
+ except asyncio.CancelledError:
112
+ print("Intelligence pipeline successfully shut down.")
113
+ print("βœ… Sentinel Protocol offline.")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
114
 
115
+ # --- App Initialization ---
116
+ app = FastAPI(title="Sentinel TradeFlow", lifespan=lifespan)
117
  templates = Jinja2Templates(directory="templates")
118
 
119
  # --- HTML Rendering Helper ---
120
+ def render_signal_card(payload: dict) -> str:
121
+ """Renders the final signal payload into a rich HTML card."""
122
+ signal = payload.get("signal", {})
123
+ analysis = payload.get("analysis", {})
124
+ headline = payload.get("headline", "N/A")
125
+
126
+ signal_type = signal.get("type", "INFO")
127
+ confidence = signal.get("confidence", "N/A")
128
 
129
+ # Dynamic styling based on signal
130
+ if "SHORT" in signal_type:
131
+ card_class = "signal-short"
132
+ icon = "πŸ“‰"
133
+ elif "LONG" in signal_type:
134
+ card_class = "signal-long"
135
+ icon = "πŸ“ˆ"
136
+ else:
137
+ card_class = ""
138
+ icon = "ℹ️"
139
+
140
+ return f"""
141
+ <div class="card {card_class}">
142
+ <header class="signal-header">
143
+ <span>{icon} {signal_type}</span>
144
+ <span>Confidence: <strong>{confidence}</strong></span>
145
+ </header>
146
+ <p class="headline"><strong>Source Headline:</strong> {headline}</p>
147
+ <p><strong>Sentinel's Assessment:</strong> {analysis.get('summary', 'N/A')}</p>
148
+ <div class="grid">
149
+ <div><strong>Impact:</strong> {analysis.get('impact')}</div>
150
+ <div><strong>Topic:</strong> {analysis.get('topic')}</div>
151
+ <div><strong>Entities:</strong> {', '.join(analysis.get('entities', []))}</div>
152
+ </div>
153
+ </div>
154
+ """
155
 
156
  # --- API Endpoints ---
157
  @app.get("/", response_class=HTMLResponse)
158
+ async def serve_command_center(request: Request):
159
  return templates.TemplateResponse("index.html", {"request": request})
160
 
161
+ @app.get("/api/signals/stream")
162
+ async def signal_stream(request: Request):
163
+ """SSE stream for pushing generated trade hypotheses to the UI."""
164
+ queue: asyncio.Queue = request.app.state.signal_queue
 
 
 
 
 
165
  async def event_generator():
166
  while True:
167
  payload = await queue.get()
168
+ html = render_signal_card(payload)
169
  data_payload = html.replace('\n', '')
170
+ yield f"event: new_signal\ndata: {data_payload}\n\n"
171
+ return StreamingResponse(event_generator(), media_type="text/event-stream")