Spaces:
Sleeping
Sleeping
File size: 11,905 Bytes
14bac19 f533950 14bac19 f533950 14bac19 f533950 14bac19 f533950 14bac19 f533950 14bac19 f533950 14bac19 f533950 14bac19 f533950 14bac19 f533950 1b72959 b559aef 1b72959 b559aef 1b72959 f533950 b559aef f533950 1b72959 f533950 1b72959 f533950 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 |
"""
Enhanced topic modeling processor for comparing text responses with better error handling
and more robust algorithm configuration
"""
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from sklearn.decomposition import LatentDirichletAllocation, NMF
import numpy as np
import nltk
from nltk.corpus import stopwords
import re
from scipy.spatial import distance
import logging
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('topic_modeling')
def preprocess_text(text):
"""
Preprocess text for topic modeling
Args:
text (str): Text to preprocess
Returns:
str: Preprocessed text
"""
try:
# Convert to lowercase
text = text.lower()
# Remove special characters and digits
text = re.sub(r'[^a-zA-Z\s]', '', text)
# Tokenize
tokens = nltk.word_tokenize(text)
# Remove stopwords
stop_words = set(stopwords.words('english'))
tokens = [token for token in tokens if token not in stop_words and len(token) > 3]
return ' '.join(tokens)
except Exception as e:
logger.error(f"Error in preprocess_text: {str(e)}")
# Return original text if preprocessing fails
return text
def get_top_words_per_topic(model, feature_names, n_top_words=10):
"""
Get the top words for each topic in the model
Args:
model: Topic model (LDA or NMF)
feature_names (list): Feature names (words)
n_top_words (int): Number of top words to include per topic
Returns:
list: List of topics with their top words
"""
topics = []
for topic_idx, topic in enumerate(model.components_):
top_words_idx = topic.argsort()[:-n_top_words - 1:-1]
top_words = [feature_names[i] for i in top_words_idx]
topic_dict = {
"id": topic_idx,
"words": top_words,
"weights": topic[top_words_idx].tolist()
}
topics.append(topic_dict)
return topics
def extract_topics(texts, n_topics=3, n_top_words=10, method="lda"):
"""
Extract topics from a list of texts
Args:
texts (list): List of text documents
n_topics (int): Number of topics to extract
n_top_words (int): Number of top words per topic
method (str): Topic modeling method ('lda' or 'nmf')
Returns:
dict: Topic modeling results with topics and document-topic distributions
"""
if isinstance(n_topics, str):
n_topics = int(n_topics)
# Ensure n_topics is at least 2
n_topics = max(2, n_topics)
logger.info(f"Starting topic modeling with method={method}, n_topics={n_topics}")
result = {
"method": method,
"n_topics": n_topics,
"topics": [],
"document_topics": []
}
try:
# Preprocess texts
logger.info("Preprocessing texts")
preprocessed_texts = [preprocess_text(text) for text in texts]
# Check if texts are not empty after preprocessing
preprocessed_texts = [text for text in preprocessed_texts if len(text.strip()) > 0]
if not preprocessed_texts:
logger.warning("All texts are empty after preprocessing")
return result
# Create document-term matrix
logger.info(f"Creating document-term matrix using {method}")
if method == "nmf":
# For NMF, use TF-IDF vectorization
vectorizer = TfidfVectorizer(max_features=1000, min_df=1, max_df=0.95, stop_words='english')
else:
# For LDA, use CountVectorizer
vectorizer = CountVectorizer(max_features=1000, min_df=1, max_df=0.95, stop_words='english')
try:
X = vectorizer.fit_transform(preprocessed_texts)
feature_names = vectorizer.get_feature_names_out()
# Check if we have enough features
if X.shape[1] < n_topics:
logger.warning(f"Only {X.shape[1]} features found, reducing n_topics from {n_topics}")
n_topics = max(2, X.shape[1] - 1)
result["n_topics"] = n_topics
# Apply topic modeling
logger.info(f"Applying {method.upper()} with {n_topics} topics")
if method == "nmf":
# Non-negative Matrix Factorization
model = NMF(n_components=n_topics, random_state=42, max_iter=1000)
else:
# Latent Dirichlet Allocation
model = LatentDirichletAllocation(
n_components=n_topics,
random_state=42,
max_iter=20,
learning_method='online'
)
topic_distribution = model.fit_transform(X)
# Get top words for each topic
logger.info("Extracting top words for each topic")
result["topics"] = get_top_words_per_topic(model, feature_names, n_top_words)
# Get topic distribution for each document
logger.info("Calculating topic distributions for documents")
for i, dist in enumerate(topic_distribution):
# Normalize for easier comparison
normalized_dist = dist / np.sum(dist) if np.sum(dist) > 0 else dist
result["document_topics"].append({
"document_id": i,
"distribution": normalized_dist.tolist()
})
logger.info("Topic modeling completed successfully")
except Exception as e:
logger.error(f"Error in vectorization or modeling: {str(e)}")
result["error"] = f"Topic modeling failed: {str(e)}"
except Exception as e:
logger.error(f"General error in extract_topics: {str(e)}")
result["error"] = f"Topic modeling failed: {str(e)}"
return result
def calculate_jensen_shannon_divergence(p, q):
"""
Calculate Jensen-Shannon divergence between two probability distributions
Args:
p (array): First probability distribution
q (array): Second probability distribution
Returns:
float: Jensen-Shannon divergence
"""
# Ensure inputs are numpy arrays
p = np.array(p)
q = np.array(q)
# Normalize if not already normalized
if np.sum(p) != 1.0:
p = p / np.sum(p) if np.sum(p) > 0 else p
if np.sum(q) != 1.0:
q = q / np.sum(q) if np.sum(q) > 0 else q
# Calculate Jensen-Shannon divergence
m = 0.5 * (p + q)
return 0.5 * (distance.jensenshannon(p, m) + distance.jensenshannon(q, m))
def compare_topics(texts_set_1, texts_set_2, n_topics=3, n_top_words=10, method="lda", model_names=None):
"""
Compare topics between two sets of texts
Args:
texts_set_1 (list): First list of text documents
texts_set_2 (list): Second list of text documents
n_topics (int): Number of topics to extract
n_top_words (int): Number of top words per topic
method (str): Topic modeling method ('lda' or 'nmf')
model_names (list, optional): Names of the models being compared
Returns:
dict: Comparison results with topics from both sets and similarity metrics
"""
logger.info(f"Starting topic comparison with n_topics={n_topics}, method={method}")
# Set default model names if not provided
if model_names is None:
model_names = ["Model 1", "Model 2"]
# Initialize the result structure
result = {
"method": method,
"n_topics": n_topics,
"models": model_names,
"model_topics": {},
"topics": [],
"comparisons": {}
}
try:
# Extract topics for each set separately
# For very short texts, try combining all texts from each model
combined_text_1 = " ".join(texts_set_1)
combined_text_2 = " ".join(texts_set_2)
# Process all texts together to find common topics
all_texts = texts_set_1 + texts_set_2
logger.info(f"Processing {len(all_texts)} total texts")
# Extract topics from combined corpus
combined_result = extract_topics(all_texts, n_topics, n_top_words, method)
# Check for errors
if "error" in combined_result:
logger.warning(f"Error in combined topic extraction: {combined_result['error']}")
result["error"] = combined_result["error"]
return result
# Store topics from combined analysis
result["topics"] = combined_result["topics"]
# Now process each text set to get their topic distributions
model1_doc_topics = []
model2_doc_topics = []
# Try to use the same model from combined analysis for consistency
if "document_topics" in combined_result and len(combined_result["document_topics"]) == len(all_texts):
# Get document topics for each model
n_docs_model1 = len(texts_set_1)
for i, doc_topic in enumerate(combined_result["document_topics"]):
if i < n_docs_model1:
model1_doc_topics.append(doc_topic["distribution"])
else:
model2_doc_topics.append(doc_topic["distribution"])
else:
# Fallback: run separate topic modeling for each model
logger.info("Using separate topic modeling for each model")
model1_result = extract_topics([combined_text_1], n_topics, n_top_words, method)
model2_result = extract_topics([combined_text_2], n_topics, n_top_words, method)
if "document_topics" in model1_result and model1_result["document_topics"]:
model1_doc_topics = [doc["distribution"] for doc in model1_result["document_topics"]]
if "document_topics" in model2_result and model2_result["document_topics"]:
model2_doc_topics = [doc["distribution"] for doc in model2_result["document_topics"]]
# Calculate average topic distribution for each model
if model1_doc_topics:
model1_avg_distribution = np.mean(model1_doc_topics, axis=0).tolist()
result["model_topics"][model_names[0]] = model1_avg_distribution
if model2_doc_topics:
model2_avg_distribution = np.mean(model2_doc_topics, axis=0).tolist()
result["model_topics"][model_names[1]] = model2_avg_distribution
# Calculate similarity between models' topic distributions
if model_names[0] in result["model_topics"] and model_names[1] in result["model_topics"]:
comparison_key = f"{model_names[0]} vs {model_names[1]}"
dist1 = result["model_topics"][model_names[0]]
dist2 = result["model_topics"][model_names[1]]
# Calculate Jensen-Shannon divergence (smaller means more similar)
js_div = calculate_jensen_shannon_divergence(dist1, dist2)
# Create comparison result
result["comparisons"][comparison_key] = {
"js_divergence": js_div
}
logger.info(f"Topic comparison completed successfully. JS divergence: {js_div:.4f}")
else:
logger.warning("Could not calculate model comparisons due to missing topic distributions")
except Exception as e:
logger.error(f"Error in compare_topics: {str(e)}")
result["error"] = f"Topic comparison failed: {str(e)}"
return result
|