Spaces:
Sleeping
Sleeping
import gradio as gr | |
import json | |
from visualization.bow_visualizer import process_and_visualize_analysis | |
# Import analysis modules | |
from processors.topic_modeling import compare_topics | |
from processors.ngram_analysis import compare_ngrams | |
from processors.bow_analysis import compare_bow | |
from processors.text_classifiers import classify_formality, classify_sentiment, classify_complexity, compare_classifications | |
from processors.bias_detection import compare_bias | |
# Add the implementation of these helper functions | |
def extract_important_words(text, top_n=20): | |
""" | |
Extract the most important words from a text. | |
Args: | |
text (str): Input text | |
top_n (int): Number of top words to return | |
Returns: | |
list: List of important words with their counts | |
""" | |
# Import necessary modules | |
from collections import Counter | |
import re | |
import nltk | |
from nltk.corpus import stopwords | |
from nltk.tokenize import word_tokenize | |
# Make sure nltk resources are available | |
try: | |
stop_words = set(stopwords.words('english')) | |
except: | |
nltk.download('stopwords') | |
stop_words = set(stopwords.words('english')) | |
try: | |
tokens = word_tokenize(text.lower()) | |
except: | |
nltk.download('punkt') | |
tokens = word_tokenize(text.lower()) | |
# Remove stopwords and non-alphabetic tokens | |
filtered_tokens = [word for word in tokens if word.isalpha() and word not in stop_words and len(word) > 2] | |
# Count word frequencies | |
word_counts = Counter(filtered_tokens) | |
# Get the top N words | |
top_words = word_counts.most_common(top_n) | |
# Format the result | |
result = [{"word": word, "count": count} for word, count in top_words] | |
return result | |
def calculate_text_similarity(text1, text2): | |
""" | |
Calculate similarity metrics between two texts. | |
Args: | |
text1 (str): First text | |
text2 (str): Second text | |
Returns: | |
dict: Similarity metrics | |
""" | |
from processors.metrics import calculate_similarity | |
# Calculate similarity using the metrics module | |
metrics = calculate_similarity(text1, text2) | |
# Add common word count | |
from collections import Counter | |
import nltk | |
from nltk.corpus import stopwords | |
# Make sure nltk resources are available | |
try: | |
stop_words = set(stopwords.words('english')) | |
except: | |
nltk.download('stopwords') | |
stop_words = set(stopwords.words('english')) | |
# Simple tokenization and filtering | |
words1 = set([w.lower() for w in nltk.word_tokenize(text1) | |
if w.isalpha() and w.lower() not in stop_words]) | |
words2 = set([w.lower() for w in nltk.word_tokenize(text2) | |
if w.isalpha() and w.lower() not in stop_words]) | |
# Calculate common words | |
common_words = words1.intersection(words2) | |
# Add to metrics | |
metrics["common_word_count"] = len(common_words) | |
return metrics | |
def extract_ngrams(text, n=2, top_n=10): | |
""" | |
Extract the most common n-grams from text. | |
Args: | |
text (str): Input text | |
n (int or str): Size of n-grams | |
top_n (int): Number of top n-grams to return | |
Returns: | |
list: List of important n-grams with their counts | |
""" | |
import nltk | |
from nltk.util import ngrams | |
from collections import Counter | |
# Convert n to int if it's a string | |
if isinstance(n, str): | |
n = int(n) | |
# Make sure nltk resources are available | |
try: | |
tokens = nltk.word_tokenize(text.lower()) | |
except: | |
nltk.download('punkt') | |
tokens = nltk.word_tokenize(text.lower()) | |
# Generate n-grams | |
n_grams = list(ngrams(tokens, n)) | |
# Convert n-grams to strings for easier handling | |
n_gram_strings = [' '.join(gram) for gram in n_grams] | |
# Count n-gram frequencies | |
n_gram_counts = Counter(n_gram_strings) | |
# Get the top N n-grams | |
top_n_grams = n_gram_counts.most_common(top_n) | |
# Format the result | |
result = [{"ngram": ngram, "count": count} for ngram, count in top_n_grams] | |
return result | |
def compare_ngrams(text1, text2, n=2): | |
""" | |
Compare n-grams between two texts. | |
Args: | |
text1 (str or list): First text | |
text2 (str or list): Second text | |
n (int or str): Size of n-grams | |
Returns: | |
dict: Comparison metrics | |
""" | |
import nltk | |
from nltk.util import ngrams | |
from collections import Counter | |
# Convert n to int if it's a string | |
if isinstance(n, str): | |
n = int(n) | |
# Handle list inputs by converting to strings | |
if isinstance(text1, list): | |
text1 = ' '.join(str(item) for item in text1) | |
if isinstance(text2, list): | |
text2 = ' '.join(str(item) for item in text2) | |
# Make sure nltk resources are available | |
try: | |
tokens1 = nltk.word_tokenize(text1.lower()) | |
tokens2 = nltk.word_tokenize(text2.lower()) | |
except: | |
nltk.download('punkt') | |
tokens1 = nltk.word_tokenize(text1.lower()) | |
tokens2 = nltk.word_tokenize(text2.lower()) | |
# Generate n-grams | |
n_grams1 = set([' '.join(gram) for gram in ngrams(tokens1, n)]) | |
n_grams2 = set([' '.join(gram) for gram in ngrams(tokens2, n)]) | |
# Calculate common n-grams | |
common_n_grams = n_grams1.intersection(n_grams2) | |
# Return comparison metrics | |
return { | |
"common_ngram_count": len(common_n_grams) | |
} | |
def perform_topic_modeling(texts, model_names, n_topics=3): | |
""" | |
Perform topic modeling on a list of texts. | |
Args: | |
texts (list): List of text documents | |
model_names (list): Names of the models | |
n_topics (int): Number of topics to extract | |
Returns: | |
dict: Topic modeling results | |
""" | |
from processors.topic_modeling import compare_topics | |
# Use the topic modeling processor | |
result = compare_topics(texts, model_names, n_topics=n_topics) | |
return result | |
def process_analysis_request(dataset, selected_analysis, parameters): | |
""" | |
Process the analysis request based on the selected options. | |
Args: | |
dataset (dict): The input dataset | |
selected_analysis (str): The selected analysis type | |
parameters (dict): Additional parameters for the analysis | |
Returns: | |
tuple: A tuple containing (analysis_results, visualization_data) | |
""" | |
if not dataset or "entries" not in dataset or not dataset["entries"]: | |
return {}, None | |
# Initialize the results structure | |
results = {"analyses": {}} | |
# Get the prompt text from the first entry | |
prompt_text = dataset["entries"][0].get("prompt", "") | |
if not prompt_text: | |
return {"error": "No prompt found in dataset"}, None | |
# Initialize the analysis container for this prompt | |
results["analyses"][prompt_text] = {} | |
# Get model names and responses | |
model1_name = dataset["entries"][0].get("model", "Model 1") | |
model2_name = dataset["entries"][1].get("model", "Model 2") | |
model1_response = dataset["entries"][0].get("response", "") | |
model2_response = dataset["entries"][1].get("response", "") | |
# Process based on the selected analysis type | |
if selected_analysis == "Bag of Words": | |
# Get the top_n parameter and ensure it's an integer | |
top_n = parameters.get("bow_top", 25) | |
if isinstance(top_n, str): | |
top_n = int(top_n) | |
print(f"Using top_n value: {top_n}") # Debug print | |
# Perform Bag of Words analysis using the processor | |
from processors.bow_analysis import compare_bow | |
bow_results = compare_bow( | |
[model1_response, model2_response], | |
[model1_name, model2_name], | |
top_n=top_n | |
) | |
results["analyses"][prompt_text]["bag_of_words"] = bow_results | |
elif selected_analysis == "N-gram Analysis": | |
# Perform N-gram analysis | |
ngram_size = parameters.get("ngram_n", 2) | |
if isinstance(ngram_size, str): | |
ngram_size = int(ngram_size) | |
top_n = parameters.get("ngram_top", 10) # Using default 10 | |
if isinstance(top_n, str): | |
top_n = int(top_n) | |
# Use the processor from the dedicated ngram_analysis module | |
from processors.ngram_analysis import compare_ngrams as ngram_processor | |
ngram_results = ngram_processor( | |
[model1_response, model2_response], | |
[model1_name, model2_name], | |
n=ngram_size, | |
top_n=top_n | |
) | |
results["analyses"][prompt_text]["ngram_analysis"] = ngram_results | |
elif selected_analysis == "Topic Modeling": | |
# Perform topic modeling analysis | |
topic_count = parameters.get("topic_count", 3) | |
if isinstance(topic_count, str): | |
topic_count = int(topic_count) | |
try: | |
# Import the enhanced topic modeling function | |
from processors.topic_modeling import compare_topics, load_all_datasets_for_topic_modeling | |
print("Starting topic modeling analysis...") | |
# Get all responses from dataset directory | |
all_model1_responses, all_model2_responses, dataset_model_names = load_all_datasets_for_topic_modeling() | |
# Add current responses to the collection if they're not empty | |
if model1_response.strip(): | |
all_model1_responses.append(model1_response) | |
print(f"Added current model1 response ({len(model1_response.split())} words)") | |
if model2_response.strip(): | |
all_model2_responses.append(model2_response) | |
print(f"Added current model2 response ({len(model2_response.split())} words)") | |
# Ensure we're using all loaded responses | |
print(f"Using {len(all_model1_responses)} model1 responses and {len(all_model2_responses)} model2 responses") | |
# If we have data, perform topic modeling with all available responses | |
if all_model1_responses and all_model2_responses: | |
# Calculate total word count for diagnostics | |
total_words_model1 = sum(len(text.split()) for text in all_model1_responses) | |
total_words_model2 = sum(len(text.split()) for text in all_model2_responses) | |
print(f"Total words: Model1={total_words_model1}, Model2={total_words_model2}") | |
topic_results = compare_topics( | |
texts_set_1=all_model1_responses, | |
texts_set_2=all_model2_responses, | |
n_topics=topic_count, | |
model_names=[model1_name, model2_name]) # Keep original model names for output | |
results["analyses"][prompt_text]["topic_modeling"] = topic_results | |
# Add helpful message about using all datasets | |
results["analyses"][prompt_text]["topic_modeling"]["info"] = f"Topic modeling performed using {len(all_model1_responses)} responses from model 1 and {len(all_model2_responses)} responses from model 2 for better results." | |
# Add corpus details to help users understand the analysis | |
results["analyses"][prompt_text]["topic_modeling"]["corpus_stats"] = { | |
"model1_documents": len(all_model1_responses), | |
"model2_documents": len(all_model2_responses), | |
"model1_total_words": total_words_model1, | |
"model2_total_words": total_words_model2 | |
} | |
else: | |
# Fallback to original implementation if no data found | |
print("No dataset responses loaded, falling back to current responses only") | |
topic_results = compare_topics( | |
texts_set_1=[model1_response], | |
texts_set_2=[model2_response], | |
n_topics=topic_count, | |
model_names=[model1_name, model2_name]) | |
results["analyses"][prompt_text]["topic_modeling"] = topic_results | |
# Add helpful message if text is very short | |
if (len(model1_response.split()) < 50 or len(model2_response.split()) < 50): | |
if "error" not in topic_results: | |
# Add a warning message about short text | |
results["analyses"][prompt_text]["topic_modeling"]["warning"] = "One or both texts are relatively short. Topic modeling works best with longer texts." | |
except Exception as e: | |
import traceback | |
error_trace = traceback.format_exc() | |
print(f"Topic modeling error: {str(e)}\n{error_trace}") | |
results["analyses"][prompt_text]["topic_modeling"] = { | |
"models": [model1_name, model2_name], | |
"error": str(e), | |
"message": "Topic modeling failed. Try with longer text or different parameters." | |
} | |
elif selected_analysis == "Classifier": | |
# Perform classifier analysis | |
from processors.text_classifiers import classify_formality, classify_sentiment, classify_complexity, compare_classifications | |
results["analyses"][prompt_text]["classifier"] = { | |
"models": [model1_name, model2_name], | |
"classifications": { | |
model1_name: { | |
"formality": classify_formality(model1_response), | |
"sentiment": classify_sentiment(model1_response), | |
"complexity": classify_complexity(model1_response) | |
}, | |
model2_name: { | |
"formality": classify_formality(model2_response), | |
"sentiment": classify_sentiment(model2_response), | |
"complexity": classify_complexity(model2_response) | |
} | |
}, | |
"differences": compare_classifications(model1_response, model2_response) | |
} | |
elif selected_analysis == "Bias Detection": | |
try: | |
# Perform bias detection analysis, always focusing on partisan leaning | |
from processors.bias_detection import compare_bias | |
bias_results = compare_bias( | |
model1_response, | |
model2_response, | |
model_names=[model1_name, model2_name] | |
) | |
results["analyses"][prompt_text]["bias_detection"] = bias_results | |
except Exception as e: | |
import traceback | |
print(f"Bias detection error: {str(e)}\n{traceback.format_exc()}") | |
results["analyses"][prompt_text]["bias_detection"] = { | |
"models": [model1_name, model2_name], | |
"error": str(e), | |
"message": "Bias detection failed. Try with different parameters." | |
} | |
else: | |
# Unknown analysis type | |
results["analyses"][prompt_text]["message"] = "Please select a valid analysis type." | |
# Return both the analysis results and a placeholder for visualization data | |
return results, None | |
def create_analysis_screen(): | |
""" | |
Create the analysis options screen with enhanced topic modeling options | |
Returns: | |
tuple: (analysis_options, analysis_params, run_analysis_btn, analysis_output, ngram_n, topic_count) | |
""" | |
import gradio as gr | |
with gr.Column() as analysis_screen: | |
gr.Markdown("## Analysis Options") | |
gr.Markdown("Select which analysis you want to run on the LLM responses.") | |
# Change from CheckboxGroup to Radio for analysis selection | |
with gr.Group(): | |
analysis_options = gr.Radio( | |
choices=[ | |
"Bag of Words", | |
"N-gram Analysis", | |
"Bias Detection", | |
"Classifier" | |
], | |
value="Bag of Words", # Default selection | |
label="Select Analysis Type" | |
) | |
# Create N-gram parameters accessible at top level | |
ngram_n = gr.Radio( | |
choices=["1", "2", "3"], value="2", | |
label="N-gram Size", | |
visible=False | |
) | |
# Create enhanced topic modeling parameter accessible at top level | |
topic_count = gr.Slider( | |
minimum=2, maximum=10, value=3, step=1, | |
label="Number of Topics", | |
info="Choose fewer topics for shorter texts, more topics for longer texts", | |
visible=False | |
) | |
# Parameters for each analysis type | |
with gr.Group() as analysis_params: | |
# Topic modeling parameters with enhanced options | |
with gr.Group(visible=False) as topic_params: | |
gr.Markdown("### Topic Modeling Parameters") | |
gr.Markdown(""" | |
Topic modeling extracts thematic patterns from text. | |
For best results: | |
- Use longer text samples (100+ words) | |
- Adjust topic count based on text length | |
- For political content, 3-5 topics usually works well | |
""") | |
# We're already using topic_count defined above | |
# N-gram parameters group (using external ngram_n) | |
with gr.Group(visible=False) as ngram_params: | |
gr.Markdown("### N-gram Parameters") | |
# We're already using ngram_n defined above | |
# Bias detection parameters | |
with gr.Group(visible=False) as bias_params: | |
gr.Markdown("### Bias Detection Parameters") | |
gr.Markdown("Analysis will focus on detecting partisan leaning.") | |
# Classifier parameters | |
with gr.Group(visible=False) as classifier_params: | |
gr.Markdown("### Classifier Parameters") | |
gr.Markdown("Classifies responses based on formality, sentiment, and complexity") | |
# Function to update parameter visibility based on selected analysis | |
def update_params_visibility(selected): | |
return { | |
topic_params: gr.update(visible=selected == "Topic Modeling"), | |
ngram_params: gr.update(visible=selected == "N-gram Analysis"), | |
bias_params: gr.update(visible=selected == "Bias Detection"), | |
classifier_params: gr.update(visible=selected == "Classifier"), | |
ngram_n: gr.update(visible=selected == "N-gram Analysis"), | |
topic_count: gr.update(visible=selected == "Topic Modeling") | |
} | |
# Set up event handler for analysis selection | |
analysis_options.change( | |
fn=update_params_visibility, | |
inputs=[analysis_options], | |
outputs=[ | |
topic_params, | |
ngram_params, | |
bias_params, | |
classifier_params, | |
ngram_n, | |
topic_count | |
] | |
) | |
# Run analysis button | |
run_analysis_btn = gr.Button("Run Analysis", variant="primary", size="large") | |
# Analysis output area - hidden JSON component to store raw results | |
analysis_output = gr.JSON(label="Analysis Results", visible=False) | |
# Return the components needed by app.py | |
return analysis_options, analysis_params, run_analysis_btn, analysis_output, ngram_n, topic_count |