entropy25's picture
Update app.py
1ad3bd1 verified
raw
history blame
59 kB
import torch
import gradio as gr
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
import numpy as np
from wordcloud import WordCloud
from collections import Counter, defaultdict
from dataclasses import dataclass
from typing import List, Dict, Optional, Tuple, Any, Callable
from contextlib import contextmanager
import nltk
from nltk.corpus import stopwords
import langdetect
import pandas as pd
import gc
# Configuration
CACHE_SIZE: int = 128
BATCH_PROCESSING_SIZE: int = 8
# Supported languages and models
SUPPORTED_LANGUAGES = {
'auto': 'Auto Detect',
'en': 'English',
'zh': 'Chinese',
'es': 'Spanish',
'fr': 'French',
'de': 'German',
'sv': 'Swedish'
}
MODELS = {
'en': "cardiffnlp/twitter-roberta-base-sentiment-latest",
'multilingual': "cardiffnlp/twitter-xlm-roberta-base-sentiment",
'zh': "uer/roberta-base-finetuned-dianping-chinese"
}
# Color themes for Plotly
THEMES = {
'default': {'pos': '#4CAF50', 'neg': '#F44336', 'neu': '#FF9800'},
'ocean': {'pos': '#0077BE', 'neg': '#FF6B35', 'neu': '#00BCD4'},
'dark': {'pos': '#66BB6A', 'neg': '#EF5350', 'neu': '#FFA726'},
'rainbow': {'pos': '#9C27B0', 'neg': '#E91E63', 'neu': '#FF5722'}
}
config = Config()
# Logging setup
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize NLTK
try:
nltk.download('stopwords', quiet=True)
nltk.download('punkt', quiet=True)
STOP_WORDS = set(stopwords.words('english'))
except:
STOP_WORDS = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by'}
# Decorators and Context Managers
def handle_errors(default_return=None):
"""Centralized error handling decorator"""
return decorator
@contextmanager
def memory_cleanup():
"""Context manager for memory cleanup"""
try:
yield
finally:
gc.collect()
class ThemeContext:
self.theme = theme
self.colors = config.THEMES.get(theme, config.THEMES['default'])
# Enhanced Model Manager with Multi-language Support
class ModelManager:
"""Multi-language model manager with lazy loading"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if not self._initialized:
self.models = {}
self.tokenizers = {}
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self._load_default_models()
self._initialized = True
def _load_default_models(self):
"""Load default models"""
try:
# Load multilingual model as default
model_name = config.MODELS['multilingual']
self.tokenizers['default'] = AutoTokenizer.from_pretrained(model_name)
self.models['default'] = AutoModelForSequenceClassification.from_pretrained(model_name)
self.models['default'].to(self.device)
logger.info(f"Default model loaded: {model_name}")
# Load Chinese model
zh_model_name = config.MODELS['zh']
self.tokenizers['zh'] = AutoTokenizer.from_pretrained(zh_model_name)
self.models['zh'] = AutoModelForSequenceClassification.from_pretrained(zh_model_name)
self.models['zh'].to(self.device)
logger.info(f"Chinese model loaded: {zh_model_name}")
except Exception as e:
logger.error(f"Failed to load models: {e}")
raise
def get_model(self, language='en'):
"""Get model for specific language"""
if language == 'zh':
return self.models['zh'], self.tokenizers['zh']
return self.models['default'], self.tokenizers['default']
@staticmethod
def detect_language(text: str) -> str:
"""Detect text language"""
try:
detected = langdetect.detect(text)
language_mapping = {
'zh-cn': 'zh',
'zh-tw': 'zh'
}
detected = language_mapping.get(detected, detected)
return detected if detected in config.SUPPORTED_LANGUAGES else 'en'
except:
return 'en'
# Simplified Text Processing
class TextProcessor:
"""Optimized text processing with multi-language support"""
@staticmethod
@lru_cache(maxsize=config.CACHE_SIZE)
def clean_text(text: str, remove_punctuation: bool = True, remove_numbers: bool = False) -> str:
"""Clean text with language awareness"""
text = text.strip()
# Don't clean Chinese text aggressively
if re.search(r'[\u4e00-\u9fff]', text):
return text
text = text.lower()
if remove_numbers:
text = re.sub(r'\d+', '', text)
if remove_punctuation:
text = re.sub(r'[^\w\s]', '', text)
words = text.split()
cleaned_words = [w for w in words if w not in STOP_WORDS and len(w) >= config.MIN_WORD_LENGTH]
return ' '.join(cleaned_words)
@staticmethod
def extract_keywords(text: str, top_k: int = 5) -> List[str]:
"""Extract keywords with language support"""
if re.search(r'[\u4e00-\u9fff]', text):
# Chinese text processing
words = re.findall(r'[\u4e00-\u9fff]+', text)
all_chars = ''.join(words)
char_freq = Counter(all_chars)
return [char for char, _ in char_freq.most_common(top_k)]
else:
# Other languages
cleaned = TextProcessor.clean_text(text)
words = cleaned.split()
word_freq = Counter(words)
return [word for word, _ in word_freq.most_common(top_k)]
@staticmethod
def parse_batch_input(text: str) -> List[str]:
"""Parse batch input from textarea"""
lines = text.strip().split('\n')
return [line.strip() for line in lines if line.strip()]
# Enhanced History Manager
class HistoryManager:
"""Enhanced history management with filtering"""
def __init__(self):
self._history = []
def add(self, entry: Dict):
"""Add entry with timestamp"""
entry['timestamp'] = datetime.now().isoformat()
self._history.append(entry)
if len(self._history) > config.MAX_HISTORY_SIZE:
self._history = self._history[-config.MAX_HISTORY_SIZE:]
def add_batch(self, entries: List[Dict]):
"""Add multiple entries"""
for entry in entries:
self.add(entry)
def get_all(self) -> List[Dict]:
return self._history.copy()
def get_recent(self, n: int = 10) -> List[Dict]:
return self._history[-n:] if self._history else []
def filter_by(self, sentiment: str = None, language: str = None,
min_confidence: float = None) -> List[Dict]:
"""Filter history by criteria"""
filtered = self._history
if sentiment:
filtered = [h for h in filtered if h['sentiment'] == sentiment]
if language:
filtered = [h for h in filtered if h.get('language', 'en') == language]
if min_confidence:
filtered = [h for h in filtered if h['confidence'] >= min_confidence]
return filtered
def clear(self) -> int:
count = len(self._history)
self._history.clear()
def size(self) -> int:
return len(self._history)
def get_stats(self) -> Dict:
"""Get comprehensive statistics"""
if not self._history:
return {}
sentiments = [item['sentiment'] for item in self._history]
confidences = [item['confidence'] for item in self._history]
languages = [item.get('language', 'en') for item in self._history]
return {
'total_analyses': len(self._history),
'positive_count': sentiments.count('Positive'),
'negative_count': sentiments.count('Negative'),
'neutral_count': sentiments.count('Neutral'),
'avg_confidence': np.mean(confidences),
'max_confidence': np.max(confidences),
'min_confidence': np.min(confidences),
'languages_detected': len(set(languages)),
'most_common_language': Counter(languages).most_common(1)[0][0] if languages else 'en'
}
# Core Sentiment Analysis Engine
class SentimentEngine:
"""Multi-language sentiment analysis engine"""
def __init__(self):
self.model_manager = ModelManager()
def extract_attention_keywords(self, text: str, language: str = 'auto', top_k: int = 10) -> List[Tuple[str, float]]:
"""Extract keywords using attention weights"""
try:
if language == 'auto':
language = self.model_manager.detect_language(text)
model, tokenizer = self.model_manager.get_model(language)
inputs = tokenizer(
text, return_tensors="pt", padding=True,
truncation=True, max_length=config.MAX_TEXT_LENGTH
).to(self.model_manager.device)
with torch.no_grad():
outputs = model(**inputs, output_attentions=True)
if hasattr(outputs, 'attentions') and outputs.attentions:
# Use attention weights
attention = outputs.attentions[-1]
avg_attention = attention.mean(dim=1)[0, 0, :]
tokens = tokenizer.convert_ids_to_tokens(inputs['input_ids'][0])
attention_scores = avg_attention.cpu().numpy()
# Process tokens and scores
word_scores = {}
current_word = ""
current_score = 0.0
for token, score in zip(tokens, attention_scores):
if token in ['[CLS]', '[SEP]', '[PAD]', '<s>', '</s>']:
continue
if token.startswith('##') or token.startswith('▁'):
current_word += token.replace('##', '').replace('▁', '')
current_score = max(current_score, score)
else:
if current_word and len(current_word) >= config.MIN_WORD_LENGTH:
word_scores[current_word.lower()] = current_score
current_word = token
current_score = score
if current_word and len(current_word) >= config.MIN_WORD_LENGTH:
word_scores[current_word.lower()] = current_score
# Filter and sort
filtered_words = {
word: score for word, score in word_scores.items()
if word not in STOP_WORDS and len(word) >= config.MIN_WORD_LENGTH
}
sorted_words = sorted(filtered_words.items(), key=lambda x: x[1], reverse=True)
return sorted_words[:top_k]
except Exception as e:
logger.error(f"Attention keyword extraction failed: {e}")
# Fallback to simple keyword extraction
keywords = TextProcessor.extract_keywords(text, top_k)
return [(word, 0.1) for word in keywords]
@handle_errors(default_return={'sentiment': 'Unknown', 'confidence': 0.0, 'keywords': []})
def analyze_single(self, text: str, language: str = 'auto', preprocessing_options: Dict = None) -> Dict:
"""Analyze single text with enhanced features"""
if not text.strip():
raise ValueError("Empty text provided")
# Detect language
if language == 'auto':
detected_lang = self.model_manager.detect_language(text)
else:
detected_lang = language
# Get appropriate model
model, tokenizer = self.model_manager.get_model(detected_lang)
# Preprocessing
options = preprocessing_options or {}
processed_text = text
if options.get('clean_text', False) and not re.search(r'[\u4e00-\u9fff]', text):
processed_text = TextProcessor.clean_text(
text,
options.get('remove_punctuation', True),
options.get('remove_numbers', False)
)
# Tokenize and analyze
inputs = tokenizer(processed_text, return_tensors="pt", padding=True,
truncation=True, max_length=config.MAX_TEXT_LENGTH).to(self.model_manager.device)
with torch.no_grad():
outputs = model(**inputs)
probs = torch.nn.functional.softmax(outputs.logits, dim=-1).cpu().numpy()[0]
# Handle different model outputs
if len(probs) == 3: # negative, neutral, positive
sentiment_idx = np.argmax(probs)
sentiment_labels = ['Negative', 'Neutral', 'Positive']
sentiment = sentiment_labels[sentiment_idx]
confidence = float(probs[sentiment_idx])
result = {
'sentiment': sentiment,
'confidence': confidence,
'neg_prob': float(probs[0]),
'neu_prob': float(probs[1]),
'pos_prob': float(probs[2]),
'has_neutral': True
}
else: # negative, positive
pred = np.argmax(probs)
sentiment = "Positive" if pred == 1 else "Negative"
confidence = float(probs[pred])
result = {
'sentiment': sentiment,
'confidence': confidence,
'neg_prob': float(probs[0]),
'pos_prob': float(probs[1]),
'neu_prob': 0.0,
'has_neutral': False
}
# Extract keywords
keywords = self.extract_attention_keywords(text, detected_lang)
# Add metadata
result.update({
'language': detected_lang,
'keywords': keywords,
'word_count': len(text.split()),
'char_count': len(text)
})
return result
@handle_errors(default_return=[])
def analyze_batch(self, texts: List[str], language: str = 'auto',
preprocessing_options: Dict = None, progress_callback=None) -> List[Dict]:
"""Optimized batch processing"""
if len(texts) > config.BATCH_SIZE_LIMIT:
texts = texts[:config.BATCH_SIZE_LIMIT]
if progress_callback:
progress_callback((i + len(batch)) / len(texts))
for text in batch:
try:
result = self.analyze_single(text, language, preprocessing_options)
result['batch_index'] = len(results)
result['text'] = text[:100] + '...' if len(text) > 100 else text
result['full_text'] = text
results.append(result)
except Exception as e:
results.append({
'sentiment': 'Error',
'confidence': 0.0,
'error': str(e),
'batch_index': len(results),
'text': text[:100] + '...' if len(text) > 100 else text,
'full_text': text
})
return results
# Advanced Plotly Visualization System
class PlotlyVisualizer:
"""Enhanced Plotly visualizations"""
@staticmethod
@handle_errors(default_return=None)
def create_sentiment_gauge(result: Dict, theme: ThemeContext) -> go.Figure:
"""Create animated sentiment gauge"""
colors = theme.colors
if result.get('has_neutral', False):
# Three-way gauge
fig = go.Figure(go.Indicator(
mode="gauge+number+delta",
value=result['pos_prob'] * 100,
domain={'x': [0, 1], 'y': [0, 1]},
title={'text': f"Sentiment: {result['sentiment']}"},
delta={'reference': 50},
gauge={
'axis': {'range': [None, 100]},
'bar': {'color': colors['pos'] if result['sentiment'] == 'Positive' else colors['neg']},
'steps': [
{'range': [0, 33], 'color': colors['neg']},
{'range': [33, 67], 'color': colors['neu']},
{'range': [67, 100], 'color': colors['pos']}
],
'threshold': {
'line': {'color': "red", 'width': 4},
'thickness': 0.75,
'value': 90
}
}
))
else:
# Two-way gauge
fig = go.Figure(go.Indicator(
mode="gauge+number",
value=result['confidence'] * 100,
domain={'x': [0, 1], 'y': [0, 1]},
title={'text': f"Confidence: {result['sentiment']}"},
gauge={
'axis': {'range': [None, 100]},
'bar': {'color': colors['pos'] if result['sentiment'] == 'Positive' else colors['neg']},
'steps': [
{'range': [0, 50], 'color': "lightgray"},
{'range': [50, 100], 'color': "gray"}
]
}
))
fig.update_layout(height=400, font={'size': 16})
return fig
@staticmethod
@handle_errors(default_return=None)
def create_probability_bars(result: Dict, theme: ThemeContext) -> go.Figure:
"""Create probability bar chart"""
colors = theme.colors
if result.get('has_neutral', False):
labels = ['Negative', 'Neutral', 'Positive']
values = [result['neg_prob'], result['neu_prob'], result['pos_prob']]
bar_colors = [colors['neg'], colors['neu'], colors['pos']]
else:
labels = ['Negative', 'Positive']
values = [result['neg_prob'], result['pos_prob']]
bar_colors = [colors['neg'], colors['pos']]
fig = go.Figure(data=[
go.Bar(x=labels, y=values, marker_color=bar_colors,
text=[f'{v:.3f}' for v in values], textposition='outside')
])
fig.update_layout(
title="Sentiment Probabilities",
yaxis_title="Probability",
height=400,
showlegend=False
)
return fig
@staticmethod
@handle_errors(default_return=None)
def create_keyword_chart(keywords: List[Tuple[str, float]], sentiment: str, theme: ThemeContext) -> go.Figure:
"""Create keyword importance chart"""
if not keywords:
fig = go.Figure()
fig.add_annotation(text="No keywords extracted",
xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False)
fig.update_layout(height=400, title="Keywords")
return fig
words = [word for word, score in keywords]
scores = [score for word, score in keywords]
color = theme.colors['pos'] if sentiment == 'Positive' else theme.colors['neg']
fig = go.Figure(data=[
go.Bar(
y=words,
x=scores,
orientation='h',
marker_color=color,
text=[f'{score:.3f}' for score in scores],
textposition='auto'
)
])
fig.update_layout(
title=f"Top Keywords ({sentiment})",
xaxis_title="Attention Weight",
yaxis_title="Keywords",
height=400,
showlegend=False
)
return fig
@staticmethod
@handle_errors(default_return=None)
def create_batch_summary(results: List[Dict], theme: ThemeContext) -> go.Figure:
"""Create batch analysis summary"""
colors = theme.colors
# Count sentiments
sentiments = [r['sentiment'] for r in results if 'sentiment' in r and r['sentiment'] != 'Error']
sentiment_counts = Counter(sentiments)
# Create pie chart
fig = go.Figure(data=[go.Pie(
labels=list(sentiment_counts.keys()),
values=list(sentiment_counts.values()),
marker_colors=[colors.get(s.lower()[:3], '#999999') for s in sentiment_counts.keys()],
textinfo='label+percent',
hole=0.3
)])
fig.update_layout(
title=f"Batch Analysis Summary ({len(results)} texts)",
height=400
)
return fig
@staticmethod
@handle_errors(default_return=None)
def create_confidence_distribution(results: List[Dict]) -> go.Figure:
"""Create confidence distribution plot"""
confidences = [r['confidence'] for r in results if 'confidence' in r and r['sentiment'] != 'Error']
if not confidences:
return go.Figure()
fig = go.Figure(data=[go.Histogram(
x=confidences,
nbinsx=20,
marker_color='skyblue',
opacity=0.7
)])
fig.update_layout(
title="Confidence Distribution",
xaxis_title="Confidence Score",
yaxis_title="Frequency",
height=400
)
return fig
@staticmethod
@handle_errors(default_return=None)
def create_history_dashboard(history: List[Dict], theme: ThemeContext) -> go.Figure:
"""Create comprehensive history dashboard"""
if len(history) < 2:
return go.Figure()
# Create subplots
fig = make_subplots(
rows=2, cols=2,
subplot_titles=['Sentiment Timeline', 'Confidence Distribution',
'Language Distribution', 'Sentiment Summary'],
specs=[[{"secondary_y": False}, {"secondary_y": False}],
[{"type": "pie"}, {"type": "bar"}]]
)
# Extract data
indices = list(range(len(history)))
pos_probs = [item.get('pos_prob', 0) for item in history]
confidences = [item['confidence'] for item in history]
sentiments = [item['sentiment'] for item in history]
languages = [item.get('language', 'en') for item in history]
# Sentiment timeline
colors_map = {'Positive': theme.colors['pos'], 'Negative': theme.colors['neg'], 'Neutral': theme.colors['neu']}
colors = [colors_map.get(s, '#999999') for s in sentiments]
fig.add_trace(
go.Scatter(x=indices, y=pos_probs, mode='lines+markers',
marker=dict(color=colors, size=8),
name='Positive Probability'),
row=1, col=1
)
# Confidence distribution
fig.add_trace(
go.Histogram(x=confidences, nbinsx=10, name='Confidence'),
row=1, col=2
)
# Language distribution
lang_counts = Counter(languages)
fig.add_trace(
go.Pie(labels=list(lang_counts.keys()), values=list(lang_counts.values()),
name="Languages"),
row=2, col=1
)
# Sentiment summary
sent_counts = Counter(sentiments)
sent_colors = [colors_map.get(k, '#999999') for k in sent_counts.keys()]
fig.add_trace(
go.Bar(x=list(sent_counts.keys()), y=list(sent_counts.values()),
marker_color=sent_colors),
row=2, col=2
)
fig.update_layout(height=800, showlegend=False)
return fig
# Universal Data Handler
class DataHandler:
"""Enhanced data operations"""
@staticmethod
@handle_errors(default_return=(None, "Export failed"))
def export_data(data: List[Dict], format_type: str) -> Tuple[Optional[str], str]:
"""Export data with comprehensive information"""
if not data:
return None, "No data to export"
if format_type == 'csv':
writer = csv.writer(temp_file)
writer.writerow(['Timestamp', 'Text', 'Sentiment', 'Confidence', 'Language',
'Pos_Prob', 'Neg_Prob', 'Neu_Prob', 'Keywords', 'Word_Count'])
for entry in data:
keywords_str = "|".join([f"{word}:{score:.3f}" for word, score in entry.get('keywords', [])])
writer.writerow([
entry.get('timestamp', ''),
entry.get('text', ''),
entry.get('sentiment', ''),
f"{entry.get('confidence', 0):.4f}",
entry.get('language', 'en'),
f"{entry.get('pos_prob', 0):.4f}",
f"{entry.get('neg_prob', 0):.4f}",
f"{entry.get('neu_prob', 0):.4f}",
keywords_str,
entry.get('word_count', 0)
])
elif format_type == 'json':
json.dump(data, temp_file, indent=2, ensure_ascii=False)
temp_file.close()
return temp_file.name, f"Exported {len(data)} entries"
@staticmethod
@handle_errors(default_return="")
def process_file(file) -> str:
"""Process uploaded files"""
if not file:
return ""
content = file.read().decode('utf-8')
if file.name.endswith('.csv'):
csv_file = io.StringIO(content)
reader = csv.reader(csv_file)
try:
next(reader) # Skip header
texts = []
for row in reader:
if row and row[0].strip():
text = row[0].strip().strip('"')
if text:
texts.append(text)
return '\n'.join(texts)
except:
lines = content.strip().split('\n')[1:]
texts = []
for line in lines:
if line.strip():
if text:
texts.append(text)
return '\n'.join(texts)
return content
# Main Application Class
class SentimentApp:
"""Main multilingual sentiment analysis application"""
def __init__(self):
self.engine = SentimentEngine()
self.history = HistoryManager()
self.data_handler = DataHandler()
# Multi-language examples
self.examples = [
["This movie was absolutely fantastic! The acting was superb and the plot kept me engaged throughout."],
["The film was disappointing with poor character development and a confusing storyline."],
["这部电影真的很棒!演技精湛,情节引人入胜。"], # Chinese
["Esta película fue increíble, me encantó la cinematografía."], # Spanish
["Ce film était magnifique, j'ai adoré la réalisation."], # French
]
@handle_errors(default_return=("Please enter text", None, None, None))
def analyze_single(self, text: str, language: str, theme: str, clean_text: bool,
remove_punct: bool, remove_nums: bool):
"""Single text analysis with enhanced visualizations"""
if not text.strip():
return "Please enter text", None, None, None
# Map display names to language codes
language_map = {v: k for k, v in config.SUPPORTED_LANGUAGES.items()}
language_code = language_map.get(language, 'auto')
preprocessing_options = {
'clean_text': clean_text,
'remove_punctuation': remove_punct,
'remove_numbers': remove_nums
}
with memory_cleanup():
result = self.engine.analyze_single(text, language_code, preprocessing_options)
# Add to history
history_entry = {
'text': text[:100] + '...' if len(text) > 100 else text,
'full_text': text,
'sentiment': result['sentiment'],
'confidence': result['confidence'],
'pos_prob': result.get('pos_prob', 0),
'neg_prob': result.get('neg_prob', 0),
'neu_prob': result.get('neu_prob', 0),
'language': result['language'],
'keywords': result['keywords'],
'word_count': result['word_count'],
'analysis_type': 'single'
}
self.history.add(history_entry)
# Create visualizations
theme_ctx = ThemeContext(theme)
gauge_fig = PlotlyVisualizer.create_sentiment_gauge(result, theme_ctx)
bars_fig = PlotlyVisualizer.create_probability_bars(result, theme_ctx)
keyword_fig = PlotlyVisualizer.create_keyword_chart(result['keywords'], result['sentiment'], theme_ctx)
# Create comprehensive result text
keywords_str = ", ".join([f"{word}({score:.3f})" for word, score in result['keywords'][:5]])
info_text = f"""
**Analysis Results:**
- **Sentiment:** {result['sentiment']} ({result['confidence']:.3f} confidence)
- **Language:** {result['language'].upper()}
- **Keywords:** {keywords_str}
- **Statistics:** {result['word_count']} words, {result['char_count']} characters
"""
return info_text, gauge_fig, bars_fig, keyword_fig
@handle_errors(default_return=("Please enter texts", None, None, None))
def analyze_batch(self, batch_text: str, language: str, theme: str,
clean_text: bool, remove_punct: bool, remove_nums: bool):
"""Enhanced batch analysis"""
if not batch_text.strip():
return "Please enter texts (one per line)", None, None, None
# Parse batch input
texts = TextProcessor.parse_batch_input(batch_text)
if len(texts) > config.BATCH_SIZE_LIMIT:
return f"Too many texts. Maximum {config.BATCH_SIZE_LIMIT} allowed.", None, None, None
if not texts:
return "No valid texts found", None, None, None
# Map display names to language codes
language_map = {v: k for k, v in config.SUPPORTED_LANGUAGES.items()}
language_code = language_map.get(language, 'auto')
preprocessing_options = {
'clean_text': clean_text,
'remove_punctuation': remove_punct,
'remove_numbers': remove_nums
}
with memory_cleanup():
results = self.engine.analyze_batch(texts, language_code, preprocessing_options)
# Add to history
batch_entries = []
for result in results:
if 'error' not in result:
entry = {
'text': result['text'],
'full_text': result['full_text'],
'sentiment': result['sentiment'],
'confidence': result['confidence'],
'pos_prob': result.get('pos_prob', 0),
'neg_prob': result.get('neg_prob', 0),
'neu_prob': result.get('neu_prob', 0),
'language': result['language'],
'keywords': result['keywords'],
'word_count': result['word_count'],
'analysis_type': 'batch',
'batch_index': result['batch_index']
}
batch_entries.append(entry)
self.history.add_batch(batch_entries)
# Create visualizations
theme_ctx = ThemeContext(theme)
summary_fig = PlotlyVisualizer.create_batch_summary(results, theme_ctx)
confidence_fig = PlotlyVisualizer.create_confidence_distribution(results)
# Create results DataFrame
df_data = []
for result in results:
if 'error' in result:
df_data.append({
'Index': result['batch_index'] + 1,
'Text': result['text'],
'Sentiment': 'Error',
'Confidence': 0.0,
'Language': 'Unknown',
'Error': result['error']
})
else:
keywords_str = ', '.join([word for word, _ in result['keywords'][:3]])
df_data.append({
'Index': result['batch_index'] + 1,
'Text': result['text'],
'Sentiment': result['sentiment'],
'Confidence': f"{result['confidence']:.3f}",
'Language': result['language'].upper(),
'Keywords': keywords_str
})
df = pd.DataFrame(df_data)
# Create summary text
successful_results = [r for r in results if 'error' not in r]
error_count = len(results) - len(successful_results)
if successful_results:
sentiment_counts = Counter([r['sentiment'] for r in successful_results])
avg_confidence = np.mean([r['confidence'] for r in successful_results])
languages = Counter([r['language'] for r in successful_results])
summary_text = f"""
**Batch Analysis Summary:**
- **Total Texts:** {len(texts)}
- **Successful:** {len(successful_results)}
- **Errors:** {error_count}
- **Average Confidence:** {avg_confidence:.3f}
- **Sentiments:** {dict(sentiment_counts)}
- **Languages Detected:** {dict(languages)}
"""
else:
summary_text = f"All {len(texts)} texts failed to analyze."
return summary_text, df, summary_fig, confidence_fig
@handle_errors(default_return=(None, "No history available"))
def plot_history(self, theme: str = 'default'):
"""Plot comprehensive history analysis"""
history = self.history.get_all()
if len(history) < 2:
return None, f"Need at least 2 analyses for trends. Current: {len(history)}"
theme_ctx = ThemeContext(theme)
with memory_cleanup():
fig = PlotlyVisualizer.create_history_dashboard(history, theme_ctx)
stats = self.history.get_stats()
stats_text = f"""
**History Statistics:**
- **Total Analyses:** {stats.get('total_analyses', 0)}
- **Positive:** {stats.get('positive_count', 0)}
- **Negative:** {stats.get('negative_count', 0)}
- **Neutral:** {stats.get('neutral_count', 0)}
- **Average Confidence:** {stats.get('avg_confidence', 0):.3f}
- **Languages:** {stats.get('languages_detected', 0)}
- **Most Common Language:** {stats.get('most_common_language', 'N/A').upper()}
"""
return fig, stats_text
@handle_errors(default_return=("No data available",))
def get_history_status(self):
"""Get current history status"""
stats = self.history.get_stats()
if not stats:
return "No analyses performed yet"
return f"""
**Current Status:**
- **Total Analyses:** {stats['total_analyses']}
- **Recent Sentiment Distribution:**
* Positive: {stats['positive_count']}
* Negative: {stats['negative_count']}
* Neutral: {stats['neutral_count']}
- **Average Confidence:** {stats['avg_confidence']:.3f}
- **Languages Detected:** {stats['languages_detected']}
"""
# Gradio Interface
def create_interface():
"""Create comprehensive Gradio interface"""
app = SentimentApp()
with gr.Blocks(theme=gr.themes.Soft(), title="Multilingual Sentiment Analyzer") as demo:
gr.Markdown("# 🌍 Advanced Multilingual Sentiment Analyzer")
gr.Markdown("AI-powered sentiment analysis with support for multiple languages, advanced visualizations, and explainable AI features")
with gr.Tab("Single Analysis"):
with gr.Row():
with gr.Column():
text_input = gr.Textbox(
label="Enter Text for Analysis",
placeholder="Enter your text in any supported language...",
lines=5
)
with gr.Row():
language_selector = gr.Dropdown(
choices=list(config.SUPPORTED_LANGUAGES.values()),
value="Auto Detect",
label="Language"
)
theme_selector = gr.Dropdown(
choices=list(config.THEMES.keys()),
value="default",
label="Theme"
)
with gr.Row():
clean_text_cb = gr.Checkbox(label="Clean Text", value=False)
remove_punct_cb = gr.Checkbox(label="Remove Punctuation", value=False)
remove_nums_cb = gr.Checkbox(label="Remove Numbers", value=False)
analyze_btn = gr.Button("Analyze", variant="primary", size="lg")
gr.Examples(
examples=app.examples,
inputs=text_input,
cache_examples=False
)
with gr.Column():
result_output = gr.Textbox(label="Analysis Results", lines=8)
with gr.Row():
gauge_plot = gr.Plot(label="Sentiment Gauge")
probability_plot = gr.Plot(label="Probability Distribution")
with gr.Row():
keyword_plot = gr.Plot(label="Key Contributing Words")
with gr.Tab("Batch Analysis"):
with gr.Row():
with gr.Column():
file_upload = gr.File(
label="Upload File (CSV/TXT)",
file_types=[".csv", ".txt"]
)
batch_input = gr.Textbox(
label="Batch Input (one text per line)",
placeholder="Enter multiple texts, one per line...",
lines=10
)
with gr.Row():
batch_language = gr.Dropdown(
choices=list(config.SUPPORTED_LANGUAGES.values()),
value="Auto Detect",
label="Language"
)
batch_theme = gr.Dropdown(
choices=list(config.THEMES.keys()),
value="default",
label="Theme"
)
with gr.Row():
batch_clean_cb = gr.Checkbox(label="Clean Text", value=False)
batch_punct_cb = gr.Checkbox(label="Remove Punctuation", value=False)
batch_nums_cb = gr.Checkbox(label="Remove Numbers", value=False)
with gr.Row():
load_file_btn = gr.Button("Load File")
analyze_batch_btn = gr.Button("Analyze Batch", variant="primary")
with gr.Column():
batch_summary = gr.Textbox(label="Batch Summary", lines=8)
batch_results_df = gr.Dataframe(
label="Detailed Results",
headers=["Index", "Text", "Sentiment", "Confidence", "Language", "Keywords"],
datatype=["number", "str", "str", "str", "str", "str"]
)
with gr.Row():
batch_plot = gr.Plot(label="Batch Analysis Summary")
confidence_dist_plot = gr.Plot(label="Confidence Distribution")
with gr.Tab("History & Analytics"):
with gr.Row():
with gr.Column():
with gr.Row():
refresh_history_btn = gr.Button("Refresh History")
clear_history_btn = gr.Button("Clear History", variant="stop")
status_btn = gr.Button("Get Status")
history_theme = gr.Dropdown(
choices=list(config.THEMES.keys()),
value="default",
label="Dashboard Theme"
)
with gr.Row():
export_csv_btn = gr.Button("Export CSV")
export_json_btn = gr.Button("Export JSON")
with gr.Column():
history_status = gr.Textbox(label="History Status", lines=8)
history_dashboard = gr.Plot(label="History Analytics Dashboard")
with gr.Row():
csv_download = gr.File(label="CSV Download", visible=True)
json_download = gr.File(label="JSON Download", visible=True)
# Event Handlers
analyze_btn.click(
app.analyze_single,
inputs=[text_input, language_selector, theme_selector,
clean_text_cb, remove_punct_cb, remove_nums_cb],
outputs=[result_output, gauge_plot, probability_plot, keyword_plot]
)
load_file_btn.click(
app.data_handler.process_file,
inputs=file_upload,
outputs=batch_input
)
analyze_batch_btn.click(
app.analyze_batch,
inputs=[batch_input, batch_language, batch_theme,
batch_clean_cb, batch_punct_cb, batch_nums_cb],
outputs=[batch_summary, batch_results_df, batch_plot, confidence_dist_plot]
)
refresh_history_btn.click(
app.plot_history,
inputs=history_theme,
outputs=[history_dashboard, history_status]
)
clear_history_btn.click(
lambda: f"Cleared {app.history.clear()} entries",
outputs=history_status
)
status_btn.click(
app.get_history_status,
outputs=history_status
)
export_csv_btn.click(
lambda: app.data_handler.export_data(app.history.get_all(), 'csv'),
outputs=[csv_download, history_status]
)
export_json_btn.click(
lambda: app.data_handler.export_data(app.history.get_all(), 'json'),
outputs=[json_download, history_status]
)
return demo
# Application Entry Point
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
try:
demo = create_interface()
demo.launch(
share=True,
server_name="0.0.0.0",
server_port=7860,
show_error=True
)
except Exception as e:
logger.error(f"Failed to launch application: {e}")
raise
@handle_errors(default_return=("Please enter texts", None, None, None))
def analyze_batch(self, batch_text: str, language: str, theme: str,
clean_text: bool, remove_punct: bool, remove_nums: bool):
"""Enhanced batch analysis"""
if not batch_text.strip():
return "Please enter texts (one per line)", None, None, None
# Parse batch input
texts = TextProcessor.parse_batch_input(batch_text)
if len(texts) > config.BATCH_SIZE_LIMIT:
return f"Too many texts. Maximum {config.BATCH_SIZE_LIMIT} allowed.", None, None, None
if not texts:
return "No valid texts found", None, None, None
# Map display names to language codes
language_map = {v: k for k, v in config.SUPPORTED_LANGUAGES.items()}
language_code = language_map.get(language, 'auto')
preprocessing_options = {
'clean_text': clean_text,
'remove_punctuation': remove_punct,
'remove_numbers': remove_nums
}
with memory_cleanup():
results = self.engine.analyze_batch(texts, language_code, preprocessing_options)
# Add to history
batch_entries = []
for result in results:
if 'error' not in result:
entry = {
'text': result['text'],
'full_text': result['full_text'],
'sentiment': result['sentiment'],
'confidence': result['confidence'],
'pos_prob': result.get('pos_prob', 0),
'neg_prob': result.get('neg_prob', 0),
'neu_prob': result.get('neu_prob', 0),
'language': result['language'],
'keywords': result['keywords'],
'word_count': result['word_count'],
'analysis_type': 'batch',
'batch_index': result['batch_index']
}
batch_entries.append(entry)
self.history.add_batch(batch_entries)
# Create visualizations
theme_ctx = ThemeContext(theme)
summary_fig = PlotlyVisualizer.create_batch_summary(results, theme_ctx)
confidence_fig = PlotlyVisualizer.create_confidence_distribution(results)
# Create results DataFrame
df_data = []
for result in results:
if 'error' in result:
df_data.append({
'Index': result['batch_index'] + 1,
'Text': result['text'],
'Sentiment': 'Error',
'Confidence': 0.0,
'Language': 'Unknown',
'Error': result['error']
})
else:
keywords_str = ', '.join([word for word, _ in result['keywords'][:3]])
df_data.append({
'Index': result['batch_index'] + 1,
'Text': result['text'],
'Sentiment': result['sentiment'],
'Confidence': f"{result['confidence']:.3f}",
'Language': result['language'].upper(),
'Keywords': keywords_str
})
df = pd.DataFrame(df_data)
# Create summary text
successful_results = [r for r in results if 'error' not in r]
error_count = len(results) - len(successful_results)
if successful_results:
sentiment_counts = Counter([r['sentiment'] for r in successful_results])
avg_confidence = np.mean([r['confidence'] for r in successful_results])
languages = Counter([r['language'] for r in successful_results])
summary_text = f"""
**Batch Analysis Summary:**
- **Total Texts:** {len(texts)}
- **Successful:** {len(successful_results)}
- **Errors:** {error_count}
- **Average Confidence:** {avg_confidence:.3f}
- **Sentiments:** {dict(sentiment_counts)}
- **Languages Detected:** {dict(languages)}
"""
else:
summary_text = f"All {len(texts)} texts failed to analyze."
return summary_text, df, summary_fig, confidence_fig
@handle_errors(default_return=(None, "No history available"))
def plot_history(self, theme: str = 'default'):
"""Plot comprehensive history analysis"""
history = self.history.get_all()
if len(history) < 2:
return None, f"Need at least 2 analyses for trends. Current: {len(history)}"
theme_ctx = ThemeContext(theme)
with memory_cleanup():
fig = PlotlyVisualizer.create_history_dashboard(history, theme_ctx)
stats = self.history.get_stats()
stats_text = f"""
**History Statistics:**
- **Total Analyses:** {stats.get('total_analyses', 0)}
- **Positive:** {stats.get('positive_count', 0)}
- **Negative:** {stats.get('negative_count', 0)}
- **Neutral:** {stats.get('neutral_count', 0)}
- **Average Confidence:** {stats.get('avg_confidence', 0):.3f}
- **Languages:** {stats.get('languages_detected', 0)}
- **Most Common Language:** {stats.get('most_common_language', 'N/A').upper()}
"""
return fig, stats_text
@handle_errors(default_return=("No data available",))
def get_history_status(self):
"""Get current history status"""
stats = self.history.get_stats()
if not stats:
return "No analyses performed yet"
return f"""
**Current Status:**
- **Total Analyses:** {stats['total_analyses']}
- **Recent Sentiment Distribution:**
* Positive: {stats['positive_count']}
* Negative: {stats['negative_count']}
* Neutral: {stats['neutral_count']}
- **Average Confidence:** {stats['avg_confidence']:.3f}
- **Languages Detected:** {stats['languages_detected']}
"""
# Gradio Interface
def create_interface():
"""Create comprehensive Gradio interface"""
app = SentimentApp()
with gr.Blocks(theme=gr.themes.Soft(), title="Multilingual Sentiment Analyzer") as demo:
gr.Markdown("# 🌍 Advanced Multilingual Sentiment Analyzer")
gr.Markdown("AI-powered sentiment analysis with support for multiple languages, advanced visualizations, and explainable AI features")
with gr.Tab("Single Analysis"):
with gr.Row():
with gr.Column():
text_input = gr.Textbox(
label="Enter Text for Analysis",
placeholder="Enter your text in any supported language...",
lines=5
)
with gr.Row():
language_selector = gr.Dropdown(
choices=list(config.SUPPORTED_LANGUAGES.values()),
value="Auto Detect",
label="Language"
)
theme_selector = gr.Dropdown(
choices=list(config.THEMES.keys()),
value="default",
label="Theme"
)
with gr.Row():
clean_text_cb = gr.Checkbox(label="Clean Text", value=False)
remove_punct_cb = gr.Checkbox(label="Remove Punctuation", value=False)
remove_nums_cb = gr.Checkbox(label="Remove Numbers", value=False)
analyze_btn = gr.Button("Analyze", variant="primary", size="lg")
gr.Examples(
examples=app.examples,
inputs=text_input,
cache_examples=False
)
with gr.Column():
result_output = gr.Textbox(label="Analysis Results", lines=8)
with gr.Row():
gauge_plot = gr.Plot(label="Sentiment Gauge")
probability_plot = gr.Plot(label="Probability Distribution")
with gr.Row():
keyword_plot = gr.Plot(label="Key Contributing Words")
with gr.Tab("Batch Analysis"):
with gr.Row():
with gr.Column():
file_upload = gr.File(
label="Upload File (CSV/TXT)",
file_types=[".csv", ".txt"]
)
batch_input = gr.Textbox(
label="Batch Input (one text per line)",
placeholder="Enter multiple texts, one per line...",
lines=10
)
with gr.Row():
batch_language = gr.Dropdown(
choices=list(config.SUPPORTED_LANGUAGES.values()),
value="Auto Detect",
label="Language"
)
batch_theme = gr.Dropdown(
choices=list(config.THEMES.keys()),
value="default",
label="Theme"
)
with gr.Row():
batch_clean_cb = gr.Checkbox(label="Clean Text", value=False)
batch_punct_cb = gr.Checkbox(label="Remove Punctuation", value=False)
batch_nums_cb = gr.Checkbox(label="Remove Numbers", value=False)
with gr.Row():
load_file_btn = gr.Button("Load File")
analyze_batch_btn = gr.Button("Analyze Batch", variant="primary")
with gr.Column():
batch_summary = gr.Textbox(label="Batch Summary", lines=8)
batch_results_df = gr.Dataframe(
label="Detailed Results",
headers=["Index", "Text", "Sentiment", "Confidence", "Language", "Keywords"],
datatype=["number", "str", "str", "str", "str", "str"]
)
with gr.Row():
batch_plot = gr.Plot(label="Batch Analysis Summary")
confidence_dist_plot = gr.Plot(label="Confidence Distribution")
with gr.Tab("History & Analytics"):
with gr.Row():
with gr.Column():
with gr.Row():
refresh_history_btn = gr.Button("Refresh History")
clear_history_btn = gr.Button("Clear History", variant="stop")
status_btn = gr.Button("Get Status")
history_theme = gr.Dropdown(
choices=list(config.THEMES.keys()),
value="default",
label="Dashboard Theme"
)
with gr.Row():
export_csv_btn = gr.Button("Export CSV")
export_json_btn = gr.Button("Export JSON")
with gr.Column():
history_status = gr.Textbox(label="History Status", lines=8)
history_dashboard = gr.Plot(label="History Analytics Dashboard")
with gr.Row():
csv_download = gr.File(label="CSV Download", visible=True)
json_download = gr.File(label="JSON Download", visible=True)
# Event Handlers
analyze_btn.click(
app.analyze_single,
inputs=[text_input, language_selector, theme_selector,
clean_text_cb, remove_punct_cb, remove_nums_cb],
outputs=[result_output, gauge_plot, probability_plot, keyword_plot]
)
load_file_btn.click(
app.data_handler.process_file,
inputs=file_upload,
outputs=batch_input
)
analyze_batch_btn.click(
app.analyze_batch,
inputs=[batch_input, batch_language, batch_theme,
batch_clean_cb, batch_punct_cb, batch_nums_cb],
outputs=[batch_summary, batch_results_df, batch_plot, confidence_dist_plot]
)
refresh_history_btn.click(
app.plot_history,
inputs=history_theme,
outputs=[history_dashboard, history_status]
)
clear_history_btn.click(
lambda: f"Cleared {app.history.clear()} entries",
outputs=history_status
)
status_btn.click(
app.get_history_status,
outputs=history_status
)
export_csv_btn.click(
lambda: app.data_handler.export_data(app.history.get_all(), 'csv'),
outputs=[csv_download, history_status]
)
export_json_btn.click(
lambda: app.data_handler.export_data(app.history.get_all(), 'json'),
outputs=[json_download, history_status]
)
return demo
# Application Entry Point
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
try:
demo = create_interface()
demo.launch(
share=True,
server_name="0.0.0.0",
server_port=7860,
show_error=True
)
except Exception as e:
logger.error(f"Failed to launch application: {e}")
raise