mgbam commited on
Commit
74169d6
Β·
verified Β·
1 Parent(s): 1fd5d5f

Update app/app.py

Browse files
Files changed (1) hide show
  1. app/app.py +93 -91
app/app.py CHANGED
@@ -1,17 +1,13 @@
1
  """
2
- CryptoSentinel Co-Pilot β€” Your AI-Powered Market Analyst.
3
-
4
- This application provides a multi-faceted view of the crypto market by:
5
- - Tracking live prices.
6
- - Proactively fetching, analyzing, and synthesizing top news into a daily briefing.
7
- - Offering on-demand deep analysis of any text.
8
- - Streaming all insights to a dynamic frontend in real-time.
9
  """
10
  import asyncio
11
  import json
12
  import os
13
  from contextlib import asynccontextmanager
14
  from typing import Optional, Union, List
 
15
 
16
  import httpx
17
  from fastapi import FastAPI, Request, BackgroundTasks, Form
@@ -23,143 +19,149 @@ from newsapi import NewsApiClient
23
  # Import our modular service classes
24
  from .price_fetcher import PriceFetcher
25
  from .gemini_analyzer import GeminiAnalyzer
 
26
 
27
- # --- Application Lifespan for Resource Management ---
28
-
29
  @asynccontextmanager
30
  async def lifespan(app: FastAPI):
31
  """Manages application startup and shutdown events."""
32
  async with httpx.AsyncClient() as client:
33
- # Instantiate and store services in the application state
34
  app.state.price_fetcher = PriceFetcher(client=client, coins=["bitcoin", "ethereum", "dogecoin"])
35
  app.state.gemini_analyzer = GeminiAnalyzer(client=client)
36
  app.state.news_api = NewsApiClient(api_key=os.getenv("NEWS_API_KEY"))
 
37
 
38
- # State for caching and queues
39
- app.state.daily_briefing_cache = "### Briefing Unavailable\nGenerating the first daily briefing, please check back in a few minutes."
40
  app.state.analyzed_news_today: List[dict] = []
41
  app.state.sentiment_queue: asyncio.Queue = asyncio.Queue()
42
  app.state.news_queue: asyncio.Queue = asyncio.Queue()
43
 
44
- # Create cancellable background tasks
45
- price_task = asyncio.create_task(run_periodic_updates(app.state.price_fetcher, 60))
46
- news_task = asyncio.create_task(run_periodic_news_analysis(app, 900)) # 15 mins
47
- briefing_task = asyncio.create_task(run_daily_briefing_generation(app, 3600)) # Every hour
48
-
49
- print("πŸš€ CryptoSentinel Co-Pilot started.")
 
 
50
  yield
51
 
52
- print("⏳ Shutting down background tasks...")
53
- price_task.cancel()
54
- news_task.cancel()
55
- briefing_task.cancel()
56
- try:
57
- await asyncio.gather(price_task, news_task, briefing_task, return_exceptions=True)
58
- except asyncio.CancelledError:
59
- print("Background tasks cancelled successfully.")
60
- print("βœ… Shutdown complete.")
61
-
62
- async def run_periodic_updates(fetcher: PriceFetcher, interval_seconds: int):
63
- """Periodically updates crypto prices."""
64
  while True:
65
  await fetcher.update_prices_async()
66
- await asyncio.sleep(interval_seconds)
67
 
68
- async def run_periodic_news_analysis(app: FastAPI, interval_seconds: int):
69
- """Periodically fetches, analyzes, and queues top crypto news."""
70
  while True:
71
- print("πŸ“° Fetching latest crypto news...")
72
  try:
73
- top_headlines = app.state.news_api.get_everything(
74
- q='bitcoin OR ethereum OR crypto OR blockchain',
75
- language='en', sort_by='publishedAt', page_size=5
76
- )
77
  analyzer: GeminiAnalyzer = app.state.gemini_analyzer
78
- for article in top_headlines.get('articles', []):
 
79
  title = article.get('title')
80
- if title and "[Removed]" not in title:
81
- analysis = await analyzer.analyze_text(title)
82
- analysis['url'] = article.get('url')
83
- await app.state.news_queue.put(analysis)
84
- app.state.analyzed_news_today.append(analysis)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
85
  except Exception as e:
86
- print(f"❌ Error during news analysis: {e}")
87
- await asyncio.sleep(interval_seconds)
88
 
89
- async def run_daily_briefing_generation(app: FastAPI, interval_seconds: int):
90
- """Periodically generates and caches the daily market briefing."""
 
91
  while True:
