import gradio as gr
from ui.dataset_input import create_dataset_input, load_example_dataset
from ui.analysis_screen import create_analysis_screen, process_analysis_request
from ui.roberta_screen import create_roberta_screen, process_roberta_request
from visualization.bow_visualizer import process_and_visualize_analysis
from visualization.roberta_visualizer import process_and_visualize_sentiment_analysis
import nltk
import os
import json
import matplotlib.pyplot as plt
import io
import base64
import datetime
from PIL import Image
# Download necessary NLTK resources function remains unchanged
def download_nltk_resources():
"""Download required NLTK resources if not already downloaded"""
try:
# Create nltk_data directory in the user's home directory if it doesn't exist
nltk_data_path = os.path.expanduser("~/nltk_data")
os.makedirs(nltk_data_path, exist_ok=True)
# Add this path to NLTK's data path
nltk.data.path.append(nltk_data_path)
# Download required resources
resources = ['punkt', 'wordnet', 'stopwords', 'punkt_tab']
for resource in resources:
try:
# Different resources can be in different directories in NLTK
locations = [
f'tokenizers/{resource}',
f'corpora/{resource}',
f'taggers/{resource}',
f'{resource}'
]
found = False
for location in locations:
try:
nltk.data.find(location)
print(f"Resource {resource} already downloaded")
found = True
break
except LookupError:
continue
if not found:
print(f"Downloading {resource}...")
nltk.download(resource, quiet=True)
except Exception as e:
print(f"Error with resource {resource}: {e}")
print("NLTK resources check completed")
except Exception as e:
print(f"Error downloading NLTK resources: {e}")
def create_app():
"""
Create a streamlined Gradio app for dataset input and analysis.
Returns:
gr.Blocks: The Gradio application
"""
with gr.Blocks(title="LLM Response Comparator") as app:
# Application state to share data between tabs
dataset_state = gr.State({})
analysis_results_state = gr.State({})
roberta_results_state = gr.State({})
# Add a state for storing user dataset analysis results
user_analysis_log = gr.State({})
# Dataset Input Tab
with gr.Tab("Dataset Input"):
# Filter out files that start with 'summary' for the Dataset Input tab
dataset_files = [f for f in os.listdir("dataset")
if not f.startswith("summary-") and os.path.isfile(os.path.join("dataset", f))]
dataset_inputs, example_dropdown, load_example_btn, create_btn, prompt, response1, model1, response2, model2 = create_dataset_input()
# Add status indicator to show when dataset is created
dataset_status = gr.Markdown("*No dataset loaded*")
# Load example dataset
load_example_btn.click(
fn=load_example_dataset,
inputs=[example_dropdown],
outputs=[prompt, response1, model1, response2, model2] # Update all field values
)
# Save dataset to state and update status
def create_dataset(p, r1, m1, r2, m2):
if not p or not r1 or not r2:
return {}, "❌ **Error:** Please fill in at least the prompt and both responses"
dataset = {
"entries": [
{"prompt": p, "response": r1, "model": m1 or "Model 1"},
{"prompt": p, "response": r2, "model": m2 or "Model 2"}
]
}
return dataset, "✅ **Dataset created successfully!** You can now go to the Analysis tab"
create_btn.click(
fn=create_dataset,
inputs=[prompt, response1, model1, response2, model2],
outputs=[dataset_state, dataset_status]
)
# Analysis Tab
with gr.Tab("Analysis"):
# Use create_analysis_screen to get UI components including visualization container
analysis_options, analysis_params, run_analysis_btn, analysis_output, ngram_n, topic_count = create_analysis_screen()
# Pre-create visualization components (initially hidden)
visualization_area_visible = gr.Checkbox(value=False, visible=False, label="Visualization Visible")
analysis_title = gr.Markdown("## Analysis Results", visible=False)
prompt_title = gr.Markdown(visible=False)
models_compared = gr.Markdown(visible=False)
# Container for model 1 words
model1_title = gr.Markdown(visible=False)
model1_words = gr.Markdown(visible=False)
# Container for model 2 words
model2_title = gr.Markdown(visible=False)
model2_words = gr.Markdown(visible=False)
# Similarity metrics
similarity_metrics_title = gr.Markdown("### Similarity Metrics", visible=False)
similarity_metrics = gr.Markdown(visible=False)
# Status or error message area
status_message_visible = gr.Checkbox(value=False, visible=False, label="Status Message Visible")
status_message = gr.Markdown(visible=False)
# Define a helper function to extract parameter values and run the analysis
def run_analysis(dataset, selected_analysis, ngram_n, topic_count, user_analysis_log, *args):
"""
Run the analysis with the selected parameters
Args:
dataset (dict): The dataset state
selected_analysis (str): The selected analysis type
ngram_n (str or int): N value for n-gram analysis
topic_count (str or int): Number of topics for topic modeling
user_analysis_log (dict): Log of user analysis results
*args: Additional arguments that might be passed by Gradio
Returns:
tuple: Analysis results and UI component updates
"""
try:
if not dataset or "entries" not in dataset or not dataset["entries"]:
return (
{}, # analysis_results_state
user_analysis_log, # user_analysis_log (unchanged)
False, # analysis_output visibility
False, # visualization_area_visible
gr.update(visible=False), # analysis_title
gr.update(visible=False), # prompt_title
gr.update(visible=False), # models_compared
gr.update(visible=False), # model1_title
gr.update(visible=False), # model1_words
gr.update(visible=False), # model2_title
gr.update(visible=False), # model2_words
gr.update(visible=False), # similarity_metrics_title
gr.update(visible=False), # similarity_metrics
True, # status_message_visible
gr.update(visible=True, value="**Error:** No dataset loaded. Please create or load a dataset first.") # status_message
)
parameters = {
"bow_top": 25, # Default fixed value for Bag of Words
"ngram_n": ngram_n,
"ngram_top": 10, # Default fixed value for N-gram analysis
"topic_count": topic_count,
"bias_methods": ["partisan"] # Default to partisan leaning only
}
print(f"Running analysis with selected type: {selected_analysis}")
print("Parameters:", parameters)
# Process the analysis request - passing selected_analysis as a string
analysis_results, _ = process_analysis_request(dataset, selected_analysis, parameters)
# If there's an error or no results
if not analysis_results or "analyses" not in analysis_results or not analysis_results["analyses"]:
return (
analysis_results,
user_analysis_log, # user_analysis_log (unchanged)
False,
False,
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
True,
gr.update(visible=True, value="**No results found.** Try a different analysis option.")
)
# Extract information to display in components
prompt = list(analysis_results["analyses"].keys())[0]
analyses = analysis_results["analyses"][prompt]
# Initialize visualization components visibilities and contents
visualization_area_visible = False
prompt_title_visible = False
prompt_title_value = ""
models_compared_visible = False
models_compared_value = ""
model1_title_visible = False
model1_title_value = ""
model1_words_visible = False
model1_words_value = ""
model2_title_visible = False
model2_title_value = ""
model2_words_visible = False
model2_words_value = ""
similarity_title_visible = False
similarity_metrics_visible = False
similarity_metrics_value = ""
# Update the user analysis log with the new results
updated_log = user_analysis_log.copy() if user_analysis_log else {}
# Initialize this prompt in the log if it doesn't exist
if prompt not in updated_log:
updated_log[prompt] = {}
# Store the analysis results in the log
if selected_analysis in ["Bag of Words", "N-gram Analysis", "Classifier", "Bias Detection", "Topic Modeling"]:
key = selected_analysis.replace(" ", "_").lower()
if key in analyses:
updated_log[prompt][selected_analysis] = {
"timestamp": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"result": analyses[key]
}
# Check for messages from placeholder analyses
if "message" in analyses:
return (
analysis_results,
updated_log, # Return updated log
False,
False,
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
True,
gr.update(visible=True, value=f"**{analyses['message']}**") # status_message
)
# Process based on the selected analysis type
if selected_analysis == "Bag of Words" and "bag_of_words" in analyses:
visualization_area_visible = True
bow_results = analyses["bag_of_words"]
models = bow_results.get("models", [])
if len(models) >= 2:
prompt_title_visible = True
prompt_title_value = f"## Analysis of Prompt: \"{prompt[:100]}...\""
models_compared_visible = True
models_compared_value = f"### Comparing responses from {models[0]} and {models[1]}"
# Extract and format information for display
model1_name = models[0]
model2_name = models[1]
# Format important words for each model
important_words = bow_results.get("important_words", {})
if model1_name in important_words:
model1_title_visible = True
model1_title_value = f"#### Top Words Used by {model1_name}"
word_list = [f"**{item['word']}** ({item['count']})" for item in important_words[model1_name][:10]]
model1_words_visible = True
model1_words_value = ", ".join(word_list)
if model2_name in important_words:
model2_title_visible = True
model2_title_value = f"#### Top Words Used by {model2_name}"
word_list = [f"**{item['word']}** ({item['count']})" for item in important_words[model2_name][:10]]
model2_words_visible = True
model2_words_value = ", ".join(word_list)
# Format similarity metrics
comparisons = bow_results.get("comparisons", {})
comparison_key = f"{model1_name} vs {model2_name}"
if comparison_key in comparisons:
metrics = comparisons[comparison_key]
cosine = metrics.get("cosine_similarity", 0)
jaccard = metrics.get("jaccard_similarity", 0)
semantic = metrics.get("semantic_similarity", 0)
common_words = metrics.get("common_word_count", 0)
similarity_title_visible = True
similarity_metrics_visible = True
similarity_metrics_value = f"""
- **Cosine Similarity**: {cosine:.2f} (higher means more similar word frequency patterns)
- **Jaccard Similarity**: {jaccard:.2f} (higher means more word overlap)
- **Semantic Similarity**: {semantic:.2f} (higher means more similar meaning)
- **Common Words**: {common_words} words appear in both responses
"""
# Check for N-gram analysis
elif selected_analysis == "N-gram Analysis" and "ngram_analysis" in analyses:
visualization_area_visible = True
ngram_results = analyses["ngram_analysis"]
models = ngram_results.get("models", [])
ngram_size = ngram_results.get("ngram_size", 2)
size_name = "Unigrams" if ngram_size == 1 else f"{ngram_size}-grams"
if len(models) >= 2:
prompt_title_visible = True
prompt_title_value = f"## Analysis of Prompt: \"{prompt[:100]}...\""
models_compared_visible = True
models_compared_value = f"### {size_name} Analysis: Comparing responses from {models[0]} and {models[1]}"
# Extract and format information for display
model1_name = models[0]
model2_name = models[1]
# Format important n-grams for each model
important_ngrams = ngram_results.get("important_ngrams", {})
if model1_name in important_ngrams:
model1_title_visible = True
model1_title_value = f"#### Top {size_name} Used by {model1_name}"
# Create a better formatted list of n-grams
ngram_list = []
for item in important_ngrams[model1_name][:10]:
ngram_text = item['ngram']
ngram_count = item['count']
ngram_list.append(f"**{ngram_text}** ({ngram_count})")
model1_words_visible = True
model1_words_value = ", ".join(ngram_list)
if model2_name in important_ngrams:
model2_title_visible = True
model2_title_value = f"#### Top {size_name} Used by {model2_name}"
# Create a better formatted list of n-grams
ngram_list = []
for item in important_ngrams[model2_name][:10]:
ngram_text = item['ngram']
ngram_count = item['count']
ngram_list.append(f"**{ngram_text}** ({ngram_count})")
model2_words_visible = True
model2_words_value = ", ".join(ngram_list)
# Format similarity metrics if available
if "comparisons" in ngram_results:
comparison_key = f"{model1_name} vs {model2_name}"
if comparison_key in ngram_results["comparisons"]:
metrics = ngram_results["comparisons"][comparison_key]
common_count = metrics.get("common_ngram_count", 0)
similarity_title_visible = True
similarity_metrics_visible = True
similarity_metrics_value = f"""
- **Common {size_name}**: {common_count} {size_name.lower()} appear in both responses
"""
# Create a new function to generate N-gram visualizations
def generate_ngram_visualization(important_ngrams, model1_name, model2_name):
plt.figure(figsize=(12, 6))
# Process data for model 1
model1_data = {}
if model1_name in important_ngrams:
for item in important_ngrams[model1_name][:10]:
model1_data[item['ngram']] = item['count']
# Process data for model 2
model2_data = {}
if model2_name in important_ngrams:
for item in important_ngrams[model2_name][:10]:
model2_data[item['ngram']] = item['count']
# Plot for the first model
plt.subplot(1, 2, 1)
sorted_data1 = sorted(model1_data.items(), key=lambda x: x[1], reverse=True)[:10]
terms1, counts1 = zip(*sorted_data1) if sorted_data1 else ([], [])
# Create horizontal bar chart
plt.barh([t[:20] + '...' if len(t) > 20 else t for t in terms1[::-1]], counts1[::-1])
plt.xlabel('Frequency')
plt.title(f'Top {size_name} Used by {model1_name}')
plt.tight_layout()
# Plot for the second model
plt.subplot(1, 2, 2)
sorted_data2 = sorted(model2_data.items(), key=lambda x: x[1], reverse=True)[:10]
terms2, counts2 = zip(*sorted_data2) if sorted_data2 else ([], [])
# Create horizontal bar chart
plt.barh([t[:20] + '...' if len(t) > 20 else t for t in terms2[::-1]], counts2[::-1])
plt.xlabel('Frequency')
plt.title(f'Top {size_name} Used by {model2_name}')
plt.tight_layout()
# Save the plot to a bytes buffer
buf = io.BytesIO()
plt.savefig(buf, format='png', dpi=100)
buf.seek(0)
# Convert to PIL Image
image = Image.open(buf)
return image
# Create the visualization
try:
viz_image = generate_ngram_visualization(important_ngrams, model1_name, model2_name)
# Convert the image to a base64 string for embedding
buffered = io.BytesIO()
viz_image.save(buffered, format="PNG")
img_str = base64.b64encode(buffered.getvalue()).decode()
# Append the image to the metrics_value
similarity_metrics_value += f"""
"""
similarity_metrics_visible = True
except Exception as viz_error:
print(f"Visualization error: {viz_error}")
# Handle the error gracefully - continue without the visualization
# Check for Topic Modeling analysis
elif selected_analysis == "Topic Modeling" and "topic_modeling" in analyses:
visualization_area_visible = True
topic_results = analyses["topic_modeling"]
models = topic_results.get("models", [])
method = topic_results.get("method", "lda").upper()
n_topics = topic_results.get("n_topics", 3)
if len(models) >= 2:
prompt_title_visible = True
prompt_title_value = f"## Analysis of Prompt: \"{prompt[:100]}...\""
models_compared_visible = True
models_compared_value = f"### Topic Modeling Analysis ({method}, {n_topics} topics)"
# Extract and format topic information
topics = topic_results.get("topics", [])
if topics:
# Format topic info for display
topic_info = []
for topic in topics[:3]: # Show first 3 topics
topic_id = topic.get("id", 0)
words = topic.get("words", [])[:5] # Top 5 words per topic
if words:
topic_info.append(f"**Topic {topic_id+1}**: {', '.join(words)}")
if topic_info:
model1_title_visible = True
model1_title_value = "#### Discovered Topics"
model1_words_visible = True
model1_words_value = "\n".join(topic_info)
# Get topic distributions for models
model_topics = topic_results.get("model_topics", {})
if model_topics:
model1_name = models[0]
model2_name = models[1]
# Format topic distribution info
if model1_name in model_topics and model2_name in model_topics:
model2_title_visible = True
model2_title_value = "#### Topic Distribution"
model2_words_visible = True
# Simple distribution display
dist1 = model_topics[model1_name]
dist2 = model_topics[model2_name]
model2_words_value = f"""
**{model1_name}**: {', '.join([f"Topic {i+1}: {v:.2f}" for i, v in enumerate(dist1[:3])])}
**{model2_name}**: {', '.join([f"Topic {i+1}: {v:.2f}" for i, v in enumerate(dist2[:3])])}
"""
# Add similarity metrics if available
comparisons = topic_results.get("comparisons", {})
if comparisons:
comparison_key = f"{model1_name} vs {model2_name}"
if comparison_key in comparisons:
metrics = comparisons[comparison_key]
js_div = metrics.get("js_divergence", 0)
similarity_title_visible = True
similarity_metrics_visible = True
similarity_metrics_value = f"""
- **Topic Distribution Divergence**: {js_div:.4f} (lower means more similar topic distributions)
"""
# Check for Classifier analysis
elif selected_analysis == "Classifier" and "classifier" in analyses:
visualization_area_visible = True
classifier_results = analyses["classifier"]
models = classifier_results.get("models", [])
if len(models) >= 2:
prompt_title_visible = True
prompt_title_value = f"## Analysis of Prompt: \"{prompt[:100]}...\""
models_compared_visible = True
models_compared_value = f"### Classifier Analysis for {models[0]} and {models[1]}"
# Extract and format classifier information
model1_name = models[0]
model2_name = models[1]
# Display classifications for each model
classifications = classifier_results.get("classifications", {})
if classifications:
model1_title_visible = True
model1_title_value = f"#### Classification Results"
model1_words_visible = True
model1_results = classifications.get(model1_name, {})
model2_results = classifications.get(model2_name, {})
model1_words_value = f"""
**{model1_name}**:
- Formality: {model1_results.get('formality', 'N/A')}
- Sentiment: {model1_results.get('sentiment', 'N/A')}
- Complexity: {model1_results.get('complexity', 'N/A')}
**{model2_name}**:
- Formality: {model2_results.get('formality', 'N/A')}
- Sentiment: {model2_results.get('sentiment', 'N/A')}
- Complexity: {model2_results.get('complexity', 'N/A')}
"""
# Show comparison
model2_title_visible = True
model2_title_value = f"#### Classification Comparison"
model2_words_visible = True
differences = classifier_results.get("differences", {})
model2_words_value = "\n".join([
f"- **{category}**: {diff}"
for category, diff in differences.items()
])
# Create visualization using matplotlib
try:
# Define metrics and mappings
metrics = ['Formality', 'Sentiment', 'Complexity']
mapping = {
'Formality': {'Informal': 1, 'Neutral': 2, 'Formal': 3},
'Sentiment': {'Negative': 1, 'Neutral': 2, 'Positive': 3},
'Complexity': {'Simple': 1, 'Average': 2, 'Complex': 3}
}
# Get values for each model
model1_vals = []
model2_vals = []
# Get formality value for model1
formality1 = model1_results.get('formality', 'Neutral')
if formality1 in mapping['Formality']:
model1_vals.append(mapping['Formality'][formality1])
else:
model1_vals.append(2) # Default to neutral
# Get sentiment value for model1
sentiment1 = model1_results.get('sentiment', 'Neutral')
if sentiment1 in mapping['Sentiment']:
model1_vals.append(mapping['Sentiment'][sentiment1])
else:
model1_vals.append(2) # Default to neutral
# Get complexity value for model1
complexity1 = model1_results.get('complexity', 'Average')
if complexity1 in mapping['Complexity']:
model1_vals.append(mapping['Complexity'][complexity1])
else:
model1_vals.append(2) # Default to average
# Get formality value for model2
formality2 = model2_results.get('formality', 'Neutral')
if formality2 in mapping['Formality']:
model2_vals.append(mapping['Formality'][formality2])
else:
model2_vals.append(2) # Default to neutral
# Get sentiment value for model2
sentiment2 = model2_results.get('sentiment', 'Neutral')
if sentiment2 in mapping['Sentiment']:
model2_vals.append(mapping['Sentiment'][sentiment2])
else:
model2_vals.append(2) # Default to neutral
# Get complexity value for model2
complexity2 = model2_results.get('complexity', 'Average')
if complexity2 in mapping['Complexity']:
model2_vals.append(mapping['Complexity'][complexity2])
else:
model2_vals.append(2) # Default to average
# Plot grouped bar chart
plt.figure(figsize=(10, 6))
x = range(len(metrics))
width = 0.35
plt.bar([p - width/2 for p in x], model1_vals, width=width, label=model1_name)
plt.bar([p + width/2 for p in x], model2_vals, width=width, label=model2_name)
plt.xticks(x, metrics)
plt.yticks([1, 2, 3], ['Low', 'Medium', 'High'])
plt.ylim(0, 3.5)
plt.ylabel('Level')
plt.title('Comparison of Model Characteristics')
plt.legend()
plt.tight_layout()
# Save the plot to a bytes buffer
buf = io.BytesIO()
plt.savefig(buf, format='png', dpi=100)
buf.seek(0)
# Convert to PIL Image
viz_image = Image.open(buf)
# Convert the image to a base64 string for embedding
buffered = io.BytesIO()
viz_image.save(buffered, format="PNG")
img_str = base64.b64encode(buffered.getvalue()).decode()
# Append the image to the metrics_value
similarity_title_visible = True
similarity_metrics_visible = True
similarity_metrics_value = f"""
"""
except Exception as viz_error:
print(f"Classifier visualization error: {viz_error}")
# Check for Bias Detection analysis
elif selected_analysis == "Bias Detection" and "bias_detection" in analyses:
visualization_area_visible = True
bias_results = analyses["bias_detection"]
models = bias_results.get("models", [])
if len(models) >= 2:
prompt_title_visible = True
prompt_title_value = f"## Analysis of Prompt: \"{prompt[:100]}...\""
models_compared_visible = True
models_compared_value = f"### Bias Analysis: Comparing responses from {models[0]} and {models[1]}"
# Display comparative bias results
model1_name = models[0]
model2_name = models[1]
if "comparative" in bias_results:
comparative = bias_results["comparative"]
# Format summary for display
model1_title_visible = True
model1_title_value = "#### Bias Detection Summary"
model1_words_visible = True
summary_parts = []
# Add partisan comparison (focus on partisan leaning)
if "partisan" in comparative:
part = comparative["partisan"]
is_significant = part.get("significant", False)
summary_parts.append(
f"**Partisan Leaning**: {model1_name} appears {part.get(model1_name, 'N/A')}, " +
f"while {model2_name} appears {part.get(model2_name, 'N/A')}. " +
f"({'Significant' if is_significant else 'Minor'} difference)"
)
# Add overall assessment
if "overall" in comparative:
overall = comparative["overall"]
significant = overall.get("significant_bias_difference", False)
summary_parts.append(
f"**Overall Assessment**: " +
f"Analysis shows a {overall.get('difference', 0):.2f}/1.0 difference in bias patterns. " +
f"({'Significant' if significant else 'Minor'} overall bias difference)"
)
# Combine all parts
model1_words_value = "\n\n".join(summary_parts)
# Format detailed term analysis
if (model1_name in bias_results and "partisan" in bias_results[model1_name] and
model2_name in bias_results and "partisan" in bias_results[model2_name]):
model2_title_visible = True
model2_title_value = "#### Partisan Term Analysis"
model2_words_visible = True
m1_lib = bias_results[model1_name]["partisan"].get("liberal_terms", [])
m1_con = bias_results[model1_name]["partisan"].get("conservative_terms", [])
m2_lib = bias_results[model2_name]["partisan"].get("liberal_terms", [])
m2_con = bias_results[model2_name]["partisan"].get("conservative_terms", [])
model2_words_value = f"""
**{model1_name}**:
- Liberal terms: {', '.join(m1_lib) if m1_lib else 'None detected'}
- Conservative terms: {', '.join(m1_con) if m1_con else 'None detected'}
**{model2_name}**:
- Liberal terms: {', '.join(m2_lib) if m2_lib else 'None detected'}
- Conservative terms: {', '.join(m2_con) if m2_con else 'None detected'}
"""
# If we don't have visualization data from any analysis
if not visualization_area_visible:
return (
analysis_results,
updated_log, # Return updated log
False,
False,
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
True, # status_message_visible
gr.update(visible=True, value="**No visualization data found.** Make sure to select a valid analysis option.")
)
# Return all updated component values
return (
analysis_results, # analysis_results_state
updated_log, # user_analysis_log (updated with new results)
False, # analysis_output visibility
True, # visualization_area_visible
gr.update(visible=True), # analysis_title
gr.update(visible=prompt_title_visible, value=prompt_title_value), # prompt_title
gr.update(visible=models_compared_visible, value=models_compared_value), # models_compared
gr.update(visible=model1_title_visible, value=model1_title_value), # model1_title
gr.update(visible=model1_words_visible, value=model1_words_value), # model1_words
gr.update(visible=model2_title_visible, value=model2_title_value), # model2_title
gr.update(visible=model2_words_visible, value=model2_words_value), # model2_words
gr.update(visible=similarity_title_visible), # similarity_metrics_title
gr.update(visible=similarity_metrics_visible, value=similarity_metrics_value), # similarity_metrics
False, # status_message_visible
gr.update(visible=False) # status_message
)
except Exception as e:
import traceback
error_msg = f"Error in analysis: {str(e)}\n{traceback.format_exc()}"
print(error_msg)
return (
{"error": error_msg}, # analysis_results_state
user_analysis_log, # Return unchanged log
True, # analysis_output visibility (show raw JSON for debugging)
False, # visualization_area_visible
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
True, # status_message_visible
gr.update(visible=True, value=f"**Error during analysis:**\n\n```\n{str(e)}\n```") # status_message
)
# RoBERTa Sentiment Analysis Tab
with gr.Tab("RoBERTa Sentiment"):
# Create the RoBERTa analysis UI components
run_roberta_btn, roberta_output, visualization_container, roberta_status = create_roberta_screen()
# Create a container for visualization results
with gr.Column() as roberta_viz_container:
# create placeholder components to update
roberta_viz_title = gr.Markdown("## RoBERTa Sentiment Analysis Results", visible=False)
roberta_viz_content = gr.HTML("", visible=False)
# Function to run RoBERTa sentiment analysis
def run_roberta_analysis(dataset, existing_log):
try:
print("Starting run_roberta_analysis function")
if not dataset or "entries" not in dataset or not dataset["entries"]:
return (
{}, # roberta_results_state
existing_log, # no change to user_analysis_log
gr.update(visible=True, value="**Error:** No dataset loaded. Please create or load a dataset first."), # roberta_status
gr.update(visible=False), # roberta_output
gr.update(visible=False), # roberta_viz_title
gr.update(visible=False) # roberta_viz_content
)
print(f"Running RoBERTa sentiment analysis with sentence-level, style=")
# Process the analysis request
roberta_results = process_roberta_request(dataset)
print(f"RoBERTa results obtained. Size: {len(str(roberta_results))} characters")
# NEW: Update the user analysis log with RoBERTa results
updated_log = existing_log.copy() if existing_log else {}
# Get the prompt text
prompt_text = None
if "analyses" in roberta_results:
prompt_text = list(roberta_results["analyses"].keys())[0] if roberta_results["analyses"] else None
if prompt_text:
# Initialize this prompt in the log if it doesn't exist
if prompt_text not in updated_log:
updated_log[prompt_text] = {}
# Store the RoBERTa results
if "analyses" in roberta_results and prompt_text in roberta_results["analyses"]:
if "roberta_sentiment" in roberta_results["analyses"][prompt_text]:
updated_log[prompt_text]["RoBERTa Sentiment"] = {
"timestamp": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"result": roberta_results["analyses"][prompt_text]["roberta_sentiment"]
}
# Check if we have results
if "error" in roberta_results:
return (
roberta_results, # Store in state anyway for debugging
updated_log, # Return updated log
gr.update(visible=True, value=f"**Error:** {roberta_results['error']}"), # roberta_status
gr.update(visible=False), # Hide raw output
gr.update(visible=False), # roberta_viz_title
gr.update(visible=False) # roberta_viz_content
)
print("About to process visualization components")
viz_components = process_and_visualize_sentiment_analysis(roberta_results)
print(f"Visualization components generated: {len(viz_components)}")
print("Starting HTML conversion of visualization components")
# Convert the visualization components to HTML - OPTIMIZED VERSION
print("Starting HTML conversion of visualization components")
html_content = ""
html_content += "
Sentiment Analysis Results
"
if "analyses" in roberta_results:
for prompt, analyses in roberta_results["analyses"].items():
if "roberta_sentiment" in analyses:
sentiment_result = analyses["roberta_sentiment"]
models = sentiment_result.get("models", [])
if len(models) >= 2:
# Add overall comparison
if "comparison" in sentiment_result:
comparison = sentiment_result["comparison"]
html_content += f"
"
html_content += f"
{comparison.get('difference_direction', 'Models have different sentiment patterns')}
"
html_content += f"
"
# Add individual model results
sentiment_analysis = sentiment_result.get("sentiment_analysis", {})
for model in models:
if model in sentiment_analysis:
model_result = sentiment_analysis[model]
score = model_result.get("sentiment_score", 0)
label = model_result.get("label", "neutral")
html_content += f"
"
html_content += f"
{model}
"
html_content += f"
Sentiment: {label} (Score: {score:.2f})
"
html_content += f"
"
html_content += "
"
print("HTML conversion completed")
# Return updated values
return (
roberta_results, # roberta_results_state
updated_log, # Return updated log
gr.update(visible=False), # roberta_status (hide status message)
gr.update(visible=False), # roberta_output (hide raw output)
gr.update(visible=True), # roberta_viz_title (show title)
gr.update(visible=True, value=html_content) # roberta_viz_content (show content)
)
except Exception as e:
import traceback
error_msg = f"Error in RoBERTa analysis: {str(e)}\n{traceback.format_exc()}"
print(error_msg)
return (
{"error": error_msg}, # roberta_results_state
existing_log, # Return unchanged log
gr.update(visible=True, value=f"**Error during RoBERTa analysis:**\n\n```\n{str(e)}\n```"), # roberta_status
gr.update(visible=False), # Hide raw output
gr.update(visible=False), # roberta_viz_title
gr.update(visible=False) # roberta_viz_content
)
# Connect the run button to the analysis function
run_roberta_btn.click(
fn=run_roberta_analysis,
inputs=[dataset_state, user_analysis_log],
outputs=[
roberta_results_state,
user_analysis_log,
roberta_status,
roberta_output,
roberta_viz_title,
roberta_viz_content
]
)
# Add a Summary tab
with gr.Tab("Summary"):
gr.Markdown("## Analysis Summaries")
with gr.Row():
with gr.Column(scale=1):
# Get summary files from dataset directory
summary_files = [f for f in os.listdir("dataset") if f.startswith("summary-") and f.endswith(".txt")]
# Dropdown for selecting summary file
summary_dropdown = gr.Dropdown(
choices=["YOUR DATASET RESULTS"] + summary_files,
label="Select Summary",
info="Choose a summary to display",
value="YOUR DATASET RESULTS"
)
load_summary_btn = gr.Button("Load Summary", variant="primary")
summary_assistant_prompt = gr.Textbox(
value="Attached are the results from various NLP based comparisons between two LLM responses on the same prompt. Give your interpretation of the results.",
label="Analysis Assistant Prompt",
lines=3,
interactive=True,
)
with gr.Column(scale=3):
summary_content = gr.Textbox(
label="Summary Content",
lines=25,
max_lines=50,
interactive=False
)
summary_status = gr.Markdown("*No summary loaded*")
# Function to load summary content from file or user analysis
def load_summary_content(file_name, user_log):
if not file_name:
return "", "*No summary selected*"
# Handle the special "YOUR DATASET RESULTS" option
if file_name == "YOUR DATASET RESULTS":
if not user_log or not any(user_log.values()):
return "", "**No analysis results available.** Run some analyses in the Analysis tab first."
# Format the user analysis log as text
content = "# YOUR DATASET ANALYSIS RESULTS\n\n"
for prompt, analyses in user_log.items():
content += f"## Analysis of Prompt: \"{prompt[:100]}{'...' if len(prompt) > 100 else ''}\"\n\n"
if not analyses:
content += "_No analyses run for this prompt._\n\n"
continue
# Order the analyses in a specific sequence
analysis_order = ["Bag of Words", "N-gram Analysis", "Classifier", "Bias Detection", "RoBERTa Sentiment"]
for analysis_type in analysis_order:
if analysis_type in analyses:
analysis_data = analyses[analysis_type]
timestamp = analysis_data.get("timestamp", "")
result = analysis_data.get("result", {})
content += f"### {analysis_type} ({timestamp})\n\n"
# Format based on analysis type
if analysis_type == "Bag of Words":
models = result.get("models", [])
if len(models) >= 2:
content += f"Comparing responses from {models[0]} and {models[1]}\n\n"
# Add important words for each model
important_words = result.get("important_words", {})
for model_name in models:
if model_name in important_words:
content += f"Top Words Used by {model_name}\n"
word_list = [f"{item['word']} ({item['count']})" for item in important_words[model_name][:10]]
content += ", ".join(word_list) + "\n\n"
# Add similarity metrics
comparisons = result.get("comparisons", {})
comparison_key = f"{models[0]} vs {models[1]}"
if comparison_key in comparisons:
metrics = comparisons[comparison_key]
content += "Similarity Metrics\n"
content += f"Cosine Similarity: {metrics.get('cosine_similarity', 0):.2f} (higher means more similar word frequency patterns)\n"
content += f"Jaccard Similarity: {metrics.get('jaccard_similarity', 0):.2f} (higher means more word overlap)\n"
content += f"Semantic Similarity: {metrics.get('semantic_similarity', 0):.2f} (higher means more similar meaning)\n"
content += f"Common Words: {metrics.get('common_word_count', 0)} words appear in both responses\n\n"
elif analysis_type == "N-gram Analysis":
models = result.get("models", [])
ngram_size = result.get("ngram_size", 2)
size_name = "Unigrams" if ngram_size == 1 else f"{ngram_size}-grams"
if len(models) >= 2:
content += f"{size_name} Analysis: Comparing responses from {models[0]} and {models[1]}\n\n"
# Add important n-grams for each model
important_ngrams = result.get("important_ngrams", {})
for model_name in models:
if model_name in important_ngrams:
content += f"Top {size_name} Used by {model_name}\n"
ngram_list = [f"{item['ngram']} ({item['count']})" for item in important_ngrams[model_name][:10]]
content += ", ".join(ngram_list) + "\n\n"
# Add similarity metrics
if "comparisons" in result:
comparison_key = f"{models[0]} vs {models[1]}"
if comparison_key in result["comparisons"]:
metrics = result["comparisons"][comparison_key]
content += "Similarity Metrics\n"
content += f"Common {size_name}: {metrics.get('common_ngram_count', 0)} {size_name.lower()} appear in both responses\n\n"
elif analysis_type == "Classifier":
models = result.get("models", [])
if len(models) >= 2:
content += f"Classifier Analysis for {models[0]} and {models[1]}\n\n"
# Add classification results
classifications = result.get("classifications", {})
if classifications:
content += "Classification Results\n"
for model_name in models:
if model_name in classifications:
model_results = classifications[model_name]
content += f"{model_name}:\n"
content += f"- Formality: {model_results.get('formality', 'N/A')}\n"
content += f"- Sentiment: {model_results.get('sentiment', 'N/A')}\n"
content += f"- Complexity: {model_results.get('complexity', 'N/A')}\n\n"
# Add differences
differences = result.get("differences", {})
if differences:
content += "Classification Comparison\n"
for category, diff in differences.items():
content += f"- {category}: {diff}\n"
content += "\n"
elif analysis_type == "Bias Detection":
models = result.get("models", [])
if len(models) >= 2:
content += f"Bias Analysis: Comparing responses from {models[0]} and {models[1]}\n\n"
# Add comparative results
if "comparative" in result:
comparative = result["comparative"]
content += "Bias Detection Summary\n"
if "partisan" in comparative:
part = comparative["partisan"]
is_significant = part.get("significant", False)
content += f"Partisan Leaning: {models[0]} appears {part.get(models[0], 'N/A')}, "
content += f"while {models[1]} appears {part.get(models[1], 'N/A')}. "
content += f"({'Significant' if is_significant else 'Minor'} difference)\n\n"
if "overall" in comparative:
overall = comparative["overall"]
significant = overall.get("significant_bias_difference", False)
content += f"Overall Assessment: "
content += f"Analysis shows a {overall.get('difference', 0):.2f}/1.0 difference in bias patterns. "
content += f"({'Significant' if significant else 'Minor'} overall bias difference)\n\n"
# Add partisan terms
content += "Partisan Term Analysis\n"
for model_name in models:
if model_name in result and "partisan" in result[model_name]:
partisan = result[model_name]["partisan"]
content += f"{model_name}:\n"
lib_terms = partisan.get("liberal_terms", [])
con_terms = partisan.get("conservative_terms", [])
content += f"- Liberal terms: {', '.join(lib_terms) if lib_terms else 'None detected'}\n"
content += f"- Conservative terms: {', '.join(con_terms) if con_terms else 'None detected'}\n\n"
elif analysis_type == "RoBERTa Sentiment":
models = result.get("models", [])
if len(models) >= 2:
content += "Sentiment Analysis Results\n"
# Add comparison info
if "comparison" in result:
comparison = result["comparison"]
if "difference_direction" in comparison:
content += f"{comparison['difference_direction']}\n\n"
# Add individual model results
sentiment_analysis = result.get("sentiment_analysis", {})
for model_name in models:
if model_name in sentiment_analysis:
model_result = sentiment_analysis[model_name]
score = model_result.get("sentiment_score", 0)
label = model_result.get("label", "neutral")
content += f"{model_name}\n"
content += f"Sentiment: {label} (Score: {score:.2f})\n\n"
return content, f"**Loaded user analysis results**"
# Regular file loading for built-in summaries
file_path = os.path.join("dataset", file_name)
if os.path.exists(file_path):
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
return content, f"**Loaded summary**: {file_name}"
except Exception as e:
return "", f"**Error loading summary**: {str(e)}"
else:
return "", f"**File not found**: {file_path}"
def update_summary_dropdown(user_log):
"""Update summary dropdown options based on user log state"""
choices = ["YOUR DATASET RESULTS"]
choices.extend([f for f in os.listdir("dataset") if f.startswith("summary-") and f.endswith(".txt")])
return gr.update(choices=choices, value="YOUR DATASET RESULTS")
# Connect the load button to the function
load_summary_btn.click(
fn=load_summary_content,
inputs=[summary_dropdown, user_analysis_log],
outputs=[summary_content, summary_status]
)
# Also load summary when dropdown changes
summary_dropdown.change(
fn=load_summary_content,
inputs=[summary_dropdown, user_analysis_log],
outputs=[summary_content, summary_status]
)
# Add a Visuals tab for plotting graphs
with gr.Tab("Visuals"):
gr.Markdown("## Visualization Graphs")
with gr.Row():
with gr.Column(scale=1):
# Dropdown for selecting visualization type
viz_type = gr.Dropdown(
choices=["N-gram Comparison", "Word Frequency", "Sentiment Analysis"],
label="Visualization Type",
info="Select the type of visualization to display",
value="N-gram Comparison"
)
# Button to generate visualization
generate_viz_btn = gr.Button("Generate Visualization", variant="primary")
with gr.Column(scale=3):
# Image component to display the plot
viz_output = gr.Image(
label="Visualization",
type="pil",
height=500
)
viz_status = gr.Markdown("*No visualization generated*")
# Function to generate and display visualizations
def generate_visualization(viz_type, dataset, analysis_results):
try:
if not dataset or "entries" not in dataset or not dataset["entries"]:
return None, "❌ **Error:** No dataset loaded. Please create or load a dataset first."
# Example data (fallback when no real data is available)
ex_data = {
'attorney general': 3,
'social justice': 3,
'centrist approach': 2,
'climate change': 2,
'criminal justice': 2,
'gun control': 2,
'human rights': 2,
'justice issues': 2,
'measures like': 2,
'middle class': 2
}
gran_data = {
'political views': 3,
'vice president': 3,
'criminal justice': 2,
'democratic party': 2,
'foreign policy': 2,
'harris advocated': 2,
'lgbtq rights': 2,
'president harris': 2,
'social issues': 2,
'2019 proposed': 1
}
# Use real data if available in analysis_results
model1_data = {}
model2_data = {}
model1_name = "Model 1"
model2_name = "Model 2"
# Extract actual model names from dataset
if dataset and "entries" in dataset and len(dataset["entries"]) >= 2:
model1_name = dataset["entries"][0].get("model", "Model 1")
model2_name = dataset["entries"][1].get("model", "Model 2")
# Try to get real data from analysis_results
if analysis_results and "analyses" in analysis_results:
for prompt, analyses in analysis_results["analyses"].items():
if viz_type == "N-gram Comparison" and "ngram_analysis" in analyses:
ngram_results = analyses["ngram_analysis"]
important_ngrams = ngram_results.get("important_ngrams", {})
if model1_name in important_ngrams:
model1_data = {item["ngram"]: item["count"] for item in important_ngrams[model1_name]}
if model2_name in important_ngrams:
model2_data = {item["ngram"]: item["count"] for item in important_ngrams[model2_name]}
elif viz_type == "Word Frequency" and "bag_of_words" in analyses:
bow_results = analyses["bag_of_words"]
important_words = bow_results.get("important_words", {})
if model1_name in important_words:
model1_data = {item["word"]: item["count"] for item in important_words[model1_name]}
if model2_name in important_words:
model2_data = {item["word"]: item["count"] for item in important_words[model2_name]}
# If we couldn't get real data, use example data
if not model1_data:
model1_data = ex_data
if not model2_data:
model2_data = gran_data
# Create the visualization
plt.figure(figsize=(10, 6))
if viz_type == "N-gram Comparison" or viz_type == "Word Frequency":
# Plot for the first model
plt.subplot(1, 2, 1)
sorted_data1 = sorted(model1_data.items(), key=lambda x: x[1], reverse=True)[:10] # Top 10
terms1, counts1 = zip(*sorted_data1) if sorted_data1 else ([], [])
# Create horizontal bar chart
plt.barh([t[:20] + '...' if len(t) > 20 else t for t in terms1[::-1]], counts1[::-1])
plt.xlabel('Frequency')
plt.title(f'Harris, Top {viz_type.split()[0]}s Used by {model1_name}')
plt.tight_layout()
# Plot for the second model
plt.subplot(1, 2, 2)
sorted_data2 = sorted(model2_data.items(), key=lambda x: x[1], reverse=True)[:10] # Top 10
terms2, counts2 = zip(*sorted_data2) if sorted_data2 else ([], [])
# Create horizontal bar chart
plt.barh([t[:20] + '...' if len(t) > 20 else t for t in terms2[::-1]], counts2[::-1])
plt.xlabel('Frequency')
plt.title(f'Harris, Top {viz_type.split()[0]}s Used by {model2_name}')
plt.tight_layout()
elif viz_type == "Sentiment Analysis":
# Generate sentiment comparison visualization
# This would be populated with real data when available
sentiment_scores = {
model1_name: 0.75, # Example score
model2_name: 0.25 # Example score
}
# Extract real sentiment scores if available
if "roberta_results_state" in analysis_results:
roberta_results = analysis_results["roberta_results_state"]
if "analyses" in roberta_results:
for prompt, analyses in roberta_results["analyses"].items():
if "roberta_sentiment" in analyses:
sentiment_result = analyses["roberta_sentiment"]
sentiment_analysis = sentiment_result.get("sentiment_analysis", {})
if model1_name in sentiment_analysis:
sentiment_scores[model1_name] = sentiment_analysis[model1_name].get("sentiment_score", 0)
if model2_name in sentiment_analysis:
sentiment_scores[model2_name] = sentiment_analysis[model2_name].get("sentiment_score", 0)
# Create sentiment bar chart
plt.bar(list(sentiment_scores.keys()), list(sentiment_scores.values()))
plt.ylim(-1, 1)
plt.ylabel('Harris Sentiment Score (-1 to 1)')
plt.title('Harris Sentiment Analysis Comparison')
plt.axhline(y=0, color='r', linestyle='-', alpha=0.3) # Add a zero line
# Save the plot to a bytes buffer
buf = io.BytesIO()
plt.savefig(buf, format='png')
buf.seek(0)
# Convert plot to PIL Image
from PIL import Image
image = Image.open(buf)
return image, f"**Generated {viz_type} visualization**"
except Exception as e:
import traceback
error_msg = f"Error generating visualization: {str(e)}\n{traceback.format_exc()}"
print(error_msg)
return None, f"**Error:** {str(e)}"
# Connect the generate button to the function
generate_viz_btn.click(
fn=generate_visualization,
inputs=[viz_type, dataset_state, analysis_results_state],
outputs=[viz_output, viz_status]
)
# Run analysis with proper parameters
run_analysis_btn.click(
fn=run_analysis,
inputs=[dataset_state, analysis_options, ngram_n, topic_count, user_analysis_log],
outputs=[
analysis_results_state,
user_analysis_log,
analysis_output,
visualization_area_visible,
analysis_title,
prompt_title,
models_compared,
model1_title,
model1_words,
model2_title,
model2_words,
similarity_metrics_title,
similarity_metrics,
status_message_visible,
status_message
]
)
'''
app.load(
fn=lambda log: (
update_summary_dropdown(log),
load_summary_content("YOUR DATASET RESULTS", log)
),
inputs=[user_analysis_log],
outputs=[summary_dropdown, summary_content, summary_status]
)
'''
return app
if __name__ == "__main__":
# Download required NLTK resources before launching the app
download_nltk_resources()
app = create_app()
app.launch()