Spaces:
Sleeping
Sleeping
| """ | |
| The Tier 1 Intelligence Sieve. | |
| This module uses a locally-hosted, finance-optimized transformer model (FinBERT) | |
| to perform initial, high-speed sentiment analysis. It acts as a gatekeeper, | |
| only escalating high-conviction events to the more powerful Tier 2 analyst. | |
| """ | |
| from transformers import pipeline | |
| from threading import Lock | |
| class SentimentEngine: | |
| """ | |
| A thread-safe, high-performance sentiment analysis engine using a local model. | |
| Implemented as a singleton to ensure the model is loaded only once. | |
| """ | |
| _instance = None | |
| _lock = Lock() | |
| def __new__(cls): | |
| with cls._lock: | |
| if cls._instance is None: | |
| print("π§ [Tier 1] Initializing local sentiment model (FinBERT)... This may take a moment.") | |
| try: | |
| cls._instance = super().__new__(cls) | |
| # Using a model specifically fine-tuned on financial text for superior accuracy. | |
| cls._instance.sentiment_pipeline = pipeline( | |
| "sentiment-analysis", | |
| model="ProsusAI/finbert" | |
| ) | |
| print("β [Tier 1] FinBERT model is online and ready.") | |
| except Exception as e: | |
| print(f"β CRITICAL: Failed to load local FinBERT model. Tier 1 filtering will be disabled. Error: {e}") | |
| cls._instance.sentiment_pipeline = None | |
| return cls._instance | |
| def analyze(self, text: str) -> dict: | |
| """ | |
| Analyzes text using the local model if available. | |
| Returns a dictionary with 'label' and 'score'. | |
| """ | |
| if not self.sentiment_pipeline: | |
| return {"label": "neutral", "score": 0.0} | |
| try: | |
| # FinBERT labels are 'positive', 'negative', 'neutral' | |
| return self.sentiment_pipeline(text)[0] | |
| except Exception as e: | |
| print(f"Error in local sentiment analysis: {e}") | |
| return {"label": "neutral", "score": 0.0} | |
| # Create a singleton instance that will be imported by the main app. | |
| LocalSentimentFilter = SentimentEngine() |