92
- await asyncio.sleep(interval_seconds) # Wait first, then generate
93
- print("πŸ“ Generating Daily Market Briefing...")
94
  if app.state.analyzed_news_today:
95
  analyzer: GeminiAnalyzer = app.state.gemini_analyzer
96
  briefing = await analyzer.generate_daily_briefing(app.state.analyzed_news_today)
97
  app.state.daily_briefing_cache = briefing
98
- app.state.analyzed_news_today = [] # Clear list for the next cycle
99
- print("βœ… Daily briefing generated and cached.")
100
  else:
101
- print("ℹ️ No new news items to analyze for briefing. Skipping.")
102
-
103
- # --- FastAPI App Initialization ---
104
 
105
- app = FastAPI(title="CryptoSentinel Co-Pilot", lifespan=lifespan)
 
106
  templates = Jinja2Templates(directory="templates")
107
 
108
  # --- HTML Rendering Helper ---
109
- def render_analysis_card(payload: dict, is_news: bool = False) -> str:
110
  s = payload
111
- text_to_show = s.get('summary', 'Analysis failed or not available.')
112
- if is_news:
113
- url = s.get('url', '#')
114
- text_to_show = f'<a href="{url}" target="_blank" rel="noopener noreferrer">{s.get("summary", "N/A")}</a>'
115
  impact_class = f"impact-{s.get('impact', 'low').lower()}"
116
  sentiment_class = f"sentiment-{s.get('sentiment', 'neutral').lower()}"
117
- return f"""<div class="card {impact_class}"><blockquote>{text_to_show}</blockquote><div class="grid"><div><strong>Sentiment:</strong> <span class="{sentiment_class}">{s.get('sentiment')} ({s.get('sentiment_score', 0):.2f})</span></div><div><strong>Impact:</strong> {s.get('impact')}</div></div><div class="grid"><div><strong>Topic:</strong> {s.get('topic')}</div><div><strong>Entities:</strong> {', '.join(s.get('entities', []))}</div></div></div>"""
 
 
118
 
119
  # --- API Endpoints ---
120
-
121
  @app.get("/", response_class=HTMLResponse)
122
  async def serve_dashboard(request: Request):
123
  return templates.TemplateResponse("index.html", {"request": request})
124
 
125
- @app.get("/api/prices", response_class=HTMLResponse)
126
- async def get_prices_fragment(request: Request):
127
- prices = request.app.state.price_fetcher.get_current_prices()
128
- html = "".join(f"<div><strong>{k.capitalize()}:</strong> ${v:,.2f}</div>" if isinstance(v, (int, float)) else f"<div><strong>{k.capitalize()}:</strong> {v}</div>" for k, v in prices.items())
129
- return HTMLResponse(content=html)
130
-
131
- @app.post("/api/sentiment")
132
- async def analyze_sentiment(request: Request, background_tasks: BackgroundTasks, text: str = Form(...)):
133
- analyzer: GeminiAnalyzer = request.app.state.gemini_analyzer
134
- async def task_wrapper():
135
- analysis = await analyzer.analyze_text(text)
136
- await request.app.state.sentiment_queue.put(analysis)
137
- background_tasks.add_task(task_wrapper)
138
- return HTMLResponse(content="<small>βœ… Queued for deep analysis...</small>")
139
-
140
  @app.get("/api/briefing", response_class=HTMLResponse)
141
  async def get_daily_briefing(request: Request):
142
  briefing_html = markdown(request.app.state.daily_briefing_cache)
143
  return HTMLResponse(content=briefing_html)
144
 
145
- @app.get("/api/sentiment/stream")
146
- async def sentiment_stream(request: Request):
147
- queue: asyncio.Queue = request.app.state.sentiment_queue
148
- async def event_generator():
149
- while True:
150
- payload = await queue.get()
151
- html = render_analysis_card(payload)
152
- data_payload = html.replace('\n', '')
153
- yield f"event: sentiment_update\ndata: {data_payload}\n\n"
154
- return StreamingResponse(event_generator(), media_type="text/event-stream")
155
-
156
  @app.get("/api/news/stream")
157
  async def news_stream(request: Request):
158
  queue: asyncio.Queue = request.app.state.news_queue
159
  async def event_generator():
160
  while True:
161
  payload = await queue.get()
162
- html = render_analysis_card(payload, is_news=True)
163
  data_payload = html.replace('\n', '')
164
  yield f"event: news_update\ndata: {data_payload}\n\n"
165
- return StreamingResponse(event_generator(), media_type="text/event-stream")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  """
2
+ The Sentinel Protocol β€” An Autonomous AI Market Analyst.
3
+ Connects real-time news events to market data visualizations.
 
 
 
 
 
4
  """
