wozwize's picture
updating logging
1360e33
import logging
from typing import Dict, Any, List, Optional
from textblob import TextBlob
from transformers import pipeline
import numpy as np
logger = logging.getLogger(__name__)
class SentimentAnalyzer:
def __init__(self, use_ai: bool = True, model_registry: Optional[Any] = None):
"""
Initialize sentiment analyzer with both traditional and LLM-based approaches.
Args:
use_ai: Boolean indicating whether to use AI-powered analysis (True) or traditional analysis (False)
model_registry: Optional shared model registry for better performance
"""
self.use_ai = use_ai
self.llm_available = False
self.model_registry = model_registry
self.toxicity_available = False
# Traditional manipulation patterns
self.manipulative_patterns = [
"experts say",
"sources claim",
"many believe",
"some say",
"everyone knows",
"clearly",
"obviously",
"without doubt",
"certainly"
]
if use_ai:
try:
if model_registry and model_registry.is_available:
# Use shared models
self.sentiment_pipeline = model_registry.sentiment
self.zero_shot = model_registry.zero_shot
self.toxicity_pipeline = getattr(model_registry, 'toxicity', None)
self.toxicity_available = self.toxicity_pipeline is not None
self.llm_available = True
logger.info("Using shared model pipelines for sentiment analysis")
if self.toxicity_available:
logger.info("Toxicity analysis enabled")
else:
logger.info("Toxicity analysis not available")
else:
# Initialize own pipelines
self.sentiment_pipeline = pipeline(
"text-classification",
model="SamLowe/roberta-base-go_emotions",
device=-1,
batch_size=16
)
self.zero_shot = pipeline(
"zero-shot-classification",
model="facebook/bart-large-mnli",
device=-1,
batch_size=8
)
try:
self.toxicity_pipeline = pipeline(
"text-classification",
model="unitary/toxic-bert",
device=-1,
batch_size=16
)
self.toxicity_available = True
logger.info("Toxicity analysis enabled")
except Exception as tox_error:
logger.warning(f"Toxicity pipeline initialization failed: {str(tox_error)}")
self.toxicity_available = False
self.llm_available = True
logger.info("Initialized dedicated model pipelines for sentiment analysis")
except Exception as e:
logger.warning(f"Failed to initialize LLM pipelines: {str(e)}")
self.llm_available = False
else:
logger.info("Initializing sentiment analyzer in traditional mode")
def _analyze_with_llm(self, text: str) -> Dict[str, Any]:
"""Perform sentiment analysis using LLM models."""
try:
logger.info("\n" + "="*50)
logger.info("SENTIMENT ANALYSIS STARTED")
logger.info("="*50)
# Clean the text of formatting markers
logger.info("Cleaning and preparing text...")
cleaned_text = text.replace('$!/$', '').replace('##', '').replace('#', '')
cleaned_text = '\n'.join(line for line in cleaned_text.split('\n')
if not line.startswith('[') and not line.startswith('More on'))
logger.info(f"Text prepared - Length: {len(cleaned_text)} characters")
# Split text into chunks of 512 tokens (approximate)
chunks = [cleaned_text[i:i+2000] for i in range(0, len(cleaned_text), 2000)]
logger.info(f"Split text into {len(chunks)} chunks for processing")
# Initialize aggregation variables
sentiment_scores = []
toxicity_scores = []
manipulation_scores = []
flagged_phrases = []
manipulation_categories = [
"emotional manipulation",
"fear mongering",
"propaganda",
"factual reporting",
"balanced perspective"
]
# Process each chunk
for i, chunk in enumerate(chunks, 1):
logger.info(f"\n{'-'*30}")
logger.info(f"Processing chunk {i}/{len(chunks)}")
logger.info(f"Chunk length: {len(chunk)} characters")
try:
# Get emotion scores
logger.info("Analyzing emotions...")
emotions = self.sentiment_pipeline(chunk)
logger.debug(f"Raw emotion response: {emotions}")
# Handle different response formats
if isinstance(emotions, list):
for emotion in emotions:
if isinstance(emotion, dict) and 'label' in emotion and 'score' in emotion:
sentiment_scores.append(emotion)
logger.info(f"Detected emotion: {emotion['label']} (score: {emotion['score']:.3f})")
elif isinstance(emotions, dict) and 'label' in emotions and 'score' in emotions:
sentiment_scores.append(emotions)
logger.info(f"Detected emotion: {emotions['label']} (score: {emotions['score']:.3f})")
# Get toxicity scores if available
if self.toxicity_available:
logger.info("Analyzing toxicity...")
try:
toxicity = self.toxicity_pipeline(chunk)
if isinstance(toxicity, list):
toxicity_scores.extend(toxicity)
else:
toxicity_scores.append(toxicity)
logger.info(f"Toxicity analysis complete for chunk {i}")
logger.debug(f"Toxicity scores: {toxicity_scores[-1]}")
except Exception as tox_error:
logger.warning(f"Toxicity analysis failed for chunk {i}: {str(tox_error)}")
# Get manipulation scores
logger.info("Analyzing manipulation patterns...")
manipulation = self.zero_shot(
chunk,
manipulation_categories,
multi_label=True
)
if isinstance(manipulation, dict) and 'labels' in manipulation and 'scores' in manipulation:
chunk_scores = {
label: score
for label, score in zip(manipulation['labels'], manipulation['scores'])
}
manipulation_scores.append(chunk_scores)
logger.info("Manipulation scores for chunk:")
for label, score in chunk_scores.items():
logger.info(f" - {label}: {score:.3f}")
# Analyze sentences for manipulation
logger.info("Analyzing individual sentences for manipulation...")
sentences = chunk.split('.')
for sentence in sentences:
if len(sentence.strip()) > 10:
sent_result = self.zero_shot(
sentence.strip(),
manipulation_categories,
multi_label=False
)
if (sent_result['labels'][0] in ["emotional manipulation", "fear mongering", "propaganda"]
and sent_result['scores'][0] > 0.7):
logger.info(f"Found manipulative content (score: {sent_result['scores'][0]:.3f}): {sentence.strip()}")
flagged_phrases.append({
'text': sentence.strip(),
'type': sent_result['labels'][0],
'score': sent_result['scores'][0]
})
except Exception as chunk_error:
logger.error(f"Error processing chunk {i}: {str(chunk_error)}")
continue
logger.info("\nAggregating final scores...")
# Aggregate scores with error handling
def aggregate_scores(scores_list, score_type: str):
try:
if not scores_list:
logger.warning(f"No {score_type} scores to aggregate")
return {}
all_scores = {}
for scores in scores_list:
if isinstance(scores, dict):
if 'label' in scores and 'score' in scores:
label = scores['label']
score = scores['score']
else:
# Handle direct label-score mapping
for label, score in scores.items():
if label not in all_scores:
all_scores[label] = []
if isinstance(score, (int, float)):
all_scores[label].append(score)
continue
else:
logger.warning(f"Unexpected score format in {score_type}: {scores}")
continue
if isinstance(label, (str, bytes)):
if label not in all_scores:
all_scores[label] = []
if isinstance(score, (int, float)):
all_scores[label].append(score)
return {k: float(np.mean(v)) for k, v in all_scores.items() if v}
except Exception as agg_error:
logger.error(f"Error aggregating {score_type} scores: {str(agg_error)}")
return {}
emotion_scores = aggregate_scores(sentiment_scores, "emotion")
toxicity_scores = aggregate_scores(toxicity_scores, "toxicity") if self.toxicity_available else {}
logger.info("\nFinal emotion scores:")
for emotion, score in emotion_scores.items():
logger.info(f" - {emotion}: {score:.3f}")
if toxicity_scores:
logger.info("\nFinal toxicity scores:")
for category, score in toxicity_scores.items():
logger.info(f" - {category}: {score:.3f}")
# Aggregate manipulation scores
manipulation_agg = {
category: float(np.mean([
scores.get(category, 0)
for scores in manipulation_scores
]))
for category in manipulation_categories
if manipulation_scores
}
logger.info("\nFinal manipulation scores:")
for category, score in manipulation_agg.items():
logger.info(f" - {category}: {score:.3f}")
# Calculate manipulation score based on multiple factors
manipulation_indicators = {
'emotional manipulation': 0.4,
'fear mongering': 0.3,
'propaganda': 0.3
}
if self.toxicity_available:
manipulation_indicators.update({
'toxic': 0.2,
'severe_toxic': 0.3,
'threat': 0.2
})
# Combine toxicity and manipulation scores
combined_scores = {**toxicity_scores, **manipulation_agg}
# Calculate manipulation score with fallback
if combined_scores:
manipulation_score = min(100, sum(
combined_scores.get(k, 0) * weight
for k, weight in manipulation_indicators.items()
) * 100)
else:
# Fallback to traditional analysis if no scores available
manipulation_score = len(self._detect_manipulative_phrases(text)) * 10
logger.info(f"\nFinal manipulation score: {manipulation_score:.1f}")
# Determine overall sentiment
positive_emotions = ['admiration', 'joy', 'amusement', 'approval']
negative_emotions = ['disgust', 'anger', 'disappointment', 'fear']
neutral_emotions = ['neutral', 'confusion', 'realization']
pos_score = sum(emotion_scores.get(emotion, 0) for emotion in positive_emotions)
neg_score = sum(emotion_scores.get(emotion, 0) for emotion in negative_emotions)
neu_score = sum(emotion_scores.get(emotion, 0) for emotion in neutral_emotions)
logger.info(f"\nSentiment component scores:")
logger.info(f" - Positive: {pos_score:.3f}")
logger.info(f" - Negative: {neg_score:.3f}")
logger.info(f" - Neutral: {neu_score:.3f}")
# Determine sentiment based on highest score
max_score = max(pos_score, neg_score, neu_score)
if max_score == pos_score and pos_score > 0.3:
sentiment = "Positive"
elif max_score == neg_score and neg_score > 0.3:
sentiment = "Negative"
else:
sentiment = "Neutral"
logger.info(f"\nFinal sentiment determination: {sentiment}")
# Sort and limit flagged phrases by manipulation score
sorted_phrases = sorted(flagged_phrases, key=lambda x: x['score'], reverse=True)
unique_phrases = []
seen = set()
for phrase in sorted_phrases:
clean_text = phrase['text'].strip()
if clean_text not in seen:
unique_phrases.append(clean_text)
seen.add(clean_text)
if len(unique_phrases) >= 5:
break
logger.info(f"\nFlagged {len(unique_phrases)} unique manipulative phrases")
logger.info("\nSentiment analysis completed successfully")
return {
"sentiment": sentiment,
"manipulation_score": round(manipulation_score, 1),
"flagged_phrases": unique_phrases,
"detailed_scores": {
"emotions": emotion_scores,
"manipulation": manipulation_agg,
"toxicity": toxicity_scores
}
}
except Exception as e:
logger.error(f"LLM analysis failed: {str(e)}", exc_info=True)
return None
def analyze(self, text: str) -> Dict[str, Any]:
"""
Analyze sentiment using LLM with fallback to traditional methods.
Args:
text: The text to analyze
Returns:
Dict containing sentiment analysis results
"""
try:
# Try LLM analysis if enabled and available
if self.use_ai and self.llm_available:
llm_result = self._analyze_with_llm(text)
if llm_result:
return llm_result
# Use traditional analysis
logger.info("Using traditional sentiment analysis")
blob = TextBlob(text)
sentiment_score = blob.sentiment.polarity
manipulative_phrases = self._detect_manipulative_phrases(text)
manipulation_score = len(manipulative_phrases) * 10
if sentiment_score > 0.2:
sentiment = "Positive"
elif sentiment_score < -0.2:
sentiment = "Negative"
else:
sentiment = "Neutral"
return {
"sentiment": sentiment,
"manipulation_score": min(manipulation_score, 100),
"flagged_phrases": manipulative_phrases[:5] # Limit to top 5 phrases
}
except Exception as e:
logger.error(f"Error in sentiment analysis: {str(e)}")
return {
"sentiment": "Error",
"manipulation_score": 0,
"flagged_phrases": []
}
def _detect_manipulative_phrases(self, text: str) -> List[str]:
"""Detect potentially manipulative phrases."""
found_phrases = []
text_lower = text.lower()
for pattern in self.manipulative_patterns:
if pattern in text_lower:
start = text_lower.find(pattern)
context = text[max(0, start-20):min(len(text), start+len(pattern)+20)]
found_phrases.append(context.strip())
return found_phrases