entropy25's picture
Update app.py
061ab6f verified
raw
history blame
53.4 kB
import torch
import gradio as gr
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
import numpy as np
from wordcloud import WordCloud
from collections import Counter, defaultdict
import re
import json
import csv
import io
import tempfile
from datetime import datetime
import logging
from functools import lru_cache, wraps
from dataclasses import dataclass
from typing import List, Dict, Optional, Tuple, Any, Callable
from contextlib import contextmanager
import nltk
from nltk.corpus import stopwords
import langdetect
import pandas as pd
import gc
# Advanced analysis imports
import shap
import lime
from lime.lime_text import LimeTextExplainer
# Configuration
@dataclass
class Config:
MAX_HISTORY_SIZE: int = 1000
BATCH_SIZE_LIMIT: int = 50
MAX_TEXT_LENGTH: int = 512
MIN_WORD_LENGTH: int = 2
CACHE_SIZE: int = 128
BATCH_PROCESSING_SIZE: int = 8
# Supported languages and models
SUPPORTED_LANGUAGES = {
'auto': 'Auto Detect',
'en': 'English',
'zh': 'Chinese',
'es': 'Spanish',
'fr': 'French',
'de': 'German',
'sv': 'Swedish'
}
MODELS = {
'en': "cardiffnlp/twitter-roberta-base-sentiment-latest",
'multilingual': "cardiffnlp/twitter-xlm-roberta-base-sentiment",
'zh': "uer/roberta-base-finetuned-dianping-chinese"
}
# Color themes for Plotly
THEMES = {
'default': {'pos': '#4CAF50', 'neg': '#F44336', 'neu': '#FF9800'},
'ocean': {'pos': '#0077BE', 'neg': '#FF6B35', 'neu': '#00BCD4'},
'dark': {'pos': '#66BB6A', 'neg': '#EF5350', 'neu': '#FFA726'},
'rainbow': {'pos': '#9C27B0', 'neg': '#E91E63', 'neu': '#FF5722'}
}
config = Config()
# Logging setup
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize NLTK
try:
nltk.download('stopwords', quiet=True)
nltk.download('punkt', quiet=True)
STOP_WORDS = set(stopwords.words('english'))
except:
STOP_WORDS = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by'}
# Decorators and Context Managers
def handle_errors(default_return=None):
"""Centralized error handling decorator"""
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
logger.error(f"{func.__name__} failed: {e}")
return default_return if default_return is not None else f"Error: {str(e)}"
return wrapper
return decorator
@contextmanager
def memory_cleanup():
"""Context manager for memory cleanup"""
try:
yield
finally:
gc.collect()
class ThemeContext:
"""Theme management context"""
def __init__(self, theme: str = 'default'):
self.theme = theme
self.colors = config.THEMES.get(theme, config.THEMES['default'])
# Enhanced Model Manager with Multi-language Support
class ModelManager:
"""Multi-language model manager with lazy loading"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if not self._initialized:
self.models = {}
self.tokenizers = {}
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self._load_default_models()
self._initialized = True
def _load_default_models(self):
"""Load default models"""
try:
# Load multilingual model as default
model_name = config.MODELS['multilingual']
self.tokenizers['default'] = AutoTokenizer.from_pretrained(model_name)
self.models['default'] = AutoModelForSequenceClassification.from_pretrained(model_name)
self.models['default'].to(self.device)
logger.info(f"Default model loaded: {model_name}")
# Load Chinese model
zh_model_name = config.MODELS['zh']
self.tokenizers['zh'] = AutoTokenizer.from_pretrained(zh_model_name)
self.models['zh'] = AutoModelForSequenceClassification.from_pretrained(zh_model_name)
self.models['zh'].to(self.device)
logger.info(f"Chinese model loaded: {zh_model_name}")
except Exception as e:
logger.error(f"Failed to load models: {e}")
raise
def get_model(self, language='en'):
"""Get model for specific language"""
if language == 'zh':
return self.models['zh'], self.tokenizers['zh']
return self.models['default'], self.tokenizers['default']
@staticmethod
def detect_language(text: str) -> str:
"""Detect text language"""
try:
detected = langdetect.detect(text)
language_mapping = {
'zh-cn': 'zh',
'zh-tw': 'zh'
}
detected = language_mapping.get(detected, detected)
return detected if detected in config.SUPPORTED_LANGUAGES else 'en'
except:
return 'en'
# Simplified Text Processing
class TextProcessor:
"""Optimized text processing with multi-language support"""
@staticmethod
@lru_cache(maxsize=config.CACHE_SIZE)
def clean_text(text: str, remove_punctuation: bool = True, remove_numbers: bool = False) -> str:
"""Clean text with language awareness"""
text = text.strip()
# Don't clean Chinese text aggressively
if re.search(r'[\u4e00-\u9fff]', text):
return text
text = text.lower()
if remove_numbers:
text = re.sub(r'\d+', '', text)
if remove_punctuation:
text = re.sub(r'[^\w\s]', '', text)
words = text.split()
cleaned_words = [w for w in words if w not in STOP_WORDS and len(w) >= config.MIN_WORD_LENGTH]
return ' '.join(cleaned_words)
@staticmethod
def extract_keywords(text: str, top_k: int = 5) -> List[str]:
"""Extract keywords with language support"""
if re.search(r'[\u4e00-\u9fff]', text):
# Chinese text processing
words = re.findall(r'[\u4e00-\u9fff]+', text)
all_chars = ''.join(words)
char_freq = Counter(all_chars)
return [char for char, _ in char_freq.most_common(top_k)]
else:
# Other languages
cleaned = TextProcessor.clean_text(text)
words = cleaned.split()
word_freq = Counter(words)
return [word for word, _ in word_freq.most_common(top_k)]
@staticmethod
def parse_batch_input(text: str) -> List[str]:
"""Parse batch input from textarea"""
lines = text.strip().split('\n')
return [line.strip() for line in lines if line.strip()]
# Enhanced History Manager
class HistoryManager:
"""Enhanced history management with filtering"""
def __init__(self):
self._history = []
def add(self, entry: Dict):
"""Add entry with timestamp"""
entry['timestamp'] = datetime.now().isoformat()
self._history.append(entry)
if len(self._history) > config.MAX_HISTORY_SIZE:
self._history = self._history[-config.MAX_HISTORY_SIZE:]
def add_batch(self, entries: List[Dict]):
"""Add multiple entries"""
for entry in entries:
self.add(entry)
def get_all(self) -> List[Dict]:
return self._history.copy()
def get_recent(self, n: int = 10) -> List[Dict]:
return self._history[-n:] if self._history else []
def filter_by(self, sentiment: str = None, language: str = None,
min_confidence: float = None) -> List[Dict]:
"""Filter history by criteria"""
filtered = self._history
if sentiment:
filtered = [h for h in filtered if h['sentiment'] == sentiment]
if language:
filtered = [h for h in filtered if h.get('language', 'en') == language]
if min_confidence:
filtered = [h for h in filtered if h['confidence'] >= min_confidence]
return filtered
def clear(self) -> int:
count = len(self._history)
self._history.clear()
return count
def size(self) -> int:
return len(self._history)
def get_stats(self) -> Dict:
"""Get comprehensive statistics"""
if not self._history:
return {}
sentiments = [item['sentiment'] for item in self._history]
confidences = [item['confidence'] for item in self._history]
languages = [item.get('language', 'en') for item in self._history]
return {
'total_analyses': len(self._history),
'positive_count': sentiments.count('Positive'),
'negative_count': sentiments.count('Negative'),
'neutral_count': sentiments.count('Neutral'),
'avg_confidence': np.mean(confidences),
'max_confidence': np.max(confidences),
'min_confidence': np.min(confidences),
'languages_detected': len(set(languages)),
'most_common_language': Counter(languages).most_common(1)[0][0] if languages else 'en'
}
# Core Sentiment Analysis Engine (Modified - removed attention analysis)
class SentimentEngine:
"""Multi-language sentiment analysis engine"""
def __init__(self):
self.model_manager = ModelManager()
@handle_errors(default_return={'sentiment': 'Unknown', 'confidence': 0.0, 'keywords': []})
def analyze_single(self, text: str, language: str = 'auto', preprocessing_options: Dict = None) -> Dict:
"""Analyze single text with basic features"""
if not text.strip():
raise ValueError("Empty text provided")
# Detect language
if language == 'auto':
detected_lang = self.model_manager.detect_language(text)
else:
detected_lang = language
# Get appropriate model
model, tokenizer = self.model_manager.get_model(detected_lang)
# Preprocessing
options = preprocessing_options or {}
processed_text = text
if options.get('clean_text', False) and not re.search(r'[\u4e00-\u9fff]', text):
processed_text = TextProcessor.clean_text(
text,
options.get('remove_punctuation', True),
options.get('remove_numbers', False)
)
# Tokenize and analyze
inputs = tokenizer(processed_text, return_tensors="pt", padding=True,
truncation=True, max_length=config.MAX_TEXT_LENGTH).to(self.model_manager.device)
with torch.no_grad():
outputs = model(**inputs)
probs = torch.nn.functional.softmax(outputs.logits, dim=-1).cpu().numpy()[0]
# Handle different model outputs
if len(probs) == 3: # negative, neutral, positive
sentiment_idx = np.argmax(probs)
sentiment_labels = ['Negative', 'Neutral', 'Positive']
sentiment = sentiment_labels[sentiment_idx]
confidence = float(probs[sentiment_idx])
result = {
'sentiment': sentiment,
'confidence': confidence,
'neg_prob': float(probs[0]),
'neu_prob': float(probs[1]),
'pos_prob': float(probs[2]),
'has_neutral': True
}
else: # negative, positive
pred = np.argmax(probs)
sentiment = "Positive" if pred == 1 else "Negative"
confidence = float(probs[pred])
result = {
'sentiment': sentiment,
'confidence': confidence,
'neg_prob': float(probs[0]),
'pos_prob': float(probs[1]),
'neu_prob': 0.0,
'has_neutral': False
}
# Extract basic keywords
keywords = TextProcessor.extract_keywords(text, 10)
keyword_tuples = [(word, 0.1) for word in keywords] # Simple keyword extraction
# Add metadata
result.update({
'language': detected_lang,
'keywords': keyword_tuples,
'word_count': len(text.split()),
'char_count': len(text)
})
return result
@handle_errors(default_return=[])
def analyze_batch(self, texts: List[str], language: str = 'auto',
preprocessing_options: Dict = None, progress_callback=None) -> List[Dict]:
"""Optimized batch processing"""
if len(texts) > config.BATCH_SIZE_LIMIT:
texts = texts[:config.BATCH_SIZE_LIMIT]
results = []
batch_size = config.BATCH_PROCESSING_SIZE
for i in range(0, len(texts), batch_size):
batch = texts[i:i+batch_size]
if progress_callback:
progress_callback((i + len(batch)) / len(texts))
for text in batch:
try:
result = self.analyze_single(text, language, preprocessing_options)
result['batch_index'] = len(results)
result['text'] = text[:100] + '...' if len(text) > 100 else text
result['full_text'] = text
results.append(result)
except Exception as e:
results.append({
'sentiment': 'Error',
'confidence': 0.0,
'error': str(e),
'batch_index': len(results),
'text': text[:100] + '...' if len(text) > 100 else text,
'full_text': text
})
return results
# Advanced Analysis Engine (NEW)
class AdvancedAnalysisEngine:
"""Advanced analysis using SHAP and LIME"""
def __init__(self):
self.model_manager = ModelManager()
def create_prediction_function(self, model, tokenizer, device):
"""Create prediction function for LIME/SHAP"""
def predict_proba(texts):
results = []
for text in texts:
inputs = tokenizer(text, return_tensors="pt", padding=True,
truncation=True, max_length=config.MAX_TEXT_LENGTH).to(device)
with torch.no_grad():
outputs = model(**inputs)
probs = torch.nn.functional.softmax(outputs.logits, dim=-1).cpu().numpy()[0]
results.append(probs)
return np.array(results)
return predict_proba
@handle_errors(default_return=("Analysis failed", None, None))
def analyze_with_shap(self, text: str, language: str = 'auto') -> Tuple[str, go.Figure, Dict]:
"""Perform SHAP analysis"""
if not text.strip():
return "Please enter text for analysis", None, {}
# Detect language and get model
if language == 'auto':
detected_lang = self.model_manager.detect_language(text)
else:
detected_lang = language
model, tokenizer = self.model_manager.get_model(detected_lang)
# Create prediction function
predict_fn = self.create_prediction_function(model, tokenizer, self.model_manager.device)
try:
# Initialize SHAP explainer
explainer = shap.Explainer(predict_fn, tokenizer)
# Get SHAP values
shap_values = explainer([text])
# Extract token importance
tokens = shap_values.data[0]
values = shap_values.values[0]
# Create visualization data
if len(values.shape) > 1:
# Multi-class case
pos_values = values[:, -1] if values.shape[1] == 3 else values[:, 1]
else:
pos_values = values
# Create SHAP plot
fig = go.Figure()
colors = ['red' if v < 0 else 'green' for v in pos_values]
fig.add_trace(go.Bar(
x=list(range(len(tokens))),
y=pos_values,
text=tokens,
textposition='outside',
marker_color=colors,
name='SHAP Values'
))
fig.update_layout(
title="SHAP Analysis - Token Importance",
xaxis_title="Token Index",
yaxis_title="SHAP Value",
height=500,
xaxis=dict(tickmode='array', tickvals=list(range(len(tokens))), ticktext=tokens)
)
# Create analysis summary
analysis_data = {
'method': 'SHAP',
'language': detected_lang,
'total_tokens': len(tokens),
'positive_influence': sum(1 for v in pos_values if v > 0),
'negative_influence': sum(1 for v in pos_values if v < 0),
'most_important_tokens': [(tokens[i], float(pos_values[i]))
for i in np.argsort(np.abs(pos_values))[-5:]]
}
summary_text = f"""
**SHAP Analysis Results:**
- **Language:** {detected_lang.upper()}
- **Total Tokens:** {analysis_data['total_tokens']}
- **Positive Influence Tokens:** {analysis_data['positive_influence']}
- **Negative Influence Tokens:** {analysis_data['negative_influence']}
- **Most Important Tokens:** {', '.join([f"{token}({score:.3f})" for token, score in analysis_data['most_important_tokens']])}
"""
return summary_text, fig, analysis_data
except Exception as e:
logger.error(f"SHAP analysis failed: {e}")
return f"SHAP analysis failed: {str(e)}", None, {}
@handle_errors(default_return=("Analysis failed", None, None))
def analyze_with_lime(self, text: str, language: str = 'auto') -> Tuple[str, go.Figure, Dict]:
"""Perform LIME analysis"""
if not text.strip():
return "Please enter text for analysis", None, {}
# Detect language and get model
if language == 'auto':
detected_lang = self.model_manager.detect_language(text)
else:
detected_lang = language
model, tokenizer = self.model_manager.get_model(detected_lang)
# Create prediction function
predict_fn = self.create_prediction_function(model, tokenizer, self.model_manager.device)
try:
# Initialize LIME explainer
explainer = LimeTextExplainer(class_names=['Negative', 'Neutral', 'Positive'])
# Get LIME explanation
exp = explainer.explain_instance(text, predict_fn, num_features=20)
# Extract feature importance
lime_data = exp.as_list()
# Create visualization
words = [item[0] for item in lime_data]
scores = [item[1] for item in lime_data]
fig = go.Figure()
colors = ['red' if s < 0 else 'green' for s in scores]
fig.add_trace(go.Bar(
y=words,
x=scores,
orientation='h',
marker_color=colors,
text=[f'{s:.3f}' for s in scores],
textposition='auto',
name='LIME Importance'
))
fig.update_layout(
title="LIME Analysis - Feature Importance",
xaxis_title="Importance Score",
yaxis_title="Words/Phrases",
height=500
)
# Create analysis summary
analysis_data = {
'method': 'LIME',
'language': detected_lang,
'features_analyzed': len(lime_data),
'positive_features': sum(1 for _, score in lime_data if score > 0),
'negative_features': sum(1 for _, score in lime_data if score < 0),
'feature_importance': lime_data
}
summary_text = f"""
**LIME Analysis Results:**
- **Language:** {detected_lang.upper()}
- **Features Analyzed:** {analysis_data['features_analyzed']}
- **Positive Features:** {analysis_data['positive_features']}
- **Negative Features:** {analysis_data['negative_features']}
- **Top Features:** {', '.join([f"{word}({score:.3f})" for word, score in lime_data[:5]])}
"""
return summary_text, fig, analysis_data
except Exception as e:
logger.error(f"LIME analysis failed: {e}")
return f"LIME analysis failed: {str(e)}", None, {}
# Advanced Plotly Visualization System (Updated - removed attention visualization)
class PlotlyVisualizer:
"""Enhanced Plotly visualizations"""
@staticmethod
@handle_errors(default_return=None)
def create_sentiment_gauge(result: Dict, theme: ThemeContext) -> go.Figure:
"""Create animated sentiment gauge"""
colors = theme.colors
if result.get('has_neutral', False):
# Three-way gauge
fig = go.Figure(go.Indicator(
mode="gauge+number+delta",
value=result['pos_prob'] * 100,
domain={'x': [0, 1], 'y': [0, 1]},
title={'text': f"Sentiment: {result['sentiment']}"},
delta={'reference': 50},
gauge={
'axis': {'range': [None, 100]},
'bar': {'color': colors['pos'] if result['sentiment'] == 'Positive' else colors['neg']},
'steps': [
{'range': [0, 33], 'color': colors['neg']},
{'range': [33, 67], 'color': colors['neu']},
{'range': [67, 100], 'color': colors['pos']}
],
'threshold': {
'line': {'color': "red", 'width': 4},
'thickness': 0.75,
'value': 90
}
}
))
else:
# Two-way gauge
fig = go.Figure(go.Indicator(
mode="gauge+number",
value=result['confidence'] * 100,
domain={'x': [0, 1], 'y': [0, 1]},
title={'text': f"Confidence: {result['sentiment']}"},
gauge={
'axis': {'range': [None, 100]},
'bar': {'color': colors['pos'] if result['sentiment'] == 'Positive' else colors['neg']},
'steps': [
{'range': [0, 50], 'color': "lightgray"},
{'range': [50, 100], 'color': "gray"}
]
}
))
fig.update_layout(height=400, font={'size': 16})
return fig
@staticmethod
@handle_errors(default_return=None)
def create_probability_bars(result: Dict, theme: ThemeContext) -> go.Figure:
"""Create probability bar chart"""
colors = theme.colors
if result.get('has_neutral', False):
labels = ['Negative', 'Neutral', 'Positive']
values = [result['neg_prob'], result['neu_prob'], result['pos_prob']]
bar_colors = [colors['neg'], colors['neu'], colors['pos']]
else:
labels = ['Negative', 'Positive']
values = [result['neg_prob'], result['pos_prob']]
bar_colors = [colors['neg'], colors['pos']]
fig = go.Figure(data=[
go.Bar(x=labels, y=values, marker_color=bar_colors,
text=[f'{v:.3f}' for v in values], textposition='outside')
])
fig.update_layout(
title="Sentiment Probabilities",
yaxis_title="Probability",
height=400,
showlegend=False
)
return fig
@staticmethod
@handle_errors(default_return=None)
def create_keyword_chart(keywords: List[Tuple[str, float]], sentiment: str, theme: ThemeContext) -> go.Figure:
"""Create basic keyword chart"""
if not keywords:
fig = go.Figure()
fig.add_annotation(text="No keywords extracted",
xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False)
fig.update_layout(height=400, title="Keywords")
return fig
words = [word for word, score in keywords]
scores = [score for word, score in keywords]
color = theme.colors['pos'] if sentiment == 'Positive' else theme.colors['neg']
fig = go.Figure(data=[
go.Bar(
y=words,
x=scores,
orientation='h',
marker_color=color,
text=[f'{score:.3f}' for score in scores],
textposition='auto'
)
])
fig.update_layout(
title=f"Top Keywords ({sentiment})",
xaxis_title="Frequency Score",
yaxis_title="Keywords",
height=400,
showlegend=False
)
return fig
@staticmethod
@handle_errors(default_return=None)
def create_batch_summary(results: List[Dict], theme: ThemeContext) -> go.Figure:
"""Create batch analysis summary"""
colors = theme.colors
# Count sentiments
sentiments = [r['sentiment'] for r in results if 'sentiment' in r and r['sentiment'] != 'Error']
sentiment_counts = Counter(sentiments)
# Create pie chart
fig = go.Figure(data=[go.Pie(
labels=list(sentiment_counts.keys()),
values=list(sentiment_counts.values()),
marker_colors=[colors.get(s.lower()[:3], '#999999') for s in sentiment_counts.keys()],
textinfo='label+percent',
hole=0.3
)])
fig.update_layout(
title=f"Batch Analysis Summary ({len(results)} texts)",
height=400
)
return fig
@staticmethod
@handle_errors(default_return=None)
def create_confidence_distribution(results: List[Dict]) -> go.Figure:
"""Create confidence distribution plot"""
confidences = [r['confidence'] for r in results if 'confidence' in r and r['sentiment'] != 'Error']
if not confidences:
return go.Figure()
fig = go.Figure(data=[go.Histogram(
x=confidences,
nbinsx=20,
marker_color='skyblue',
opacity=0.7
)])
fig.update_layout(
title="Confidence Distribution",
xaxis_title="Confidence Score",
yaxis_title="Frequency",
height=400
)
return fig
@staticmethod
@handle_errors(default_return=None)
def create_history_dashboard(history: List[Dict], theme: ThemeContext) -> go.Figure:
"""Create comprehensive history dashboard"""
if len(history) < 2:
return go.Figure()
# Create subplots
fig = make_subplots(
rows=2, cols=2,
subplot_titles=['Sentiment Timeline', 'Confidence Distribution',
'Language Distribution', 'Sentiment Summary'],
specs=[[{"secondary_y": False}, {"secondary_y": False}],
[{"type": "pie"}, {"type": "bar"}]]
)
# Extract data
indices = list(range(len(history)))
pos_probs = [item.get('pos_prob', 0) for item in history]
confidences = [item['confidence'] for item in history]
sentiments = [item['sentiment'] for item in history]
languages = [item.get('language', 'en') for item in history]
# Sentiment timeline
colors_map = {'Positive': theme.colors['pos'], 'Negative': theme.colors['neg'], 'Neutral': theme.colors['neu']}
colors = [colors_map.get(s, '#999999') for s in sentiments]
fig.add_trace(
go.Scatter(x=indices, y=pos_probs, mode='lines+markers',
marker=dict(color=colors, size=8),
name='Positive Probability'),
row=1, col=1
)
# Confidence distribution
fig.add_trace(
go.Histogram(x=confidences, nbinsx=10, name='Confidence'),
row=1, col=2
)
# Language distribution
lang_counts = Counter(languages)
fig.add_trace(
go.Pie(labels=list(lang_counts.keys()), values=list(lang_counts.values()),
name="Languages"),
row=2, col=1
)
# Sentiment summary
sent_counts = Counter(sentiments)
sent_colors = [colors_map.get(k, '#999999') for k in sent_counts.keys()]
fig.add_trace(
go.Bar(x=list(sent_counts.keys()), y=list(sent_counts.values()),
marker_color=sent_colors),
row=2, col=2
)
fig.update_layout(height=800, showlegend=False)
return fig
# Universal Data Handler
class DataHandler:
"""Enhanced data operations"""
@staticmethod
@handle_errors(default_return=(None, "Export failed"))
def export_data(data: List[Dict], format_type: str) -> Tuple[Optional[str], str]:
"""Export data with comprehensive information"""
if not data:
return None, "No data to export"
temp_file = tempfile.NamedTemporaryFile(mode='w', delete=False,
suffix=f'.{format_type}', encoding='utf-8')
if format_type == 'csv':
writer = csv.writer(temp_file)
writer.writerow(['Timestamp', 'Text', 'Sentiment', 'Confidence', 'Language',
'Pos_Prob', 'Neg_Prob', 'Neu_Prob', 'Keywords', 'Word_Count'])
for entry in data:
keywords_str = "|".join([f"{word}:{score:.3f}" for word, score in entry.get('keywords', [])])
writer.writerow([
entry.get('timestamp', ''),
entry.get('text', ''),
entry.get('sentiment', ''),
f"{entry.get('confidence', 0):.4f}",
entry.get('language', 'en'),
f"{entry.get('pos_prob', 0):.4f}",
f"{entry.get('neg_prob', 0):.4f}",
f"{entry.get('neu_prob', 0):.4f}",
keywords_str,
entry.get('word_count', 0)
])
elif format_type == 'json':
json.dump(data, temp_file, indent=2, ensure_ascii=False)
temp_file.close()
return temp_file.name, f"Exported {len(data)} entries"
@staticmethod
@handle_errors(default_return="")
def process_file(file) -> str:
"""Process uploaded files"""
if not file:
return ""
content = file.read().decode('utf-8')
if file.name.endswith('.csv'):
csv_file = io.StringIO(content)
reader = csv.reader(csv_file)
try:
next(reader) # Skip header
texts = []
for row in reader:
if row and row[0].strip():
text = row[0].strip().strip('"')
if text:
texts.append(text)
return '\n'.join(texts)
except:
lines = content.strip().split('\n')[1:]
texts = []
for line in lines:
if line.strip():
text = line.strip().strip('"')
if text:
texts.append(text)
return '\n'.join(texts)
return content
# Main Application Class
class SentimentApp:
"""Main multilingual sentiment analysis application"""
def __init__(self):
self.engine = SentimentEngine()
self.advanced_engine = AdvancedAnalysisEngine() # NEW
self.history = HistoryManager()
self.data_handler = DataHandler()
# Multi-language examples
self.examples = [
["This movie was absolutely fantastic! The acting was superb and the plot kept me engaged throughout."],
["The film was disappointing with poor character development and a confusing storyline."],
["这部电影真的很棒!演技精湛,情节引人入胜。"], # Chinese
["Esta película fue increíble, me encantó la cinematografía."], # Spanish
["Ce film était magnifique, j'ai adoré la réalisation."], # French
]
@handle_errors(default_return=("Please enter text", None, None, None))
def analyze_single(self, text: str, language: str, theme: str, clean_text: bool,
remove_punct: bool, remove_nums: bool):
"""Single text analysis with basic visualizations (removed attention analysis)"""
if not text.strip():
return "Please enter text", None, None, None
# Map display names to language codes
language_map = {v: k for k, v in config.SUPPORTED_LANGUAGES.items()}
language_code = language_map.get(language, 'auto')
preprocessing_options = {
'clean_text': clean_text,
'remove_punctuation': remove_punct,
'remove_numbers': remove_nums
}
with memory_cleanup():
result = self.engine.analyze_single(text, language_code, preprocessing_options)
# Add to history
history_entry = {
'text': text[:100] + '...' if len(text) > 100 else text,
'full_text': text,
'sentiment': result['sentiment'],
'confidence': result['confidence'],
'pos_prob': result.get('pos_prob', 0),
'neg_prob': result.get('neg_prob', 0),
'neu_prob': result.get('neu_prob', 0),
'language': result['language'],
'keywords': result['keywords'],
'word_count': result['word_count'],
'analysis_type': 'single'
}
self.history.add(history_entry)
# Create visualizations
theme_ctx = ThemeContext(theme)
gauge_fig = PlotlyVisualizer.create_sentiment_gauge(result, theme_ctx)
bars_fig = PlotlyVisualizer.create_probability_bars(result, theme_ctx)
keyword_fig = PlotlyVisualizer.create_keyword_chart(result['keywords'], result['sentiment'], theme_ctx)
# Create comprehensive result text
keywords_str = ", ".join([f"{word}({score:.3f})" for word, score in result['keywords'][:5]])
info_text = f"""
**Analysis Results:**
- **Sentiment:** {result['sentiment']} ({result['confidence']:.3f} confidence)
- **Language:** {result['language'].upper()}
- **Keywords:** {keywords_str}
- **Statistics:** {result['word_count']} words, {result['char_count']} characters
"""
return info_text, gauge_fig, bars_fig, keyword_fig
@handle_errors(default_return=("Please enter texts", None, None, None))
def analyze_batch(self, batch_text: str, language: str, theme: str,
clean_text: bool, remove_punct: bool, remove_nums: bool):
"""Enhanced batch analysis"""
if not batch_text.strip():
return "Please enter texts (one per line)", None, None, None
# Parse batch input
texts = TextProcessor.parse_batch_input(batch_text)
if len(texts) > config.BATCH_SIZE_LIMIT:
return f"Too many texts. Maximum {config.BATCH_SIZE_LIMIT} allowed.", None, None, None
if not texts:
return "No valid texts found", None, None, None
# Map display names to language codes
language_map = {v: k for k, v in config.SUPPORTED_LANGUAGES.items()}
language_code = language_map.get(language, 'auto')
preprocessing_options = {
'clean_text': clean_text,
'remove_punctuation': remove_punct,
'remove_numbers': remove_nums
}
with memory_cleanup():
results = self.engine.analyze_batch(texts, language_code, preprocessing_options)
# Add to history
batch_entries = []
for result in results:
if 'error' not in result:
entry = {
'text': result['text'],
'full_text': result['full_text'],
'sentiment': result['sentiment'],
'confidence': result['confidence'],
'pos_prob': result.get('pos_prob', 0),
'neg_prob': result.get('neg_prob', 0),
'neu_prob': result.get('neu_prob', 0),
'language': result['language'],
'keywords': result['keywords'],
'word_count': result['word_count'],
'analysis_type': 'batch',
'batch_index': result['batch_index']
}
batch_entries.append(entry)
self.history.add_batch(batch_entries)
# Create visualizations
theme_ctx = ThemeContext(theme)
summary_fig = PlotlyVisualizer.create_batch_summary(results, theme_ctx)
confidence_fig = PlotlyVisualizer.create_confidence_distribution(results)
# Create results DataFrame
df_data = []
for result in results:
if 'error' in result:
df_data.append({
'Index': result['batch_index'] + 1,
'Text': result['text'],
'Sentiment': 'Error',
'Confidence': 0.0,
'Language': 'Unknown',
'Error': result['error']
})
else:
keywords_str = ', '.join([word for word, _ in result['keywords'][:3]])
df_data.append({
'Index': result['batch_index'] + 1,
'Text': result['text'],
'Sentiment': result['sentiment'],
'Confidence': f"{result['confidence']:.3f}",
'Language': result['language'].upper(),
'Keywords': keywords_str
})
df = pd.DataFrame(df_data)
# Create summary text
successful_results = [r for r in results if 'error' not in r]
error_count = len(results) - len(successful_results)
if successful_results:
sentiment_counts = Counter([r['sentiment'] for r in successful_results])
avg_confidence = np.mean([r['confidence'] for r in successful_results])
languages = Counter([r['language'] for r in successful_results])
summary_text = f"""
**Batch Analysis Summary:**
- **Total Texts:** {len(texts)}
- **Successful:** {len(successful_results)}
- **Errors:** {error_count}
- **Average Confidence:** {avg_confidence:.3f}
- **Sentiments:** {dict(sentiment_counts)}
- **Languages Detected:** {dict(languages)}
"""
else:
summary_text = f"All {len(texts)} texts failed to analyze."
return summary_text, df, summary_fig, confidence_fig
# NEW: Advanced analysis methods
@handle_errors(default_return=("Please enter text", None))
def analyze_with_shap(self, text: str, language: str):
"""Perform SHAP analysis"""
language_map = {v: k for k, v in config.SUPPORTED_LANGUAGES.items()}
language_code = language_map.get(language, 'auto')
return self.advanced_engine.analyze_with_shap(text, language_code)
@handle_errors(default_return=("Please enter text", None))
def analyze_with_lime(self, text: str, language: str):
"""Perform LIME analysis"""
language_map = {v: k for k, v in config.SUPPORTED_LANGUAGES.items()}
language_code = language_map.get(language, 'auto')
return self.advanced_engine.analyze_with_lime(text, language_code)
@handle_errors(default_return=(None, "No history available"))
def plot_history(self, theme: str = 'default'):
"""Plot comprehensive history analysis"""
history = self.history.get_all()
if len(history) < 2:
return None, f"Need at least 2 analyses for trends. Current: {len(history)}"
theme_ctx = ThemeContext(theme)
with memory_cleanup():
fig = PlotlyVisualizer.create_history_dashboard(history, theme_ctx)
stats = self.history.get_stats()
stats_text = f"""
**History Statistics:**
- **Total Analyses:** {stats.get('total_analyses', 0)}
- **Positive:** {stats.get('positive_count', 0)}
- **Negative:** {stats.get('negative_count', 0)}
- **Neutral:** {stats.get('neutral_count', 0)}
- **Average Confidence:** {stats.get('avg_confidence', 0):.3f}
- **Languages:** {stats.get('languages_detected', 0)}
- **Most Common Language:** {stats.get('most_common_language', 'N/A').upper()}
"""
return fig, stats_text
@handle_errors(default_return=("No data available",))
def get_history_status(self):
"""Get current history status"""
stats = self.history.get_stats()
if not stats:
return "No analyses performed yet"
return f"""
**Current Status:**
- **Total Analyses:** {stats['total_analyses']}
- **Recent Sentiment Distribution:**
* Positive: {stats['positive_count']}
* Negative: {stats['negative_count']}
* Neutral: {stats['neutral_count']}
- **Average Confidence:** {stats['avg_confidence']:.3f}
- **Languages Detected:** {stats['languages_detected']}
"""
# Gradio Interface (Updated with Advanced Analysis tab)
def create_interface():
"""Create comprehensive Gradio interface with Advanced Analysis tab"""
app = SentimentApp()
with gr.Blocks(theme=gr.themes.Soft(), title="Multilingual Sentiment Analyzer") as demo:
gr.Markdown("# 🌍 Advanced Multilingual Sentiment Analyzer")
gr.Markdown("AI-powered sentiment analysis with support for multiple languages, advanced visualizations, and explainable AI features")
with gr.Tab("Single Analysis"):
with gr.Row():
with gr.Column():
text_input = gr.Textbox(
label="Enter Text for Analysis",
placeholder="Enter your text in any supported language...",
lines=5
)
with gr.Row():
language_selector = gr.Dropdown(
choices=list(config.SUPPORTED_LANGUAGES.values()),
value="Auto Detect",
label="Language"
)
theme_selector = gr.Dropdown(
choices=list(config.THEMES.keys()),
value="default",
label="Theme"
)
with gr.Row():
clean_text_cb = gr.Checkbox(label="Clean Text", value=False)
remove_punct_cb = gr.Checkbox(label="Remove Punctuation", value=False)
remove_nums_cb = gr.Checkbox(label="Remove Numbers", value=False)
analyze_btn = gr.Button("Analyze", variant="primary", size="lg")
gr.Examples(
examples=app.examples,
inputs=text_input,
cache_examples=False
)
with gr.Column():
result_output = gr.Textbox(label="Analysis Results", lines=8)
with gr.Row():
gauge_plot = gr.Plot(label="Sentiment Gauge")
probability_plot = gr.Plot(label="Probability Distribution")
with gr.Row():
keyword_plot = gr.Plot(label="Basic Keywords")
# NEW: Advanced Analysis Tab
with gr.Tab("Advanced Analysis"):
gr.Markdown("## 🔬 Explainable AI Analysis")
gr.Markdown("Use SHAP and LIME to understand which words and phrases most influence the sentiment prediction.")
with gr.Row():
with gr.Column():
advanced_text_input = gr.Textbox(
label="Enter Text for Advanced Analysis",
placeholder="Enter text to analyze with SHAP and LIME...",
lines=6
)
advanced_language = gr.Dropdown(
choices=list(config.SUPPORTED_LANGUAGES.values()),
value="Auto Detect",
label="Language"
)
with gr.Row():
shap_btn = gr.Button("SHAP Analysis", variant="primary")
lime_btn = gr.Button("LIME Analysis", variant="secondary")
gr.Markdown("""
**Analysis Methods:**
- **SHAP**: Shows token-level importance scores
- **LIME**: Explains predictions by perturbing input features
""")
with gr.Column():
advanced_results = gr.Textbox(label="Analysis Summary", lines=10)
with gr.Row():
advanced_plot = gr.Plot(label="Feature Importance Visualization")
with gr.Tab("Batch Analysis"):
with gr.Row():
with gr.Column():
file_upload = gr.File(
label="Upload File (CSV/TXT)",
file_types=[".csv", ".txt"]
)
batch_input = gr.Textbox(
label="Batch Input (one text per line)",
placeholder="Enter multiple texts, one per line...",
lines=10
)
with gr.Row():
batch_language = gr.Dropdown(
choices=list(config.SUPPORTED_LANGUAGES.values()),
value="Auto Detect",
label="Language"
)
batch_theme = gr.Dropdown(
choices=list(config.THEMES.keys()),
value="default",
label="Theme"
)
with gr.Row():
batch_clean_cb = gr.Checkbox(label="Clean Text", value=False)
batch_punct_cb = gr.Checkbox(label="Remove Punctuation", value=False)
batch_nums_cb = gr.Checkbox(label="Remove Numbers", value=False)
with gr.Row():
load_file_btn = gr.Button("Load File")
analyze_batch_btn = gr.Button("Analyze Batch", variant="primary")
with gr.Column():
batch_summary = gr.Textbox(label="Batch Summary", lines=8)
batch_results_df = gr.Dataframe(
label="Detailed Results",
headers=["Index", "Text", "Sentiment", "Confidence", "Language", "Keywords"],
datatype=["number", "str", "str", "str", "str", "str"]
)
with gr.Row():
batch_plot = gr.Plot(label="Batch Analysis Summary")
confidence_dist_plot = gr.Plot(label="Confidence Distribution")
with gr.Tab("History & Analytics"):
with gr.Row():
with gr.Column():
with gr.Row():
refresh_history_btn = gr.Button("Refresh History")
clear_history_btn = gr.Button("Clear History", variant="stop")
status_btn = gr.Button("Get Status")
history_theme = gr.Dropdown(
choices=list(config.THEMES.keys()),
value="default",
label="Dashboard Theme"
)
with gr.Row():
export_csv_btn = gr.Button("Export CSV")
export_json_btn = gr.Button("Export JSON")
with gr.Column():
history_status = gr.Textbox(label="History Status", lines=8)
history_dashboard = gr.Plot(label="History Analytics Dashboard")
with gr.Row():
csv_download = gr.File(label="CSV Download", visible=True)
json_download = gr.File(label="JSON Download", visible=True)
# Event Handlers
# Single Analysis
analyze_btn.click(
app.analyze_single,
inputs=[text_input, language_selector, theme_selector,
clean_text_cb, remove_punct_cb, remove_nums_cb],
outputs=[result_output, gauge_plot, probability_plot, keyword_plot]
)
# Advanced Analysis (NEW)
shap_btn.click(
app.analyze_with_shap,
inputs=[advanced_text_input, advanced_language],
outputs=[advanced_results, advanced_plot]
)
lime_btn.click(
app.analyze_with_lime,
inputs=[advanced_text_input, advanced_language],
outputs=[advanced_results, advanced_plot]
)
# Batch Analysis
load_file_btn.click(
app.data_handler.process_file,
inputs=file_upload,
outputs=batch_input
)
analyze_batch_btn.click(
app.analyze_batch,
inputs=[batch_input, batch_language, batch_theme,
batch_clean_cb, batch_punct_cb, batch_nums_cb],
outputs=[batch_summary, batch_results_df, batch_plot, confidence_dist_plot]
)
# History & Analytics
refresh_history_btn.click(
app.plot_history,
inputs=history_theme,
outputs=[history_dashboard, history_status]
)
clear_history_btn.click(
lambda: f"Cleared {app.history.clear()} entries",
outputs=history_status
)
status_btn.click(
app.get_history_status,
outputs=history_status
)
export_csv_btn.click(
lambda: app.data_handler.export_data(app.history.get_all(), 'csv'),
outputs=[csv_download, history_status]
)
export_json_btn.click(
lambda: app.data_handler.export_data(app.history.get_all(), 'json'),
outputs=[json_download, history_status]
)
return demo
# Application Entry Point
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
try:
demo = create_interface()
demo.launch(
share=True,
server_name="0.0.0.0",
server_port=7860,
show_error=True
)
except Exception as e:
logger.error(f"Failed to launch application: {e}")
raise