mgbam commited on
Commit
d5b9420
Β·
verified Β·
1 Parent(s): 1a96a66

Update app/app.py

Browse files
Files changed (1) hide show
  1. app/app.py +61 -77
app/app.py CHANGED
@@ -1,85 +1,78 @@
1
  """
2
- CryptoSentinel Pro β€” High-performance FastAPI application.
3
 
4
- This is the main entry point that orchestrates the entire application.
5
- - Integrates an asynchronous PriceFetcher for live market data.
6
- - Integrates a sophisticated GeminiAnalyzer for deep text analysis.
7
- - Implements an automated pipeline to fetch, analyze, and stream top crypto news.
8
- - Serves the interactive frontend and provides all necessary API endpoints.
9
  """
10
  import asyncio
11
  import json
12
  import os
13
  from contextlib import asynccontextmanager
14
- from typing import Optional, Union
15
 
16
  import httpx
17
- # =================== FIX APPLIED HERE (1 of 2) ===================
18
  from fastapi import FastAPI, Request, BackgroundTasks, Form
19
- # =================================================================
20
  from fastapi.responses import HTMLResponse, StreamingResponse
21
  from fastapi.templating import Jinja2Templates
22
- from pydantic import BaseModel, constr
 
23
 
24
- # Import our modular, asynchronous service classes
25
  from .price_fetcher import PriceFetcher
26
  from .gemini_analyzer import GeminiAnalyzer
27
- from newsapi import NewsApiClient
28
 
29
  # --- Application Lifespan for Resource Management ---
30
 
31
  @asynccontextmanager
32
  async def lifespan(app: FastAPI):
33
- """
34
- Manages application startup and shutdown events using the modern
35
- lifespan context manager.
36
- """
37
  async with httpx.AsyncClient() as client:
38
- # Instantiate and store all services in the application state.
39
  app.state.price_fetcher = PriceFetcher(client=client, coins=["bitcoin", "ethereum", "dogecoin"])
40
  app.state.gemini_analyzer = GeminiAnalyzer(client=client)
41
  app.state.news_api = NewsApiClient(api_key=os.getenv("NEWS_API_KEY"))
42
 
43
- # Create separate queues for the two real-time feeds
 
 
44
  app.state.sentiment_queue: asyncio.Queue = asyncio.Queue()
45
  app.state.news_queue: asyncio.Queue = asyncio.Queue()
46
 
47
- # Create cancellable background tasks for periodic updates.
48
- price_task = asyncio.create_task(
49
- run_periodic_updates(app.state.price_fetcher, interval_seconds=30)
50
- )
51
- news_task = asyncio.create_task(
52
- run_periodic_news_analysis(app, interval_seconds=900) # Run every 15 minutes
53
- )
54
-
55
- print("πŸš€ CryptoSentinel Pro started successfully.")
56
  yield
57
 
58
  print("⏳ Shutting down background tasks...")
59
  price_task.cancel()
60
  news_task.cancel()
 
61
  try:
62
- await asyncio.gather(price_task, news_task)
63
  except asyncio.CancelledError:
64
  print("Background tasks cancelled successfully.")
65
  print("βœ… Shutdown complete.")
66
 
67
  async def run_periodic_updates(fetcher: PriceFetcher, interval_seconds: int):
68
- """A robust asyncio background task that periodically updates prices."""
69
  while True:
70
  await fetcher.update_prices_async()
71
  await asyncio.sleep(interval_seconds)
72
 
73
  async def run_periodic_news_analysis(app: FastAPI, interval_seconds: int):
74
- """Fetches, analyzes, and queues top crypto news periodically."""
75
  while True:
76
- print("πŸ“° Fetching latest crypto news for automated analysis...")
77
  try:
78
  top_headlines = app.state.news_api.get_everything(
79
  q='bitcoin OR ethereum OR crypto OR blockchain',
80
- language='en',
81
- sort_by='publishedAt',
82
- page_size=5
83
  )
84
  analyzer: GeminiAnalyzer = app.state.gemini_analyzer
85
  for article in top_headlines.get('articles', []):
@@ -88,14 +81,28 @@ async def run_periodic_news_analysis(app: FastAPI, interval_seconds: int):
88
  analysis = await analyzer.analyze_text(title)
89
  analysis['url'] = article.get('url')
