""" Enhanced topic modeling processor for comparing text responses with better error handling and more robust algorithm configuration """ from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer from sklearn.decomposition import LatentDirichletAllocation, NMF import numpy as np import nltk from nltk.corpus import stopwords import re from scipy.spatial import distance import logging # Set up logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger('topic_modeling') def preprocess_text(text): """ Preprocess text for topic modeling Args: text (str): Text to preprocess Returns: str: Preprocessed text """ try: # Convert to lowercase text = text.lower() # Remove special characters and digits text = re.sub(r'[^a-zA-Z\s]', '', text) # Tokenize tokens = nltk.word_tokenize(text) # Remove stopwords stop_words = set(stopwords.words('english')) tokens = [token for token in tokens if token not in stop_words and len(token) > 3] return ' '.join(tokens) except Exception as e: logger.error(f"Error in preprocess_text: {str(e)}") # Return original text if preprocessing fails return text def get_top_words_per_topic(model, feature_names, n_top_words=10): """ Get the top words for each topic in the model Args: model: Topic model (LDA or NMF) feature_names (list): Feature names (words) n_top_words (int): Number of top words to include per topic Returns: list: List of topics with their top words """ topics = [] for topic_idx, topic in enumerate(model.components_): top_words_idx = topic.argsort()[:-n_top_words - 1:-1] top_words = [feature_names[i] for i in top_words_idx] topic_dict = { "id": topic_idx, "words": top_words, "weights": topic[top_words_idx].tolist() } topics.append(topic_dict) return topics def extract_topics(texts, n_topics=3, n_top_words=10, method="lda"): """ Extract topics from a list of texts Args: texts (list): List of text documents n_topics (int): Number of topics to extract n_top_words (int): Number of top words per topic method (str): Topic modeling method ('lda' or 'nmf') Returns: dict: Topic modeling results with topics and document-topic distributions """ if isinstance(n_topics, str): n_topics = int(n_topics) # Ensure n_topics is at least 2 n_topics = max(2, n_topics) logger.info(f"Starting topic modeling with method={method}, n_topics={n_topics}") result = { "method": method, "n_topics": n_topics, "topics": [], "document_topics": [] } try: # Preprocess texts logger.info("Preprocessing texts") preprocessed_texts = [preprocess_text(text) for text in texts] # Check if texts are not empty after preprocessing preprocessed_texts = [text for text in preprocessed_texts if len(text.strip()) > 0] if not preprocessed_texts: logger.warning("All texts are empty after preprocessing") return result # Create document-term matrix logger.info(f"Creating document-term matrix using {method}") if method == "nmf": # For NMF, use TF-IDF vectorization vectorizer = TfidfVectorizer(max_features=1000, min_df=1, max_df=0.95, stop_words='english') else: # For LDA, use CountVectorizer vectorizer = CountVectorizer(max_features=1000, min_df=1, max_df=0.95, stop_words='english') try: X = vectorizer.fit_transform(preprocessed_texts) feature_names = vectorizer.get_feature_names_out() # Check if we have enough features if X.shape[1] < n_topics: logger.warning(f"Only {X.shape[1]} features found, reducing n_topics from {n_topics}") n_topics = max(2, X.shape[1] - 1) result["n_topics"] = n_topics # Apply topic modeling logger.info(f"Applying {method.upper()} with {n_topics} topics") if method == "nmf": # Non-negative Matrix Factorization model = NMF(n_components=n_topics, random_state=42, max_iter=1000) else: # Latent Dirichlet Allocation model = LatentDirichletAllocation( n_components=n_topics, random_state=42, max_iter=20, learning_method='online' ) topic_distribution = model.fit_transform(X) # Get top words for each topic logger.info("Extracting top words for each topic") result["topics"] = get_top_words_per_topic(model, feature_names, n_top_words) # Get topic distribution for each document logger.info("Calculating topic distributions for documents") for i, dist in enumerate(topic_distribution): # Normalize for easier comparison normalized_dist = dist / np.sum(dist) if np.sum(dist) > 0 else dist result["document_topics"].append({ "document_id": i, "distribution": normalized_dist.tolist() }) logger.info("Topic modeling completed successfully") except Exception as e: logger.error(f"Error in vectorization or modeling: {str(e)}") result["error"] = f"Topic modeling failed: {str(e)}" except Exception as e: logger.error(f"General error in extract_topics: {str(e)}") result["error"] = f"Topic modeling failed: {str(e)}" return result def calculate_jensen_shannon_divergence(p, q): """ Calculate Jensen-Shannon divergence between two probability distributions Args: p (array): First probability distribution q (array): Second probability distribution Returns: float: Jensen-Shannon divergence """ # Ensure inputs are numpy arrays p = np.array(p) q = np.array(q) # Normalize if not already normalized if np.sum(p) != 1.0: p = p / np.sum(p) if np.sum(p) > 0 else p if np.sum(q) != 1.0: q = q / np.sum(q) if np.sum(q) > 0 else q # Calculate Jensen-Shannon divergence m = 0.5 * (p + q) return 0.5 * (distance.jensenshannon(p, m) + distance.jensenshannon(q, m)) def compare_topics(texts_set_1, texts_set_2, n_topics=3, n_top_words=10, method="lda", model_names=None): """ Compare topics between two sets of texts Args: texts_set_1 (list): First list of text documents texts_set_2 (list): Second list of text documents n_topics (int): Number of topics to extract n_top_words (int): Number of top words per topic method (str): Topic modeling method ('lda' or 'nmf') model_names (list, optional): Names of the models being compared Returns: dict: Comparison results with topics from both sets and similarity metrics """ logger.info(f"Starting topic comparison with n_topics={n_topics}, method={method}") # Set default model names if not provided if model_names is None: model_names = ["Model 1", "Model 2"] # Initialize the result structure result = { "method": method, "n_topics": n_topics, "models": model_names, "model_topics": {}, "topics": [], "comparisons": {} } try: # Extract topics for each set separately # For very short texts, try combining all texts from each model combined_text_1 = " ".join(texts_set_1) combined_text_2 = " ".join(texts_set_2) # Process all texts together to find common topics all_texts = texts_set_1 + texts_set_2 logger.info(f"Processing {len(all_texts)} total texts") # Extract topics from combined corpus combined_result = extract_topics(all_texts, n_topics, n_top_words, method) # Check for errors if "error" in combined_result: logger.warning(f"Error in combined topic extraction: {combined_result['error']}") result["error"] = combined_result["error"] return result # Store topics from combined analysis result["topics"] = combined_result["topics"] # Now process each text set to get their topic distributions model1_doc_topics = [] model2_doc_topics = [] # Try to use the same model from combined analysis for consistency if "document_topics" in combined_result and len(combined_result["document_topics"]) == len(all_texts): # Get document topics for each model n_docs_model1 = len(texts_set_1) for i, doc_topic in enumerate(combined_result["document_topics"]): if i < n_docs_model1: model1_doc_topics.append(doc_topic["distribution"]) else: model2_doc_topics.append(doc_topic["distribution"]) else: # Fallback: run separate topic modeling for each model logger.info("Using separate topic modeling for each model") model1_result = extract_topics([combined_text_1], n_topics, n_top_words, method) model2_result = extract_topics([combined_text_2], n_topics, n_top_words, method) if "document_topics" in model1_result and model1_result["document_topics"]: model1_doc_topics = [doc["distribution"] for doc in model1_result["document_topics"]] if "document_topics" in model2_result and model2_result["document_topics"]: model2_doc_topics = [doc["distribution"] for doc in model2_result["document_topics"]] # Calculate average topic distribution for each model if model1_doc_topics: model1_avg_distribution = np.mean(model1_doc_topics, axis=0).tolist() result["model_topics"][model_names[0]] = model1_avg_distribution if model2_doc_topics: model2_avg_distribution = np.mean(model2_doc_topics, axis=0).tolist() result["model_topics"][model_names[1]] = model2_avg_distribution # Calculate similarity between models' topic distributions if model_names[0] in result["model_topics"] and model_names[1] in result["model_topics"]: comparison_key = f"{model_names[0]} vs {model_names[1]}" dist1 = result["model_topics"][model_names[0]] dist2 = result["model_topics"][model_names[1]] # Calculate Jensen-Shannon divergence (smaller means more similar) js_div = calculate_jensen_shannon_divergence(dist1, dist2) # Create comparison result result["comparisons"][comparison_key] = { "js_divergence": js_div } logger.info(f"Topic comparison completed successfully. JS divergence: {js_div:.4f}") else: logger.warning("Could not calculate model comparisons due to missing topic distributions") except Exception as e: logger.error(f"Error in compare_topics: {str(e)}") result["error"] = f"Topic comparison failed: {str(e)}" return result