File size: 12,897 Bytes
776e7c0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 |
"""
Core sentiment analysis engine for MCP server.
This module provides sentiment analysis functionality using both TextBlob
for simplicity and Transformers for accuracy, with confidence scoring
and comprehensive error handling.
"""
import logging
from typing import Dict, Any, Optional, Tuple
from enum import Enum
import asyncio
from concurrent.futures import ThreadPoolExecutor
try:
from textblob import TextBlob
TEXTBLOB_AVAILABLE = True
except ImportError:
TEXTBLOB_AVAILABLE = False
logging.warning("TextBlob not available. Install with: pip install textblob")
try:
from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification
import torch
TRANSFORMERS_AVAILABLE = True
except ImportError:
TRANSFORMERS_AVAILABLE = False
logging.warning("Transformers not available. Install with: pip install transformers torch")
class SentimentLabel(Enum):
"""Sentiment classification labels."""
POSITIVE = "positive"
NEGATIVE = "negative"
NEUTRAL = "neutral"
class SentimentResult:
"""Container for sentiment analysis results."""
def __init__(self, label: SentimentLabel, confidence: float, raw_scores: Optional[Dict[str, float]] = None):
self.label = label
self.confidence = confidence
self.raw_scores = raw_scores or {}
def to_dict(self) -> Dict[str, Any]:
"""Convert result to dictionary format."""
return {
"label": self.label.value,
"confidence": round(self.confidence, 4),
"raw_scores": self.raw_scores
}
class SentimentAnalyzer:
"""
Advanced sentiment analysis engine supporting multiple backends.
Supports both TextBlob (simple) and Transformers (accurate) for sentiment analysis
with confidence scoring and async processing capabilities.
"""
def __init__(self, backend: str = "auto", model_name: str = "cardiffnlp/twitter-roberta-base-sentiment-latest"):
"""
Initialize sentiment analyzer.
Args:
backend: Analysis backend ("textblob", "transformers", or "auto")
model_name: Hugging Face model name for transformers backend
"""
self.backend = backend
self.model_name = model_name
self.logger = logging.getLogger(__name__)
self.executor = ThreadPoolExecutor(max_workers=2)
# Model caching
self._transformer_pipeline = None
self._model_loaded = False
# Initialize backend
self._initialize_backend()
def _initialize_backend(self) -> None:
"""Initialize the selected backend."""
if self.backend == "auto":
if TRANSFORMERS_AVAILABLE:
self.backend = "transformers"
self.logger.info("Auto-selected Transformers backend")
elif TEXTBLOB_AVAILABLE:
self.backend = "textblob"
self.logger.info("Auto-selected TextBlob backend")
else:
raise RuntimeError("No sentiment analysis backend available. Install textblob or transformers.")
if self.backend == "transformers" and not TRANSFORMERS_AVAILABLE:
raise RuntimeError("Transformers backend requested but not available")
if self.backend == "textblob" and not TEXTBLOB_AVAILABLE:
raise RuntimeError("TextBlob backend requested but not available")
async def _load_transformer_model(self) -> None:
"""Load transformer model asynchronously."""
if self._model_loaded:
return
try:
self.logger.info(f"Loading transformer model: {self.model_name}")
# Load model in thread pool to avoid blocking
loop = asyncio.get_event_loop()
self._transformer_pipeline = await loop.run_in_executor(
self.executor,
lambda: pipeline(
"sentiment-analysis",
model=self.model_name,
tokenizer=self.model_name,
device=0 if torch.cuda.is_available() else -1,
return_all_scores=True
)
)
self._model_loaded = True
self.logger.info("Transformer model loaded successfully")
except Exception as e:
self.logger.error(f"Failed to load transformer model: {e}")
raise RuntimeError(f"Model loading failed: {e}")
def _validate_input(self, text: str) -> str:
"""
Validate and sanitize input text.
Args:
text: Input text to validate
Returns:
Sanitized text
Raises:
ValueError: If text is invalid
"""
if not isinstance(text, str):
raise ValueError("Input must be a string")
text = text.strip()
if not text:
raise ValueError("Input text cannot be empty")
if len(text) > 10000: # Reasonable limit
raise ValueError("Input text too long (max 10,000 characters)")
# Basic sanitization
text = text.replace('\x00', '') # Remove null bytes
return text
def _analyze_with_textblob(self, text: str) -> SentimentResult:
"""
Analyze sentiment using TextBlob.
Args:
text: Text to analyze
Returns:
SentimentResult with classification and confidence
"""
try:
blob = TextBlob(text)
polarity = blob.sentiment.polarity
# Convert polarity to label and confidence
if polarity > 0.1:
label = SentimentLabel.POSITIVE
confidence = min(polarity, 1.0)
elif polarity < -0.1:
label = SentimentLabel.NEGATIVE
confidence = min(abs(polarity), 1.0)
else:
label = SentimentLabel.NEUTRAL
confidence = 1.0 - abs(polarity)
raw_scores = {
"polarity": polarity,
"subjectivity": blob.sentiment.subjectivity
}
return SentimentResult(label, confidence, raw_scores)
except Exception as e:
self.logger.error(f"TextBlob analysis failed: {e}")
raise RuntimeError(f"Sentiment analysis failed: {e}")
async def _analyze_with_transformers(self, text: str) -> SentimentResult:
"""
Analyze sentiment using Transformers.
Args:
text: Text to analyze
Returns:
SentimentResult with classification and confidence
"""
try:
await self._load_transformer_model()
# Run inference in thread pool
loop = asyncio.get_event_loop()
results = await loop.run_in_executor(
self.executor,
lambda: self._transformer_pipeline(text)
)
# Process results
scores = {result['label'].lower(): result['score'] for result in results[0]}
# Map model labels to our labels
label_mapping = {
'positive': SentimentLabel.POSITIVE,
'negative': SentimentLabel.NEGATIVE,
'neutral': SentimentLabel.NEUTRAL,
'label_0': SentimentLabel.NEGATIVE, # Some models use numeric labels
'label_1': SentimentLabel.NEUTRAL,
'label_2': SentimentLabel.POSITIVE
}
# Find best match
best_score = 0
best_label = SentimentLabel.NEUTRAL
for model_label, score in scores.items():
if model_label in label_mapping and score > best_score:
best_score = score
best_label = label_mapping[model_label]
return SentimentResult(best_label, best_score, scores)
except Exception as e:
self.logger.error(f"Transformers analysis failed: {e}")
raise RuntimeError(f"Sentiment analysis failed: {e}")
async def analyze(self, text: str) -> SentimentResult:
"""
Analyze sentiment of input text.
Args:
text: Text to analyze
Returns:
SentimentResult with label, confidence, and raw scores
Raises:
ValueError: If input is invalid
RuntimeError: If analysis fails
"""
# Validate input
text = self._validate_input(text)
try:
if self.backend == "transformers":
return await self._analyze_with_transformers(text)
elif self.backend == "textblob":
# Run TextBlob in thread pool since it's CPU-bound
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self.executor,
self._analyze_with_textblob,
text
)
else:
raise RuntimeError(f"Unknown backend: {self.backend}")
except Exception as e:
self.logger.error(f"Sentiment analysis failed for text: {text[:100]}... Error: {e}")
raise
async def analyze_batch(self, texts: list[str]) -> list[SentimentResult]:
"""
Analyze sentiment for multiple texts concurrently.
Args:
texts: List of texts to analyze
Returns:
List of SentimentResult objects
"""
if not texts:
return []
# Analyze all texts concurrently
tasks = [self.analyze(text) for text in texts]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Handle exceptions
processed_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
self.logger.error(f"Failed to analyze text {i}: {result}")
# Return neutral result for failed analysis
processed_results.append(
SentimentResult(SentimentLabel.NEUTRAL, 0.0, {"error": str(result)})
)
else:
processed_results.append(result)
return processed_results
def get_info(self) -> Dict[str, Any]:
"""Get information about the analyzer configuration."""
return {
"backend": self.backend,
"model_name": self.model_name if self.backend == "transformers" else None,
"model_loaded": self._model_loaded,
"textblob_available": TEXTBLOB_AVAILABLE,
"transformers_available": TRANSFORMERS_AVAILABLE,
"cuda_available": torch.cuda.is_available() if TRANSFORMERS_AVAILABLE else False
}
async def cleanup(self) -> None:
"""Clean up resources."""
self.executor.shutdown(wait=True)
self.logger.info("Sentiment analyzer cleaned up")
# Global analyzer instance for reuse
_global_analyzer: Optional[SentimentAnalyzer] = None
async def get_analyzer(backend: str = "auto") -> SentimentAnalyzer:
"""
Get or create global sentiment analyzer instance.
Args:
backend: Analysis backend to use
Returns:
SentimentAnalyzer instance
"""
global _global_analyzer
if _global_analyzer is None:
_global_analyzer = SentimentAnalyzer(backend=backend)
return _global_analyzer
async def analyze_sentiment(text: str, backend: str = "auto") -> Dict[str, Any]:
"""
Convenience function for sentiment analysis.
Args:
text: Text to analyze
backend: Analysis backend to use
Returns:
Dictionary with sentiment analysis results
"""
analyzer = await get_analyzer(backend)
result = await analyzer.analyze(text)
return result.to_dict()
if __name__ == "__main__":
# Example usage
async def main():
analyzer = SentimentAnalyzer(backend="textblob")
test_texts = [
"I love this product! It's amazing!",
"This is terrible and I hate it.",
"It's okay, nothing special.",
"The weather is nice today."
]
for text in test_texts:
result = await analyzer.analyze(text)
print(f"Text: {text}")
print(f"Result: {result.to_dict()}")
print("-" * 50)
await analyzer.cleanup()
asyncio.run(main()) |