Spaces:
Sleeping
Sleeping
import nltk | |
import logging | |
import spacy | |
from nltk.corpus import stopwords | |
from nltk.util import ngrams | |
from collections import Counter | |
import re | |
from tqdm import tqdm | |
# Logging setup | |
logging.basicConfig(level=logging.WARNING, format="%(asctime)s - %(levelname)s - %(message)s") | |
logger = logging.getLogger(__name__) | |
class NgramProcessor: | |
def __init__(self, models=None): | |
try: | |
nltk.data.find('corpora/stopwords') | |
except LookupError: | |
nltk.download('stopwords') | |
self.stop_words = set(stopwords.words('english')) | |
# Default to standard model if none specified | |
if models is None: | |
models = ["en_core_web_trf"] | |
# Load specified model | |
self.models = {} | |
for model_name in models: | |
try: | |
self.models[model_name] = spacy.load(model_name) | |
tqdm.write(f"[NgramProcessor] Loaded model: {model_name}") | |
except IOError: | |
tqdm.write(f"[NgramProcessor] Error: Model '{model_name}' not found. Please install it with:") | |
tqdm.write(f"python -m spacy download {model_name}") | |
except Exception as e: | |
tqdm.write(f"[NgramProcessor] Error loading model '{model_name}': {str(e)}") | |
# Set primary NLP model for other processes | |
if "en_core_web_trf" in self.models: | |
self.nlp = self.models["en_core_web_trf"] | |
elif len(self.models) > 0: | |
# Use first available model as primary if preferred one isn't available | |
self.nlp = next(iter(self.models.values())) | |
else: | |
raise ValueError("No spaCy model was successfully loaded") | |
# Add custom entity patterns for numerical ranges to primary model | |
if "entity_ruler" not in self.nlp.pipe_names: | |
ruler = self.nlp.add_pipe("entity_ruler", before="ner") | |
patterns = [ | |
{"label": "CARDINAL", "pattern": [{"TEXT": {"REGEX": "\\d+-\\d+"}}]}, # Pattern for ranges like "7-10" | |
{"label": "PERCENT", "pattern": [{"TEXT": {"REGEX": "\\d+%"}}]} # Pattern for percentages | |
] | |
ruler.add_patterns(patterns) | |
# Create special pattern for numerical ranges | |
self.number_range_pattern = re.compile(r'\b(\d+(?:-\d+)+)\b') | |
tqdm.write("[NgramProcessor] Initialized with stopwords, spaCy NLP model, and numerical range detection.") | |
def remove_stopwords(self, text): | |
words = re.findall(r'\w+', text.lower()) | |
filtered_words = [word for word in words if word not in self.stop_words] | |
return ' '.join(filtered_words) | |
def extract_number_ranges(self, sentences): | |
"""Extract numerical ranges like '7-10' from sentences""" | |
tqdm.write("[NgramProcessor] Extracting numerical ranges...") | |
number_ranges = [] | |
range_counts = Counter() | |
for sentence in sentences: | |
# Find all numerical ranges in the sentence | |
matches = self.number_range_pattern.findall(sentence) | |
for match in matches: | |
range_counts[match] += 1 | |
# Add all ranges that appear in all sentences (threshold for ranges) | |
for range_text, count in range_counts.items(): | |
if count >= 1: | |
number_ranges.append(range_text) | |
tqdm.write(f"[NgramProcessor] Found {len(number_ranges)} numerical ranges: {number_ranges}") | |
return number_ranges | |
def extract_standalone_numbers(self, sentences): | |
"""Extract standalone numerical values from sentences""" | |
tqdm.write("[NgramProcessor] Extracting standalone numbers...") | |
# Two patterns: one for percentages, one for regular numbers | |
percentage_pattern = re.compile(r'\b\d+%\b') # Only matches numbers with % sign | |
number_pattern = re.compile(r'\b\d+\b') # Only matches standalone numbers | |
percentage_counts = Counter() | |
number_counts = Counter() | |
percentage_values = set() # Store the numeric part of percentages for cross-reference | |
# First pass: Find all percentages | |
for sentence in sentences: | |
# Extract all percentages first | |
percentage_matches = percentage_pattern.findall(sentence) | |
for match in percentage_matches: | |
percentage_counts[match] += 1 | |
# Store the numeric part for later comparison | |
numeric_part = match.rstrip('%') | |
percentage_values.add(numeric_part) | |
# Second pass: Find standalone numbers | |
for sentence in sentences: | |
# Only look for standalone numbers now | |
number_matches = number_pattern.findall(sentence) | |
for match in number_matches: | |
# Avoid double counting numbers that we already counted as percentages | |
if match not in percentage_values: | |
number_counts[match] += 1 | |
# Process percentages first (they have priority) | |
threshold = max(1, int(len(sentences) * 1.0)) | |
standalone_numbers = [] | |
# Add percentages that meet the threshold | |
for num, count in percentage_counts.items(): | |
if count >= threshold: | |
standalone_numbers.append(num) # Already has % sign | |
# Then add standalone numbers, converting to percentage format if needed | |
for num, count in number_counts.items(): | |
if count >= threshold: | |
# If this number also appeared as part of a percentage, use the percentage format | |
if num in percentage_values: | |
standalone_numbers.append(f"{num}%") | |
else: | |
standalone_numbers.append(num) | |
tqdm.write(f"[NgramProcessor] Found {len(standalone_numbers)} standalone numbers: {standalone_numbers}") | |
return standalone_numbers | |
def extract_regex_subsequences(self, sentences): | |
"""Extract potential subsequences using regex patterns before applying NLP""" | |
tqdm.write("[NgramProcessor] Extracting regex subsequences...") | |
# Find potential multi-word subsequences (2-5 words) that occur across sentences | |
potential_subsequences = set() | |
# Process each sentence to find multi-word phrases | |
for sentence in sentences: | |
# First, clean the sentence by removing punctuation and converting to lowercase | |
clean_sentence = re.sub(r'[^\w\s&-./\'()[\]$€£¥+%]', ' ', sentence.lower()) | |
# Extract sequences of 2-6 words | |
for i in range(2, 7): # Try sequences of length 2-6 words | |
pattern = r'\b(\w+(?:[-&\s./\'()[\]$€£¥+%]+\w+){' + str(i-1) + r'})\b' | |
matches = re.findall(pattern, clean_sentence) | |
potential_subsequences.update(matches) | |
# Filter out sequences that consist only of stopwords (but preserve numbers) | |
filtered_subsequences = [] | |
for subseq in potential_subsequences: | |
words = re.split(r'[\s-]+', subseq) # Split on spaces or hyphens | |
# Function to check if a word is a number or percentage | |
def is_numeric(word): | |
return bool(re.match(r'^\d+(\.\d+)?%?$|^\d+-\d+$', word)) | |
# Skip if ALL words are stopwords and none are numeric | |
if all((word in self.stop_words and not is_numeric(word)) for word in words): | |
tqdm.write(f"[NgramProcessor] Skipping all-stopword phrase: {subseq}") | |
continue | |
# Keep if sequence has significant words (not just stopwords) | |
# OR if it contains numbers/percentages | |
if len(words) > 1 and ( | |
any(word not in self.stop_words and (len(word) > 2 or is_numeric(word)) for word in words) | |
): | |
# Additional check to reject if standalone "the" or other common stopwords | |
if not (len(words) == 1 and words[0] in self.stop_words and not is_numeric(words[0])): | |
filtered_subsequences.append(subseq) | |
# Count occurrences across all sentences | |
subseq_counts = Counter() | |
for subseq in filtered_subsequences: | |
for sentence in sentences: | |
if re.search(r'\b' + re.escape(subseq) + r'\b', sentence.lower()): | |
subseq_counts[subseq] += 1 | |
# Keep only subsequences that appear in multiple sentences | |
threshold = max(2, int(len(sentences) * 1.0)) # threshold to catch all patterns | |
regex_candidates = [subseq for subseq, count in subseq_counts.items() | |
if count >= threshold] | |
tqdm.write(f"[NgramProcessor] Found {len(regex_candidates)} regex subsequences") | |
return regex_candidates | |
def filter_standalone_stopwords(self, ngrams_dict): | |
"""Remove standalone stopwords and very short terms from the ngrams dictionary""" | |
filtered_dict = {} | |
for sentence, ngrams in ngrams_dict.items(): | |
filtered_dict[sentence] = {} | |
for ngram, indices in ngrams.items(): | |
words = ngram.split() | |
# Skip single stopwords and very short terms UNLESS they are numbers | |
if (len(words) == 1 and (words[0] in self.stop_words or len(words[0]) < 3)): | |
# Exception for numbers | |
if len(words) == 1 and re.match(r'^\d+$', words[0]): | |
filtered_dict[sentence][ngram] = indices | |
continue | |
else: | |
continue | |
# Skip if ALL words are stopwords | |
if all(word in self.stop_words for word in words): | |
continue | |
filtered_dict[sentence][ngram] = indices | |
return filtered_dict | |
def extract_named_entities(self, sentences): | |
entity_counter = Counter() | |
# Process each sentence with each model | |
for model_name, nlp_model in self.models.items(): | |
tqdm.write(f"[NgramProcessor] Extracting entities with model: {model_name}") | |
docs = list(nlp_model.pipe(sentences)) | |
# Process each sentence | |
for doc in docs: | |
for ent in doc.ents: | |
# Include entity types relevant to this model | |
# This is a comprehensive list - some models may not use all these types | |
if ent.label_ in { | |
# People, organizations, locations | |
"PERSON", "ORG", "GPE", "LOC", "NORP", | |
# Facilities and products | |
"FAC", "PRODUCT", "WORK_OF_ART", "EVENT", | |
# Numeric entities | |
"DATE", "TIME", "MONEY", "QUANTITY", "PERCENT", "CARDINAL", "ORDINAL", | |
# Others | |
"LAW", "LANGUAGE", | |
# Scientific entities | |
"SCIENTIFIC", "SUBSTANCE", "CHEMICAL", "TECHNOLOGY", | |
# Medical entities | |
"DISEASE", "MEDICAL", "CLINICAL", "TREATMENT", "SYMPTOM", "DIAGNOSTIC", | |
"ANATOMICAL", "BIOLOGY", "GENE", "PROTEIN", "DRUG", | |
# Legal entities | |
"LEGAL", "COURT", "STATUTE", "PROVISION", "CASE_CITATION", "JUDGE", | |
"LEGAL_ROLE", "REGULATION", "CONTRACT" | |
}: | |
# Handle possessive forms by stripping 's | |
clean_entity = re.sub(r"'s\b", "", ent.text.lower()).strip() | |
# Add model name prefix to distinguish sources | |
entity_counter[clean_entity] += 1 | |
threshold = max(1, len(sentences) * 1.0) # Adjusted threshold for entities | |
return [ent for ent, count in entity_counter.items() if count >= threshold] | |
def extract_domain_specific_entities(self, text): | |
"""Extract entities from all models and categorize by domain""" | |
domain_entities = {} | |
for model_name, nlp_model in self.models.items(): | |
doc = nlp_model(text) | |
domain_entities[model_name] = [(ent.text, ent.label_) for ent in doc.ents] | |
return domain_entities | |
def is_substring_of_any(self, ngram, common_ngrams): | |
for other_ngram in common_ngrams: | |
if ngram != other_ngram and ngram in other_ngram: | |
return True | |
return False | |
def find_filtered_ngrams(self, sentences): | |
tqdm.write("[NgramProcessor] Processing...") | |
# Step 1: First extract numerical ranges or standalone numbers (special priority) | |
number_ranges = self.extract_number_ranges(sentences) | |
standalone_numbers = self.extract_standalone_numbers(sentences) | |
# Step 2: Use regex to find common subsequences | |
regex_subsequences = self.extract_regex_subsequences(sentences) | |
tqdm.write(f"[NgramProcessor] Regex Subsequences: {regex_subsequences}") | |
# Step 3: Then apply spaCy to detect named entities | |
named_entities = self.extract_named_entities(sentences) | |
# Make sure percentage values have proper format | |
for i, entity in enumerate(named_entities): | |
if re.match(r'\d+$', entity) and any(f"{entity}%" in sentence for sentence in sentences): | |
# Replace standalone digit with percentage if it appears as percentage in text | |
named_entities[i] = f"{entity}%" | |
tqdm.write(f"[NgramProcessor] Named Entities: {named_entities}") | |
# Step 4: Consolidate and filter all detected patterns | |
# Collect all patterns in one list | |
all_patterns = number_ranges + regex_subsequences + named_entities + standalone_numbers | |
# Sort by length (longer first) to prioritize more specific patterns | |
all_patterns.sort(key=len, reverse=True) | |
# Remove duplicates while preserving order | |
unique_patterns = [] | |
seen = set() | |
for pattern in all_patterns: | |
if pattern not in seen: | |
# Check if this pattern is a substring of any already selected pattern | |
is_substring = False | |
for selected_pattern in unique_patterns: | |
if pattern in selected_pattern and pattern != selected_pattern: | |
is_substring = True | |
break | |
if not is_substring: | |
unique_patterns.append(pattern) | |
seen.add(pattern) | |
# Re-index sequentially | |
indexed_patterns = [(i+1, pattern) for i, pattern in enumerate(unique_patterns)] | |
self.indexed_patterns = indexed_patterns | |
non_melting_points = [pattern for _, pattern in indexed_patterns] | |
tqdm.write(f"[NgramProcessor] Filtered non_melting_points: {non_melting_points}") | |
tqdm.write(f"[NgramProcessor] Filtered non-melting points: {len(non_melting_points)}") | |
# Filter out patterns that are substrings of longer patterns or standalone numbers | |
standalone_numbers_set = set(standalone_numbers) | |
non_melting_points = [] | |
for pattern in unique_patterns: | |
is_substring = False | |
for longer_pattern in non_melting_points: | |
# Check if pattern is contained within a longer pattern | |
if pattern in longer_pattern: | |
is_substring = True | |
break | |
if not is_substring or pattern in standalone_numbers_set: | |
non_melting_points.append(pattern) | |
# For remaining cases that might have been missed, apply NLTK n-gram extraction | |
# Only on cleaned sentences (less computationally expensive now) | |
clean_to_original = {} | |
sentences_cleaned = [] | |
# Process sentences with spaCy to preserve entity information | |
docs = list(self.nlp.pipe(sentences)) | |
for i, doc in enumerate(docs): | |
original_sentence = sentences[i] | |
entity_texts = {ent.text.lower() for ent in doc.ents if len(ent.text.split()) > 1} | |
# Tokenize while preserving entities and numerical ranges | |
tokens = [] | |
j = 0 | |
words = [token.text for token in doc] | |
while j < len(words): | |
# First check for numerical ranges | |
current_word = words[j].lower() | |
if self.number_range_pattern.match(current_word): | |
tokens.append(current_word) | |
j += 1 | |
continue | |
# Then check for entities | |
matched_entity = None | |
for ent in sorted(entity_texts, key=len, reverse=True): | |
ent_words = ent.split() | |
if j + len(ent_words) <= len(words) and [w.lower() for w in words[j:j+len(ent_words)]] == ent_words: | |
matched_entity = " ".join(words[j:j+len(ent_words)]) | |
tokens.append(matched_entity.lower()) # preserve full entity | |
j += len(ent_words) | |
break | |
if not matched_entity: | |
word = words[j].lower() | |
if word not in self.stop_words and re.match(r'\w+', word): | |
tokens.append(word) | |
j += 1 | |
cleaned = " ".join(tokens) | |
sentences_cleaned.append(cleaned) | |
clean_to_original[cleaned] = original_sentence | |
# Step 5: Only run n-gram extraction on gaps not covered by regex and named entities | |
ngram_lengths = [4, 3, 2, 1] # Consider shorter n-grams now since we already have longer phrases | |
all_ngrams_by_length = {} | |
for n in ngram_lengths: | |
all_ngrams = [] | |
for sentence in sentences_cleaned: | |
tokens = sentence.split() | |
if len(tokens) >= n: | |
sent_ngrams = list(ngrams(tokens, n)) | |
all_ngrams.extend(sent_ngrams) | |
all_ngrams_by_length[n] = Counter(all_ngrams) | |
# Step 6: Add additional n-grams that are frequent but weren't caught by regex or named entities | |
threshold_factor = 1.0 # threshold since we're focusing on gaps | |
for n_size in sorted(ngram_lengths, reverse=True): | |
ngram_counts = all_ngrams_by_length[n_size] | |
threshold = max(2, int(len(sentences) * threshold_factor)) | |
# Sort by count for efficiency | |
for ngram, count in ngram_counts.most_common(): | |
if count >= threshold: | |
ngram_str = ' '.join(ngram) | |
# Skip if is a substring of existing n-grams or already in our collection | |
if ngram_str not in non_melting_points and not self.is_substring_of_any(ngram_str, non_melting_points): | |
non_melting_points.append(ngram_str) | |
# Create sorted version for efficient lookup | |
final_non_melting_points = non_melting_points.copy() | |
sorted_non_melting_points = sorted(final_non_melting_points, key=len, reverse=True) | |
final_indexed_patterns = [(i+1, pattern) for i, pattern in enumerate(sorted_non_melting_points)] | |
#Filter out n-grams that consist entirely of stop words | |
filtered_patterns = [] | |
for idx, pattern in final_indexed_patterns: | |
words = pattern.lower().split() | |
# Check if the pattern is a number or contains a number | |
has_number = any(re.match(r'.*\d+.*', word) for word in words) | |
# If the pattern has a number OR has any non-stop word, keep it | |
if has_number or any(word not in self.stop_words for word in words): | |
filtered_patterns.append((idx, pattern)) | |
else: | |
tqdm.write(f"[NgramProcessor] Removing n-gram with all stop words: {pattern}") | |
# Reassign filtered patterns with reindexed values | |
self.indexed_patterns = [(i+1, pattern) for i, (_, pattern) in enumerate(filtered_patterns)] | |
# Generate the results with more efficient regex matching | |
result = {} | |
for sentence in sentences: | |
sentence_result = {} | |
for _,ngram in self.indexed_patterns: # Use the filtered patterns | |
# Skip single word stopwords and short terms | |
words = ngram.split() | |
if len(words) == 1 and (words[0] in self.stop_words or len(words[0]) < 3): | |
continue | |
# Handle numerical ranges differently - need exact matching | |
if self.number_range_pattern.match(ngram): | |
pattern = re.compile(r'\b' + re.escape(ngram) + r'\b', re.IGNORECASE) | |
else: | |
# Compile the regex pattern once per n-gram - modified to handle special characters | |
pattern = re.compile(r'(?<!\w)' + re.escape(ngram) + r'(?!\w)', re.IGNORECASE) | |
matches = list(pattern.finditer(sentence)) | |
if matches: | |
indices = [] | |
for match in matches: | |
# Calculate word indices with improved handling for hyphenated terms | |
start_pos = match.start() | |
text_before = sentence[:start_pos] | |
# More accurate word counting that handles hyphenated terms | |
start_idx = len(re.findall(r'\s+', text_before)) + (0 if text_before.strip() == "" else 1) | |
# Count words in the matched n-gram (handling hyphens as single terms) | |
if self.number_range_pattern.match(ngram): | |
# Numerical ranges count as one term | |
ngram_word_count = 1 | |
else: | |
ngram_word_count = len(re.findall(r'\S+', ngram)) | |
end_idx = start_idx + ngram_word_count - 1 | |
indices.append((start_idx, end_idx)) | |
if indices: # Only add if we found valid indices | |
sentence_result[ngram] = indices | |
result[sentence] = sentence_result | |
# Apply the stopword filter before returning | |
result = self.filter_standalone_stopwords(result) | |
return result, dict(self.indexed_patterns) | |
def find_relative_order(self, sentence, common_ngrams): | |
# First, identify all possible matches without modifying the sentence | |
all_matches = [] | |
for ngram in common_ngrams: | |
# Special handling for percentages | |
if any(char in ngram for char in '&-/.\'()[]$€£¥+%'): | |
pattern = re.compile(r'\b' + re.escape(ngram) + r'\b', re.IGNORECASE) | |
# Handle numerical ranges | |
elif self.number_range_pattern.match(ngram): | |
pattern = re.compile(r'\b' + re.escape(ngram) + r'\b', re.IGNORECASE) | |
else: | |
pattern = re.compile(r'(?<!\w)' + re.escape(ngram) + r"(?:'s)?(?!\w)", re.IGNORECASE) | |
for match in pattern.finditer(sentence): | |
start, end = match.span() | |
#store character position range, ngram text, and token count | |
all_matches.append((start, end, ngram, len(ngram.split()))) | |
# Pre-process: identify all word spans in the original sentence | |
words = [] | |
word_spans = [] | |
for match in re.finditer(r'\S+', sentence): | |
words.append(match.group()) | |
word_spans.append((match.start(), match.end())) | |
# Create a mapping from character positions to word indices | |
char_to_word_idx = {} | |
for i, (start, end) in enumerate(word_spans): | |
for pos in range(start, end + 1): | |
char_to_word_idx[pos] = i | |
# Sort by length in characters first, then by word count | |
all_matches.sort(key=lambda x: (-len(x[2]), -x[3], x[0])) | |
# Filter out ngrams that overlap with already claimed ranges | |
filtered_matches = [] | |
claimed_ranges = [] | |
for start, end, ngram, length in all_matches: | |
# Check if this match overlaps with any existing claimed range | |
is_overlapping = False | |
for c_start, c_end in claimed_ranges: | |
# Check for any overlap | |
if max(start, c_start) < min(end, c_end): | |
is_overlapping = True | |
break | |
if not is_overlapping: | |
# Add this ngram to our filtered list | |
filtered_matches.append((start, end, ngram, length)) | |
# Claim its range | |
claimed_ranges.append((start, end)) | |
# Sort filtered matches by position for final ordering | |
filtered_matches.sort(key=lambda x: x[0]) | |
# Create word-level indices for the final matches | |
word_level_matches = [] | |
for start, end, ngram, _ in filtered_matches: | |
# Find the word index for the start and end positions | |
try: | |
start_word_idx = char_to_word_idx.get(start, char_to_word_idx.get(start+1)) | |
end_word_idx = char_to_word_idx.get(end-1, char_to_word_idx.get(end-2)) | |
if start_word_idx is not None and end_word_idx is not None: | |
word_level_matches.append((start_word_idx, end_word_idx, ngram)) | |
except (KeyError, IndexError): | |
# Skip this match if we can't determine word indices | |
continue | |
# Create the final order with 1-based indexing | |
ngram_to_index = {pattern: idx for idx, pattern in self.indexed_patterns} | |
relative_order = [(ngram_to_index.get(ngram, i+1), ngram) for i, (_, _, ngram) in enumerate(word_level_matches)] | |
return relative_order, sentence | |
# Example usage | |
if __name__ == "__main__": | |
# Test with NBA Play-In Tournament example | |
sentences = [ | |
"The NBA Play-In Tournament tips off tonight as the No. 7-10 teams in each conference battle for a spot in the playoffs. Here's everything you need to know as the action unfolds.", | |
"Tonight the NBA Play-In Tournament begins with No. 7-10 teams from each conference competing for playoff spots. Here's your guide to following all the action.", | |
"The NBA Play-In Tournament kicks off this evening featuring the No. 7-10 teams across both conferences fighting for playoff positions. Here's what you should know about the upcoming games.", | |
"Starting tonight, the NBA Play-In Tournament will showcase the No. 7-10 teams from each conference as they compete for remaining playoff berths. Here's your complete guide to the action.", | |
"The NBA Play-In Tournament begins tonight with the No. 7-10 teams in both conferences battling for playoff spots. Here's everything you need to know about the upcoming games.", | |
"Tonight marks the start of the NBA Play-In Tournament where No. 7-10 teams in each conference compete for playoff positions. Here's your essential guide to following the action.", | |
"The NBA Play-In Tournament tips off tonight, featuring No. 7-10 teams from both conferences fighting for playoff berths. Here's what you need to know about the tournament.", | |
"Beginning tonight, the NBA Play-In Tournament will pit the No. 7-10 teams in each conference against each other for playoff spots. Here's everything you should know about the games.", | |
"The NBA Play-In Tournament starts tonight with No. 7-10 teams across both conferences competing for playoff positions. Here's your complete guide to all the action.", | |
"Tonight is the tip-off of the NBA Play-In Tournament where the No. 7-10 teams from each conference battle for remaining playoff spots. Here's what you need to know as the games unfold." | |
] | |
# Initialize with multiple models | |
processor = NgramProcessor(models=["en_core_web_trf"]) | |
# Process with all models combined | |
common_ngrams,indexed_ngrams = processor.find_filtered_ngrams(sentences) | |
# Print results | |
print("Common n-grams with indices per sentence:") | |
for sentence in sentences: | |
order, updated_sentence = processor.find_relative_order(sentence, common_ngrams[sentence]) | |
print(f"Sentence: {sentence}") | |
print(f"Order: {order}") | |
print() |