5
  import asyncio
6
  import json
7
  import os
8
  from contextlib import asynccontextmanager
9
  from typing import Optional, Union, List
10
+ import pandas as pd
11
 
12
  import httpx
13
  from fastapi import FastAPI, Request, BackgroundTasks, Form
 
19
  # Import our modular service classes
20
  from .price_fetcher import PriceFetcher
21
  from .gemini_analyzer import GeminiAnalyzer
22
+ from .chart_generator import generate_price_chart
23
 
24
+ # --- Lifespan Manager ---
 
25
  @asynccontextmanager
26
  async def lifespan(app: FastAPI):
27
  """Manages application startup and shutdown events."""
28
  async with httpx.AsyncClient() as client:
 
29
  app.state.price_fetcher = PriceFetcher(client=client, coins=["bitcoin", "ethereum", "dogecoin"])
30
  app.state.gemini_analyzer = GeminiAnalyzer(client=client)
31
  app.state.news_api = NewsApiClient(api_key=os.getenv("NEWS_API_KEY"))
32
+ app.state.coingecko_client = client # For fetching chart data
33
 
34
+ app.state.daily_briefing_cache = "### Briefing Unavailable\nSentinel is gathering initial intelligence..."
 
35
  app.state.analyzed_news_today: List[dict] = []
36
  app.state.sentiment_queue: asyncio.Queue = asyncio.Queue()
37
  app.state.news_queue: asyncio.Queue = asyncio.Queue()
38
 
39
+ tasks = [
40
+ run_periodic_updates(app.state.price_fetcher, 60),
41
+ run_periodic_news_analysis(app, 900), # 15 mins
42
+ run_daily_briefing_generation(app, 3600) # Every hour
43
+ ]
44
+ background_tasks = [asyncio.create_task(task) for task in tasks]
45
+
46
+ print("πŸš€ The Sentinel Protocol is online.")
47
  yield
48
 
49
+ print("⏳ Shutting down Sentinel protocols...")
50
+ for task in background_tasks:
51
+ task.cancel()
52
+ await asyncio.gather(*background_tasks, return_exceptions=True)
53
+ print("βœ… Sentinel Protocol offline.")
54
+
55
+ # --- Background Tasks ---
56
+ async def run_periodic_updates(fetcher: PriceFetcher, interval: int):
 
 
 
 
57
  while True:
58
  await fetcher.update_prices_async()
59
+ await asyncio.sleep(interval)
60
 
61
+ async def run_periodic_news_analysis(app: FastAPI, interval: int):
62
+ """Fetches news, analyzes it, generates charts, and queues the full payload."""
63
  while True:
64
+ print("πŸ“° Sentinel is scanning for new market intelligence...")
65
  try:
66
+ headlines = app.state.news_api.get_everything(q='bitcoin OR ethereum OR solana', language='en', sort_by='publishedAt', page_size=5)
 
 
 
67
  analyzer: GeminiAnalyzer = app.state.gemini_analyzer
68
+
69
+ for article in headlines.get('articles', []):
70
  title = article.get('title')
71
+ if not title or "[Removed]" in title:
72
+ continue
73
+
74
+ analysis = await analyzer.analyze_text(title)
75
+ if not analysis or analysis.get('sentiment') == 'ERROR':
76
+ continue
77
+
78
+ analysis['url'] = article.get('url')
79
+
80
+ # --- CHART GENERATION LOGIC ---
81
+ chart_base64 = ""
82
+ main_entity = analysis.get('entities', [])[0].lower() if analysis.get('entities') else None
83
+ if main_entity in ["bitcoin", "ethereum", "solana"]: # Add more as needed
84
+ event_time = pd.to_datetime(article.get('publishedAt'))
85
+ # Fetch last 24h of price data for charting
86
+ cg_url = f"https://api.coingecko.com/api/v3/coins/{main_entity}/market_chart"
87
+ params = {'vs_currency': 'usd', 'days': '1'}
88
+ resp = await app.state.coingecko_client.get(cg_url, params=params)
89
+ if resp.status_code == 200:
90
+ price_data = resp.json().get('prices', [])
91
+ chart_base64 = generate_price_chart(price_data, event_time, main_entity.capitalize())
92
+
93
+ analysis['chart'] = chart_base64
94
+ await app.state.news_queue.put(analysis)
95
+ app.state.analyzed_news_today.append(analysis)
96
+
97
  except Exception as e:
98
+ print(f"❌ News analysis pipeline failed: {e}")
99
+ await asyncio.sleep(interval)
100
 
