Spaces:
Sleeping
Sleeping
import gradio as gr | |
import json | |
import logging | |
from visualization.bow_visualizer import process_and_visualize_analysis | |
from processors.topic_modeling import compare_topics | |
from processors.ngram_analysis import compare_ngrams | |
from processors.bow_analysis import compare_bow | |
from processors.text_classifiers import classify_formality, classify_sentiment, classify_complexity, compare_classifications | |
# Add import for bias detection | |
from processors.bias_detection import compare_bias | |
# Set up logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger('analysis_handler') | |
def process_analysis_request(dataset, selected_analysis, parameters): | |
""" | |
Process the analysis request based on the selected options. | |
Args: | |
dataset (dict): The input dataset | |
selected_analysis (str): The selected analysis type | |
parameters (dict): Additional parameters for the analysis | |
Returns: | |
tuple: A tuple containing (analysis_results, visualization_data) | |
""" | |
logger.info(f"Processing analysis request: {selected_analysis}") | |
if not dataset or "entries" not in dataset or not dataset["entries"]: | |
logger.warning("No valid dataset provided for analysis") | |
return {}, None | |
# Initialize the results structure | |
results = {"analyses": {}} | |
# Get the prompt text from the first entry | |
prompt_text = dataset["entries"][0].get("prompt", "") | |
if not prompt_text: | |
logger.warning("No prompt found in dataset") | |
return {"error": "No prompt found in dataset"}, None | |
# Initialize the analysis container for this prompt | |
results["analyses"][prompt_text] = {} | |
# Get model names and responses | |
model1_name = dataset["entries"][0].get("model", "Model 1") | |
model2_name = dataset["entries"][1].get("model", "Model 2") | |
model1_response = dataset["entries"][0].get("response", "") | |
model2_response = dataset["entries"][1].get("response", "") | |
logger.info(f"Comparing responses from {model1_name} and {model2_name}") | |
try: | |
# Process based on the selected analysis type | |
if selected_analysis == "Bag of Words": | |
# Get the top_n parameter and ensure it's an integer | |
top_n = parameters.get("bow_top", 25) | |
if isinstance(top_n, str): | |
top_n = int(top_n) | |
logger.info(f"Running Bag of Words analysis with top_n={top_n}") | |
# Perform Bag of Words analysis using the processor | |
bow_results = compare_bow( | |
[model1_response, model2_response], | |
[model1_name, model2_name], | |
top_n=top_n | |
) | |
results["analyses"][prompt_text]["bag_of_words"] = bow_results | |
elif selected_analysis == "N-gram Analysis": | |
# Perform N-gram analysis | |
ngram_size = parameters.get("ngram_n", 2) | |
if isinstance(ngram_size, str): | |
ngram_size = int(ngram_size) | |
top_n = parameters.get("ngram_top", 15) | |
if isinstance(top_n, str): | |
top_n = int(top_n) | |
logger.info(f"Running N-gram analysis with n={ngram_size}, top_n={top_n}") | |
# Use the processor from the dedicated ngram_analysis module | |
from processors.ngram_analysis import compare_ngrams as ngram_processor | |
ngram_results = ngram_processor( | |
[model1_response, model2_response], | |
[model1_name, model2_name], | |
n=ngram_size, | |
top_n=top_n | |
) | |
results["analyses"][prompt_text]["ngram_analysis"] = ngram_results | |
elif selected_analysis == "Topic Modeling": | |
# Perform topic modeling analysis | |
topic_count = parameters.get("topic_count", 3) | |
if isinstance(topic_count, str): | |
topic_count = int(topic_count) | |
logger.info(f"Running Topic Modeling analysis with n_topics={topic_count}") | |
try: | |
# Import the improved topic modeling module | |
try: | |
# First try to import from improved module if available | |
from improved_topic_modeling import compare_topics as improved_compare_topics | |
logger.info("Using improved topic modeling implementation") | |
topic_results = improved_compare_topics( | |
texts_set_1=[model1_response], | |
texts_set_2=[model2_response], | |
n_topics=topic_count, | |
model_names=[model1_name, model2_name]) | |
except ImportError: | |
# Fall back to original implementation | |
logger.info("Using original topic modeling implementation") | |
from processors.topic_modeling import compare_topics | |
topic_results = compare_topics( | |
texts_set_1=[model1_response], | |
texts_set_2=[model2_response], | |
n_topics=topic_count, | |
model_names=[model1_name, model2_name]) | |
results["analyses"][prompt_text]["topic_modeling"] = topic_results | |
# Ensure the topic modeling results contain the necessary fields | |
if "topics" not in topic_results or not topic_results["topics"]: | |
logger.warning("No topics found in topic modeling results") | |
topic_results["message"] = "No significant topics were discovered in the text. Try a different analysis method or adjust parameters." | |
if "model_topics" not in topic_results or not topic_results["model_topics"]: | |
logger.warning("No model topics found in topic modeling results") | |
if "message" not in topic_results: | |
topic_results["message"] = "Could not calculate topic distributions for the models." | |
except Exception as e: | |
import traceback | |
error_msg = f"Topic modeling error: {str(e)}\n{traceback.format_exc()}" | |
logger.error(error_msg) | |
results["analyses"][prompt_text]["topic_modeling"] = { | |
"models": [model1_name, model2_name], | |
"error": str(e), | |
"message": "Topic modeling failed. Please try with longer text or different parameters.", | |
"stack_trace": traceback.format_exc() | |
} | |
elif selected_analysis == "Classifier": | |
# Perform classifier analysis | |
logger.info("Running Classifier analysis") | |
results["analyses"][prompt_text]["classifier"] = { | |
"models": [model1_name, model2_name], | |
"classifications": { | |
model1_name: { | |
"formality": classify_formality(model1_response), | |
"sentiment": classify_sentiment(model1_response), | |
"complexity": classify_complexity(model1_response) | |
}, | |
model2_name: { | |
"formality": classify_formality(model2_response), | |
"sentiment": classify_sentiment(model2_response), | |
"complexity": classify_complexity(model2_response) | |
} | |
}, | |
"differences": compare_classifications(model1_response, model2_response) | |
} | |
elif selected_analysis == "Bias Detection": | |
# Perform bias detection analysis | |
logger.info("Running Bias Detection analysis") | |
try: | |
# Perform bias detection analysis | |
logger.info(f"Starting bias detection for {model1_name} and {model2_name}") | |
logger.info(f"Text lengths - Text1: {len(model1_response)}, Text2: {len(model2_response)}") | |
bias_results = compare_bias( | |
model1_response, | |
model2_response, | |
model_names=[model1_name, model2_name] | |
) | |
logger.info(f"Bias detection complete. Result has keys: {bias_results.keys() if bias_results else 'None'}") | |
results["analyses"][prompt_text]["bias_detection"] = bias_results | |
except Exception as e: | |
import traceback | |
error_msg = f"Bias detection error: {str(e)}\n{traceback.format_exc()}" | |
logger.error(error_msg) | |
results["analyses"][prompt_text]["bias_detection"] = { | |
"models": [model1_name, model2_name], | |
"error": str(e), | |
"message": "Bias detection failed. Try with different parameters.", | |
"stack_trace": traceback.format_exc() | |
} | |
else: | |
# Unknown analysis type | |
logger.warning(f"Unknown analysis type: {selected_analysis}") | |
results["analyses"][prompt_text]["message"] = "Please select a valid analysis type." | |
except Exception as e: | |
import traceback | |
error_msg = f"Error processing analysis request: {str(e)}\n{traceback.format_exc()}" | |
logger.error(error_msg) | |
results = { | |
"error": str(e), | |
"stack_trace": traceback.format_exc(), | |
"analyses": { | |
prompt_text: { | |
"message": f"Analysis failed: {str(e)}" | |
} | |
} | |
} | |
# Return both the analysis results and a placeholder for visualization data | |
return results, None |