Spaces:
Sleeping
Sleeping
""" | |
Topic modeling processor for comparing text responses | |
""" | |
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer | |
from sklearn.decomposition import LatentDirichletAllocation, NMF | |
import numpy as np | |
import nltk | |
from nltk.corpus import stopwords | |
import re | |
def preprocess_text(text): | |
""" | |
Preprocess text for topic modeling | |
Args: | |
text (str): Text to preprocess | |
Returns: | |
str: Preprocessed text | |
""" | |
# Convert to lowercase | |
text = text.lower() | |
# Remove special characters and digits | |
text = re.sub(r'[^a-zA-Z\s]', '', text) | |
# Tokenize | |
tokens = nltk.word_tokenize(text) | |
# Remove stopwords | |
stop_words = set(stopwords.words('english')) | |
tokens = [token for token in tokens if token not in stop_words and len(token) > 3] | |
return ' '.join(tokens) | |
def get_top_words_per_topic(model, feature_names, n_top_words=10): | |
""" | |
Get the top words for each topic in the model | |
Args: | |
model: Topic model (LDA or NMF) | |
feature_names (list): Feature names (words) | |
n_top_words (int): Number of top words to include per topic | |
Returns: | |
list: List of topics with their top words | |
""" | |
topics = [] | |
for topic_idx, topic in enumerate(model.components_): | |
top_words_idx = topic.argsort()[:-n_top_words - 1:-1] | |
top_words = [feature_names[i] for i in top_words_idx] | |
topic_dict = { | |
"id": topic_idx, | |
"words": top_words, | |
"weights": topic[top_words_idx].tolist() | |
} | |
topics.append(topic_dict) | |
return topics | |
def extract_topics(texts, n_topics=3, n_top_words=10, method="lda"): | |
""" | |
Extract topics from a list of texts | |
Args: | |
texts (list): List of text documents | |
n_topics (int): Number of topics to extract | |
n_top_words (int): Number of top words per topic | |
method (str): Topic modeling method ('lda' or 'nmf') | |
Returns: | |
dict: Topic modeling results with topics and document-topic distributions | |
""" | |
result = { | |
"method": method, | |
"n_topics": n_topics, | |
"topics": [], | |
"document_topics": [] | |
} | |
# Preprocess texts | |
preprocessed_texts = [preprocess_text(text) for text in texts] | |
# Create document-term matrix | |
if method == "nmf": | |
# For NMF, use TF-IDF vectorization | |
vectorizer = TfidfVectorizer(max_features=1000, min_df=2, max_df=0.85) | |
else: | |
# For LDA, use CountVectorizer | |
vectorizer = CountVectorizer(max_features=1000, min_df=2, max_df=0.85) | |
X = vectorizer.fit_transform(preprocessed_texts) | |
feature_names = vectorizer.get_feature_names_out() | |
# Apply topic modeling | |
if method == "nmf": | |
# Non-negative Matrix Factorization | |
model = NMF(n_components=n_topics, random_state=42, max_iter=1000) | |
else: | |
# Latent Dirichlet Allocation | |
model = LatentDirichletAllocation(n_components=n_topics, random_state=42, max_iter=20) | |
topic_distribution = model.fit_transform(X) | |
# Get top words for each topic | |
result["topics"] = get_top_words_per_topic(model, feature_names, n_top_words) | |
# Get topic distribution for each document | |
for i, dist in enumerate(topic_distribution): | |
# Normalize for easier comparison | |
normalized_dist = dist / np.sum(dist) if np.sum(dist) > 0 else dist | |
result["document_topics"].append({ | |
"document_id": i, | |
"distribution": normalized_dist.tolist() | |
}) | |
return result | |
def compare_topics(response_texts, model_names, n_topics=3, n_top_words=10, method="lda"): | |
""" | |
Compare topic distributions between different model responses | |
Args: | |
response_texts (list): List of response texts to compare | |
model_names (list): Names of models corresponding to responses | |
n_topics (int): Number of topics to extract | |
n_top_words (int): Number of top words per topic | |
method (str): Topic modeling method ('lda' or 'nmf') | |
Returns: | |
dict: Comparative topic analysis | |
""" | |
# Initialize results | |
result = { | |
"models": model_names, | |
"method": method, | |
"n_topics": n_topics, | |
"topics": [], | |
"model_topics": {}, | |
"comparisons": {} | |
} | |
# Extract topics | |
topic_model = extract_topics(response_texts, n_topics, n_top_words, method) | |
result["topics"] = topic_model["topics"] | |
# Map topic distributions to models | |
for i, model_name in enumerate(model_names): | |
if i < len(topic_model["document_topics"]): | |
result["model_topics"][model_name] = topic_model["document_topics"][i]["distribution"] | |
# Calculate topic distribution differences for pairs of models | |
if len(model_names) >= 2: | |
for i in range(len(model_names)): | |
for j in range(i+1, len(model_names)): | |
model1, model2 = model_names[i], model_names[j] | |
# Get topic distributions | |
dist1 = result["model_topics"].get(model1, []) | |
dist2 = result["model_topics"].get(model2, []) | |
# Skip if distributions are not available | |
if not dist1 or not dist2 or len(dist1) != len(dist2): | |
continue | |
# Calculate Jensen-Shannon divergence (approximation using average of KL divergences) | |
dist1 = np.array(dist1) | |
dist2 = np.array(dist2) | |
# Add small epsilon to avoid division by zero | |
epsilon = 1e-10 | |
dist1 = dist1 + epsilon | |
dist2 = dist2 + epsilon | |
# Normalize | |
dist1 = dist1 / np.sum(dist1) | |
dist2 = dist2 / np.sum(dist2) | |
# Calculate average distribution | |
avg_dist = (dist1 + dist2) / 2 | |
# Calculate KL divergences | |
kl_div1 = np.sum(dist1 * np.log(dist1 / avg_dist)) | |
kl_div2 = np.sum(dist2 * np.log(dist2 / avg_dist)) | |
# Jensen-Shannon divergence | |
js_div = (kl_div1 + kl_div2) / 2 | |
# Topic-wise differences | |
topic_diffs = [] | |
for t in range(len(dist1)): | |
topic_diffs.append({ | |
"topic_id": t, | |
"model1_weight": float(dist1[t]), | |
"model2_weight": float(dist2[t]), | |
"diff": float(abs(dist1[t] - dist2[t])) | |
}) | |
# Sort by difference | |
topic_diffs.sort(key=lambda x: x["diff"], reverse=True) | |
# Store comparison | |
comparison_key = f"{model1} vs {model2}" | |
result["comparisons"][comparison_key] = { | |
"js_divergence": float(js_div), | |
"topic_differences": topic_diffs | |
} | |
return result | |