CryptoSentinel_AI / app /sentiment.py
mgbam's picture
Update app/sentiment.py
347f446 verified
"""
The Tier 1 Intelligence Sieve.
This module uses a locally-hosted, finance-optimized transformer model (FinBERT)
to perform initial, high-speed sentiment analysis. It acts as a gatekeeper,
only escalating high-conviction events to the more powerful Tier 2 analyst.
"""
from transformers import pipeline
from threading import Lock
class SentimentEngine:
"""
A thread-safe, high-performance sentiment analysis engine using a local model.
Implemented as a singleton to ensure the model is loaded only once.
"""
_instance = None
_lock = Lock()
def __new__(cls):
with cls._lock:
if cls._instance is None:
print("🧠 [Tier 1] Initializing local sentiment model (FinBERT)... This may take a moment.")
try:
cls._instance = super().__new__(cls)
# Using a model specifically fine-tuned on financial text for superior accuracy.
cls._instance.sentiment_pipeline = pipeline(
"sentiment-analysis",
model="ProsusAI/finbert"
)
print("βœ… [Tier 1] FinBERT model is online and ready.")
except Exception as e:
print(f"❌ CRITICAL: Failed to load local FinBERT model. Tier 1 filtering will be disabled. Error: {e}")
cls._instance.sentiment_pipeline = None
return cls._instance
def analyze(self, text: str) -> dict:
"""
Analyzes text using the local model if available.
Returns a dictionary with 'label' and 'score'.
"""
if not self.sentiment_pipeline:
return {"label": "neutral", "score": 0.0}
try:
# FinBERT labels are 'positive', 'negative', 'neutral'
return self.sentiment_pipeline(text)[0]
except Exception as e:
print(f"Error in local sentiment analysis: {e}")
return {"label": "neutral", "score": 0.0}
# Create a singleton instance that will be imported by the main app.
LocalSentimentFilter = SentimentEngine()