101
+ async def run_daily_briefing_generation(app: FastAPI, interval: int):
102
+ """Generates the daily briefing."""
103
+ await asyncio.sleep(120) # Initial delay to gather some news first
104
  while True:
105
+ print("πŸ“ Sentinel is synthesizing the Daily Briefing...")
 
106
  if app.state.analyzed_news_today:
107
  analyzer: GeminiAnalyzer = app.state.gemini_analyzer
108
  briefing = await analyzer.generate_daily_briefing(app.state.analyzed_news_today)
109
  app.state.daily_briefing_cache = briefing
110
+ app.state.analyzed_news_today = []
111
+ print("βœ… Daily briefing synthesized.")
112
  else:
113
+ print("ℹ️ No new intelligence to synthesize. Waiting for next cycle.")
114
+ await asyncio.sleep(interval)
 
115
 
116
+ # --- App and Endpoints ---
117
+ app = FastAPI(title="Sentinel Protocol", lifespan=lifespan)
118
  templates = Jinja2Templates(directory="templates")
119
 
120
  # --- HTML Rendering Helper ---
121
+ def render_analysis_card(payload: dict) -> str:
122
  s = payload
123
+ summary_link = f'<a href="{s.get("url", "#")}" target="_blank" rel="noopener noreferrer">{s.get("summary", "N/A")}</a>'
 
 
 
124
  impact_class = f"impact-{s.get('impact', 'low').lower()}"
125
  sentiment_class = f"sentiment-{s.get('sentiment', 'neutral').lower()}"
126
+ chart_html = f'<img src="{s.get("chart")}" alt="Price chart" style="width:100%; margin-top:1rem; border-radius:var(--pico-border-radius);">' if s.get("chart") else ""
127
+
128
+ return f"""<div class="card {impact_class}"><blockquote>{summary_link}</blockquote>{chart_html}<div class="grid"><div><strong>Sentiment:</strong> <span class="{sentiment_class}">{s.get('sentiment')} ({s.get('sentiment_score', 0):.2f})</span></div><div><strong>Impact:</strong> {s.get('impact')}</div></div><div class="grid"><div><strong>Topic:</strong> {s.get('topic')}</div><div><strong>Entities:</strong> {', '.join(s.get('entities', []))}</div></div></div>"""
129
 
130
  # --- API Endpoints ---
 
131
  @app.get("/", response_class=HTMLResponse)
132
  async def serve_dashboard(request: Request):
133
  return templates.TemplateResponse("index.html", {"request": request})
134
 
135
+ # ... (keep /api/prices and /api/sentiment as they were) ...
 
 
 
 
 
 
 
 
 
 
 
 
 
 
136
  @app.get("/api/briefing", response_class=HTMLResponse)
137
  async def get_daily_briefing(request: Request):
138
  briefing_html = markdown(request.app.state.daily_briefing_cache)
139
  return HTMLResponse(content=briefing_html)
140
 
 
 
 
 
 
 
 
 
 
 
 
141
  @app.get("/api/news/stream")
142
  async def news_stream(request: Request):
143
  queue: asyncio.Queue = request.app.state.news_queue
144
  async def event_generator():
145
  while True:
146
  payload = await queue.get()
147
+ html = render_analysis_card(payload)
148
  data_payload = html.replace('\n', '')
149
  yield f"event: news_update\ndata: {data_payload}\n\n"
150
+ return StreamingResponse(event_generator(), media_type="text/event-stream")
151
+
152
+ # Add back the other endpoints that were removed for brevity
153
+ @app.get("/api/prices", response_class=HTMLResponse)
154
+ async def get_prices_fragment(request: Request):
155
+ prices = request.app.state.price_fetcher.get_current_prices()
156
+ html = "".join(f"<div><strong>{k.capitalize()}:</strong> ${v:,.2f}</div>" if isinstance(v, (int, float)) else f"<div><strong>{k.capitalize()}:</strong> {v}</div>" for k, v in prices.items())
157
+ return HTMLResponse(content=html)
158
+
159
+ @app.post("/api/sentiment")
160
+ async def analyze_sentiment(request: Request, background_tasks: BackgroundTasks, text: str = Form(...)):
161
+ analyzer: GeminiAnalyzer = request.app.state.gemini_analyzer
162
+ async def task_wrapper():
163
+ analysis = await analyzer.analyze_text(text)
164
+ # We can reuse the news queue for manual analysis results
165
+ await request.app.state.news_queue.put(analysis)
166
+ background_tasks.add_task(task_wrapper)
167
+ return HTMLResponse(content="<small>βœ… Analysis protocol initiated...</small>")