Spaces:
Sleeping
Sleeping
""" | |
Enhanced topic modeling processor for comparing text responses with better error handling | |
and more robust algorithm configuration | |
""" | |
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer | |
from sklearn.decomposition import LatentDirichletAllocation, NMF | |
import numpy as np | |
import nltk | |
from nltk.corpus import stopwords | |
import re | |
from scipy.spatial import distance | |
import logging | |
# Set up logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger('topic_modeling') | |
def preprocess_text(text): | |
""" | |
Preprocess text for topic modeling | |
Args: | |
text (str): Text to preprocess | |
Returns: | |
str: Preprocessed text | |
""" | |
try: | |
# Convert to lowercase | |
text = text.lower() | |
# Remove special characters and digits | |
text = re.sub(r'[^a-zA-Z\s]', '', text) | |
# Tokenize | |
tokens = nltk.word_tokenize(text) | |
# Remove stopwords | |
stop_words = set(stopwords.words('english')) | |
tokens = [token for token in tokens if token not in stop_words and len(token) > 3] | |
return ' '.join(tokens) | |
except Exception as e: | |
logger.error(f"Error in preprocess_text: {str(e)}") | |
# Return original text if preprocessing fails | |
return text | |
def get_top_words_per_topic(model, feature_names, n_top_words=10): | |
""" | |
Get the top words for each topic in the model | |
Args: | |
model: Topic model (LDA or NMF) | |
feature_names (list): Feature names (words) | |
n_top_words (int): Number of top words to include per topic | |
Returns: | |
list: List of topics with their top words | |
""" | |
topics = [] | |
for topic_idx, topic in enumerate(model.components_): | |
top_words_idx = topic.argsort()[:-n_top_words - 1:-1] | |
top_words = [feature_names[i] for i in top_words_idx] | |
topic_dict = { | |
"id": topic_idx, | |
"words": top_words, | |
"weights": topic[top_words_idx].tolist() | |
} | |
topics.append(topic_dict) | |
return topics | |
def extract_topics(texts, n_topics=3, n_top_words=10, method="lda"): | |
""" | |
Extract topics from a list of texts | |
Args: | |
texts (list): List of text documents | |
n_topics (int): Number of topics to extract | |
n_top_words (int): Number of top words per topic | |
method (str): Topic modeling method ('lda' or 'nmf') | |
Returns: | |
dict: Topic modeling results with topics and document-topic distributions | |
""" | |
if isinstance(n_topics, str): | |
n_topics = int(n_topics) | |
# Ensure n_topics is at least 2 | |
n_topics = max(2, n_topics) | |
logger.info(f"Starting topic modeling with method={method}, n_topics={n_topics}") | |
result = { | |
"method": method, | |
"n_topics": n_topics, | |
"topics": [], | |
"document_topics": [] | |
} | |
try: | |
# Preprocess texts | |
logger.info("Preprocessing texts") | |
preprocessed_texts = [preprocess_text(text) for text in texts] | |
# Check if texts are not empty after preprocessing | |
preprocessed_texts = [text for text in preprocessed_texts if len(text.strip()) > 0] | |
if not preprocessed_texts: | |
logger.warning("All texts are empty after preprocessing") | |
return result | |
# Create document-term matrix | |
logger.info(f"Creating document-term matrix using {method}") | |
if method == "nmf": | |
# For NMF, use TF-IDF vectorization | |
vectorizer = TfidfVectorizer(max_features=1000, min_df=1, max_df=0.95, stop_words='english') | |
else: | |
# For LDA, use CountVectorizer | |
vectorizer = CountVectorizer(max_features=1000, min_df=1, max_df=0.95, stop_words='english') | |
try: | |
X = vectorizer.fit_transform(preprocessed_texts) | |
feature_names = vectorizer.get_feature_names_out() | |
# Check if we have enough features | |
if X.shape[1] < n_topics: | |
logger.warning(f"Only {X.shape[1]} features found, reducing n_topics from {n_topics}") | |
n_topics = max(2, X.shape[1] - 1) | |
result["n_topics"] = n_topics | |
# Apply topic modeling | |
logger.info(f"Applying {method.upper()} with {n_topics} topics") | |
if method == "nmf": | |
# Non-negative Matrix Factorization | |
model = NMF(n_components=n_topics, random_state=42, max_iter=1000) | |
else: | |
# Latent Dirichlet Allocation | |
model = LatentDirichletAllocation( | |
n_components=n_topics, | |
random_state=42, | |
max_iter=20, | |
learning_method='online' | |
) | |
topic_distribution = model.fit_transform(X) | |
# Get top words for each topic | |
logger.info("Extracting top words for each topic") | |
result["topics"] = get_top_words_per_topic(model, feature_names, n_top_words) | |
# Get topic distribution for each document | |
logger.info("Calculating topic distributions for documents") | |
for i, dist in enumerate(topic_distribution): | |
# Normalize for easier comparison | |
normalized_dist = dist / np.sum(dist) if np.sum(dist) > 0 else dist | |
result["document_topics"].append({ | |
"document_id": i, | |
"distribution": normalized_dist.tolist() | |
}) | |
logger.info("Topic modeling completed successfully") | |
except Exception as e: | |
logger.error(f"Error in vectorization or modeling: {str(e)}") | |
result["error"] = f"Topic modeling failed: {str(e)}" | |
except Exception as e: | |
logger.error(f"General error in extract_topics: {str(e)}") | |
result["error"] = f"Topic modeling failed: {str(e)}" | |
return result | |
def calculate_jensen_shannon_divergence(p, q): | |
""" | |
Calculate Jensen-Shannon divergence between two probability distributions | |
Args: | |
p (array): First probability distribution | |
q (array): Second probability distribution | |
Returns: | |
float: Jensen-Shannon divergence | |
""" | |
# Ensure inputs are numpy arrays | |
p = np.array(p) | |
q = np.array(q) | |
# Normalize if not already normalized | |
if np.sum(p) != 1.0: | |
p = p / np.sum(p) if np.sum(p) > 0 else p | |
if np.sum(q) != 1.0: | |
q = q / np.sum(q) if np.sum(q) > 0 else q | |
# Calculate Jensen-Shannon divergence | |
m = 0.5 * (p + q) | |
return 0.5 * (distance.jensenshannon(p, m) + distance.jensenshannon(q, m)) | |
def compare_topics(texts_set_1, texts_set_2, n_topics=3, n_top_words=10, method="lda", model_names=None): | |
""" | |
Compare topics between two sets of texts | |
Args: | |
texts_set_1 (list): First list of text documents | |
texts_set_2 (list): Second list of text documents | |
n_topics (int): Number of topics to extract | |
n_top_words (int): Number of top words per topic | |
method (str): Topic modeling method ('lda' or 'nmf') | |
model_names (list, optional): Names of the models being compared | |
Returns: | |
dict: Comparison results with topics from both sets and similarity metrics | |
""" | |
logger.info(f"Starting topic comparison with n_topics={n_topics}, method={method}") | |
# Set default model names if not provided | |
if model_names is None: | |
model_names = ["Model 1", "Model 2"] | |
# Initialize the result structure | |
result = { | |
"method": method, | |
"n_topics": n_topics, | |
"models": model_names, | |
"model_topics": {}, | |
"topics": [], | |
"comparisons": {} | |
} | |
try: | |
# Extract topics for each set separately | |
# For very short texts, try combining all texts from each model | |
combined_text_1 = " ".join(texts_set_1) | |
combined_text_2 = " ".join(texts_set_2) | |
# Process all texts together to find common topics | |
all_texts = texts_set_1 + texts_set_2 | |
logger.info(f"Processing {len(all_texts)} total texts") | |
# Extract topics from combined corpus | |
combined_result = extract_topics(all_texts, n_topics, n_top_words, method) | |
# Check for errors | |
if "error" in combined_result: | |
logger.warning(f"Error in combined topic extraction: {combined_result['error']}") | |
result["error"] = combined_result["error"] | |
return result | |
# Store topics from combined analysis | |
result["topics"] = combined_result["topics"] | |
# Now process each text set to get their topic distributions | |
model1_doc_topics = [] | |
model2_doc_topics = [] | |
# Try to use the same model from combined analysis for consistency | |
if "document_topics" in combined_result and len(combined_result["document_topics"]) == len(all_texts): | |
# Get document topics for each model | |
n_docs_model1 = len(texts_set_1) | |
for i, doc_topic in enumerate(combined_result["document_topics"]): | |
if i < n_docs_model1: | |
model1_doc_topics.append(doc_topic["distribution"]) | |
else: | |
model2_doc_topics.append(doc_topic["distribution"]) | |
else: | |
# Fallback: run separate topic modeling for each model | |
logger.info("Using separate topic modeling for each model") | |
model1_result = extract_topics([combined_text_1], n_topics, n_top_words, method) | |
model2_result = extract_topics([combined_text_2], n_topics, n_top_words, method) | |
if "document_topics" in model1_result and model1_result["document_topics"]: | |
model1_doc_topics = [doc["distribution"] for doc in model1_result["document_topics"]] | |
if "document_topics" in model2_result and model2_result["document_topics"]: | |
model2_doc_topics = [doc["distribution"] for doc in model2_result["document_topics"]] | |
# Calculate average topic distribution for each model | |
if model1_doc_topics: | |
model1_avg_distribution = np.mean(model1_doc_topics, axis=0).tolist() | |
result["model_topics"][model_names[0]] = model1_avg_distribution | |
if model2_doc_topics: | |
model2_avg_distribution = np.mean(model2_doc_topics, axis=0).tolist() | |
result["model_topics"][model_names[1]] = model2_avg_distribution | |
# Calculate similarity between models' topic distributions | |
if model_names[0] in result["model_topics"] and model_names[1] in result["model_topics"]: | |
comparison_key = f"{model_names[0]} vs {model_names[1]}" | |
dist1 = result["model_topics"][model_names[0]] | |
dist2 = result["model_topics"][model_names[1]] | |
# Calculate Jensen-Shannon divergence (smaller means more similar) | |
js_div = calculate_jensen_shannon_divergence(dist1, dist2) | |
# Create comparison result | |
result["comparisons"][comparison_key] = { | |
"js_divergence": js_div | |
} | |
logger.info(f"Topic comparison completed successfully. JS divergence: {js_div:.4f}") | |
else: | |
logger.warning("Could not calculate model comparisons due to missing topic distributions") | |
except Exception as e: | |
logger.error(f"Error in compare_topics: {str(e)}") | |
result["error"] = f"Topic comparison failed: {str(e)}" | |
return result | |