import gradio as gr from ui.dataset_input import create_dataset_input, load_example_dataset from ui.analysis_screen import create_analysis_screen, process_analysis_request from ui.roberta_screen import create_roberta_screen, process_roberta_request from visualization.bow_visualizer import process_and_visualize_analysis from visualization.roberta_visualizer import process_and_visualize_sentiment_analysis import nltk import os import json import matplotlib.pyplot as plt import io import base64 import datetime from PIL import Image # Download necessary NLTK resources function remains unchanged def download_nltk_resources(): """Download required NLTK resources if not already downloaded""" try: # Create nltk_data directory in the user's home directory if it doesn't exist nltk_data_path = os.path.expanduser("~/nltk_data") os.makedirs(nltk_data_path, exist_ok=True) # Add this path to NLTK's data path nltk.data.path.append(nltk_data_path) # Download required resources resources = ['punkt', 'wordnet', 'stopwords', 'punkt_tab'] for resource in resources: try: # Different resources can be in different directories in NLTK locations = [ f'tokenizers/{resource}', f'corpora/{resource}', f'taggers/{resource}', f'{resource}' ] found = False for location in locations: try: nltk.data.find(location) print(f"Resource {resource} already downloaded") found = True break except LookupError: continue if not found: print(f"Downloading {resource}...") nltk.download(resource, quiet=True) except Exception as e: print(f"Error with resource {resource}: {e}") print("NLTK resources check completed") except Exception as e: print(f"Error downloading NLTK resources: {e}") def create_app(): """ Create a streamlined Gradio app for dataset input and analysis. Returns: gr.Blocks: The Gradio application """ with gr.Blocks(title="LLM Response Comparator") as app: # Application state to share data between tabs dataset_state = gr.State({}) analysis_results_state = gr.State({}) roberta_results_state = gr.State({}) # Add a state for storing user dataset analysis results user_analysis_log = gr.State({}) # Dataset Input Tab with gr.Tab("Dataset Input"): # Filter out files that start with 'summary' for the Dataset Input tab dataset_files = [f for f in os.listdir("dataset") if not f.startswith("summary-") and os.path.isfile(os.path.join("dataset", f))] dataset_inputs, example_dropdown, load_example_btn, create_btn, prompt, response1, model1, response2, model2 = create_dataset_input() # Add status indicator to show when dataset is created dataset_status = gr.Markdown("*No dataset loaded*") # Load example dataset load_example_btn.click( fn=load_example_dataset, inputs=[example_dropdown], outputs=[prompt, response1, model1, response2, model2] # Update all field values ) # Save dataset to state and update status def create_dataset(p, r1, m1, r2, m2): if not p or not r1 or not r2: return {}, "❌ **Error:** Please fill in at least the prompt and both responses" dataset = { "entries": [ {"prompt": p, "response": r1, "model": m1 or "Model 1"}, {"prompt": p, "response": r2, "model": m2 or "Model 2"} ] } return dataset, "✅ **Dataset created successfully!** You can now go to the Analysis tab" create_btn.click( fn=create_dataset, inputs=[prompt, response1, model1, response2, model2], outputs=[dataset_state, dataset_status] ) # Analysis Tab with gr.Tab("Analysis"): # Use create_analysis_screen to get UI components including visualization container analysis_options, analysis_params, run_analysis_btn, analysis_output, ngram_n, topic_count = create_analysis_screen() # Pre-create visualization components (initially hidden) visualization_area_visible = gr.Checkbox(value=False, visible=False, label="Visualization Visible") analysis_title = gr.Markdown("## Analysis Results", visible=False) prompt_title = gr.Markdown(visible=False) models_compared = gr.Markdown(visible=False) # Container for model 1 words model1_title = gr.Markdown(visible=False) model1_words = gr.Markdown(visible=False) # Container for model 2 words model2_title = gr.Markdown(visible=False) model2_words = gr.Markdown(visible=False) # Similarity metrics similarity_metrics_title = gr.Markdown("### Similarity Metrics", visible=False) similarity_metrics = gr.Markdown(visible=False) # Status or error message area status_message_visible = gr.Checkbox(value=False, visible=False, label="Status Message Visible") status_message = gr.Markdown(visible=False) # Define a helper function to extract parameter values and run the analysis def run_analysis(dataset, selected_analysis, ngram_n, topic_count, user_analysis_log, *args): """ Run the analysis with the selected parameters Args: dataset (dict): The dataset state selected_analysis (str): The selected analysis type ngram_n (str or int): N value for n-gram analysis topic_count (str or int): Number of topics for topic modeling user_analysis_log (dict): Log of user analysis results *args: Additional arguments that might be passed by Gradio Returns: tuple: Analysis results and UI component updates """ try: if not dataset or "entries" not in dataset or not dataset["entries"]: return ( {}, # analysis_results_state user_analysis_log, # user_analysis_log (unchanged) False, # analysis_output visibility False, # visualization_area_visible gr.update(visible=False), # analysis_title gr.update(visible=False), # prompt_title gr.update(visible=False), # models_compared gr.update(visible=False), # model1_title gr.update(visible=False), # model1_words gr.update(visible=False), # model2_title gr.update(visible=False), # model2_words gr.update(visible=False), # similarity_metrics_title gr.update(visible=False), # similarity_metrics True, # status_message_visible gr.update(visible=True, value="**Error:** No dataset loaded. Please create or load a dataset first.") # status_message ) parameters = { "bow_top": 25, # Default fixed value for Bag of Words "ngram_n": ngram_n, "ngram_top": 10, # Default fixed value for N-gram analysis "topic_count": topic_count, "bias_methods": ["partisan"] # Default to partisan leaning only } print(f"Running analysis with selected type: {selected_analysis}") print("Parameters:", parameters) # Process the analysis request - passing selected_analysis as a string analysis_results, _ = process_analysis_request(dataset, selected_analysis, parameters) # If there's an error or no results if not analysis_results or "analyses" not in analysis_results or not analysis_results["analyses"]: return ( analysis_results, user_analysis_log, # user_analysis_log (unchanged) False, False, gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), True, gr.update(visible=True, value="**No results found.** Try a different analysis option.") ) # Extract information to display in components prompt = list(analysis_results["analyses"].keys())[0] analyses = analysis_results["analyses"][prompt] # Initialize visualization components visibilities and contents visualization_area_visible = False prompt_title_visible = False prompt_title_value = "" models_compared_visible = False models_compared_value = "" model1_title_visible = False model1_title_value = "" model1_words_visible = False model1_words_value = "" model2_title_visible = False model2_title_value = "" model2_words_visible = False model2_words_value = "" similarity_title_visible = False similarity_metrics_visible = False similarity_metrics_value = "" # Update the user analysis log with the new results updated_log = user_analysis_log.copy() if user_analysis_log else {} # Initialize this prompt in the log if it doesn't exist if prompt not in updated_log: updated_log[prompt] = {} # Store the analysis results in the log if selected_analysis in ["Bag of Words", "N-gram Analysis", "Classifier", "Bias Detection", "Topic Modeling"]: key = selected_analysis.replace(" ", "_").lower() if key in analyses: updated_log[prompt][selected_analysis] = { "timestamp": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "result": analyses[key] } # Check for messages from placeholder analyses if "message" in analyses: return ( analysis_results, updated_log, # Return updated log False, False, gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), True, gr.update(visible=True, value=f"**{analyses['message']}**") # status_message ) # Process based on the selected analysis type if selected_analysis == "Bag of Words" and "bag_of_words" in analyses: visualization_area_visible = True bow_results = analyses["bag_of_words"] models = bow_results.get("models", []) if len(models) >= 2: prompt_title_visible = True prompt_title_value = f"## Analysis of Prompt: \"{prompt[:100]}...\"" models_compared_visible = True models_compared_value = f"### Comparing responses from {models[0]} and {models[1]}" # Extract and format information for display model1_name = models[0] model2_name = models[1] # Format important words for each model important_words = bow_results.get("important_words", {}) if model1_name in important_words: model1_title_visible = True model1_title_value = f"#### Top Words Used by {model1_name}" word_list = [f"**{item['word']}** ({item['count']})" for item in important_words[model1_name][:10]] model1_words_visible = True model1_words_value = ", ".join(word_list) if model2_name in important_words: model2_title_visible = True model2_title_value = f"#### Top Words Used by {model2_name}" word_list = [f"**{item['word']}** ({item['count']})" for item in important_words[model2_name][:10]] model2_words_visible = True model2_words_value = ", ".join(word_list) # Format similarity metrics comparisons = bow_results.get("comparisons", {}) comparison_key = f"{model1_name} vs {model2_name}" if comparison_key in comparisons: metrics = comparisons[comparison_key] cosine = metrics.get("cosine_similarity", 0) jaccard = metrics.get("jaccard_similarity", 0) semantic = metrics.get("semantic_similarity", 0) common_words = metrics.get("common_word_count", 0) similarity_title_visible = True similarity_metrics_visible = True similarity_metrics_value = f""" - **Cosine Similarity**: {cosine:.2f} (higher means more similar word frequency patterns) - **Jaccard Similarity**: {jaccard:.2f} (higher means more word overlap) - **Semantic Similarity**: {semantic:.2f} (higher means more similar meaning) - **Common Words**: {common_words} words appear in both responses """ # Check for N-gram analysis elif selected_analysis == "N-gram Analysis" and "ngram_analysis" in analyses: visualization_area_visible = True ngram_results = analyses["ngram_analysis"] models = ngram_results.get("models", []) ngram_size = ngram_results.get("ngram_size", 2) size_name = "Unigrams" if ngram_size == 1 else f"{ngram_size}-grams" if len(models) >= 2: prompt_title_visible = True prompt_title_value = f"## Analysis of Prompt: \"{prompt[:100]}...\"" models_compared_visible = True models_compared_value = f"### {size_name} Analysis: Comparing responses from {models[0]} and {models[1]}" # Extract and format information for display model1_name = models[0] model2_name = models[1] # Format important n-grams for each model important_ngrams = ngram_results.get("important_ngrams", {}) if model1_name in important_ngrams: model1_title_visible = True model1_title_value = f"#### Top {size_name} Used by {model1_name}" # Create a better formatted list of n-grams ngram_list = [] for item in important_ngrams[model1_name][:10]: ngram_text = item['ngram'] ngram_count = item['count'] ngram_list.append(f"**{ngram_text}** ({ngram_count})") model1_words_visible = True model1_words_value = ", ".join(ngram_list) if model2_name in important_ngrams: model2_title_visible = True model2_title_value = f"#### Top {size_name} Used by {model2_name}" # Create a better formatted list of n-grams ngram_list = [] for item in important_ngrams[model2_name][:10]: ngram_text = item['ngram'] ngram_count = item['count'] ngram_list.append(f"**{ngram_text}** ({ngram_count})") model2_words_visible = True model2_words_value = ", ".join(ngram_list) # Format similarity metrics if available if "comparisons" in ngram_results: comparison_key = f"{model1_name} vs {model2_name}" if comparison_key in ngram_results["comparisons"]: metrics = ngram_results["comparisons"][comparison_key] common_count = metrics.get("common_ngram_count", 0) similarity_title_visible = True similarity_metrics_visible = True similarity_metrics_value = f""" - **Common {size_name}**: {common_count} {size_name.lower()} appear in both responses """ # Create a new function to generate N-gram visualizations def generate_ngram_visualization(important_ngrams, model1_name, model2_name): plt.figure(figsize=(12, 6)) # Process data for model 1 model1_data = {} if model1_name in important_ngrams: for item in important_ngrams[model1_name][:10]: model1_data[item['ngram']] = item['count'] # Process data for model 2 model2_data = {} if model2_name in important_ngrams: for item in important_ngrams[model2_name][:10]: model2_data[item['ngram']] = item['count'] # Plot for the first model plt.subplot(1, 2, 1) sorted_data1 = sorted(model1_data.items(), key=lambda x: x[1], reverse=True)[:10] terms1, counts1 = zip(*sorted_data1) if sorted_data1 else ([], []) # Create horizontal bar chart plt.barh([t[:20] + '...' if len(t) > 20 else t for t in terms1[::-1]], counts1[::-1]) plt.xlabel('Frequency') plt.title(f'Top {size_name} Used by {model1_name}') plt.tight_layout() # Plot for the second model plt.subplot(1, 2, 2) sorted_data2 = sorted(model2_data.items(), key=lambda x: x[1], reverse=True)[:10] terms2, counts2 = zip(*sorted_data2) if sorted_data2 else ([], []) # Create horizontal bar chart plt.barh([t[:20] + '...' if len(t) > 20 else t for t in terms2[::-1]], counts2[::-1]) plt.xlabel('Frequency') plt.title(f'Top {size_name} Used by {model2_name}') plt.tight_layout() # Save the plot to a bytes buffer buf = io.BytesIO() plt.savefig(buf, format='png', dpi=100) buf.seek(0) # Convert to PIL Image image = Image.open(buf) return image # Create the visualization try: viz_image = generate_ngram_visualization(important_ngrams, model1_name, model2_name) # Convert the image to a base64 string for embedding buffered = io.BytesIO() viz_image.save(buffered, format="PNG") img_str = base64.b64encode(buffered.getvalue()).decode() # Append the image to the metrics_value similarity_metrics_value += f"""
N-gram visualization
""" similarity_metrics_visible = True except Exception as viz_error: print(f"Visualization error: {viz_error}") # Handle the error gracefully - continue without the visualization # Check for Topic Modeling analysis elif selected_analysis == "Topic Modeling" and "topic_modeling" in analyses: visualization_area_visible = True topic_results = analyses["topic_modeling"] models = topic_results.get("models", []) method = topic_results.get("method", "lda").upper() n_topics = topic_results.get("n_topics", 3) if len(models) >= 2: prompt_title_visible = True prompt_title_value = f"## Analysis of Prompt: \"{prompt[:100]}...\"" models_compared_visible = True models_compared_value = f"### Topic Modeling Analysis ({method}, {n_topics} topics)" # Extract and format topic information topics = topic_results.get("topics", []) if topics: # Format topic info for display topic_info = [] for topic in topics[:3]: # Show first 3 topics topic_id = topic.get("id", 0) words = topic.get("words", [])[:5] # Top 5 words per topic if words: topic_info.append(f"**Topic {topic_id+1}**: {', '.join(words)}") if topic_info: model1_title_visible = True model1_title_value = "#### Discovered Topics" model1_words_visible = True model1_words_value = "\n".join(topic_info) # Get topic distributions for models model_topics = topic_results.get("model_topics", {}) if model_topics: model1_name = models[0] model2_name = models[1] # Format topic distribution info if model1_name in model_topics and model2_name in model_topics: model2_title_visible = True model2_title_value = "#### Topic Distribution" model2_words_visible = True # Simple distribution display dist1 = model_topics[model1_name] dist2 = model_topics[model2_name] model2_words_value = f""" **{model1_name}**: {', '.join([f"Topic {i+1}: {v:.2f}" for i, v in enumerate(dist1[:3])])} **{model2_name}**: {', '.join([f"Topic {i+1}: {v:.2f}" for i, v in enumerate(dist2[:3])])} """ # Add similarity metrics if available comparisons = topic_results.get("comparisons", {}) if comparisons: comparison_key = f"{model1_name} vs {model2_name}" if comparison_key in comparisons: metrics = comparisons[comparison_key] js_div = metrics.get("js_divergence", 0) similarity_title_visible = True similarity_metrics_visible = True similarity_metrics_value = f""" - **Topic Distribution Divergence**: {js_div:.4f} (lower means more similar topic distributions) """ # Check for Classifier analysis elif selected_analysis == "Classifier" and "classifier" in analyses: visualization_area_visible = True classifier_results = analyses["classifier"] models = classifier_results.get("models", []) if len(models) >= 2: prompt_title_visible = True prompt_title_value = f"## Analysis of Prompt: \"{prompt[:100]}...\"" models_compared_visible = True models_compared_value = f"### Classifier Analysis for {models[0]} and {models[1]}" # Extract and format classifier information model1_name = models[0] model2_name = models[1] # Display classifications for each model classifications = classifier_results.get("classifications", {}) if classifications: model1_title_visible = True model1_title_value = f"#### Classification Results" model1_words_visible = True model1_results = classifications.get(model1_name, {}) model2_results = classifications.get(model2_name, {}) model1_words_value = f""" **{model1_name}**: - Formality: {model1_results.get('formality', 'N/A')} - Sentiment: {model1_results.get('sentiment', 'N/A')} - Complexity: {model1_results.get('complexity', 'N/A')} **{model2_name}**: - Formality: {model2_results.get('formality', 'N/A')} - Sentiment: {model2_results.get('sentiment', 'N/A')} - Complexity: {model2_results.get('complexity', 'N/A')} """ # Show comparison model2_title_visible = True model2_title_value = f"#### Classification Comparison" model2_words_visible = True differences = classifier_results.get("differences", {}) model2_words_value = "\n".join([ f"- **{category}**: {diff}" for category, diff in differences.items() ]) # Create visualization using matplotlib try: # Define metrics and mappings metrics = ['Formality', 'Sentiment', 'Complexity'] mapping = { 'Formality': {'Informal': 1, 'Neutral': 2, 'Formal': 3}, 'Sentiment': {'Negative': 1, 'Neutral': 2, 'Positive': 3}, 'Complexity': {'Simple': 1, 'Average': 2, 'Complex': 3} } # Get values for each model model1_vals = [] model2_vals = [] # Get formality value for model1 formality1 = model1_results.get('formality', 'Neutral') if formality1 in mapping['Formality']: model1_vals.append(mapping['Formality'][formality1]) else: model1_vals.append(2) # Default to neutral # Get sentiment value for model1 sentiment1 = model1_results.get('sentiment', 'Neutral') if sentiment1 in mapping['Sentiment']: model1_vals.append(mapping['Sentiment'][sentiment1]) else: model1_vals.append(2) # Default to neutral # Get complexity value for model1 complexity1 = model1_results.get('complexity', 'Average') if complexity1 in mapping['Complexity']: model1_vals.append(mapping['Complexity'][complexity1]) else: model1_vals.append(2) # Default to average # Get formality value for model2 formality2 = model2_results.get('formality', 'Neutral') if formality2 in mapping['Formality']: model2_vals.append(mapping['Formality'][formality2]) else: model2_vals.append(2) # Default to neutral # Get sentiment value for model2 sentiment2 = model2_results.get('sentiment', 'Neutral') if sentiment2 in mapping['Sentiment']: model2_vals.append(mapping['Sentiment'][sentiment2]) else: model2_vals.append(2) # Default to neutral # Get complexity value for model2 complexity2 = model2_results.get('complexity', 'Average') if complexity2 in mapping['Complexity']: model2_vals.append(mapping['Complexity'][complexity2]) else: model2_vals.append(2) # Default to average # Plot grouped bar chart plt.figure(figsize=(10, 6)) x = range(len(metrics)) width = 0.35 plt.bar([p - width/2 for p in x], model1_vals, width=width, label=model1_name) plt.bar([p + width/2 for p in x], model2_vals, width=width, label=model2_name) plt.xticks(x, metrics) plt.yticks([1, 2, 3], ['Low', 'Medium', 'High']) plt.ylim(0, 3.5) plt.ylabel('Level') plt.title('Comparison of Model Characteristics') plt.legend() plt.tight_layout() # Save the plot to a bytes buffer buf = io.BytesIO() plt.savefig(buf, format='png', dpi=100) buf.seek(0) # Convert to PIL Image viz_image = Image.open(buf) # Convert the image to a base64 string for embedding buffered = io.BytesIO() viz_image.save(buffered, format="PNG") img_str = base64.b64encode(buffered.getvalue()).decode() # Append the image to the metrics_value similarity_title_visible = True similarity_metrics_visible = True similarity_metrics_value = f"""
Classifier visualization
""" except Exception as viz_error: print(f"Classifier visualization error: {viz_error}") # Check for Bias Detection analysis elif selected_analysis == "Bias Detection" and "bias_detection" in analyses: visualization_area_visible = True bias_results = analyses["bias_detection"] models = bias_results.get("models", []) if len(models) >= 2: prompt_title_visible = True prompt_title_value = f"## Analysis of Prompt: \"{prompt[:100]}...\"" models_compared_visible = True models_compared_value = f"### Bias Analysis: Comparing responses from {models[0]} and {models[1]}" # Display comparative bias results model1_name = models[0] model2_name = models[1] if "comparative" in bias_results: comparative = bias_results["comparative"] # Format summary for display model1_title_visible = True model1_title_value = "#### Bias Detection Summary" model1_words_visible = True summary_parts = [] # Add partisan comparison (focus on partisan leaning) if "partisan" in comparative: part = comparative["partisan"] is_significant = part.get("significant", False) summary_parts.append( f"**Partisan Leaning**: {model1_name} appears {part.get(model1_name, 'N/A')}, " + f"while {model2_name} appears {part.get(model2_name, 'N/A')}. " + f"({'Significant' if is_significant else 'Minor'} difference)" ) # Add overall assessment if "overall" in comparative: overall = comparative["overall"] significant = overall.get("significant_bias_difference", False) summary_parts.append( f"**Overall Assessment**: " + f"Analysis shows a {overall.get('difference', 0):.2f}/1.0 difference in bias patterns. " + f"({'Significant' if significant else 'Minor'} overall bias difference)" ) # Combine all parts model1_words_value = "\n\n".join(summary_parts) # Format detailed term analysis if (model1_name in bias_results and "partisan" in bias_results[model1_name] and model2_name in bias_results and "partisan" in bias_results[model2_name]): model2_title_visible = True model2_title_value = "#### Partisan Term Analysis" model2_words_visible = True m1_lib = bias_results[model1_name]["partisan"].get("liberal_terms", []) m1_con = bias_results[model1_name]["partisan"].get("conservative_terms", []) m2_lib = bias_results[model2_name]["partisan"].get("liberal_terms", []) m2_con = bias_results[model2_name]["partisan"].get("conservative_terms", []) model2_words_value = f""" **{model1_name}**: - Liberal terms: {', '.join(m1_lib) if m1_lib else 'None detected'} - Conservative terms: {', '.join(m1_con) if m1_con else 'None detected'} **{model2_name}**: - Liberal terms: {', '.join(m2_lib) if m2_lib else 'None detected'} - Conservative terms: {', '.join(m2_con) if m2_con else 'None detected'} """ # If we don't have visualization data from any analysis if not visualization_area_visible: return ( analysis_results, updated_log, # Return updated log False, False, gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), True, # status_message_visible gr.update(visible=True, value="**No visualization data found.** Make sure to select a valid analysis option.") ) # Return all updated component values return ( analysis_results, # analysis_results_state updated_log, # user_analysis_log (updated with new results) False, # analysis_output visibility True, # visualization_area_visible gr.update(visible=True), # analysis_title gr.update(visible=prompt_title_visible, value=prompt_title_value), # prompt_title gr.update(visible=models_compared_visible, value=models_compared_value), # models_compared gr.update(visible=model1_title_visible, value=model1_title_value), # model1_title gr.update(visible=model1_words_visible, value=model1_words_value), # model1_words gr.update(visible=model2_title_visible, value=model2_title_value), # model2_title gr.update(visible=model2_words_visible, value=model2_words_value), # model2_words gr.update(visible=similarity_title_visible), # similarity_metrics_title gr.update(visible=similarity_metrics_visible, value=similarity_metrics_value), # similarity_metrics False, # status_message_visible gr.update(visible=False) # status_message ) except Exception as e: import traceback error_msg = f"Error in analysis: {str(e)}\n{traceback.format_exc()}" print(error_msg) return ( {"error": error_msg}, # analysis_results_state user_analysis_log, # Return unchanged log True, # analysis_output visibility (show raw JSON for debugging) False, # visualization_area_visible gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), True, # status_message_visible gr.update(visible=True, value=f"**Error during analysis:**\n\n```\n{str(e)}\n```") # status_message ) # RoBERTa Sentiment Analysis Tab with gr.Tab("RoBERTa Sentiment"): # Create the RoBERTa analysis UI components run_roberta_btn, roberta_output, visualization_container, roberta_status = create_roberta_screen() # Create a container for visualization results with gr.Column() as roberta_viz_container: # create placeholder components to update roberta_viz_title = gr.Markdown("## RoBERTa Sentiment Analysis Results", visible=False) roberta_viz_content = gr.HTML("", visible=False) # Function to run RoBERTa sentiment analysis def run_roberta_analysis(dataset, existing_log): try: print("Starting run_roberta_analysis function") if not dataset or "entries" not in dataset or not dataset["entries"]: return ( {}, # roberta_results_state existing_log, # no change to user_analysis_log gr.update(visible=True, value="**Error:** No dataset loaded. Please create or load a dataset first."), # roberta_status gr.update(visible=False), # roberta_output gr.update(visible=False), # roberta_viz_title gr.update(visible=False) # roberta_viz_content ) print(f"Running RoBERTa sentiment analysis with sentence-level, style=") # Process the analysis request roberta_results = process_roberta_request(dataset) print(f"RoBERTa results obtained. Size: {len(str(roberta_results))} characters") # NEW: Update the user analysis log with RoBERTa results updated_log = existing_log.copy() if existing_log else {} # Get the prompt text prompt_text = None if "analyses" in roberta_results: prompt_text = list(roberta_results["analyses"].keys())[0] if roberta_results["analyses"] else None if prompt_text: # Initialize this prompt in the log if it doesn't exist if prompt_text not in updated_log: updated_log[prompt_text] = {} # Store the RoBERTa results if "analyses" in roberta_results and prompt_text in roberta_results["analyses"]: if "roberta_sentiment" in roberta_results["analyses"][prompt_text]: updated_log[prompt_text]["RoBERTa Sentiment"] = { "timestamp": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "result": roberta_results["analyses"][prompt_text]["roberta_sentiment"] } # Check if we have results if "error" in roberta_results: return ( roberta_results, # Store in state anyway for debugging updated_log, # Return updated log gr.update(visible=True, value=f"**Error:** {roberta_results['error']}"), # roberta_status gr.update(visible=False), # Hide raw output gr.update(visible=False), # roberta_viz_title gr.update(visible=False) # roberta_viz_content ) print("About to process visualization components") viz_components = process_and_visualize_sentiment_analysis(roberta_results) print(f"Visualization components generated: {len(viz_components)}") print("Starting HTML conversion of visualization components") # Convert the visualization components to HTML - OPTIMIZED VERSION print("Starting HTML conversion of visualization components") html_content = "
" html_content += "

Sentiment Analysis Results

" if "analyses" in roberta_results: for prompt, analyses in roberta_results["analyses"].items(): if "roberta_sentiment" in analyses: sentiment_result = analyses["roberta_sentiment"] models = sentiment_result.get("models", []) if len(models) >= 2: # Add overall comparison if "comparison" in sentiment_result: comparison = sentiment_result["comparison"] html_content += f"
" html_content += f"

{comparison.get('difference_direction', 'Models have different sentiment patterns')}

" html_content += f"
" # Add individual model results sentiment_analysis = sentiment_result.get("sentiment_analysis", {}) for model in models: if model in sentiment_analysis: model_result = sentiment_analysis[model] score = model_result.get("sentiment_score", 0) label = model_result.get("label", "neutral") html_content += f"
" html_content += f"

{model}

" html_content += f"

Sentiment: {label} (Score: {score:.2f})

" html_content += f"
" html_content += "
" print("HTML conversion completed") # Return updated values return ( roberta_results, # roberta_results_state updated_log, # Return updated log gr.update(visible=False), # roberta_status (hide status message) gr.update(visible=False), # roberta_output (hide raw output) gr.update(visible=True), # roberta_viz_title (show title) gr.update(visible=True, value=html_content) # roberta_viz_content (show content) ) except Exception as e: import traceback error_msg = f"Error in RoBERTa analysis: {str(e)}\n{traceback.format_exc()}" print(error_msg) return ( {"error": error_msg}, # roberta_results_state existing_log, # Return unchanged log gr.update(visible=True, value=f"**Error during RoBERTa analysis:**\n\n```\n{str(e)}\n```"), # roberta_status gr.update(visible=False), # Hide raw output gr.update(visible=False), # roberta_viz_title gr.update(visible=False) # roberta_viz_content ) # Connect the run button to the analysis function run_roberta_btn.click( fn=run_roberta_analysis, inputs=[dataset_state, user_analysis_log], outputs=[ roberta_results_state, user_analysis_log, roberta_status, roberta_output, roberta_viz_title, roberta_viz_content ] ) # Add a Summary tab with gr.Tab("Summary"): gr.Markdown("## Analysis Summaries") with gr.Row(): with gr.Column(scale=1): # Get summary files from dataset directory summary_files = [f for f in os.listdir("dataset") if f.startswith("summary-") and f.endswith(".txt")] # Dropdown for selecting summary file summary_dropdown = gr.Dropdown( choices=["YOUR DATASET RESULTS"] + summary_files, label="Select Summary", info="Choose a summary to display", value="YOUR DATASET RESULTS" ) load_summary_btn = gr.Button("Load Summary", variant="primary") summary_assistant_prompt = gr.Textbox( value="Attached are the results from various NLP based comparisons between two LLM responses on the same prompt. Give your interpretation of the results.", label="Analysis Assistant Prompt", lines=3, interactive=True, ) with gr.Column(scale=3): summary_content = gr.Textbox( label="Summary Content", lines=25, max_lines=50, interactive=False ) summary_status = gr.Markdown("*No summary loaded*") # Function to load summary content from file or user analysis def load_summary_content(file_name, user_log): if not file_name: return "", "*No summary selected*" # Handle the special "YOUR DATASET RESULTS" option if file_name == "YOUR DATASET RESULTS": if not user_log or not any(user_log.values()): return "", "**No analysis results available.** Run some analyses in the Analysis tab first." # Format the user analysis log as text content = "# YOUR DATASET ANALYSIS RESULTS\n\n" for prompt, analyses in user_log.items(): content += f"## Analysis of Prompt: \"{prompt[:100]}{'...' if len(prompt) > 100 else ''}\"\n\n" if not analyses: content += "_No analyses run for this prompt._\n\n" continue # Order the analyses in a specific sequence analysis_order = ["Bag of Words", "N-gram Analysis", "Classifier", "Bias Detection", "RoBERTa Sentiment"] for analysis_type in analysis_order: if analysis_type in analyses: analysis_data = analyses[analysis_type] timestamp = analysis_data.get("timestamp", "") result = analysis_data.get("result", {}) content += f"### {analysis_type} ({timestamp})\n\n" # Format based on analysis type if analysis_type == "Bag of Words": models = result.get("models", []) if len(models) >= 2: content += f"Comparing responses from {models[0]} and {models[1]}\n\n" # Add important words for each model important_words = result.get("important_words", {}) for model_name in models: if model_name in important_words: content += f"Top Words Used by {model_name}\n" word_list = [f"{item['word']} ({item['count']})" for item in important_words[model_name][:10]] content += ", ".join(word_list) + "\n\n" # Add similarity metrics comparisons = result.get("comparisons", {}) comparison_key = f"{models[0]} vs {models[1]}" if comparison_key in comparisons: metrics = comparisons[comparison_key] content += "Similarity Metrics\n" content += f"Cosine Similarity: {metrics.get('cosine_similarity', 0):.2f} (higher means more similar word frequency patterns)\n" content += f"Jaccard Similarity: {metrics.get('jaccard_similarity', 0):.2f} (higher means more word overlap)\n" content += f"Semantic Similarity: {metrics.get('semantic_similarity', 0):.2f} (higher means more similar meaning)\n" content += f"Common Words: {metrics.get('common_word_count', 0)} words appear in both responses\n\n" elif analysis_type == "N-gram Analysis": models = result.get("models", []) ngram_size = result.get("ngram_size", 2) size_name = "Unigrams" if ngram_size == 1 else f"{ngram_size}-grams" if len(models) >= 2: content += f"{size_name} Analysis: Comparing responses from {models[0]} and {models[1]}\n\n" # Add important n-grams for each model important_ngrams = result.get("important_ngrams", {}) for model_name in models: if model_name in important_ngrams: content += f"Top {size_name} Used by {model_name}\n" ngram_list = [f"{item['ngram']} ({item['count']})" for item in important_ngrams[model_name][:10]] content += ", ".join(ngram_list) + "\n\n" # Add similarity metrics if "comparisons" in result: comparison_key = f"{models[0]} vs {models[1]}" if comparison_key in result["comparisons"]: metrics = result["comparisons"][comparison_key] content += "Similarity Metrics\n" content += f"Common {size_name}: {metrics.get('common_ngram_count', 0)} {size_name.lower()} appear in both responses\n\n" elif analysis_type == "Classifier": models = result.get("models", []) if len(models) >= 2: content += f"Classifier Analysis for {models[0]} and {models[1]}\n\n" # Add classification results classifications = result.get("classifications", {}) if classifications: content += "Classification Results\n" for model_name in models: if model_name in classifications: model_results = classifications[model_name] content += f"{model_name}:\n" content += f"- Formality: {model_results.get('formality', 'N/A')}\n" content += f"- Sentiment: {model_results.get('sentiment', 'N/A')}\n" content += f"- Complexity: {model_results.get('complexity', 'N/A')}\n\n" # Add differences differences = result.get("differences", {}) if differences: content += "Classification Comparison\n" for category, diff in differences.items(): content += f"- {category}: {diff}\n" content += "\n" elif analysis_type == "Bias Detection": models = result.get("models", []) if len(models) >= 2: content += f"Bias Analysis: Comparing responses from {models[0]} and {models[1]}\n\n" # Add comparative results if "comparative" in result: comparative = result["comparative"] content += "Bias Detection Summary\n" if "partisan" in comparative: part = comparative["partisan"] is_significant = part.get("significant", False) content += f"Partisan Leaning: {models[0]} appears {part.get(models[0], 'N/A')}, " content += f"while {models[1]} appears {part.get(models[1], 'N/A')}. " content += f"({'Significant' if is_significant else 'Minor'} difference)\n\n" if "overall" in comparative: overall = comparative["overall"] significant = overall.get("significant_bias_difference", False) content += f"Overall Assessment: " content += f"Analysis shows a {overall.get('difference', 0):.2f}/1.0 difference in bias patterns. " content += f"({'Significant' if significant else 'Minor'} overall bias difference)\n\n" # Add partisan terms content += "Partisan Term Analysis\n" for model_name in models: if model_name in result and "partisan" in result[model_name]: partisan = result[model_name]["partisan"] content += f"{model_name}:\n" lib_terms = partisan.get("liberal_terms", []) con_terms = partisan.get("conservative_terms", []) content += f"- Liberal terms: {', '.join(lib_terms) if lib_terms else 'None detected'}\n" content += f"- Conservative terms: {', '.join(con_terms) if con_terms else 'None detected'}\n\n" elif analysis_type == "RoBERTa Sentiment": models = result.get("models", []) if len(models) >= 2: content += "Sentiment Analysis Results\n" # Add comparison info if "comparison" in result: comparison = result["comparison"] if "difference_direction" in comparison: content += f"{comparison['difference_direction']}\n\n" # Add individual model results sentiment_analysis = result.get("sentiment_analysis", {}) for model_name in models: if model_name in sentiment_analysis: model_result = sentiment_analysis[model_name] score = model_result.get("sentiment_score", 0) label = model_result.get("label", "neutral") content += f"{model_name}\n" content += f"Sentiment: {label} (Score: {score:.2f})\n\n" return content, f"**Loaded user analysis results**" # Regular file loading for built-in summaries file_path = os.path.join("dataset", file_name) if os.path.exists(file_path): try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() return content, f"**Loaded summary**: {file_name}" except Exception as e: return "", f"**Error loading summary**: {str(e)}" else: return "", f"**File not found**: {file_path}" def update_summary_dropdown(user_log): """Update summary dropdown options based on user log state""" choices = ["YOUR DATASET RESULTS"] choices.extend([f for f in os.listdir("dataset") if f.startswith("summary-") and f.endswith(".txt")]) return gr.update(choices=choices, value="YOUR DATASET RESULTS") # Connect the load button to the function load_summary_btn.click( fn=load_summary_content, inputs=[summary_dropdown, user_analysis_log], outputs=[summary_content, summary_status] ) # Also load summary when dropdown changes summary_dropdown.change( fn=load_summary_content, inputs=[summary_dropdown, user_analysis_log], outputs=[summary_content, summary_status] ) # Add a Visuals tab for plotting graphs with gr.Tab("Visuals"): gr.Markdown("## Visualization Graphs") with gr.Row(): with gr.Column(scale=1): # Dropdown for selecting visualization type viz_type = gr.Dropdown( choices=["N-gram Comparison", "Word Frequency", "Sentiment Analysis"], label="Visualization Type", info="Select the type of visualization to display", value="N-gram Comparison" ) # Button to generate visualization generate_viz_btn = gr.Button("Generate Visualization", variant="primary") with gr.Column(scale=3): # Image component to display the plot viz_output = gr.Image( label="Visualization", type="pil", height=500 ) viz_status = gr.Markdown("*No visualization generated*") # Function to generate and display visualizations def generate_visualization(viz_type, dataset, analysis_results): try: if not dataset or "entries" not in dataset or not dataset["entries"]: return None, "❌ **Error:** No dataset loaded. Please create or load a dataset first." # Example data (fallback when no real data is available) ex_data = { 'attorney general': 3, 'social justice': 3, 'centrist approach': 2, 'climate change': 2, 'criminal justice': 2, 'gun control': 2, 'human rights': 2, 'justice issues': 2, 'measures like': 2, 'middle class': 2 } gran_data = { 'political views': 3, 'vice president': 3, 'criminal justice': 2, 'democratic party': 2, 'foreign policy': 2, 'harris advocated': 2, 'lgbtq rights': 2, 'president harris': 2, 'social issues': 2, '2019 proposed': 1 } # Use real data if available in analysis_results model1_data = {} model2_data = {} model1_name = "Model 1" model2_name = "Model 2" # Extract actual model names from dataset if dataset and "entries" in dataset and len(dataset["entries"]) >= 2: model1_name = dataset["entries"][0].get("model", "Model 1") model2_name = dataset["entries"][1].get("model", "Model 2") # Try to get real data from analysis_results if analysis_results and "analyses" in analysis_results: for prompt, analyses in analysis_results["analyses"].items(): if viz_type == "N-gram Comparison" and "ngram_analysis" in analyses: ngram_results = analyses["ngram_analysis"] important_ngrams = ngram_results.get("important_ngrams", {}) if model1_name in important_ngrams: model1_data = {item["ngram"]: item["count"] for item in important_ngrams[model1_name]} if model2_name in important_ngrams: model2_data = {item["ngram"]: item["count"] for item in important_ngrams[model2_name]} elif viz_type == "Word Frequency" and "bag_of_words" in analyses: bow_results = analyses["bag_of_words"] important_words = bow_results.get("important_words", {}) if model1_name in important_words: model1_data = {item["word"]: item["count"] for item in important_words[model1_name]} if model2_name in important_words: model2_data = {item["word"]: item["count"] for item in important_words[model2_name]} # If we couldn't get real data, use example data if not model1_data: model1_data = ex_data if not model2_data: model2_data = gran_data # Create the visualization plt.figure(figsize=(10, 6)) if viz_type == "N-gram Comparison" or viz_type == "Word Frequency": # Plot for the first model plt.subplot(1, 2, 1) sorted_data1 = sorted(model1_data.items(), key=lambda x: x[1], reverse=True)[:10] # Top 10 terms1, counts1 = zip(*sorted_data1) if sorted_data1 else ([], []) # Create horizontal bar chart plt.barh([t[:20] + '...' if len(t) > 20 else t for t in terms1[::-1]], counts1[::-1]) plt.xlabel('Frequency') plt.title(f'Harris, Top {viz_type.split()[0]}s Used by {model1_name}') plt.tight_layout() # Plot for the second model plt.subplot(1, 2, 2) sorted_data2 = sorted(model2_data.items(), key=lambda x: x[1], reverse=True)[:10] # Top 10 terms2, counts2 = zip(*sorted_data2) if sorted_data2 else ([], []) # Create horizontal bar chart plt.barh([t[:20] + '...' if len(t) > 20 else t for t in terms2[::-1]], counts2[::-1]) plt.xlabel('Frequency') plt.title(f'Harris, Top {viz_type.split()[0]}s Used by {model2_name}') plt.tight_layout() elif viz_type == "Sentiment Analysis": # Generate sentiment comparison visualization # This would be populated with real data when available sentiment_scores = { model1_name: 0.75, # Example score model2_name: 0.25 # Example score } # Extract real sentiment scores if available if "roberta_results_state" in analysis_results: roberta_results = analysis_results["roberta_results_state"] if "analyses" in roberta_results: for prompt, analyses in roberta_results["analyses"].items(): if "roberta_sentiment" in analyses: sentiment_result = analyses["roberta_sentiment"] sentiment_analysis = sentiment_result.get("sentiment_analysis", {}) if model1_name in sentiment_analysis: sentiment_scores[model1_name] = sentiment_analysis[model1_name].get("sentiment_score", 0) if model2_name in sentiment_analysis: sentiment_scores[model2_name] = sentiment_analysis[model2_name].get("sentiment_score", 0) # Create sentiment bar chart plt.bar(list(sentiment_scores.keys()), list(sentiment_scores.values())) plt.ylim(-1, 1) plt.ylabel('Harris Sentiment Score (-1 to 1)') plt.title('Harris Sentiment Analysis Comparison') plt.axhline(y=0, color='r', linestyle='-', alpha=0.3) # Add a zero line # Save the plot to a bytes buffer buf = io.BytesIO() plt.savefig(buf, format='png') buf.seek(0) # Convert plot to PIL Image from PIL import Image image = Image.open(buf) return image, f"**Generated {viz_type} visualization**" except Exception as e: import traceback error_msg = f"Error generating visualization: {str(e)}\n{traceback.format_exc()}" print(error_msg) return None, f"**Error:** {str(e)}" # Connect the generate button to the function generate_viz_btn.click( fn=generate_visualization, inputs=[viz_type, dataset_state, analysis_results_state], outputs=[viz_output, viz_status] ) # Run analysis with proper parameters run_analysis_btn.click( fn=run_analysis, inputs=[dataset_state, analysis_options, ngram_n, topic_count, user_analysis_log], outputs=[ analysis_results_state, user_analysis_log, analysis_output, visualization_area_visible, analysis_title, prompt_title, models_compared, model1_title, model1_words, model2_title, model2_words, similarity_metrics_title, similarity_metrics, status_message_visible, status_message ] ) ''' app.load( fn=lambda log: ( update_summary_dropdown(log), load_summary_content("YOUR DATASET RESULTS", log) ), inputs=[user_analysis_log], outputs=[summary_dropdown, summary_content, summary_status] ) ''' return app if __name__ == "__main__": # Download required NLTK resources before launching the app download_nltk_resources() app = create_app() app.launch()