""" Core sentiment analysis engine for MCP server. This module provides sentiment analysis functionality using both TextBlob for simplicity and Transformers for accuracy, with confidence scoring and comprehensive error handling. """ import logging from typing import Dict, Any, Optional, Tuple from enum import Enum import asyncio from concurrent.futures import ThreadPoolExecutor try: from textblob import TextBlob TEXTBLOB_AVAILABLE = True except ImportError: TEXTBLOB_AVAILABLE = False logging.warning("TextBlob not available. Install with: pip install textblob") try: from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification import torch TRANSFORMERS_AVAILABLE = True except ImportError: TRANSFORMERS_AVAILABLE = False logging.warning("Transformers not available. Install with: pip install transformers torch") class SentimentLabel(Enum): """Sentiment classification labels.""" POSITIVE = "positive" NEGATIVE = "negative" NEUTRAL = "neutral" class SentimentResult: """Container for sentiment analysis results.""" def __init__(self, label: SentimentLabel, confidence: float, raw_scores: Optional[Dict[str, float]] = None): self.label = label self.confidence = confidence self.raw_scores = raw_scores or {} def to_dict(self) -> Dict[str, Any]: """Convert result to dictionary format.""" return { "label": self.label.value, "confidence": round(self.confidence, 4), "raw_scores": self.raw_scores } class SentimentAnalyzer: """ Advanced sentiment analysis engine supporting multiple backends. Supports both TextBlob (simple) and Transformers (accurate) for sentiment analysis with confidence scoring and async processing capabilities. """ def __init__(self, backend: str = "auto", model_name: str = "cardiffnlp/twitter-roberta-base-sentiment-latest"): """ Initialize sentiment analyzer. Args: backend: Analysis backend ("textblob", "transformers", or "auto") model_name: Hugging Face model name for transformers backend """ self.backend = backend self.model_name = model_name self.logger = logging.getLogger(__name__) self.executor = ThreadPoolExecutor(max_workers=2) # Model caching self._transformer_pipeline = None self._model_loaded = False # Initialize backend self._initialize_backend() def _initialize_backend(self) -> None: """Initialize the selected backend.""" if self.backend == "auto": if TRANSFORMERS_AVAILABLE: self.backend = "transformers" self.logger.info("Auto-selected Transformers backend") elif TEXTBLOB_AVAILABLE: self.backend = "textblob" self.logger.info("Auto-selected TextBlob backend") else: raise RuntimeError("No sentiment analysis backend available. Install textblob or transformers.") if self.backend == "transformers" and not TRANSFORMERS_AVAILABLE: raise RuntimeError("Transformers backend requested but not available") if self.backend == "textblob" and not TEXTBLOB_AVAILABLE: raise RuntimeError("TextBlob backend requested but not available") async def _load_transformer_model(self) -> None: """Load transformer model asynchronously.""" if self._model_loaded: return try: self.logger.info(f"Loading transformer model: {self.model_name}") # Load model in thread pool to avoid blocking loop = asyncio.get_event_loop() self._transformer_pipeline = await loop.run_in_executor( self.executor, lambda: pipeline( "sentiment-analysis", model=self.model_name, tokenizer=self.model_name, device=0 if torch.cuda.is_available() else -1, return_all_scores=True ) ) self._model_loaded = True self.logger.info("Transformer model loaded successfully") except Exception as e: self.logger.error(f"Failed to load transformer model: {e}") raise RuntimeError(f"Model loading failed: {e}") def _validate_input(self, text: str) -> str: """ Validate and sanitize input text. Args: text: Input text to validate Returns: Sanitized text Raises: ValueError: If text is invalid """ if not isinstance(text, str): raise ValueError("Input must be a string") text = text.strip() if not text: raise ValueError("Input text cannot be empty") if len(text) > 10000: # Reasonable limit raise ValueError("Input text too long (max 10,000 characters)") # Basic sanitization text = text.replace('\x00', '') # Remove null bytes return text def _analyze_with_textblob(self, text: str) -> SentimentResult: """ Analyze sentiment using TextBlob. Args: text: Text to analyze Returns: SentimentResult with classification and confidence """ try: blob = TextBlob(text) polarity = blob.sentiment.polarity # Convert polarity to label and confidence if polarity > 0.1: label = SentimentLabel.POSITIVE confidence = min(polarity, 1.0) elif polarity < -0.1: label = SentimentLabel.NEGATIVE confidence = min(abs(polarity), 1.0) else: label = SentimentLabel.NEUTRAL confidence = 1.0 - abs(polarity) raw_scores = { "polarity": polarity, "subjectivity": blob.sentiment.subjectivity } return SentimentResult(label, confidence, raw_scores) except Exception as e: self.logger.error(f"TextBlob analysis failed: {e}") raise RuntimeError(f"Sentiment analysis failed: {e}") async def _analyze_with_transformers(self, text: str) -> SentimentResult: """ Analyze sentiment using Transformers. Args: text: Text to analyze Returns: SentimentResult with classification and confidence """ try: await self._load_transformer_model() # Run inference in thread pool loop = asyncio.get_event_loop() results = await loop.run_in_executor( self.executor, lambda: self._transformer_pipeline(text) ) # Process results scores = {result['label'].lower(): result['score'] for result in results[0]} # Map model labels to our labels label_mapping = { 'positive': SentimentLabel.POSITIVE, 'negative': SentimentLabel.NEGATIVE, 'neutral': SentimentLabel.NEUTRAL, 'label_0': SentimentLabel.NEGATIVE, # Some models use numeric labels 'label_1': SentimentLabel.NEUTRAL, 'label_2': SentimentLabel.POSITIVE } # Find best match best_score = 0 best_label = SentimentLabel.NEUTRAL for model_label, score in scores.items(): if model_label in label_mapping and score > best_score: best_score = score best_label = label_mapping[model_label] return SentimentResult(best_label, best_score, scores) except Exception as e: self.logger.error(f"Transformers analysis failed: {e}") raise RuntimeError(f"Sentiment analysis failed: {e}") async def analyze(self, text: str) -> SentimentResult: """ Analyze sentiment of input text. Args: text: Text to analyze Returns: SentimentResult with label, confidence, and raw scores Raises: ValueError: If input is invalid RuntimeError: If analysis fails """ # Validate input text = self._validate_input(text) try: if self.backend == "transformers": return await self._analyze_with_transformers(text) elif self.backend == "textblob": # Run TextBlob in thread pool since it's CPU-bound loop = asyncio.get_event_loop() return await loop.run_in_executor( self.executor, self._analyze_with_textblob, text ) else: raise RuntimeError(f"Unknown backend: {self.backend}") except Exception as e: self.logger.error(f"Sentiment analysis failed for text: {text[:100]}... Error: {e}") raise async def analyze_batch(self, texts: list[str]) -> list[SentimentResult]: """ Analyze sentiment for multiple texts concurrently. Args: texts: List of texts to analyze Returns: List of SentimentResult objects """ if not texts: return [] # Analyze all texts concurrently tasks = [self.analyze(text) for text in texts] results = await asyncio.gather(*tasks, return_exceptions=True) # Handle exceptions processed_results = [] for i, result in enumerate(results): if isinstance(result, Exception): self.logger.error(f"Failed to analyze text {i}: {result}") # Return neutral result for failed analysis processed_results.append( SentimentResult(SentimentLabel.NEUTRAL, 0.0, {"error": str(result)}) ) else: processed_results.append(result) return processed_results def get_info(self) -> Dict[str, Any]: """Get information about the analyzer configuration.""" return { "backend": self.backend, "model_name": self.model_name if self.backend == "transformers" else None, "model_loaded": self._model_loaded, "textblob_available": TEXTBLOB_AVAILABLE, "transformers_available": TRANSFORMERS_AVAILABLE, "cuda_available": torch.cuda.is_available() if TRANSFORMERS_AVAILABLE else False } async def cleanup(self) -> None: """Clean up resources.""" self.executor.shutdown(wait=True) self.logger.info("Sentiment analyzer cleaned up") # Global analyzer instance for reuse _global_analyzer: Optional[SentimentAnalyzer] = None async def get_analyzer(backend: str = "auto") -> SentimentAnalyzer: """ Get or create global sentiment analyzer instance. Args: backend: Analysis backend to use Returns: SentimentAnalyzer instance """ global _global_analyzer if _global_analyzer is None: _global_analyzer = SentimentAnalyzer(backend=backend) return _global_analyzer async def analyze_sentiment(text: str, backend: str = "auto") -> Dict[str, Any]: """ Convenience function for sentiment analysis. Args: text: Text to analyze backend: Analysis backend to use Returns: Dictionary with sentiment analysis results """ analyzer = await get_analyzer(backend) result = await analyzer.analyze(text) return result.to_dict() if __name__ == "__main__": # Example usage async def main(): analyzer = SentimentAnalyzer(backend="textblob") test_texts = [ "I love this product! It's amazing!", "This is terrible and I hate it.", "It's okay, nothing special.", "The weather is nice today." ] for text in test_texts: result = await analyzer.analyze(text) print(f"Text: {text}") print(f"Result: {result.to_dict()}") print("-" * 50) await analyzer.cleanup() asyncio.run(main())