import nltk import logging import spacy from nltk.corpus import stopwords from nltk.util import ngrams from collections import Counter import re from tqdm import tqdm # Logging setup logging.basicConfig(level=logging.WARNING, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) class NgramProcessor: def __init__(self, models=None): try: nltk.data.find('corpora/stopwords') except LookupError: nltk.download('stopwords') self.stop_words = set(stopwords.words('english')) # Default to standard model if none specified if models is None: models = ["en_core_web_trf"] # Load specified model self.models = {} for model_name in models: try: self.models[model_name] = spacy.load(model_name) tqdm.write(f"[NgramProcessor] Loaded model: {model_name}") except IOError: tqdm.write(f"[NgramProcessor] Error: Model '{model_name}' not found. Please install it with:") tqdm.write(f"python -m spacy download {model_name}") except Exception as e: tqdm.write(f"[NgramProcessor] Error loading model '{model_name}': {str(e)}") # Set primary NLP model for other processes if "en_core_web_trf" in self.models: self.nlp = self.models["en_core_web_trf"] elif len(self.models) > 0: # Use first available model as primary if preferred one isn't available self.nlp = next(iter(self.models.values())) else: raise ValueError("No spaCy model was successfully loaded") # Add custom entity patterns for numerical ranges to primary model if "entity_ruler" not in self.nlp.pipe_names: ruler = self.nlp.add_pipe("entity_ruler", before="ner") patterns = [ {"label": "CARDINAL", "pattern": [{"TEXT": {"REGEX": "\\d+-\\d+"}}]}, # Pattern for ranges like "7-10" {"label": "PERCENT", "pattern": [{"TEXT": {"REGEX": "\\d+%"}}]} # Pattern for percentages ] ruler.add_patterns(patterns) # Create special pattern for numerical ranges self.number_range_pattern = re.compile(r'\b(\d+(?:-\d+)+)\b') tqdm.write("[NgramProcessor] Initialized with stopwords, spaCy NLP model, and numerical range detection.") def remove_stopwords(self, text): words = re.findall(r'\w+', text.lower()) filtered_words = [word for word in words if word not in self.stop_words] return ' '.join(filtered_words) def extract_number_ranges(self, sentences): """Extract numerical ranges like '7-10' from sentences""" tqdm.write("[NgramProcessor] Extracting numerical ranges...") number_ranges = [] range_counts = Counter() for sentence in sentences: # Find all numerical ranges in the sentence matches = self.number_range_pattern.findall(sentence) for match in matches: range_counts[match] += 1 # Add all ranges that appear in all sentences (threshold for ranges) for range_text, count in range_counts.items(): if count >= 1: number_ranges.append(range_text) tqdm.write(f"[NgramProcessor] Found {len(number_ranges)} numerical ranges: {number_ranges}") return number_ranges def extract_standalone_numbers(self, sentences): """Extract standalone numerical values from sentences""" tqdm.write("[NgramProcessor] Extracting standalone numbers...") # Two patterns: one for percentages, one for regular numbers percentage_pattern = re.compile(r'\b\d+%\b') # Only matches numbers with % sign number_pattern = re.compile(r'\b\d+\b') # Only matches standalone numbers percentage_counts = Counter() number_counts = Counter() percentage_values = set() # Store the numeric part of percentages for cross-reference # First pass: Find all percentages for sentence in sentences: # Extract all percentages first percentage_matches = percentage_pattern.findall(sentence) for match in percentage_matches: percentage_counts[match] += 1 # Store the numeric part for later comparison numeric_part = match.rstrip('%') percentage_values.add(numeric_part) # Second pass: Find standalone numbers for sentence in sentences: # Only look for standalone numbers now number_matches = number_pattern.findall(sentence) for match in number_matches: # Avoid double counting numbers that we already counted as percentages if match not in percentage_values: number_counts[match] += 1 # Process percentages first (they have priority) threshold = max(1, int(len(sentences) * 1.0)) standalone_numbers = [] # Add percentages that meet the threshold for num, count in percentage_counts.items(): if count >= threshold: standalone_numbers.append(num) # Already has % sign # Then add standalone numbers, converting to percentage format if needed for num, count in number_counts.items(): if count >= threshold: # If this number also appeared as part of a percentage, use the percentage format if num in percentage_values: standalone_numbers.append(f"{num}%") else: standalone_numbers.append(num) tqdm.write(f"[NgramProcessor] Found {len(standalone_numbers)} standalone numbers: {standalone_numbers}") return standalone_numbers def extract_regex_subsequences(self, sentences): """Extract potential subsequences using regex patterns before applying NLP""" tqdm.write("[NgramProcessor] Extracting regex subsequences...") # Find potential multi-word subsequences (2-5 words) that occur across sentences potential_subsequences = set() # Process each sentence to find multi-word phrases for sentence in sentences: # First, clean the sentence by removing punctuation and converting to lowercase clean_sentence = re.sub(r'[^\w\s&-./\'()[\]$€£¥+%]', ' ', sentence.lower()) # Extract sequences of 2-6 words for i in range(2, 7): # Try sequences of length 2-6 words pattern = r'\b(\w+(?:[-&\s./\'()[\]$€£¥+%]+\w+){' + str(i-1) + r'})\b' matches = re.findall(pattern, clean_sentence) potential_subsequences.update(matches) # Filter out sequences that consist only of stopwords (but preserve numbers) filtered_subsequences = [] for subseq in potential_subsequences: words = re.split(r'[\s-]+', subseq) # Split on spaces or hyphens # Function to check if a word is a number or percentage def is_numeric(word): return bool(re.match(r'^\d+(\.\d+)?%?$|^\d+-\d+$', word)) # Skip if ALL words are stopwords and none are numeric if all((word in self.stop_words and not is_numeric(word)) for word in words): tqdm.write(f"[NgramProcessor] Skipping all-stopword phrase: {subseq}") continue # Keep if sequence has significant words (not just stopwords) # OR if it contains numbers/percentages if len(words) > 1 and ( any(word not in self.stop_words and (len(word) > 2 or is_numeric(word)) for word in words) ): # Additional check to reject if standalone "the" or other common stopwords if not (len(words) == 1 and words[0] in self.stop_words and not is_numeric(words[0])): filtered_subsequences.append(subseq) # Count occurrences across all sentences subseq_counts = Counter() for subseq in filtered_subsequences: for sentence in sentences: if re.search(r'\b' + re.escape(subseq) + r'\b', sentence.lower()): subseq_counts[subseq] += 1 # Keep only subsequences that appear in multiple sentences threshold = max(2, int(len(sentences) * 1.0)) # threshold to catch all patterns regex_candidates = [subseq for subseq, count in subseq_counts.items() if count >= threshold] tqdm.write(f"[NgramProcessor] Found {len(regex_candidates)} regex subsequences") return regex_candidates def filter_standalone_stopwords(self, ngrams_dict): """Remove standalone stopwords and very short terms from the ngrams dictionary""" filtered_dict = {} for sentence, ngrams in ngrams_dict.items(): filtered_dict[sentence] = {} for ngram, indices in ngrams.items(): words = ngram.split() # Skip single stopwords and very short terms UNLESS they are numbers if (len(words) == 1 and (words[0] in self.stop_words or len(words[0]) < 3)): # Exception for numbers if len(words) == 1 and re.match(r'^\d+$', words[0]): filtered_dict[sentence][ngram] = indices continue else: continue # Skip if ALL words are stopwords if all(word in self.stop_words for word in words): continue filtered_dict[sentence][ngram] = indices return filtered_dict def extract_named_entities(self, sentences): entity_counter = Counter() # Process each sentence with each model for model_name, nlp_model in self.models.items(): tqdm.write(f"[NgramProcessor] Extracting entities with model: {model_name}") docs = list(nlp_model.pipe(sentences)) # Process each sentence for doc in docs: for ent in doc.ents: # Include entity types relevant to this model # This is a comprehensive list - some models may not use all these types if ent.label_ in { # People, organizations, locations "PERSON", "ORG", "GPE", "LOC", "NORP", # Facilities and products "FAC", "PRODUCT", "WORK_OF_ART", "EVENT", # Numeric entities "DATE", "TIME", "MONEY", "QUANTITY", "PERCENT", "CARDINAL", "ORDINAL", # Others "LAW", "LANGUAGE", # Scientific entities "SCIENTIFIC", "SUBSTANCE", "CHEMICAL", "TECHNOLOGY", # Medical entities "DISEASE", "MEDICAL", "CLINICAL", "TREATMENT", "SYMPTOM", "DIAGNOSTIC", "ANATOMICAL", "BIOLOGY", "GENE", "PROTEIN", "DRUG", # Legal entities "LEGAL", "COURT", "STATUTE", "PROVISION", "CASE_CITATION", "JUDGE", "LEGAL_ROLE", "REGULATION", "CONTRACT" }: # Handle possessive forms by stripping 's clean_entity = re.sub(r"'s\b", "", ent.text.lower()).strip() # Add model name prefix to distinguish sources entity_counter[clean_entity] += 1 threshold = max(1, len(sentences) * 1.0) # Adjusted threshold for entities return [ent for ent, count in entity_counter.items() if count >= threshold] def extract_domain_specific_entities(self, text): """Extract entities from all models and categorize by domain""" domain_entities = {} for model_name, nlp_model in self.models.items(): doc = nlp_model(text) domain_entities[model_name] = [(ent.text, ent.label_) for ent in doc.ents] return domain_entities def is_substring_of_any(self, ngram, common_ngrams): for other_ngram in common_ngrams: if ngram != other_ngram and ngram in other_ngram: return True return False def find_filtered_ngrams(self, sentences): tqdm.write("[NgramProcessor] Processing...") # Step 1: First extract numerical ranges or standalone numbers (special priority) number_ranges = self.extract_number_ranges(sentences) standalone_numbers = self.extract_standalone_numbers(sentences) # Step 2: Use regex to find common subsequences regex_subsequences = self.extract_regex_subsequences(sentences) tqdm.write(f"[NgramProcessor] Regex Subsequences: {regex_subsequences}") # Step 3: Then apply spaCy to detect named entities named_entities = self.extract_named_entities(sentences) # Make sure percentage values have proper format for i, entity in enumerate(named_entities): if re.match(r'\d+$', entity) and any(f"{entity}%" in sentence for sentence in sentences): # Replace standalone digit with percentage if it appears as percentage in text named_entities[i] = f"{entity}%" tqdm.write(f"[NgramProcessor] Named Entities: {named_entities}") # Step 4: Consolidate and filter all detected patterns # Collect all patterns in one list all_patterns = number_ranges + regex_subsequences + named_entities + standalone_numbers # Sort by length (longer first) to prioritize more specific patterns all_patterns.sort(key=len, reverse=True) # Remove duplicates while preserving order unique_patterns = [] seen = set() for pattern in all_patterns: if pattern not in seen: # Check if this pattern is a substring of any already selected pattern is_substring = False for selected_pattern in unique_patterns: if pattern in selected_pattern and pattern != selected_pattern: is_substring = True break if not is_substring: unique_patterns.append(pattern) seen.add(pattern) # Re-index sequentially indexed_patterns = [(i+1, pattern) for i, pattern in enumerate(unique_patterns)] self.indexed_patterns = indexed_patterns non_melting_points = [pattern for _, pattern in indexed_patterns] tqdm.write(f"[NgramProcessor] Filtered non_melting_points: {non_melting_points}") tqdm.write(f"[NgramProcessor] Filtered non-melting points: {len(non_melting_points)}") # Filter out patterns that are substrings of longer patterns or standalone numbers standalone_numbers_set = set(standalone_numbers) non_melting_points = [] for pattern in unique_patterns: is_substring = False for longer_pattern in non_melting_points: # Check if pattern is contained within a longer pattern if pattern in longer_pattern: is_substring = True break if not is_substring or pattern in standalone_numbers_set: non_melting_points.append(pattern) # For remaining cases that might have been missed, apply NLTK n-gram extraction # Only on cleaned sentences (less computationally expensive now) clean_to_original = {} sentences_cleaned = [] # Process sentences with spaCy to preserve entity information docs = list(self.nlp.pipe(sentences)) for i, doc in enumerate(docs): original_sentence = sentences[i] entity_texts = {ent.text.lower() for ent in doc.ents if len(ent.text.split()) > 1} # Tokenize while preserving entities and numerical ranges tokens = [] j = 0 words = [token.text for token in doc] while j < len(words): # First check for numerical ranges current_word = words[j].lower() if self.number_range_pattern.match(current_word): tokens.append(current_word) j += 1 continue # Then check for entities matched_entity = None for ent in sorted(entity_texts, key=len, reverse=True): ent_words = ent.split() if j + len(ent_words) <= len(words) and [w.lower() for w in words[j:j+len(ent_words)]] == ent_words: matched_entity = " ".join(words[j:j+len(ent_words)]) tokens.append(matched_entity.lower()) # preserve full entity j += len(ent_words) break if not matched_entity: word = words[j].lower() if word not in self.stop_words and re.match(r'\w+', word): tokens.append(word) j += 1 cleaned = " ".join(tokens) sentences_cleaned.append(cleaned) clean_to_original[cleaned] = original_sentence # Step 5: Only run n-gram extraction on gaps not covered by regex and named entities ngram_lengths = [4, 3, 2, 1] # Consider shorter n-grams now since we already have longer phrases all_ngrams_by_length = {} for n in ngram_lengths: all_ngrams = [] for sentence in sentences_cleaned: tokens = sentence.split() if len(tokens) >= n: sent_ngrams = list(ngrams(tokens, n)) all_ngrams.extend(sent_ngrams) all_ngrams_by_length[n] = Counter(all_ngrams) # Step 6: Add additional n-grams that are frequent but weren't caught by regex or named entities threshold_factor = 1.0 # threshold since we're focusing on gaps for n_size in sorted(ngram_lengths, reverse=True): ngram_counts = all_ngrams_by_length[n_size] threshold = max(2, int(len(sentences) * threshold_factor)) # Sort by count for efficiency for ngram, count in ngram_counts.most_common(): if count >= threshold: ngram_str = ' '.join(ngram) # Skip if is a substring of existing n-grams or already in our collection if ngram_str not in non_melting_points and not self.is_substring_of_any(ngram_str, non_melting_points): non_melting_points.append(ngram_str) # Create sorted version for efficient lookup final_non_melting_points = non_melting_points.copy() sorted_non_melting_points = sorted(final_non_melting_points, key=len, reverse=True) final_indexed_patterns = [(i+1, pattern) for i, pattern in enumerate(sorted_non_melting_points)] #Filter out n-grams that consist entirely of stop words filtered_patterns = [] for idx, pattern in final_indexed_patterns: words = pattern.lower().split() # Check if the pattern is a number or contains a number has_number = any(re.match(r'.*\d+.*', word) for word in words) # If the pattern has a number OR has any non-stop word, keep it if has_number or any(word not in self.stop_words for word in words): filtered_patterns.append((idx, pattern)) else: tqdm.write(f"[NgramProcessor] Removing n-gram with all stop words: {pattern}") # Reassign filtered patterns with reindexed values self.indexed_patterns = [(i+1, pattern) for i, (_, pattern) in enumerate(filtered_patterns)] # Generate the results with more efficient regex matching result = {} for sentence in sentences: sentence_result = {} for _,ngram in self.indexed_patterns: # Use the filtered patterns # Skip single word stopwords and short terms words = ngram.split() if len(words) == 1 and (words[0] in self.stop_words or len(words[0]) < 3): continue # Handle numerical ranges differently - need exact matching if self.number_range_pattern.match(ngram): pattern = re.compile(r'\b' + re.escape(ngram) + r'\b', re.IGNORECASE) else: # Compile the regex pattern once per n-gram - modified to handle special characters pattern = re.compile(r'(?