wozwize's picture
updating logging
1360e33
import logging
import os
from typing import Dict, Any, List, Optional
from transformers import pipeline
import numpy as np
import nltk
from nltk.tokenize import sent_tokenize
logger = logging.getLogger(__name__)
class BiasAnalyzer:
def __init__(self, use_ai: bool = True, model_registry: Optional[Any] = None):
"""
Initialize bias analyzer with both LLM and traditional approaches.
Args:
use_ai: Boolean indicating whether to use AI-powered analysis (True) or traditional analysis (False)
model_registry: Optional shared model registry for better performance
"""
self.use_ai = use_ai
self.llm_available = False
self.model_registry = model_registry
# Load traditional keywords
self.resources_dir = os.path.join(os.path.dirname(__file__), '..', 'resources')
self.left_keywords = self._load_keywords('left_bias_words.txt')
self.right_keywords = self._load_keywords('right_bias_words.txt')
if use_ai:
try:
if model_registry and model_registry.is_available:
self.classifier = model_registry.zero_shot
self.llm_available = True
logger.info("Using shared model pipeline for bias analysis")
else:
# Initialize own pipeline if no shared registry
self.classifier = pipeline(
"zero-shot-classification",
model="facebook/bart-large-mnli",
device=-1,
batch_size=8
)
self.llm_available = True
logger.info("Initialized dedicated model pipeline for bias analysis")
except Exception as e:
logger.warning(f"Failed to initialize LLM pipeline: {str(e)}")
self.llm_available = False
else:
logger.info("Initializing bias analyzer in traditional mode")
def analyze(self, text: str) -> Dict[str, Any]:
"""
Analyze bias using LLM with fallback to traditional method.
Args:
text: The text to analyze
Returns:
Dict containing bias analysis results
"""
try:
# Try LLM analysis if enabled and available
if self.use_ai and self.llm_available:
llm_result = self._analyze_with_llm(text)
if llm_result:
return llm_result
# Use traditional analysis
logger.info("Using traditional bias analysis")
return self._analyze_traditional(text)
except Exception as e:
logger.error(f"Error in bias analysis: {str(e)}")
return {
"bias": "Error",
"bias_score": 0.0,
"bias_percentage": 0,
"flagged_phrases": []
}
def _load_keywords(self, filename: str) -> List[str]:
"""Load keywords from file."""
try:
filepath = os.path.join(self.resources_dir, filename)
with open(filepath, 'r', encoding='utf-8') as f:
return [line.strip().lower() for line in f if line.strip() and not line.startswith('#')]
except Exception as e:
logger.error(f"Error loading {filename}: {str(e)}")
return []
def _analyze_traditional(self, text: str) -> Dict[str, Any]:
"""Traditional keyword-based bias analysis."""
text_lower = text.lower()
# Count matches and collect flagged phrases
left_matches = [word for word in self.left_keywords if word in text_lower]
right_matches = [word for word in self.right_keywords if word in text_lower]
left_count = len(left_matches)
right_count = len(right_matches)
total_count = left_count + right_count
if total_count == 0:
return {
"bias": "Neutral",
"bias_score": 0.0,
"bias_percentage": 0,
"flagged_phrases": []
}
# Calculate bias score (-1 to 1)
bias_score = (right_count - left_count) / total_count
# Calculate bias percentage
bias_percentage = abs(bias_score * 100)
# Determine bias label
if bias_score < -0.6:
bias = "Strongly Left"
elif bias_score < -0.3:
bias = "Moderately Left"
elif bias_score < -0.1:
bias = "Leaning Left"
elif bias_score > 0.6:
bias = "Strongly Right"
elif bias_score > 0.3:
bias = "Moderately Right"
elif bias_score > 0.1:
bias = "Leaning Right"
else:
bias = "Neutral"
return {
"bias": bias,
"bias_score": round(bias_score, 2),
"bias_percentage": round(bias_percentage, 1),
"flagged_phrases": list(set(left_matches + right_matches))[:5] # Limit to top 5 unique phrases
}
def _analyze_with_llm(self, text: str) -> Dict[str, Any]:
"""Analyze bias using LLM zero-shot classification with batch processing."""
try:
logger.info("\n" + "="*50)
logger.info("BIAS ANALYSIS STARTED")
logger.info("="*50)
# Define bias categories
bias_categories = [
"left-wing bias",
"right-wing bias",
"neutral/balanced perspective"
]
logger.info("Using categories for analysis:")
for cat in bias_categories:
logger.info(f" - {cat}")
# Clean and prepare text
logger.info("\nCleaning and preparing text...")
cleaned_text = text.replace('$!/$', '').replace('##', '').replace('#', '')
cleaned_text = '\n'.join(line for line in cleaned_text.split('\n')
if not line.startswith('[') and not line.startswith('More on'))
logger.info(f"Text prepared - Length: {len(cleaned_text)} characters")
# Split into larger chunks (4000 chars) for fewer API calls
chunks = [cleaned_text[i:i+4000] for i in range(0, len(cleaned_text), 4000)]
logger.info(f"Split text into {len(chunks)} chunks for processing")
# Process chunks in batches
chunk_scores = []
flagged_phrases = []
for i, chunk in enumerate(chunks, 1):
logger.info(f"\n{'-'*30}")
logger.info(f"Processing chunk {i}/{len(chunks)}")
logger.info(f"Chunk length: {len(chunk)} characters")
# Analyze chunk as a whole first
logger.info("Analyzing chunk for overall bias...")
chunk_result = self.classifier(
chunk,
bias_categories,
multi_label=True
)
chunk_scores.append({
label: score
for label, score in zip(chunk_result['labels'], chunk_result['scores'])
})
logger.info("Chunk bias scores:")
for label, score in chunk_scores[-1].items():
logger.info(f" - {label}: {score:.3f}")
# Only analyze individual sentences if chunk shows strong bias
max_chunk_score = max(chunk_result['scores'])
if max_chunk_score > 0.6:
logger.info(f"Strong bias detected (score: {max_chunk_score:.3f}), analyzing individual sentences...")
sentences = sent_tokenize(chunk)
logger.info(f"Found {len(sentences)} sentences to analyze")
# Filter sentences for analysis (longer, potentially more meaningful ones)
relevant_sentences = [s.strip() for s in sentences if len(s.strip()) > 20]
logger.info(f"Filtered to {len(relevant_sentences)} relevant sentences")
# Process sentences in batches of 8
for j in range(0, len(relevant_sentences), 8):
batch = relevant_sentences[j:j+8]
try:
batch_results = self.classifier(
batch,
bias_categories,
multi_label=False
)
# Handle single or multiple results
if not isinstance(batch_results, list):
batch_results = [batch_results]
for sentence, result in zip(batch, batch_results):
max_score = max(result['scores'])
if max_score > 0.8 and result['labels'][0] != "neutral/balanced perspective":
logger.info(f"Found biased sentence (score: {max_score:.3f}, type: {result['labels'][0]}):")
logger.info(f" \"{sentence}\"")
flagged_phrases.append({
"text": sentence,
"type": result['labels'][0],
"score": max_score,
"highlight": f"[{result['labels'][0].upper()}] (Score: {round(max_score * 100, 1)}%) \"{sentence}\""
})
except Exception as batch_error:
logger.warning(f"Batch processing error: {str(batch_error)}")
continue
# Aggregate scores across chunks
logger.info("\nAggregating scores across all chunks...")
aggregated_scores = {
category: np.mean([
scores[category]
for scores in chunk_scores
])
for category in bias_categories
}
logger.info("\nFinal aggregated scores:")
for category, score in aggregated_scores.items():
logger.info(f" - {category}: {score:.3f}")
# Calculate bias metrics
left_score = aggregated_scores["left-wing bias"]
right_score = aggregated_scores["right-wing bias"]
neutral_score = aggregated_scores["neutral/balanced perspective"]
# Calculate bias score (-1 to 1)
bias_score = (right_score - left_score) / max(right_score + left_score, 0.0001)
logger.info(f"\nRaw bias score: {bias_score:.3f}")
# Determine bias label
if bias_score < -0.6:
bias = "Strongly Left"
elif bias_score < -0.3:
bias = "Moderately Left"
elif bias_score < -0.1:
bias = "Leaning Left"
elif bias_score > 0.6:
bias = "Strongly Right"
elif bias_score > 0.3:
bias = "Moderately Right"
elif bias_score > 0.1:
bias = "Leaning Right"
else:
bias = "Neutral"
logger.info(f"Determined bias label: {bias}")
# Calculate bias percentage (0-100)
bias_percentage = min(100, abs(bias_score * 100))
logger.info(f"Bias percentage: {bias_percentage:.1f}%")
# Sort and limit flagged phrases
sorted_phrases = sorted(flagged_phrases, key=lambda x: x['score'], reverse=True)
unique_phrases = []
seen = set()
for phrase in sorted_phrases:
if phrase['text'] not in seen:
unique_phrases.append(phrase)
seen.add(phrase['text'])
if len(unique_phrases) >= 5:
break
logger.info(f"\nFlagged {len(unique_phrases)} unique biased phrases")
logger.info("\nBias analysis completed successfully")
return {
"bias": bias,
"bias_score": round(bias_score, 2),
"bias_percentage": round(bias_percentage, 1),
"flagged_phrases": unique_phrases,
"detailed_scores": {
"left_bias": round(left_score * 100, 1),
"right_bias": round(right_score * 100, 1),
"neutral": round(neutral_score * 100, 1)
}
}
except Exception as e:
logger.error(f"LLM analysis failed: {str(e)}")
return None