525GradioApp / processors /topic_modeling.py
Ryan
update
f533950
raw
history blame
11.9 kB
"""
Enhanced topic modeling processor for comparing text responses with better error handling
and more robust algorithm configuration
"""
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from sklearn.decomposition import LatentDirichletAllocation, NMF
import numpy as np
import nltk
from nltk.corpus import stopwords
import re
from scipy.spatial import distance
import logging
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('topic_modeling')
def preprocess_text(text):
"""
Preprocess text for topic modeling
Args:
text (str): Text to preprocess
Returns:
str: Preprocessed text
"""
try:
# Convert to lowercase
text = text.lower()
# Remove special characters and digits
text = re.sub(r'[^a-zA-Z\s]', '', text)
# Tokenize
tokens = nltk.word_tokenize(text)
# Remove stopwords
stop_words = set(stopwords.words('english'))
tokens = [token for token in tokens if token not in stop_words and len(token) > 3]
return ' '.join(tokens)
except Exception as e:
logger.error(f"Error in preprocess_text: {str(e)}")
# Return original text if preprocessing fails
return text
def get_top_words_per_topic(model, feature_names, n_top_words=10):
"""
Get the top words for each topic in the model
Args:
model: Topic model (LDA or NMF)
feature_names (list): Feature names (words)
n_top_words (int): Number of top words to include per topic
Returns:
list: List of topics with their top words
"""
topics = []
for topic_idx, topic in enumerate(model.components_):
top_words_idx = topic.argsort()[:-n_top_words - 1:-1]
top_words = [feature_names[i] for i in top_words_idx]
topic_dict = {
"id": topic_idx,
"words": top_words,
"weights": topic[top_words_idx].tolist()
}
topics.append(topic_dict)
return topics
def extract_topics(texts, n_topics=3, n_top_words=10, method="lda"):
"""
Extract topics from a list of texts
Args:
texts (list): List of text documents
n_topics (int): Number of topics to extract
n_top_words (int): Number of top words per topic
method (str): Topic modeling method ('lda' or 'nmf')
Returns:
dict: Topic modeling results with topics and document-topic distributions
"""
if isinstance(n_topics, str):
n_topics = int(n_topics)
# Ensure n_topics is at least 2
n_topics = max(2, n_topics)
logger.info(f"Starting topic modeling with method={method}, n_topics={n_topics}")
result = {
"method": method,
"n_topics": n_topics,
"topics": [],
"document_topics": []
}
try:
# Preprocess texts
logger.info("Preprocessing texts")
preprocessed_texts = [preprocess_text(text) for text in texts]
# Check if texts are not empty after preprocessing
preprocessed_texts = [text for text in preprocessed_texts if len(text.strip()) > 0]
if not preprocessed_texts:
logger.warning("All texts are empty after preprocessing")
return result
# Create document-term matrix
logger.info(f"Creating document-term matrix using {method}")
if method == "nmf":
# For NMF, use TF-IDF vectorization
vectorizer = TfidfVectorizer(max_features=1000, min_df=1, max_df=0.95, stop_words='english')
else:
# For LDA, use CountVectorizer
vectorizer = CountVectorizer(max_features=1000, min_df=1, max_df=0.95, stop_words='english')
try:
X = vectorizer.fit_transform(preprocessed_texts)
feature_names = vectorizer.get_feature_names_out()
# Check if we have enough features
if X.shape[1] < n_topics:
logger.warning(f"Only {X.shape[1]} features found, reducing n_topics from {n_topics}")
n_topics = max(2, X.shape[1] - 1)
result["n_topics"] = n_topics
# Apply topic modeling
logger.info(f"Applying {method.upper()} with {n_topics} topics")
if method == "nmf":
# Non-negative Matrix Factorization
model = NMF(n_components=n_topics, random_state=42, max_iter=1000)
else:
# Latent Dirichlet Allocation
model = LatentDirichletAllocation(
n_components=n_topics,
random_state=42,
max_iter=20,
learning_method='online'
)
topic_distribution = model.fit_transform(X)
# Get top words for each topic
logger.info("Extracting top words for each topic")
result["topics"] = get_top_words_per_topic(model, feature_names, n_top_words)
# Get topic distribution for each document
logger.info("Calculating topic distributions for documents")
for i, dist in enumerate(topic_distribution):
# Normalize for easier comparison
normalized_dist = dist / np.sum(dist) if np.sum(dist) > 0 else dist
result["document_topics"].append({
"document_id": i,
"distribution": normalized_dist.tolist()
})
logger.info("Topic modeling completed successfully")
except Exception as e:
logger.error(f"Error in vectorization or modeling: {str(e)}")
result["error"] = f"Topic modeling failed: {str(e)}"
except Exception as e:
logger.error(f"General error in extract_topics: {str(e)}")
result["error"] = f"Topic modeling failed: {str(e)}"
return result
def calculate_jensen_shannon_divergence(p, q):
"""
Calculate Jensen-Shannon divergence between two probability distributions
Args:
p (array): First probability distribution
q (array): Second probability distribution
Returns:
float: Jensen-Shannon divergence
"""
# Ensure inputs are numpy arrays
p = np.array(p)
q = np.array(q)
# Normalize if not already normalized
if np.sum(p) != 1.0:
p = p / np.sum(p) if np.sum(p) > 0 else p
if np.sum(q) != 1.0:
q = q / np.sum(q) if np.sum(q) > 0 else q
# Calculate Jensen-Shannon divergence
m = 0.5 * (p + q)
return 0.5 * (distance.jensenshannon(p, m) + distance.jensenshannon(q, m))
def compare_topics(texts_set_1, texts_set_2, n_topics=3, n_top_words=10, method="lda", model_names=None):
"""
Compare topics between two sets of texts
Args:
texts_set_1 (list): First list of text documents
texts_set_2 (list): Second list of text documents
n_topics (int): Number of topics to extract
n_top_words (int): Number of top words per topic
method (str): Topic modeling method ('lda' or 'nmf')
model_names (list, optional): Names of the models being compared
Returns:
dict: Comparison results with topics from both sets and similarity metrics
"""
logger.info(f"Starting topic comparison with n_topics={n_topics}, method={method}")
# Set default model names if not provided
if model_names is None:
model_names = ["Model 1", "Model 2"]
# Initialize the result structure
result = {
"method": method,
"n_topics": n_topics,
"models": model_names,
"model_topics": {},
"topics": [],
"comparisons": {}
}
try:
# Extract topics for each set separately
# For very short texts, try combining all texts from each model
combined_text_1 = " ".join(texts_set_1)
combined_text_2 = " ".join(texts_set_2)
# Process all texts together to find common topics
all_texts = texts_set_1 + texts_set_2
logger.info(f"Processing {len(all_texts)} total texts")
# Extract topics from combined corpus
combined_result = extract_topics(all_texts, n_topics, n_top_words, method)
# Check for errors
if "error" in combined_result:
logger.warning(f"Error in combined topic extraction: {combined_result['error']}")
result["error"] = combined_result["error"]
return result
# Store topics from combined analysis
result["topics"] = combined_result["topics"]
# Now process each text set to get their topic distributions
model1_doc_topics = []
model2_doc_topics = []
# Try to use the same model from combined analysis for consistency
if "document_topics" in combined_result and len(combined_result["document_topics"]) == len(all_texts):
# Get document topics for each model
n_docs_model1 = len(texts_set_1)
for i, doc_topic in enumerate(combined_result["document_topics"]):
if i < n_docs_model1:
model1_doc_topics.append(doc_topic["distribution"])
else:
model2_doc_topics.append(doc_topic["distribution"])
else:
# Fallback: run separate topic modeling for each model
logger.info("Using separate topic modeling for each model")
model1_result = extract_topics([combined_text_1], n_topics, n_top_words, method)
model2_result = extract_topics([combined_text_2], n_topics, n_top_words, method)
if "document_topics" in model1_result and model1_result["document_topics"]:
model1_doc_topics = [doc["distribution"] for doc in model1_result["document_topics"]]
if "document_topics" in model2_result and model2_result["document_topics"]:
model2_doc_topics = [doc["distribution"] for doc in model2_result["document_topics"]]
# Calculate average topic distribution for each model
if model1_doc_topics:
model1_avg_distribution = np.mean(model1_doc_topics, axis=0).tolist()
result["model_topics"][model_names[0]] = model1_avg_distribution
if model2_doc_topics:
model2_avg_distribution = np.mean(model2_doc_topics, axis=0).tolist()
result["model_topics"][model_names[1]] = model2_avg_distribution
# Calculate similarity between models' topic distributions
if model_names[0] in result["model_topics"] and model_names[1] in result["model_topics"]:
comparison_key = f"{model_names[0]} vs {model_names[1]}"
dist1 = result["model_topics"][model_names[0]]
dist2 = result["model_topics"][model_names[1]]
# Calculate Jensen-Shannon divergence (smaller means more similar)
js_div = calculate_jensen_shannon_divergence(dist1, dist2)
# Create comparison result
result["comparisons"][comparison_key] = {
"js_divergence": js_div
}
logger.info(f"Topic comparison completed successfully. JS divergence: {js_div:.4f}")
else:
logger.warning("Could not calculate model comparisons due to missing topic distributions")
except Exception as e:
logger.error(f"Error in compare_topics: {str(e)}")
result["error"] = f"Topic comparison failed: {str(e)}"
return result