peccavi / utils /non_melting_point.py
PECCAVI-TEXT's picture
Update utils/non_melting_point.py
6ea07d9 verified
import nltk
import logging
import spacy
from nltk.corpus import stopwords
from nltk.util import ngrams
from collections import Counter
import re
from tqdm import tqdm
# Logging setup
logging.basicConfig(level=logging.WARNING, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
class NgramProcessor:
def __init__(self, models=None):
try:
nltk.data.find('corpora/stopwords')
except LookupError:
nltk.download('stopwords')
self.stop_words = set(stopwords.words('english'))
# Default to standard model if none specified
if models is None:
models = ["en_core_web_trf"]
# Load specified model
self.models = {}
for model_name in models:
try:
self.models[model_name] = spacy.load(model_name)
tqdm.write(f"[NgramProcessor] Loaded model: {model_name}")
except IOError:
tqdm.write(f"[NgramProcessor] Error: Model '{model_name}' not found. Please install it with:")
tqdm.write(f"python -m spacy download {model_name}")
except Exception as e:
tqdm.write(f"[NgramProcessor] Error loading model '{model_name}': {str(e)}")
# Set primary NLP model for other processes
if "en_core_web_trf" in self.models:
self.nlp = self.models["en_core_web_trf"]
elif len(self.models) > 0:
# Use first available model as primary if preferred one isn't available
self.nlp = next(iter(self.models.values()))
else:
raise ValueError("No spaCy model was successfully loaded")
# Add custom entity patterns for numerical ranges to primary model
if "entity_ruler" not in self.nlp.pipe_names:
ruler = self.nlp.add_pipe("entity_ruler", before="ner")
patterns = [
{"label": "CARDINAL", "pattern": [{"TEXT": {"REGEX": "\\d+-\\d+"}}]}, # Pattern for ranges like "7-10"
{"label": "PERCENT", "pattern": [{"TEXT": {"REGEX": "\\d+%"}}]} # Pattern for percentages
]
ruler.add_patterns(patterns)
# Create special pattern for numerical ranges
self.number_range_pattern = re.compile(r'\b(\d+(?:-\d+)+)\b')
tqdm.write("[NgramProcessor] Initialized with stopwords, spaCy NLP model, and numerical range detection.")
def remove_stopwords(self, text):
words = re.findall(r'\w+', text.lower())
filtered_words = [word for word in words if word not in self.stop_words]
return ' '.join(filtered_words)
def extract_number_ranges(self, sentences):
"""Extract numerical ranges like '7-10' from sentences"""
tqdm.write("[NgramProcessor] Extracting numerical ranges...")
number_ranges = []
range_counts = Counter()
for sentence in sentences:
# Find all numerical ranges in the sentence
matches = self.number_range_pattern.findall(sentence)
for match in matches:
range_counts[match] += 1
# Add all ranges that appear in all sentences (threshold for ranges)
for range_text, count in range_counts.items():
if count >= 1:
number_ranges.append(range_text)
tqdm.write(f"[NgramProcessor] Found {len(number_ranges)} numerical ranges: {number_ranges}")
return number_ranges
def extract_standalone_numbers(self, sentences):
"""Extract standalone numerical values from sentences"""
tqdm.write("[NgramProcessor] Extracting standalone numbers...")
# Two patterns: one for percentages, one for regular numbers
percentage_pattern = re.compile(r'\b\d+%\b') # Only matches numbers with % sign
number_pattern = re.compile(r'\b\d+\b') # Only matches standalone numbers
percentage_counts = Counter()
number_counts = Counter()
percentage_values = set() # Store the numeric part of percentages for cross-reference
# First pass: Find all percentages
for sentence in sentences:
# Extract all percentages first
percentage_matches = percentage_pattern.findall(sentence)
for match in percentage_matches:
percentage_counts[match] += 1
# Store the numeric part for later comparison
numeric_part = match.rstrip('%')
percentage_values.add(numeric_part)
# Second pass: Find standalone numbers
for sentence in sentences:
# Only look for standalone numbers now
number_matches = number_pattern.findall(sentence)
for match in number_matches:
# Avoid double counting numbers that we already counted as percentages
if match not in percentage_values:
number_counts[match] += 1
# Process percentages first (they have priority)
threshold = max(1, int(len(sentences) * 1.0))
standalone_numbers = []
# Add percentages that meet the threshold
for num, count in percentage_counts.items():
if count >= threshold:
standalone_numbers.append(num) # Already has % sign
# Then add standalone numbers, converting to percentage format if needed
for num, count in number_counts.items():
if count >= threshold:
# If this number also appeared as part of a percentage, use the percentage format
if num in percentage_values:
standalone_numbers.append(f"{num}%")
else:
standalone_numbers.append(num)
tqdm.write(f"[NgramProcessor] Found {len(standalone_numbers)} standalone numbers: {standalone_numbers}")
return standalone_numbers
def extract_regex_subsequences(self, sentences):
"""Extract potential subsequences using regex patterns before applying NLP"""
tqdm.write("[NgramProcessor] Extracting regex subsequences...")
# Find potential multi-word subsequences (2-5 words) that occur across sentences
potential_subsequences = set()
# Process each sentence to find multi-word phrases
for sentence in sentences:
# First, clean the sentence by removing punctuation and converting to lowercase
clean_sentence = re.sub(r'[^\w\s&-./\'()[\]$€£¥+%]', ' ', sentence.lower())
# Extract sequences of 2-6 words
for i in range(2, 7): # Try sequences of length 2-6 words
pattern = r'\b(\w+(?:[-&\s./\'()[\]$€£¥+%]+\w+){' + str(i-1) + r'})\b'
matches = re.findall(pattern, clean_sentence)
potential_subsequences.update(matches)
# Filter out sequences that consist only of stopwords (but preserve numbers)
filtered_subsequences = []
for subseq in potential_subsequences:
words = re.split(r'[\s-]+', subseq) # Split on spaces or hyphens
# Function to check if a word is a number or percentage
def is_numeric(word):
return bool(re.match(r'^\d+(\.\d+)?%?$|^\d+-\d+$', word))
# Skip if ALL words are stopwords and none are numeric
if all((word in self.stop_words and not is_numeric(word)) for word in words):
tqdm.write(f"[NgramProcessor] Skipping all-stopword phrase: {subseq}")
continue
# Keep if sequence has significant words (not just stopwords)
# OR if it contains numbers/percentages
if len(words) > 1 and (
any(word not in self.stop_words and (len(word) > 2 or is_numeric(word)) for word in words)
):
# Additional check to reject if standalone "the" or other common stopwords
if not (len(words) == 1 and words[0] in self.stop_words and not is_numeric(words[0])):
filtered_subsequences.append(subseq)
# Count occurrences across all sentences
subseq_counts = Counter()
for subseq in filtered_subsequences:
for sentence in sentences:
if re.search(r'\b' + re.escape(subseq) + r'\b', sentence.lower()):
subseq_counts[subseq] += 1
# Keep only subsequences that appear in multiple sentences
threshold = max(2, int(len(sentences) * 1.0)) # threshold to catch all patterns
regex_candidates = [subseq for subseq, count in subseq_counts.items()
if count >= threshold]
tqdm.write(f"[NgramProcessor] Found {len(regex_candidates)} regex subsequences")
return regex_candidates
def filter_standalone_stopwords(self, ngrams_dict):
"""Remove standalone stopwords and very short terms from the ngrams dictionary"""
filtered_dict = {}
for sentence, ngrams in ngrams_dict.items():
filtered_dict[sentence] = {}
for ngram, indices in ngrams.items():
words = ngram.split()
# Skip single stopwords and very short terms UNLESS they are numbers
if (len(words) == 1 and (words[0] in self.stop_words or len(words[0]) < 3)):
# Exception for numbers
if len(words) == 1 and re.match(r'^\d+$', words[0]):
filtered_dict[sentence][ngram] = indices
continue
else:
continue
# Skip if ALL words are stopwords
if all(word in self.stop_words for word in words):
continue
filtered_dict[sentence][ngram] = indices
return filtered_dict
def extract_named_entities(self, sentences):
entity_counter = Counter()
# Process each sentence with each model
for model_name, nlp_model in self.models.items():
tqdm.write(f"[NgramProcessor] Extracting entities with model: {model_name}")
docs = list(nlp_model.pipe(sentences))
# Process each sentence
for doc in docs:
for ent in doc.ents:
# Include entity types relevant to this model
# This is a comprehensive list - some models may not use all these types
if ent.label_ in {
# People, organizations, locations
"PERSON", "ORG", "GPE", "LOC", "NORP",
# Facilities and products
"FAC", "PRODUCT", "WORK_OF_ART", "EVENT",
# Numeric entities
"DATE", "TIME", "MONEY", "QUANTITY", "PERCENT", "CARDINAL", "ORDINAL",
# Others
"LAW", "LANGUAGE",
# Scientific entities
"SCIENTIFIC", "SUBSTANCE", "CHEMICAL", "TECHNOLOGY",
# Medical entities
"DISEASE", "MEDICAL", "CLINICAL", "TREATMENT", "SYMPTOM", "DIAGNOSTIC",
"ANATOMICAL", "BIOLOGY", "GENE", "PROTEIN", "DRUG",
# Legal entities
"LEGAL", "COURT", "STATUTE", "PROVISION", "CASE_CITATION", "JUDGE",
"LEGAL_ROLE", "REGULATION", "CONTRACT"
}:
# Handle possessive forms by stripping 's
clean_entity = re.sub(r"'s\b", "", ent.text.lower()).strip()
# Add model name prefix to distinguish sources
entity_counter[clean_entity] += 1
threshold = max(1, len(sentences) * 1.0) # Adjusted threshold for entities
return [ent for ent, count in entity_counter.items() if count >= threshold]
def extract_domain_specific_entities(self, text):
"""Extract entities from all models and categorize by domain"""
domain_entities = {}
for model_name, nlp_model in self.models.items():
doc = nlp_model(text)
domain_entities[model_name] = [(ent.text, ent.label_) for ent in doc.ents]
return domain_entities
def is_substring_of_any(self, ngram, common_ngrams):
for other_ngram in common_ngrams:
if ngram != other_ngram and ngram in other_ngram:
return True
return False
def find_filtered_ngrams(self, sentences):
tqdm.write("[NgramProcessor] Processing...")
# Step 1: First extract numerical ranges or standalone numbers (special priority)
number_ranges = self.extract_number_ranges(sentences)
standalone_numbers = self.extract_standalone_numbers(sentences)
# Step 2: Use regex to find common subsequences
regex_subsequences = self.extract_regex_subsequences(sentences)
tqdm.write(f"[NgramProcessor] Regex Subsequences: {regex_subsequences}")
# Step 3: Then apply spaCy to detect named entities
named_entities = self.extract_named_entities(sentences)
# Make sure percentage values have proper format
for i, entity in enumerate(named_entities):
if re.match(r'\d+$', entity) and any(f"{entity}%" in sentence for sentence in sentences):
# Replace standalone digit with percentage if it appears as percentage in text
named_entities[i] = f"{entity}%"
tqdm.write(f"[NgramProcessor] Named Entities: {named_entities}")
# Step 4: Consolidate and filter all detected patterns
# Collect all patterns in one list
all_patterns = number_ranges + regex_subsequences + named_entities + standalone_numbers
# Sort by length (longer first) to prioritize more specific patterns
all_patterns.sort(key=len, reverse=True)
# Remove duplicates while preserving order
unique_patterns = []
seen = set()
for pattern in all_patterns:
if pattern not in seen:
# Check if this pattern is a substring of any already selected pattern
is_substring = False
for selected_pattern in unique_patterns:
if pattern in selected_pattern and pattern != selected_pattern:
is_substring = True
break
if not is_substring:
unique_patterns.append(pattern)
seen.add(pattern)
# Re-index sequentially
indexed_patterns = [(i+1, pattern) for i, pattern in enumerate(unique_patterns)]
self.indexed_patterns = indexed_patterns
non_melting_points = [pattern for _, pattern in indexed_patterns]
tqdm.write(f"[NgramProcessor] Filtered non_melting_points: {non_melting_points}")
tqdm.write(f"[NgramProcessor] Filtered non-melting points: {len(non_melting_points)}")
# Filter out patterns that are substrings of longer patterns or standalone numbers
standalone_numbers_set = set(standalone_numbers)
non_melting_points = []
for pattern in unique_patterns:
is_substring = False
for longer_pattern in non_melting_points:
# Check if pattern is contained within a longer pattern
if pattern in longer_pattern:
is_substring = True
break
if not is_substring or pattern in standalone_numbers_set:
non_melting_points.append(pattern)
# For remaining cases that might have been missed, apply NLTK n-gram extraction
# Only on cleaned sentences (less computationally expensive now)
clean_to_original = {}
sentences_cleaned = []
# Process sentences with spaCy to preserve entity information
docs = list(self.nlp.pipe(sentences))
for i, doc in enumerate(docs):
original_sentence = sentences[i]
entity_texts = {ent.text.lower() for ent in doc.ents if len(ent.text.split()) > 1}
# Tokenize while preserving entities and numerical ranges
tokens = []
j = 0
words = [token.text for token in doc]
while j < len(words):
# First check for numerical ranges
current_word = words[j].lower()
if self.number_range_pattern.match(current_word):
tokens.append(current_word)
j += 1
continue
# Then check for entities
matched_entity = None
for ent in sorted(entity_texts, key=len, reverse=True):
ent_words = ent.split()
if j + len(ent_words) <= len(words) and [w.lower() for w in words[j:j+len(ent_words)]] == ent_words:
matched_entity = " ".join(words[j:j+len(ent_words)])
tokens.append(matched_entity.lower()) # preserve full entity
j += len(ent_words)
break
if not matched_entity:
word = words[j].lower()
if word not in self.stop_words and re.match(r'\w+', word):
tokens.append(word)
j += 1
cleaned = " ".join(tokens)
sentences_cleaned.append(cleaned)
clean_to_original[cleaned] = original_sentence
# Step 5: Only run n-gram extraction on gaps not covered by regex and named entities
ngram_lengths = [4, 3, 2, 1] # Consider shorter n-grams now since we already have longer phrases
all_ngrams_by_length = {}
for n in ngram_lengths:
all_ngrams = []
for sentence in sentences_cleaned:
tokens = sentence.split()
if len(tokens) >= n:
sent_ngrams = list(ngrams(tokens, n))
all_ngrams.extend(sent_ngrams)
all_ngrams_by_length[n] = Counter(all_ngrams)
# Step 6: Add additional n-grams that are frequent but weren't caught by regex or named entities
threshold_factor = 1.0 # threshold since we're focusing on gaps
for n_size in sorted(ngram_lengths, reverse=True):
ngram_counts = all_ngrams_by_length[n_size]
threshold = max(2, int(len(sentences) * threshold_factor))
# Sort by count for efficiency
for ngram, count in ngram_counts.most_common():
if count >= threshold:
ngram_str = ' '.join(ngram)
# Skip if is a substring of existing n-grams or already in our collection
if ngram_str not in non_melting_points and not self.is_substring_of_any(ngram_str, non_melting_points):
non_melting_points.append(ngram_str)
# Create sorted version for efficient lookup
final_non_melting_points = non_melting_points.copy()
sorted_non_melting_points = sorted(final_non_melting_points, key=len, reverse=True)
final_indexed_patterns = [(i+1, pattern) for i, pattern in enumerate(sorted_non_melting_points)]
#Filter out n-grams that consist entirely of stop words
filtered_patterns = []
for idx, pattern in final_indexed_patterns:
words = pattern.lower().split()
# Check if the pattern is a number or contains a number
has_number = any(re.match(r'.*\d+.*', word) for word in words)
# If the pattern has a number OR has any non-stop word, keep it
if has_number or any(word not in self.stop_words for word in words):
filtered_patterns.append((idx, pattern))
else:
tqdm.write(f"[NgramProcessor] Removing n-gram with all stop words: {pattern}")
# Reassign filtered patterns with reindexed values
self.indexed_patterns = [(i+1, pattern) for i, (_, pattern) in enumerate(filtered_patterns)]
# Generate the results with more efficient regex matching
result = {}
for sentence in sentences:
sentence_result = {}
for _,ngram in self.indexed_patterns: # Use the filtered patterns
# Skip single word stopwords and short terms
words = ngram.split()
if len(words) == 1 and (words[0] in self.stop_words or len(words[0]) < 3):
continue
# Handle numerical ranges differently - need exact matching
if self.number_range_pattern.match(ngram):
pattern = re.compile(r'\b' + re.escape(ngram) + r'\b', re.IGNORECASE)
else:
# Compile the regex pattern once per n-gram - modified to handle special characters
pattern = re.compile(r'(?<!\w)' + re.escape(ngram) + r'(?!\w)', re.IGNORECASE)
matches = list(pattern.finditer(sentence))
if matches:
indices = []
for match in matches:
# Calculate word indices with improved handling for hyphenated terms
start_pos = match.start()
text_before = sentence[:start_pos]
# More accurate word counting that handles hyphenated terms
start_idx = len(re.findall(r'\s+', text_before)) + (0 if text_before.strip() == "" else 1)
# Count words in the matched n-gram (handling hyphens as single terms)
if self.number_range_pattern.match(ngram):
# Numerical ranges count as one term
ngram_word_count = 1
else:
ngram_word_count = len(re.findall(r'\S+', ngram))
end_idx = start_idx + ngram_word_count - 1
indices.append((start_idx, end_idx))
if indices: # Only add if we found valid indices
sentence_result[ngram] = indices
result[sentence] = sentence_result
# Apply the stopword filter before returning
result = self.filter_standalone_stopwords(result)
return result, dict(self.indexed_patterns)
def find_relative_order(self, sentence, common_ngrams):
# First, identify all possible matches without modifying the sentence
all_matches = []
for ngram in common_ngrams:
# Special handling for percentages
if any(char in ngram for char in '&-/.\'()[]$€£¥+%'):
pattern = re.compile(r'\b' + re.escape(ngram) + r'\b', re.IGNORECASE)
# Handle numerical ranges
elif self.number_range_pattern.match(ngram):
pattern = re.compile(r'\b' + re.escape(ngram) + r'\b', re.IGNORECASE)
else:
pattern = re.compile(r'(?<!\w)' + re.escape(ngram) + r"(?:'s)?(?!\w)", re.IGNORECASE)
for match in pattern.finditer(sentence):
start, end = match.span()
#store character position range, ngram text, and token count
all_matches.append((start, end, ngram, len(ngram.split())))
# Pre-process: identify all word spans in the original sentence
words = []
word_spans = []
for match in re.finditer(r'\S+', sentence):
words.append(match.group())
word_spans.append((match.start(), match.end()))
# Create a mapping from character positions to word indices
char_to_word_idx = {}
for i, (start, end) in enumerate(word_spans):
for pos in range(start, end + 1):
char_to_word_idx[pos] = i
# Sort by length in characters first, then by word count
all_matches.sort(key=lambda x: (-len(x[2]), -x[3], x[0]))
# Filter out ngrams that overlap with already claimed ranges
filtered_matches = []
claimed_ranges = []
for start, end, ngram, length in all_matches:
# Check if this match overlaps with any existing claimed range
is_overlapping = False
for c_start, c_end in claimed_ranges:
# Check for any overlap
if max(start, c_start) < min(end, c_end):
is_overlapping = True
break
if not is_overlapping:
# Add this ngram to our filtered list
filtered_matches.append((start, end, ngram, length))
# Claim its range
claimed_ranges.append((start, end))
# Sort filtered matches by position for final ordering
filtered_matches.sort(key=lambda x: x[0])
# Create word-level indices for the final matches
word_level_matches = []
for start, end, ngram, _ in filtered_matches:
# Find the word index for the start and end positions
try:
start_word_idx = char_to_word_idx.get(start, char_to_word_idx.get(start+1))
end_word_idx = char_to_word_idx.get(end-1, char_to_word_idx.get(end-2))
if start_word_idx is not None and end_word_idx is not None:
word_level_matches.append((start_word_idx, end_word_idx, ngram))
except (KeyError, IndexError):
# Skip this match if we can't determine word indices
continue
# Create the final order with 1-based indexing
ngram_to_index = {pattern: idx for idx, pattern in self.indexed_patterns}
relative_order = [(ngram_to_index.get(ngram, i+1), ngram) for i, (_, _, ngram) in enumerate(word_level_matches)]
return relative_order, sentence
# Example usage
if __name__ == "__main__":
# Test with NBA Play-In Tournament example
sentences = [
"The NBA Play-In Tournament tips off tonight as the No. 7-10 teams in each conference battle for a spot in the playoffs. Here's everything you need to know as the action unfolds.",
"Tonight the NBA Play-In Tournament begins with No. 7-10 teams from each conference competing for playoff spots. Here's your guide to following all the action.",
"The NBA Play-In Tournament kicks off this evening featuring the No. 7-10 teams across both conferences fighting for playoff positions. Here's what you should know about the upcoming games.",
"Starting tonight, the NBA Play-In Tournament will showcase the No. 7-10 teams from each conference as they compete for remaining playoff berths. Here's your complete guide to the action.",
"The NBA Play-In Tournament begins tonight with the No. 7-10 teams in both conferences battling for playoff spots. Here's everything you need to know about the upcoming games.",
"Tonight marks the start of the NBA Play-In Tournament where No. 7-10 teams in each conference compete for playoff positions. Here's your essential guide to following the action.",
"The NBA Play-In Tournament tips off tonight, featuring No. 7-10 teams from both conferences fighting for playoff berths. Here's what you need to know about the tournament.",
"Beginning tonight, the NBA Play-In Tournament will pit the No. 7-10 teams in each conference against each other for playoff spots. Here's everything you should know about the games.",
"The NBA Play-In Tournament starts tonight with No. 7-10 teams across both conferences competing for playoff positions. Here's your complete guide to all the action.",
"Tonight is the tip-off of the NBA Play-In Tournament where the No. 7-10 teams from each conference battle for remaining playoff spots. Here's what you need to know as the games unfold."
]
# Initialize with multiple models
processor = NgramProcessor(models=["en_core_web_trf"])
# Process with all models combined
common_ngrams,indexed_ngrams = processor.find_filtered_ngrams(sentences)
# Print results
print("Common n-grams with indices per sentence:")
for sentence in sentences:
order, updated_sentence = processor.find_relative_order(sentence, common_ngrams[sentence])
print(f"Sentence: {sentence}")
print(f"Order: {order}")
print()