90
  await app.state.news_queue.put(analysis)
 
91
  except Exception as e:
92
- print(f"❌ Error during news fetching or analysis: {e}")
93
-
94
  await asyncio.sleep(interval_seconds)
95
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
96
  # --- FastAPI App Initialization ---
97
 
98
- app = FastAPI(title="CryptoSentinel Pro", lifespan=lifespan)
99
  templates = Jinja2Templates(directory="templates")
100
 
101
  # --- HTML Rendering Helper ---
@@ -107,19 +114,7 @@ def render_analysis_card(payload: dict, is_news: bool = False) -> str:
107
  text_to_show = f'<a href="{url}" target="_blank" rel="noopener noreferrer">{s.get("summary", "N/A")}</a>'
108
  impact_class = f"impact-{s.get('impact', 'low').lower()}"
109
  sentiment_class = f"sentiment-{s.get('sentiment', 'neutral').lower()}"
110
- return f"""
111
- <div class="card {impact_class}">
112
- <blockquote>{text_to_show}</blockquote>
113
- <div class="grid">
114
- <div><strong>Sentiment:</strong> <span class="{sentiment_class}">{s.get('sentiment')} ({s.get('sentiment_score', 0):.2f})</span></div>
115
- <div><strong>Impact:</strong> {s.get('impact')}</div>
116
- </div>
117
- <div class="grid">
118
- <div><strong>Topic:</strong> {s.get('topic')}</div>
119
- <div><strong>Entities:</strong> {', '.join(s.get('entities', []))}</div>
120
- </div>
121
- </div>
122
- """
123
 
124
  # --- API Endpoints ---
125
 
@@ -129,33 +124,24 @@ async def serve_dashboard(request: Request):
129
 
130
  @app.get("/api/prices", response_class=HTMLResponse)
131
  async def get_prices_fragment(request: Request):
132
- price_fetcher: PriceFetcher = request.app.state.price_fetcher
133
- prices = price_fetcher.get_current_prices()
134
- html_fragment = "".join(
135
- f"<div><strong>{coin.capitalize()}:</strong> ${price:,.2f}</div>" if isinstance(price, (int, float))
136
- else f"<div><strong>{coin.capitalize()}:</strong> {price}</div>"
137
- for coin, price in prices.items()
138
- )
139
- return HTMLResponse(content=html_fragment)
140
-
141
- # =================== FIX APPLIED HERE (2 of 2) ===================
142
  @app.post("/api/sentiment")
143
- async def analyze_sentiment(
144
- request: Request,
145
- background_tasks: BackgroundTasks,
146
- text: str = Form(...) # Tell FastAPI to expect a form field named 'text'
147
- ):
148
- # =================================================================
149
- """Queues user-submitted text for a full Gemini-powered analysis."""
150
  analyzer: GeminiAnalyzer = request.app.state.gemini_analyzer
151
-
152
- async def analysis_task_wrapper():
153
- analysis_result = await analyzer.analyze_text(text) # Use the 'text' variable directly
154
- await request.app.state.sentiment_queue.put(analysis_result)
155
-
156
- background_tasks.add_task(analysis_task_wrapper)
157
  return HTMLResponse(content="<small>βœ… Queued for deep analysis...</small>")
158
 
 
 
 
 
 
159
  @app.get("/api/sentiment/stream")
160
  async def sentiment_stream(request: Request):
161
  queue: asyncio.Queue = request.app.state.sentiment_queue
@@ -164,8 +150,7 @@ async def sentiment_stream(request: Request):
164
  payload = await queue.get()
165
  html = render_analysis_card(payload)
166
  data_payload = html.replace('\n', '')
167
- sse_message = f"event: sentiment_update\ndata: {data_payload}\n\n"
168
- yield sse_message
169
  return StreamingResponse(event_generator(), media_type="text/event-stream")
170
 
171
  @app.get("/api/news/stream")
@@ -176,6 +161,5 @@ async def news_stream(request: Request):
176
  payload = await queue.get()
177
  html = render_analysis_card(payload, is_news=True)
178
  data_payload = html.replace('\n', '')
179
- sse_message = f"event: news_update\ndata: {data_payload}\n\n"
180
- yield sse_message
181
  return StreamingResponse(event_generator(), media_type="text/event-stream")
 
