import gradio as gr import json from visualization.bow_visualizer import process_and_visualize_analysis # Import analysis modules from processors.topic_modeling import compare_topics from processors.ngram_analysis import compare_ngrams from processors.bow_analysis import compare_bow from processors.text_classifiers import classify_formality, classify_sentiment, classify_complexity, compare_classifications from processors.bias_detection import compare_bias import logging # Set up logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger('analysis_screen') def create_analysis_screen(): """ Create the analysis options screen Returns: tuple: (analysis_options, analysis_params, run_analysis_btn, analysis_output, ngram_n, topic_count) """ with gr.Column() as analysis_screen: gr.Markdown("## Analysis Options") gr.Markdown("Select which analysis you want to run on the LLM responses.") # Change from CheckboxGroup to Radio for analysis selection with gr.Group(): analysis_options = gr.Radio( choices=[ "Bag of Words", "N-gram Analysis", "Topic Modeling", "Bias Detection", "Classifier" # Removed "LLM Analysis" as requested ], value="Bag of Words", # Default selection label="Select Analysis Type" ) # Create N-gram parameters accessible at top level ngram_n = gr.Radio( choices=["1", "2", "3"], value="2", label="N-gram Size", visible=False ) # Removed ngram_top slider # Create topic modeling parameter accessible at top level topic_count = gr.Slider( minimum=2, maximum=10, value=3, step=1, label="Number of Topics", visible=False ) # Parameters for each analysis type with gr.Group() as analysis_params: # Topic modeling parameters with gr.Group(visible=False) as topic_params: gr.Markdown("### Topic Modeling Parameters") # We'll use the topic_count defined above # N-gram parameters group (using external ngram_n, removed ngram_top) with gr.Group(visible=False) as ngram_params: gr.Markdown("### N-gram Parameters") # We're already using ngram_n defined above # Bias detection parameters - simplified with no checkboxes with gr.Group(visible=False) as bias_params: gr.Markdown("### Bias Detection Parameters") gr.Markdown("Using partisan leaning bias detection and sentiment analysis") gr.Markdown("This analysis detects sentiment bias, partisan leaning, and issue framing patterns.") # Classifier parameters with gr.Group(visible=False) as classifier_params: gr.Markdown("### Classifier Parameters") gr.Markdown("Classifies responses based on formality, sentiment, and complexity") # Function to update parameter visibility based on selected analysis def update_params_visibility(selected): return { topic_params: gr.update(visible=selected == "Topic Modeling"), ngram_params: gr.update(visible=selected == "N-gram Analysis"), bias_params: gr.update(visible=selected == "Bias Detection"), classifier_params: gr.update(visible=selected == "Classifier"), ngram_n: gr.update(visible=selected == "N-gram Analysis"), topic_count: gr.update(visible=selected == "Topic Modeling"), } # Set up event handler for analysis selection analysis_options.change( fn=update_params_visibility, inputs=[analysis_options], outputs=[ topic_params, ngram_params, bias_params, classifier_params, ngram_n, topic_count, ] ) # Run analysis button run_analysis_btn = gr.Button("Run Analysis", variant="primary", size="large") # Analysis output area - hidden JSON component to store raw results analysis_output = gr.JSON(label="Analysis Results", visible=False) # Return the components needed by app.py, with ngram_top removed return analysis_options, analysis_params, run_analysis_btn, analysis_output, ngram_n, topic_count # Add the implementation of these helper functions def extract_important_words(text, top_n=20): """ Extract the most important words from a text. Args: text (str): Input text top_n (int): Number of top words to return Returns: list: List of important words with their counts """ # Import necessary modules from collections import Counter import re import nltk from nltk.corpus import stopwords from nltk.tokenize import word_tokenize # Make sure nltk resources are available try: stop_words = set(stopwords.words('english')) except: nltk.download('stopwords') stop_words = set(stopwords.words('english')) try: tokens = word_tokenize(text.lower()) except: nltk.download('punkt') tokens = word_tokenize(text.lower()) # Remove stopwords and non-alphabetic tokens filtered_tokens = [word for word in tokens if word.isalpha() and word not in stop_words and len(word) > 2] # Count word frequencies word_counts = Counter(filtered_tokens) # Get the top N words top_words = word_counts.most_common(top_n) # Format the result result = [{"word": word, "count": count} for word, count in top_words] return result def calculate_text_similarity(text1, text2): """ Calculate similarity metrics between two texts. Args: text1 (str): First text text2 (str): Second text Returns: dict: Similarity metrics """ from processors.metrics import calculate_similarity # Calculate similarity using the metrics module metrics = calculate_similarity(text1, text2) # Add common word count from collections import Counter import nltk from nltk.corpus import stopwords # Make sure nltk resources are available try: stop_words = set(stopwords.words('english')) except: nltk.download('stopwords') stop_words = set(stopwords.words('english')) # Simple tokenization and filtering words1 = set([w.lower() for w in nltk.word_tokenize(text1) if w.isalpha() and w.lower() not in stop_words]) words2 = set([w.lower() for w in nltk.word_tokenize(text2) if w.isalpha() and w.lower() not in stop_words]) # Calculate common words common_words = words1.intersection(words2) # Add to metrics metrics["common_word_count"] = len(common_words) return metrics def extract_ngrams(text, n=2, top_n=10): """ Extract the most common n-grams from text. Args: text (str): Input text n (int or str): Size of n-grams top_n (int): Number of top n-grams to return Returns: list: List of important n-grams with their counts """ import nltk from nltk.util import ngrams from collections import Counter # Convert n to int if it's a string if isinstance(n, str): n = int(n) # Make sure nltk resources are available try: tokens = nltk.word_tokenize(text.lower()) except: nltk.download('punkt') tokens = nltk.word_tokenize(text.lower()) # Generate n-grams n_grams = list(ngrams(tokens, n)) # Convert n-grams to strings for easier handling n_gram_strings = [' '.join(gram) for gram in n_grams] # Count n-gram frequencies n_gram_counts = Counter(n_gram_strings) # Get the top N n-grams top_n_grams = n_gram_counts.most_common(top_n) # Format the result result = [{"ngram": ngram, "count": count} for ngram, count in top_n_grams] return result def compare_ngrams(text1, text2, n=2): """ Compare n-grams between two texts. Args: text1 (str or list): First text text2 (str or list): Second text n (int or str): Size of n-grams Returns: dict: Comparison metrics """ import nltk from nltk.util import ngrams from collections import Counter # Convert n to int if it's a string if isinstance(n, str): n = int(n) # Handle list inputs by converting to strings if isinstance(text1, list): text1 = ' '.join(str(item) for item in text1) if isinstance(text2, list): text2 = ' '.join(str(item) for item in text2) # Make sure nltk resources are available try: tokens1 = nltk.word_tokenize(text1.lower()) tokens2 = nltk.word_tokenize(text2.lower()) except: nltk.download('punkt') tokens1 = nltk.word_tokenize(text1.lower()) tokens2 = nltk.word_tokenize(text2.lower()) # Generate n-grams n_grams1 = set([' '.join(gram) for gram in ngrams(tokens1, n)]) n_grams2 = set([' '.join(gram) for gram in ngrams(tokens2, n)]) # Calculate common n-grams common_n_grams = n_grams1.intersection(n_grams2) # Return comparison metrics return { "common_ngram_count": len(common_n_grams) } def perform_topic_modeling(texts, model_names, n_topics=3): """ Perform topic modeling on a list of texts. Args: texts (list): List of text documents model_names (list): Names of the models n_topics (int): Number of topics to extract Returns: dict: Topic modeling results """ from processors.topic_modeling import compare_topics # Use the topic modeling processor result = compare_topics(texts, model_names, n_topics=n_topics) return result # Process analysis request function def process_analysis_request(dataset, selected_analysis, parameters): """ Process the analysis request based on the selected options. Args: dataset (dict): The input dataset selected_analysis (str): The selected analysis type parameters (dict): Additional parameters for the analysis Returns: tuple: A tuple containing (analysis_results, visualization_data) """ logger.info(f"Processing analysis request: {selected_analysis}") if not dataset or "entries" not in dataset or not dataset["entries"]: logger.warning("No valid dataset provided for analysis") return {}, None # Initialize the results structure results = {"analyses": {}} # Get the prompt text from the first entry prompt_text = dataset["entries"][0].get("prompt", "") if not prompt_text: logger.warning("No prompt found in dataset") return {"error": "No prompt found in dataset"}, None # Initialize the analysis container for this prompt results["analyses"][prompt_text] = {} # Get model names and responses model1_name = dataset["entries"][0].get("model", "Model 1") model2_name = dataset["entries"][1].get("model", "Model 2") model1_response = dataset["entries"][0].get("response", "") model2_response = dataset["entries"][1].get("response", "") logger.info(f"Comparing responses from {model1_name} and {model2_name}") try: # Process based on the selected analysis type if selected_analysis == "Bag of Words": # Use fixed default value of 25 for top_n top_n = 25 logger.info(f"Running Bag of Words analysis with top_n={top_n}") # Perform Bag of Words analysis using the processor bow_results = compare_bow( [model1_response, model2_response], [model1_name, model2_name], top_n=top_n ) results["analyses"][prompt_text]["bag_of_words"] = bow_results elif selected_analysis == "N-gram Analysis": # Perform N-gram analysis ngram_size = parameters.get("ngram_n", 2) if isinstance(ngram_size, str): ngram_size = int(ngram_size) top_n = parameters.get("ngram_top", 15) if isinstance(top_n, str): top_n = int(top_n) logger.info(f"Running N-gram analysis with n={ngram_size}, top_n={top_n}") # Use the processor from the dedicated ngram_analysis module from processors.ngram_analysis import compare_ngrams as ngram_processor ngram_results = ngram_processor( [model1_response, model2_response], [model1_name, model2_name], n=ngram_size, top_n=top_n ) results["analyses"][prompt_text]["ngram_analysis"] = ngram_results elif selected_analysis == "Topic Modeling": # Perform topic modeling analysis topic_count = parameters.get("topic_count", 3) if isinstance(topic_count, str): topic_count = int(topic_count) logger.info(f"Running Topic Modeling analysis with n_topics={topic_count}") try: topic_results = compare_topics( texts_set_1=[model1_response], texts_set_2=[model2_response], n_topics=topic_count, model_names=[model1_name, model2_name]) results["analyses"][prompt_text]["topic_modeling"] = topic_results except Exception as e: import traceback error_msg = f"Topic modeling error: {str(e)}\n{traceback.format_exc()}" logger.error(error_msg) results["analyses"][prompt_text]["topic_modeling"] = { "models": [model1_name, model2_name], "error": str(e), "message": "Topic modeling failed. Try with longer text or different parameters." } elif selected_analysis == "Classifier": # Perform classifier analysis logger.info("Running Classifier analysis") results["analyses"][prompt_text]["classifier"] = { "models": [model1_name, model2_name], "classifications": { model1_name: { "formality": classify_formality(model1_response), "sentiment": classify_sentiment(model1_response), "complexity": classify_complexity(model1_response) }, model2_name: { "formality": classify_formality(model2_response), "sentiment": classify_sentiment(model2_response), "complexity": classify_complexity(model2_response) } }, "differences": compare_classifications(model1_response, model2_response) } elif selected_analysis == "Bias Detection": # Use partisan leaning bias detection by default logger.info("Running Bias Detection analysis") try: # Perform bias detection analysis logger.info(f"Calling compare_bias with model names: {model1_name}, {model2_name}") logger.info(f"Text lengths - Text1: {len(model1_response)}, Text2: {len(model2_response)}") bias_results = compare_bias( model1_response, model2_response, model_names=[model1_name, model2_name] ) logger.info(f"Bias detection complete. Result has keys: {bias_results.keys() if bias_results else 'None'}") results["analyses"][prompt_text]["bias_detection"] = bias_results except Exception as e: import traceback error_msg = f"Bias detection error: {str(e)}\n{traceback.format_exc()}" logger.error(error_msg) results["analyses"][prompt_text]["bias_detection"] = { "models": [model1_name, model2_name], "error": str(e), "message": "Bias detection failed. Try with different parameters." } else: # Unknown analysis type logger.warning(f"Unknown analysis type: {selected_analysis}") results["analyses"][prompt_text]["message"] = "Please select a valid analysis type." except Exception as e: import traceback error_msg = f"Error in analysis: {str(e)}\n{traceback.format_exc()}" logger.error(error_msg) results = { "error": error_msg, "analyses": { prompt_text: { "message": f"Analysis failed: {str(e)}" } } } # Return both the analysis results and a placeholder for visualization data return results, None