525GradioApp / processors /topic_modeling.py
Ryan
update
14bac19
raw
history blame
7.21 kB
"""
Topic modeling processor for comparing text responses
"""
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from sklearn.decomposition import LatentDirichletAllocation, NMF
import numpy as np
import nltk
from nltk.corpus import stopwords
import re
def preprocess_text(text):
"""
Preprocess text for topic modeling
Args:
text (str): Text to preprocess
Returns:
str: Preprocessed text
"""
# Convert to lowercase
text = text.lower()
# Remove special characters and digits
text = re.sub(r'[^a-zA-Z\s]', '', text)
# Tokenize
tokens = nltk.word_tokenize(text)
# Remove stopwords
stop_words = set(stopwords.words('english'))
tokens = [token for token in tokens if token not in stop_words and len(token) > 3]
return ' '.join(tokens)
def get_top_words_per_topic(model, feature_names, n_top_words=10):
"""
Get the top words for each topic in the model
Args:
model: Topic model (LDA or NMF)
feature_names (list): Feature names (words)
n_top_words (int): Number of top words to include per topic
Returns:
list: List of topics with their top words
"""
topics = []
for topic_idx, topic in enumerate(model.components_):
top_words_idx = topic.argsort()[:-n_top_words - 1:-1]
top_words = [feature_names[i] for i in top_words_idx]
topic_dict = {
"id": topic_idx,
"words": top_words,
"weights": topic[top_words_idx].tolist()
}
topics.append(topic_dict)
return topics
def extract_topics(texts, n_topics=3, n_top_words=10, method="lda"):
"""
Extract topics from a list of texts
Args:
texts (list): List of text documents
n_topics (int): Number of topics to extract
n_top_words (int): Number of top words per topic
method (str): Topic modeling method ('lda' or 'nmf')
Returns:
dict: Topic modeling results with topics and document-topic distributions
"""
result = {
"method": method,
"n_topics": n_topics,
"topics": [],
"document_topics": []
}
# Preprocess texts
preprocessed_texts = [preprocess_text(text) for text in texts]
# Create document-term matrix
if method == "nmf":
# For NMF, use TF-IDF vectorization
vectorizer = TfidfVectorizer(max_features=1000, min_df=2, max_df=0.85)
else:
# For LDA, use CountVectorizer
vectorizer = CountVectorizer(max_features=1000, min_df=2, max_df=0.85)
X = vectorizer.fit_transform(preprocessed_texts)
feature_names = vectorizer.get_feature_names_out()
# Apply topic modeling
if method == "nmf":
# Non-negative Matrix Factorization
model = NMF(n_components=n_topics, random_state=42, max_iter=1000)
else:
# Latent Dirichlet Allocation
model = LatentDirichletAllocation(n_components=n_topics, random_state=42, max_iter=20)
topic_distribution = model.fit_transform(X)
# Get top words for each topic
result["topics"] = get_top_words_per_topic(model, feature_names, n_top_words)
# Get topic distribution for each document
for i, dist in enumerate(topic_distribution):
# Normalize for easier comparison
normalized_dist = dist / np.sum(dist) if np.sum(dist) > 0 else dist
result["document_topics"].append({
"document_id": i,
"distribution": normalized_dist.tolist()
})
return result
def compare_topics(response_texts, model_names, n_topics=3, n_top_words=10, method="lda"):
"""
Compare topic distributions between different model responses
Args:
response_texts (list): List of response texts to compare
model_names (list): Names of models corresponding to responses
n_topics (int): Number of topics to extract
n_top_words (int): Number of top words per topic
method (str): Topic modeling method ('lda' or 'nmf')
Returns:
dict: Comparative topic analysis
"""
# Initialize results
result = {
"models": model_names,
"method": method,
"n_topics": n_topics,
"topics": [],
"model_topics": {},
"comparisons": {}
}
# Extract topics
topic_model = extract_topics(response_texts, n_topics, n_top_words, method)
result["topics"] = topic_model["topics"]
# Map topic distributions to models
for i, model_name in enumerate(model_names):
if i < len(topic_model["document_topics"]):
result["model_topics"][model_name] = topic_model["document_topics"][i]["distribution"]
# Calculate topic distribution differences for pairs of models
if len(model_names) >= 2:
for i in range(len(model_names)):
for j in range(i+1, len(model_names)):
model1, model2 = model_names[i], model_names[j]
# Get topic distributions
dist1 = result["model_topics"].get(model1, [])
dist2 = result["model_topics"].get(model2, [])
# Skip if distributions are not available
if not dist1 or not dist2 or len(dist1) != len(dist2):
continue
# Calculate Jensen-Shannon divergence (approximation using average of KL divergences)
dist1 = np.array(dist1)
dist2 = np.array(dist2)
# Add small epsilon to avoid division by zero
epsilon = 1e-10
dist1 = dist1 + epsilon
dist2 = dist2 + epsilon
# Normalize
dist1 = dist1 / np.sum(dist1)
dist2 = dist2 / np.sum(dist2)
# Calculate average distribution
avg_dist = (dist1 + dist2) / 2
# Calculate KL divergences
kl_div1 = np.sum(dist1 * np.log(dist1 / avg_dist))
kl_div2 = np.sum(dist2 * np.log(dist2 / avg_dist))
# Jensen-Shannon divergence
js_div = (kl_div1 + kl_div2) / 2
# Topic-wise differences
topic_diffs = []
for t in range(len(dist1)):
topic_diffs.append({
"topic_id": t,
"model1_weight": float(dist1[t]),
"model2_weight": float(dist2[t]),
"diff": float(abs(dist1[t] - dist2[t]))
})
# Sort by difference
topic_diffs.sort(key=lambda x: x["diff"], reverse=True)
# Store comparison
comparison_key = f"{model1} vs {model2}"
result["comparisons"][comparison_key] = {
"js_divergence": float(js_div),
"topic_differences": topic_diffs
}
return result