1
  """
2
+ CryptoSentinel Co-Pilot β€” Your AI-Powered Market Analyst.
3
 
4
+ This application provides a multi-faceted view of the crypto market by:
5
+ - Tracking live prices.
6
+ - Proactively fetching, analyzing, and synthesizing top news into a daily briefing.
7
+ - Offering on-demand deep analysis of any text.
8
+ - Streaming all insights to a dynamic frontend in real-time.
9
  """
10
  import asyncio
11
  import json
12
  import os
13
  from contextlib import asynccontextmanager
14
+ from typing import Optional, Union, List
15
 
16
  import httpx
 
17
  from fastapi import FastAPI, Request, BackgroundTasks, Form
 
18
  from fastapi.responses import HTMLResponse, StreamingResponse
19
  from fastapi.templating import Jinja2Templates
20
+ from markdown import markdown
21
+ from newsapi import NewsApiClient
22
 
23
+ # Import our modular service classes
24
  from .price_fetcher import PriceFetcher
25
  from .gemini_analyzer import GeminiAnalyzer
 
26
 
27
  # --- Application Lifespan for Resource Management ---
28
 
29
  @asynccontextmanager
30
  async def lifespan(app: FastAPI):
31
+ """Manages application startup and shutdown events."""
 
 
 
32
  async with httpx.AsyncClient() as client:
33
+ # Instantiate and store services in the application state
34
  app.state.price_fetcher = PriceFetcher(client=client, coins=["bitcoin", "ethereum", "dogecoin"])
35
  app.state.gemini_analyzer = GeminiAnalyzer(client=client)
36
  app.state.news_api = NewsApiClient(api_key=os.getenv("NEWS_API_KEY"))
37
 
38
+ # State for caching and queues
39
+ app.state.daily_briefing_cache = "### Briefing Unavailable\nGenerating the first daily briefing, please check back in a few minutes."
40
+ app.state.analyzed_news_today: List[dict] = []
41
  app.state.sentiment_queue: asyncio.Queue = asyncio.Queue()
42
  app.state.news_queue: asyncio.Queue = asyncio.Queue()
43
 
44
+ # Create cancellable background tasks
45
+ price_task = asyncio.create_task(run_periodic_updates(app.state.price_fetcher, 60))
46
+ news_task = asyncio.create_task(run_periodic_news_analysis(app, 900)) # 15 mins
47
+ briefing_task = asyncio.create_task(run_daily_briefing_generation(app, 3600)) # Every hour
48
+
49
+ print("πŸš€ CryptoSentinel Co-Pilot started.")
 
 
 
50
  yield
51
 
52
  print("⏳ Shutting down background tasks...")
53
  price_task.cancel()
54
  news_task.cancel()
55
+ briefing_task.cancel()
56
  try:
57
+ await asyncio.gather(price_task, news_task, briefing_task, return_exceptions=True)
58
  except asyncio.CancelledError:
59
  print("Background tasks cancelled successfully.")
60
  print("βœ… Shutdown complete.")
61
 
62
  async def run_periodic_updates(fetcher: PriceFetcher, interval_seconds: int):
63
+ """Periodically updates crypto prices."""
64
  while True:
65
  await fetcher.update_prices_async()
66
  await asyncio.sleep(interval_seconds)
67
 
68
  async def run_periodic_news_analysis(app: FastAPI, interval_seconds: int):
69
+ """Periodically fetches, analyzes, and queues top crypto news."""
70
  while True:
71
+ print("πŸ“° Fetching latest crypto news...")
72
  try:
73
  top_headlines = app.state.news_api.get_everything(
74
  q='bitcoin OR ethereum OR crypto OR blockchain',
75
+ language='en', sort_by='publishedAt', page_size=5
 
 
76
  )
77
  analyzer: GeminiAnalyzer = app.state.gemini_analyzer
78
  for article in top_headlines.get('articles', []):
 
81
  analysis = await analyzer.analyze_text(title)
82
  analysis['url'] = article.get('url')
83
  await app.state.news_queue.put(analysis)
84
+ app.state.analyzed_news_today.append(analysis)
85
  except Exception as e:
86
+ print(f"❌ Error during news analysis: {e}")
 
87
  await asyncio.sleep(interval_seconds)
88
 
