Spaces:
Sleeping
Sleeping
File size: 7,212 Bytes
14bac19 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 |
"""
Topic modeling processor for comparing text responses
"""
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from sklearn.decomposition import LatentDirichletAllocation, NMF
import numpy as np
import nltk
from nltk.corpus import stopwords
import re
def preprocess_text(text):
"""
Preprocess text for topic modeling
Args:
text (str): Text to preprocess
Returns:
str: Preprocessed text
"""
# Convert to lowercase
text = text.lower()
# Remove special characters and digits
text = re.sub(r'[^a-zA-Z\s]', '', text)
# Tokenize
tokens = nltk.word_tokenize(text)
# Remove stopwords
stop_words = set(stopwords.words('english'))
tokens = [token for token in tokens if token not in stop_words and len(token) > 3]
return ' '.join(tokens)
def get_top_words_per_topic(model, feature_names, n_top_words=10):
"""
Get the top words for each topic in the model
Args:
model: Topic model (LDA or NMF)
feature_names (list): Feature names (words)
n_top_words (int): Number of top words to include per topic
Returns:
list: List of topics with their top words
"""
topics = []
for topic_idx, topic in enumerate(model.components_):
top_words_idx = topic.argsort()[:-n_top_words - 1:-1]
top_words = [feature_names[i] for i in top_words_idx]
topic_dict = {
"id": topic_idx,
"words": top_words,
"weights": topic[top_words_idx].tolist()
}
topics.append(topic_dict)
return topics
def extract_topics(texts, n_topics=3, n_top_words=10, method="lda"):
"""
Extract topics from a list of texts
Args:
texts (list): List of text documents
n_topics (int): Number of topics to extract
n_top_words (int): Number of top words per topic
method (str): Topic modeling method ('lda' or 'nmf')
Returns:
dict: Topic modeling results with topics and document-topic distributions
"""
result = {
"method": method,
"n_topics": n_topics,
"topics": [],
"document_topics": []
}
# Preprocess texts
preprocessed_texts = [preprocess_text(text) for text in texts]
# Create document-term matrix
if method == "nmf":
# For NMF, use TF-IDF vectorization
vectorizer = TfidfVectorizer(max_features=1000, min_df=2, max_df=0.85)
else:
# For LDA, use CountVectorizer
vectorizer = CountVectorizer(max_features=1000, min_df=2, max_df=0.85)
X = vectorizer.fit_transform(preprocessed_texts)
feature_names = vectorizer.get_feature_names_out()
# Apply topic modeling
if method == "nmf":
# Non-negative Matrix Factorization
model = NMF(n_components=n_topics, random_state=42, max_iter=1000)
else:
# Latent Dirichlet Allocation
model = LatentDirichletAllocation(n_components=n_topics, random_state=42, max_iter=20)
topic_distribution = model.fit_transform(X)
# Get top words for each topic
result["topics"] = get_top_words_per_topic(model, feature_names, n_top_words)
# Get topic distribution for each document
for i, dist in enumerate(topic_distribution):
# Normalize for easier comparison
normalized_dist = dist / np.sum(dist) if np.sum(dist) > 0 else dist
result["document_topics"].append({
"document_id": i,
"distribution": normalized_dist.tolist()
})
return result
def compare_topics(response_texts, model_names, n_topics=3, n_top_words=10, method="lda"):
"""
Compare topic distributions between different model responses
Args:
response_texts (list): List of response texts to compare
model_names (list): Names of models corresponding to responses
n_topics (int): Number of topics to extract
n_top_words (int): Number of top words per topic
method (str): Topic modeling method ('lda' or 'nmf')
Returns:
dict: Comparative topic analysis
"""
# Initialize results
result = {
"models": model_names,
"method": method,
"n_topics": n_topics,
"topics": [],
"model_topics": {},
"comparisons": {}
}
# Extract topics
topic_model = extract_topics(response_texts, n_topics, n_top_words, method)
result["topics"] = topic_model["topics"]
# Map topic distributions to models
for i, model_name in enumerate(model_names):
if i < len(topic_model["document_topics"]):
result["model_topics"][model_name] = topic_model["document_topics"][i]["distribution"]
# Calculate topic distribution differences for pairs of models
if len(model_names) >= 2:
for i in range(len(model_names)):
for j in range(i+1, len(model_names)):
model1, model2 = model_names[i], model_names[j]
# Get topic distributions
dist1 = result["model_topics"].get(model1, [])
dist2 = result["model_topics"].get(model2, [])
# Skip if distributions are not available
if not dist1 or not dist2 or len(dist1) != len(dist2):
continue
# Calculate Jensen-Shannon divergence (approximation using average of KL divergences)
dist1 = np.array(dist1)
dist2 = np.array(dist2)
# Add small epsilon to avoid division by zero
epsilon = 1e-10
dist1 = dist1 + epsilon
dist2 = dist2 + epsilon
# Normalize
dist1 = dist1 / np.sum(dist1)
dist2 = dist2 / np.sum(dist2)
# Calculate average distribution
avg_dist = (dist1 + dist2) / 2
# Calculate KL divergences
kl_div1 = np.sum(dist1 * np.log(dist1 / avg_dist))
kl_div2 = np.sum(dist2 * np.log(dist2 / avg_dist))
# Jensen-Shannon divergence
js_div = (kl_div1 + kl_div2) / 2
# Topic-wise differences
topic_diffs = []
for t in range(len(dist1)):
topic_diffs.append({
"topic_id": t,
"model1_weight": float(dist1[t]),
"model2_weight": float(dist2[t]),
"diff": float(abs(dist1[t] - dist2[t]))
})
# Sort by difference
topic_diffs.sort(key=lambda x: x["diff"], reverse=True)
# Store comparison
comparison_key = f"{model1} vs {model2}"
result["comparisons"][comparison_key] = {
"js_divergence": float(js_div),
"topic_differences": topic_diffs
}
return result
|