Spaces:
Sleeping
Sleeping
import gradio as gr | |
from ui.dataset_input import create_dataset_input, load_example_dataset | |
from ui.analysis_screen import create_analysis_screen, process_analysis_request | |
from visualization.bow_visualizer import process_and_visualize_analysis | |
import nltk | |
import os | |
import json | |
# Download necessary NLTK resources function remains unchanged | |
def download_nltk_resources(): | |
"""Download required NLTK resources if not already downloaded""" | |
try: | |
# Create nltk_data directory in the user's home directory if it doesn't exist | |
nltk_data_path = os.path.expanduser("~/nltk_data") | |
os.makedirs(nltk_data_path, exist_ok=True) | |
# Add this path to NLTK's data path | |
nltk.data.path.append(nltk_data_path) | |
# Download required resources | |
resources = ['punkt', 'wordnet', 'stopwords', 'punkt_tab'] | |
for resource in resources: | |
try: | |
# Different resources can be in different directories in NLTK | |
locations = [ | |
f'tokenizers/{resource}', | |
f'corpora/{resource}', | |
f'taggers/{resource}', | |
f'{resource}' | |
] | |
found = False | |
for location in locations: | |
try: | |
nltk.data.find(location) | |
print(f"Resource {resource} already downloaded") | |
found = True | |
break | |
except LookupError: | |
continue | |
if not found: | |
print(f"Downloading {resource}...") | |
nltk.download(resource, quiet=True) | |
except Exception as e: | |
print(f"Error with resource {resource}: {e}") | |
print("NLTK resources check completed") | |
except Exception as e: | |
print(f"Error downloading NLTK resources: {e}") | |
def create_app(): | |
""" | |
Create a streamlined Gradio app for dataset input and Bag of Words analysis. | |
Returns: | |
gr.Blocks: The Gradio application | |
""" | |
with gr.Blocks(title="LLM Response Comparator") as app: | |
# Application state to share data between tabs | |
dataset_state = gr.State({}) | |
analysis_results_state = gr.State({}) | |
# Dataset Input Tab | |
with gr.Tab("Dataset Input"): | |
dataset_inputs, example_dropdown, load_example_btn, create_btn, prompt, response1, model1, response2, model2 = create_dataset_input() | |
# Add status indicator to show when dataset is created | |
dataset_status = gr.Markdown("*No dataset loaded*") | |
# Load example dataset | |
load_example_btn.click( | |
fn=load_example_dataset, | |
inputs=[example_dropdown], | |
outputs=[prompt, response1, model1, response2, model2] # Update all field values | |
) | |
# Save dataset to state and update status | |
def create_dataset(p, r1, m1, r2, m2): | |
if not p or not r1 or not r2: | |
return {}, "❌ **Error:** Please fill in at least the prompt and both responses" | |
dataset = { | |
"entries": [ | |
{"prompt": p, "response": r1, "model": m1 or "Model 1"}, | |
{"prompt": p, "response": r2, "model": m2 or "Model 2"} | |
] | |
} | |
return dataset, "✅ **Dataset created successfully!** You can now go to the Analysis tab" | |
create_btn.click( | |
fn=create_dataset, | |
inputs=[prompt, response1, model1, response2, model2], | |
outputs=[dataset_state, dataset_status] | |
) | |
# Analysis Tab | |
with gr.Tab("Analysis"): | |
# Use create_analysis_screen to get UI components including visualization container | |
analysis_options, analysis_params, run_analysis_btn, analysis_output, bow_top_slider, ngram_n, ngram_top, topic_count = create_analysis_screen() | |
# Pre-create visualization components (initially hidden) | |
visualization_area_visible = gr.Checkbox(value=False, visible=False, label="Visualization Visible") | |
analysis_title = gr.Markdown("## Analysis Results", visible=False) | |
prompt_title = gr.Markdown(visible=False) | |
models_compared = gr.Markdown(visible=False) | |
# Container for model 1 words | |
model1_title = gr.Markdown(visible=False) | |
model1_words = gr.Markdown(visible=False) | |
# Container for model 2 words | |
model2_title = gr.Markdown(visible=False) | |
model2_words = gr.Markdown(visible=False) | |
# Similarity metrics | |
similarity_metrics_title = gr.Markdown("### Similarity Metrics", visible=False) | |
similarity_metrics = gr.Markdown(visible=False) | |
# Status or error message area | |
status_message_visible = gr.Checkbox(value=False, visible=False, label="Status Message Visible") | |
status_message = gr.Markdown(visible=False) | |
# Define a helper function to extract parameter values and run the analysis | |
def run_analysis(dataset, selected_analysis, bow_top, ngram_n, ngram_top, topic_count): | |
try: | |
if not dataset or "entries" not in dataset or not dataset["entries"]: | |
return ( | |
{}, # analysis_results_state | |
False, # analysis_output visibility | |
False, # visualization_area_visible | |
gr.update(visible=False), # analysis_title | |
gr.update(visible=False), # prompt_title | |
gr.update(visible=False), # models_compared | |
gr.update(visible=False), # model1_title | |
gr.update(visible=False), # model1_words | |
gr.update(visible=False), # model2_title | |
gr.update(visible=False), # model2_words | |
gr.update(visible=False), # similarity_metrics_title | |
gr.update(visible=False), # similarity_metrics | |
True, # status_message_visible | |
gr.update(visible=True, value="❌ **Error:** No dataset loaded. Please create or load a dataset first.") # status_message | |
) | |
parameters = { | |
"bow_top": bow_top, | |
"ngram_n": ngram_n, | |
"ngram_top": ngram_top, | |
"topic_count": topic_count | |
} | |
print(f"Running analysis with selected type: {selected_analysis}") | |
print("Parameters:", parameters) | |
# Process the analysis request - passing selected_analysis as a string | |
analysis_results, _ = process_analysis_request(dataset, selected_analysis, parameters) | |
# If there's an error or no results | |
if not analysis_results or "analyses" not in analysis_results or not analysis_results["analyses"]: | |
return ( | |
analysis_results, | |
False, | |
False, | |
gr.update(visible=False), | |
gr.update(visible=False), | |
gr.update(visible=False), | |
gr.update(visible=False), | |
gr.update(visible=False), | |
gr.update(visible=False), | |
gr.update(visible=False), | |
gr.update(visible=False), | |
gr.update(visible=False), | |
True, | |
gr.update(visible=True, value="❌ **No results found.** Try a different analysis option.") | |
) | |
# Extract information to display in components | |
prompt = list(analysis_results["analyses"].keys())[0] | |
analyses = analysis_results["analyses"][prompt] | |
# Initialize visualization components visibilities and contents | |
visualization_area_visible = False | |
prompt_title_visible = False | |
prompt_title_value = "" | |
models_compared_visible = False | |
models_compared_value = "" | |
model1_title_visible = False | |
model1_title_value = "" | |
model1_words_visible = False | |
model1_words_value = "" | |
model2_title_visible = False | |
model2_title_value = "" | |
model2_words_visible = False | |
model2_words_value = "" | |
similarity_title_visible = False | |
similarity_metrics_visible = False | |
similarity_metrics_value = "" | |
# Check for messages from placeholder analyses | |
if "message" in analyses: | |
return ( | |
analysis_results, | |
False, | |
False, | |
gr.update(visible=False), | |
gr.update(visible=False), | |
gr.update(visible=False), | |
gr.update(visible=False), | |
gr.update(visible=False), | |
gr.update(visible=False), | |
gr.update(visible=False), | |
gr.update(visible=False), | |
gr.update(visible=False), | |
True, | |
gr.update(visible=True, value=f"ℹ️ **{analyses['message']}**") | |
) | |
# Process based on the selected analysis type | |
if selected_analysis == "Bag of Words" and "bag_of_words" in analyses: | |
visualization_area_visible = True | |
bow_results = analyses["bag_of_words"] | |
models = bow_results.get("models", []) | |
if len(models) >= 2: | |
prompt_title_visible = True | |
prompt_title_value = f"## Analysis of Prompt: \"{prompt[:100]}...\"" | |
models_compared_visible = True | |
models_compared_value = f"### Comparing responses from {models[0]} and {models[1]}" | |
# Extract and format information for display | |
model1_name = models[0] | |
model2_name = models[1] | |
# Format important words for each model | |
important_words = bow_results.get("important_words", {}) | |
if model1_name in important_words: | |
model1_title_visible = True | |
model1_title_value = f"#### Top Words Used by {model1_name}" | |
word_list = [f"**{item['word']}** ({item['count']})" for item in important_words[model1_name][:10]] | |
model1_words_visible = True | |
model1_words_value = ", ".join(word_list) | |
if model2_name in important_words: | |
model2_title_visible = True | |
model2_title_value = f"#### Top Words Used by {model2_name}" | |
word_list = [f"**{item['word']}** ({item['count']})" for item in important_words[model2_name][:10]] | |
model2_words_visible = True | |
model2_words_value = ", ".join(word_list) | |
# Format similarity metrics | |
comparisons = bow_results.get("comparisons", {}) | |
comparison_key = f"{model1_name} vs {model2_name}" | |
if comparison_key in comparisons: | |
metrics = comparisons[comparison_key] | |
cosine = metrics.get("cosine_similarity", 0) | |
jaccard = metrics.get("jaccard_similarity", 0) | |
semantic = metrics.get("semantic_similarity", 0) | |
common_words = metrics.get("common_word_count", 0) | |
similarity_title_visible = True | |
similarity_metrics_visible = True | |
similarity_metrics_value = f""" | |
- **Cosine Similarity**: {cosine:.2f} (higher means more similar word frequency patterns) | |
- **Jaccard Similarity**: {jaccard:.2f} (higher means more word overlap) | |
- **Semantic Similarity**: {semantic:.2f} (higher means more similar meaning) | |
- **Common Words**: {common_words} words appear in both responses | |
""" | |
# Check for N-gram analysis | |
elif selected_analysis == "N-gram Analysis" and "ngram_analysis" in analyses: | |
visualization_area_visible = True | |
ngram_results = analyses["ngram_analysis"] | |
models = ngram_results.get("models", []) | |
ngram_size = ngram_results.get("ngram_size", 2) | |
size_name = "Unigrams" if ngram_size == 1 else f"{ngram_size}-grams" | |
if len(models) >= 2: | |
prompt_title_visible = True | |
prompt_title_value = f"## Analysis of Prompt: \"{prompt[:100]}...\"" | |
models_compared_visible = True | |
models_compared_value = f"### {size_name} Analysis: Comparing responses from {models[0]} and {models[1]}" | |
# Extract and format information for display | |
model1_name = models[0] | |
model2_name = models[1] | |
# Format important n-grams for each model | |
important_ngrams = ngram_results.get("important_ngrams", {}) | |
if model1_name in important_ngrams: | |
model1_title_visible = True | |
model1_title_value = f"#### Top {size_name} Used by {model1_name}" | |
ngram_list = [f"**{item['ngram']}** ({item['count']})" for item in important_ngrams[model1_name][:10]] | |
model1_words_visible = True | |
model1_words_value = ", ".join(ngram_list) | |
if model2_name in important_ngrams: | |
model2_title_visible = True | |
model2_title_value = f"#### Top {size_name} Used by {model2_name}" | |
ngram_list = [f"**{item['ngram']}** ({item['count']})" for item in important_ngrams[model2_name][:10]] | |
model2_words_visible = True | |
model2_words_value = ", ".join(ngram_list) | |
# Format similarity metrics if available | |
if "comparisons" in ngram_results: | |
comparison_key = f"{model1_name} vs {model2_name}" | |
if comparison_key in ngram_results["comparisons"]: | |
metrics = ngram_results["comparisons"][comparison_key] | |
common_count = metrics.get("common_ngram_count", 0) | |
similarity_title_visible = True | |
similarity_metrics_visible = True | |
similarity_metrics_value = f""" | |
- **Common {size_name}**: {common_count} {size_name.lower()} appear in both responses | |
""" | |
# Check for Topic Modeling analysis | |
elif selected_analysis == "Topic Modeling" and "topic_modeling" in analyses: | |
visualization_area_visible = True | |
topic_results = analyses["topic_modeling"] | |
models = topic_results.get("models", []) | |
method = topic_results.get("method", "lda").upper() | |
n_topics = topic_results.get("n_topics", 3) | |
if len(models) >= 2: | |
prompt_title_visible = True | |
prompt_title_value = f"## Analysis of Prompt: \"{prompt[:100]}...\"" | |
models_compared_visible = True | |
models_compared_value = f"### Topic Modeling Analysis ({method}, {n_topics} topics)" | |
# Extract and format topic information | |
topics = topic_results.get("topics", []) | |
if topics: | |
# Format topic info for display | |
topic_info = [] | |
for topic in topics[:3]: # Show first 3 topics | |
topic_id = topic.get("id", 0) | |
words = topic.get("words", [])[:5] # Top 5 words per topic | |
if words: | |
topic_info.append(f"**Topic {topic_id+1}**: {', '.join(words)}") | |
if topic_info: | |
model1_title_visible = True | |
model1_title_value = "#### Discovered Topics" | |
model1_words_visible = True | |
model1_words_value = "\n".join(topic_info) | |
# Get topic distributions for models | |
model_topics = topic_results.get("model_topics", {}) | |
if model_topics: | |
model1_name = models[0] | |
model2_name = models[1] | |
# Format topic distribution info | |
if model1_name in model_topics and model2_name in model_topics: | |
model2_title_visible = True | |
model2_title_value = "#### Topic Distribution" | |
model2_words_visible = True | |
# Simple distribution display | |
dist1 = model_topics[model1_name] | |
dist2 = model_topics[model2_name] | |
model2_words_value = f""" | |
**{model1_name}**: {', '.join([f"Topic {i+1}: {v:.2f}" for i, v in enumerate(dist1[:3])])} | |
**{model2_name}**: {', '.join([f"Topic {i+1}: {v:.2f}" for i, v in enumerate(dist2[:3])])} | |
""" | |
# Add similarity metrics if available | |
comparisons = topic_results.get("comparisons", {}) | |
if comparisons: | |
comparison_key = f"{model1_name} vs {model2_name}" | |
if comparison_key in comparisons: | |
metrics = comparisons[comparison_key] | |
js_div = metrics.get("js_divergence", 0) | |
similarity_title_visible = True | |
similarity_metrics_visible = True | |
similarity_metrics_value = f""" | |
- **Topic Distribution Divergence**: {js_div:.4f} (lower means more similar topic distributions) | |
""" | |
# Check for Classifier analysis | |
elif selected_analysis == "Classifier" and "classifier" in analyses: | |
visualization_area_visible = True | |
classifier_results = analyses["classifier"] | |
models = classifier_results.get("models", []) | |
if len(models) >= 2: | |
prompt_title_visible = True | |
prompt_title_value = f"## Analysis of Prompt: \"{prompt[:100]}...\"" | |
models_compared_visible = True | |
models_compared_value = f"### Classifier Analysis for {models[0]} and {models[1]}" | |
# Extract and format classifier information | |
model1_name = models[0] | |
model2_name = models[1] | |
# Display classifications for each model | |
classifications = classifier_results.get("classifications", {}) | |
if classifications: | |
model1_title_visible = True | |
model1_title_value = f"#### Classification Results" | |
model1_words_visible = True | |
model1_results = classifications.get(model1_name, {}) | |
model2_results = classifications.get(model2_name, {}) | |
model1_words_value = f""" | |
**{model1_name}**: | |
- Formality: {model1_results.get('formality', 'N/A')} | |
- Sentiment: {model1_results.get('sentiment', 'N/A')} | |
- Complexity: {model1_results.get('complexity', 'N/A')} | |
**{model2_name}**: | |
- Formality: {model2_results.get('formality', 'N/A')} | |
- Sentiment: {model2_results.get('sentiment', 'N/A')} | |
- Complexity: {model2_results.get('complexity', 'N/A')} | |
""" | |
# Show comparison | |
model2_title_visible = True | |
model2_title_value = f"#### Classification Comparison" | |
model2_words_visible = True | |
differences = classifier_results.get("differences", {}) | |
model2_words_value = "\n".join([ | |
f"- **{category}**: {diff}" | |
for category, diff in differences.items() | |
]) | |
# If we don't have visualization data from any analysis | |
if not visualization_area_visible: | |
return ( | |
analysis_results, | |
False, | |
False, | |
gr.update(visible=False), | |
gr.update(visible=False), | |
gr.update(visible=False), | |
gr.update(visible=False), | |
gr.update(visible=False), | |
gr.update(visible=False), | |
gr.update(visible=False), | |
gr.update(visible=False), | |
gr.update(visible=False), | |
True, | |
gr.update(visible=True, value="❌ **No visualization data found.** Make sure to select a valid analysis option.") | |
) | |
# Return all updated component values | |
return ( | |
analysis_results, # analysis_results_state | |
False, # analysis_output visibility | |
True, # visualization_area_visible | |
gr.update(visible=True), # analysis_title | |
gr.update(visible=prompt_title_visible, value=prompt_title_value), # prompt_title | |
gr.update(visible=models_compared_visible, value=models_compared_value), # models_compared | |
gr.update(visible=model1_title_visible, value=model1_title_value), # model1_title | |
gr.update(visible=model1_words_visible, value=model1_words_value), # model1_words | |
gr.update(visible=model2_title_visible, value=model2_title_value), # model2_title | |
gr.update(visible=model2_words_visible, value=model2_words_value), # model2_words | |
gr.update(visible=similarity_title_visible), # similarity_metrics_title | |
gr.update(visible=similarity_metrics_visible, value=similarity_metrics_value), # similarity_metrics | |
False, # status_message_visible | |
gr.update(visible=False) # status_message | |
) | |
except Exception as e: | |
import traceback | |
error_msg = f"Error in analysis: {str(e)}\n{traceback.format_exc()}" | |
print(error_msg) | |
return ( | |
{"error": error_msg}, # analysis_results_state | |
True, # analysis_output visibility (show raw JSON for debugging) | |
False, # visualization_area_visible | |
gr.update(visible=False), | |
gr.update(visible=False), | |
gr.update(visible=False), | |
gr.update(visible=False), | |
gr.update(visible=False), | |
gr.update(visible=False), | |
gr.update(visible=False), | |
gr.update(visible=False), | |
gr.update(visible=False), | |
True, # status_message_visible | |
gr.update(visible=True, value=f"❌ **Error during analysis:**\n\n```\n{str(e)}\n```") # status_message | |
) | |
# Add a new LLM Analysis tab | |
with gr.Tab("LLM Analysis"): | |
gr.Markdown("## LLM-Based Response Analysis") | |
with gr.Row(): | |
with gr.Column(): | |
llm_analysis_type = gr.Radio( | |
choices=["Response Quality", "Response Comparison", "Factual Accuracy"], | |
label="Analysis Type", | |
value="Response Comparison" | |
) | |
llm_model = gr.Dropdown( | |
choices=["OpenAI GPT-4", "Anthropic Claude", "Local LLM"], | |
label="Analysis Model", | |
value="OpenAI GPT-4" | |
) | |
run_llm_analysis_btn = gr.Button("Run LLM Analysis", variant="primary") | |
with gr.Column(): | |
llm_analysis_prompt = gr.Textbox( | |
label="Custom Analysis Instructions (Optional)", | |
placeholder="Enter any specific instructions for the analysis...", | |
lines=3 | |
) | |
llm_analysis_status = gr.Markdown("*No analysis has been run*") | |
llm_analysis_result = gr.Markdown(visible=False) | |
# Placeholder function for LLM analysis | |
def run_llm_analysis(dataset, analysis_type, model, custom_prompt): | |
if not dataset or "entries" not in dataset or not dataset["entries"]: | |
return ( | |
gr.update(visible=True, value="❌ **Error:** No dataset loaded. Please create or load a dataset first."), | |
gr.update(visible=False) | |
) | |
# Placeholder for actual implementation | |
return ( | |
gr.update(visible=True, value="⏳ **Implementation in progress**\n\nLLM-based analysis will be available in a future update."), | |
gr.update(visible=False) | |
) | |
# Connect the run button to the analysis function | |
run_llm_analysis_btn.click( | |
fn=run_llm_analysis, | |
inputs=[dataset_state, llm_analysis_type, llm_model, llm_analysis_prompt], | |
outputs=[llm_analysis_status, llm_analysis_result] | |
) | |
# Run analysis with proper parameters | |
run_analysis_btn.click( | |
fn=run_analysis, | |
inputs=[dataset_state, analysis_options, bow_top_slider, ngram_n, ngram_top, topic_count], | |
outputs=[ | |
analysis_results_state, | |
analysis_output, | |
visualization_area_visible, | |
analysis_title, | |
prompt_title, | |
models_compared, | |
model1_title, | |
model1_words, | |
model2_title, | |
model2_words, | |
similarity_metrics_title, | |
similarity_metrics, | |
status_message_visible, | |
status_message | |
] | |
) | |
return app | |
if __name__ == "__main__": | |
# Download required NLTK resources before launching the app | |
download_nltk_resources() | |
app = create_app() | |
app.launch() |