89
+ async def run_daily_briefing_generation(app: FastAPI, interval_seconds: int):
90
+ """Periodically generates and caches the daily market briefing."""
91
+ while True:
92
+ await asyncio.sleep(interval_seconds) # Wait first, then generate
93
+ print("πŸ“ Generating Daily Market Briefing...")
94
+ if app.state.analyzed_news_today:
95
+ analyzer: GeminiAnalyzer = app.state.gemini_analyzer
96
+ briefing = await analyzer.generate_daily_briefing(app.state.analyzed_news_today)
97
+ app.state.daily_briefing_cache = briefing
98
+ app.state.analyzed_news_today = [] # Clear list for the next cycle
99
+ print("βœ… Daily briefing generated and cached.")
100
+ else:
101
+ print("ℹ️ No new news items to analyze for briefing. Skipping.")
102
+
103
  # --- FastAPI App Initialization ---
104
 
105
+ app = FastAPI(title="CryptoSentinel Co-Pilot", lifespan=lifespan)
106
  templates = Jinja2Templates(directory="templates")
107
 
108
  # --- HTML Rendering Helper ---
 
114
  text_to_show = f'<a href="{url}" target="_blank" rel="noopener noreferrer">{s.get("summary", "N/A")}</a>'
115
  impact_class = f"impact-{s.get('impact', 'low').lower()}"
116
  sentiment_class = f"sentiment-{s.get('sentiment', 'neutral').lower()}"
117
+ return f"""<div class="card {impact_class}"><blockquote>{text_to_show}</blockquote><div class="grid"><div><strong>Sentiment:</strong> <span class="{sentiment_class}">{s.get('sentiment')} ({s.get('sentiment_score', 0):.2f})</span></div><div><strong>Impact:</strong> {s.get('impact')}</div></div><div class="grid"><div><strong>Topic:</strong> {s.get('topic')}</div><div><strong>Entities:</strong> {', '.join(s.get('entities', []))}</div></div></div>"""
 
 
 
 
 
 
 
 
 
 
 
 
118
 
119
  # --- API Endpoints ---
120
 
 
124
 
125
  @app.get("/api/prices", response_class=HTMLResponse)
126
  async def get_prices_fragment(request: Request):
127
+ prices = request.app.state.price_fetcher.get_current_prices()
128
+ html = "".join(f"<div><strong>{k.capitalize()}:</strong> ${v:,.2f}</div>" if isinstance(v, (int, float)) else f"<div><strong>{k.capitalize()}:</strong> {v}</div>" for k, v in prices.items())
129
+ return HTMLResponse(content=html)
130
+
 
 
 
 
 
 
131
  @app.post("/api/sentiment")
132
+ async def analyze_sentiment(request: Request, background_tasks: BackgroundTasks, text: str = Form(...)):
 
 
 
 
 
 
133
  analyzer: GeminiAnalyzer = request.app.state.gemini_analyzer
134
+ async def task_wrapper():
135
+ analysis = await analyzer.analyze_text(text)
136
+ await request.app.state.sentiment_queue.put(analysis)
137
+ background_tasks.add_task(task_wrapper)
 
 
138
  return HTMLResponse(content="<small>βœ… Queued for deep analysis...</small>")
139
 
140
+ @app.get("/api/briefing", response_class=HTMLResponse)
141
+ async def get_daily_briefing(request: Request):
142
+ briefing_html = markdown(request.app.state.daily_briefing_cache)
143
+ return HTMLResponse(content=briefing_html)
144
+
145
  @app.get("/api/sentiment/stream")
146
  async def sentiment_stream(request: Request):
147
  queue: asyncio.Queue = request.app.state.sentiment_queue
 
150
  payload = await queue.get()
151
  html = render_analysis_card(payload)
152
  data_payload = html.replace('\n', '')
153
+ yield f"event: sentiment_update\ndata: {data_payload}\n\n"
 
154
  return StreamingResponse(event_generator(), media_type="text/event-stream")
155
 
156
  @app.get("/api/news/stream")
 
161
  payload = await queue.get()
162
  html = render_analysis_card(payload, is_news=True)
163
  data_payload = html.replace('\n', '')
164
+ yield f"event: news_update\ndata: {data_payload}\n\n"
 
165
  return StreamingResponse(event_generator(), media_type="text/event-stream")