mgbam commited on
Commit
347f446
Β·
verified Β·
1 Parent(s): a17b947

Update app/sentiment.py

Browse files
Files changed (1) hide show
  1. app/sentiment.py +42 -123
app/sentiment.py CHANGED
@@ -1,133 +1,52 @@
1
  """
2
- Provides a robust, asynchronous SentimentAnalyzer class.
3
 
4
- This module communicates with the Hugging Face Inference API to perform sentiment
5
- analysis without requiring local model downloads. It's designed for use within
6
- an asynchronous application like FastAPI.
7
  """
8
- import asyncio
9
- import logging
10
- import os
11
- # ====================================================================
12
- # FINAL FIX APPLIED HERE
13
- # ====================================================================
14
- # Import Optional and Union for Python 3.9 compatibility.
15
- from typing import TypedDict, Union, Optional
16
- # ====================================================================
17
 
18
-
19
- import httpx
20
-
21
- # --- Configuration & Models ---
22
-
23
- # Configure logging for this module
24
- logger = logging.getLogger(__name__)
25
-
26
- # Define the expected structure of a result payload for type hinting
27
- class SentimentResult(TypedDict):
28
- id: int
29
- text: str
30
- # Using Union for Python 3.9 compatibility
31
- result: dict[str, Union[str, float]]
32
-
33
-
34
- # --- Main Class: SentimentAnalyzer ---
35
- class SentimentAnalyzer:
36
  """
37
- Manages sentiment analysis requests to the Hugging Face Inference API.
38
-
39
- This class handles asynchronous API communication, manages a result queue for
40
- Server-Sent Events (SSE), and encapsulates all related state and logic.
41
  """
42
-
43
- HF_API_URL = "https://api-inference.huggingface.co/models/distilbert-base-uncased-finetuned-sst-2-english"
44
-
45
- # ====================================================================
46
- # FINAL FIX APPLIED HERE
47
- # ====================================================================
48
- # Changed `str | None` to `Optional[str]` for Python 3.9 compatibility.
49
- def __init__(self, client: httpx.AsyncClient, api_token: Optional[str] = None):
50
- # ====================================================================
 
 
 
 
 
 
 
 
 
 
 
 
51
  """
52
- Initializes the SentimentAnalyzer.
53
-
54
- Args:
55
- client: An instance of httpx.AsyncClient for making API calls.
56
- api_token: The Hugging Face API token.
57
  """
58
- self.client = client
59
- self.api_token = api_token or os.getenv("HF_API_TOKEN")
60
-
61
- if not self.api_token:
62
- raise ValueError("Hugging Face API token is not set. Please set the HF_API_TOKEN environment variable.")
63
 
64
- self.headers = {"Authorization": f"Bearer {self.api_token}"}
65
-
66
- # A queue is the ideal structure for a producer-consumer pattern,
67
- # where the API endpoint is the producer and SSE streamers are consumers.
68
- self.result_queue: asyncio.Queue[SentimentResult] = asyncio.Queue()
69
-
70
- async def compute_and_publish(self, text: str, request_id: int) -> None:
71
- """
72
- Performs sentiment analysis via an external API and places the result
73
- into a queue for consumption by SSE streams.
74
-
75
- Args:
76
- text: The input text to analyze.
77
- request_id: A unique identifier for this request.
78
- """
79
- analysis_result: dict[str, Union[str, float]] = {"label": "ERROR", "score": 0.0, "error": "Unknown failure"}
80
  try:
81
- response = await self.client.post(
82
- self.HF_API_URL,
83
- headers=self.headers,
84
- json={"inputs": text, "options": {"wait_for_model": True}},
85
- timeout=20.0
86
- )
87
- response.raise_for_status()
88
- data = response.json()
89
-
90
- # Validate the expected response structure from the Inference API
91
- if isinstance(data, list) and data and isinstance(data[0], list) and data[0]:
92
- # The model returns a list containing a list of results
93
- res = data[0][0]
94
- analysis_result = {"label": res.get("label"), "score": round(res.get("score", 0.0), 4)}
95
- logger.info("βœ… Sentiment computed for request #%d", request_id)
96
- else:
97
- raise ValueError(f"Unexpected API response format: {data}")
98
-
99
- except httpx.HTTPStatusError as e:
100
- error_msg = f"API returned status {e.response.status_code}"
101
- logger.error("❌ Sentiment API error for request #%d: %s", request_id, error_msg)
102
- analysis_result["error"] = error_msg
103
- except httpx.RequestError as e:
104
- error_msg = f"Network request failed: {e}"
105
- logger.error("❌ Sentiment network error for request #%d: %s", request_id, error_msg)
106
- analysis_result["error"] = error_msg
107
- except (ValueError, KeyError) as e:
108
- error_msg = f"Failed to parse API response: {e}"
109
- logger.error("❌ Sentiment parsing error for request #%d: %s", request_id, error_msg)
110
- analysis_result["error"] = error_msg
111
-
112
- # Always publish a result to the queue, even if it's an error state
113
- payload: SentimentResult = {
114
- "id": request_id,
115
- "text": text,
116
- "result": analysis_result
117
- }
118
- await self.result_queue.put(payload)
119
-
120
- async def stream_results(self): # Type hint removed for simplicity, was -> SentimentResult
121
- """
122
- An async generator that yields new results as they become available.
123
- This is the consumer part of the pattern.
124
- """
125
- while True:
126
- try:
127
- # This efficiently waits until an item is available in the queue
128
- result = await self.result_queue.get()
129
- yield result
130
- self.result_queue.task_done()
131
- except asyncio.CancelledError:
132
- logger.info("Result stream has been cancelled.")
133
- break
 
1
  """
2
+ The Tier 1 Intelligence Sieve.
3
 
4
+ This module uses a locally-hosted, finance-optimized transformer model (FinBERT)
5
+ to perform initial, high-speed sentiment analysis. It acts as a gatekeeper,
6
+ only escalating high-conviction events to the more powerful Tier 2 analyst.
7
  """
8
+ from transformers import pipeline
9
+ from threading import Lock
 
 
 
 
 
 
 
10
 
11
+ class SentimentEngine:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
12
  """
13
+ A thread-safe, high-performance sentiment analysis engine using a local model.
14
+ Implemented as a singleton to ensure the model is loaded only once.
 
 
15
  """
16
+ _instance = None
17
+ _lock = Lock()
18
+
19
+ def __new__(cls):
20
+ with cls._lock:
21
+ if cls._instance is None:
22
+ print("🧠 [Tier 1] Initializing local sentiment model (FinBERT)... This may take a moment.")
23
+ try:
24
+ cls._instance = super().__new__(cls)
25
+ # Using a model specifically fine-tuned on financial text for superior accuracy.
26
+ cls._instance.sentiment_pipeline = pipeline(
27
+ "sentiment-analysis",
28
+ model="ProsusAI/finbert"
29
+ )
30
+ print("βœ… [Tier 1] FinBERT model is online and ready.")
31
+ except Exception as e:
32
+ print(f"❌ CRITICAL: Failed to load local FinBERT model. Tier 1 filtering will be disabled. Error: {e}")
33
+ cls._instance.sentiment_pipeline = None
34
+ return cls._instance
35
+
36
+ def analyze(self, text: str) -> dict:
37
  """
38
+ Analyzes text using the local model if available.
39
+ Returns a dictionary with 'label' and 'score'.
 
 
 
40
  """
41
+ if not self.sentiment_pipeline:
42
+ return {"label": "neutral", "score": 0.0}
 
 
 
43
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
44
  try:
45
+ # FinBERT labels are 'positive', 'negative', 'neutral'
46
+ return self.sentiment_pipeline(text)[0]
47
+ except Exception as e:
48
+ print(f"Error in local sentiment analysis: {e}")
49
+ return {"label": "neutral", "score": 0.0}
50
+
51
+ # Create a singleton instance that will be imported by the main app.
52
+ LocalSentimentFilter = SentimentEngine()