peccavi / utils /non_melting_point.py
PECCAVI-TEXT's picture
Update utils/non_melting_point.py
6ea07d9 verified
raw
history blame
29.3 kB
import nltk
import logging
import spacy
from nltk.corpus import stopwords
from nltk.util import ngrams
from collections import Counter
import re
from tqdm import tqdm
# Logging setup
logging.basicConfig(level=logging.WARNING, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
class NgramProcessor:
def __init__(self, models=None):
try:
nltk.data.find('corpora/stopwords')
except LookupError:
nltk.download('stopwords')
self.stop_words = set(stopwords.words('english'))
# Default to standard model if none specified
if models is None:
models = ["en_core_web_trf"]
# Load specified model
self.models = {}
for model_name in models:
try:
self.models[model_name] = spacy.load(model_name)
tqdm.write(f"[NgramProcessor] Loaded model: {model_name}")
except IOError:
tqdm.write(f"[NgramProcessor] Error: Model '{model_name}' not found. Please install it with:")
tqdm.write(f"python -m spacy download {model_name}")
except Exception as e:
tqdm.write(f"[NgramProcessor] Error loading model '{model_name}': {str(e)}")
# Set primary NLP model for other processes
if "en_core_web_trf" in self.models:
self.nlp = self.models["en_core_web_trf"]
elif len(self.models) > 0:
# Use first available model as primary if preferred one isn't available
self.nlp = next(iter(self.models.values()))
else:
raise ValueError("No spaCy model was successfully loaded")
# Add custom entity patterns for numerical ranges to primary model
if "entity_ruler" not in self.nlp.pipe_names:
ruler = self.nlp.add_pipe("entity_ruler", before="ner")
patterns = [
{"label": "CARDINAL", "pattern": [{"TEXT": {"REGEX": "\\d+-\\d+"}}]}, # Pattern for ranges like "7-10"
{"label": "PERCENT", "pattern": [{"TEXT": {"REGEX": "\\d+%"}}]} # Pattern for percentages
]
ruler.add_patterns(patterns)
# Create special pattern for numerical ranges
self.number_range_pattern = re.compile(r'\b(\d+(?:-\d+)+)\b')
tqdm.write("[NgramProcessor] Initialized with stopwords, spaCy NLP model, and numerical range detection.")
def remove_stopwords(self, text):
words = re.findall(r'\w+', text.lower())
filtered_words = [word for word in words if word not in self.stop_words]
return ' '.join(filtered_words)
def extract_number_ranges(self, sentences):
"""Extract numerical ranges like '7-10' from sentences"""
tqdm.write("[NgramProcessor] Extracting numerical ranges...")
number_ranges = []
range_counts = Counter()
for sentence in sentences:
# Find all numerical ranges in the sentence
matches = self.number_range_pattern.findall(sentence)
for match in matches:
range_counts[match] += 1
# Add all ranges that appear in all sentences (threshold for ranges)
for range_text, count in range_counts.items():
if count >= 1:
number_ranges.append(range_text)
tqdm.write(f"[NgramProcessor] Found {len(number_ranges)} numerical ranges: {number_ranges}")
return number_ranges
def extract_standalone_numbers(self, sentences):
"""Extract standalone numerical values from sentences"""
tqdm.write("[NgramProcessor] Extracting standalone numbers...")
# Two patterns: one for percentages, one for regular numbers
percentage_pattern = re.compile(r'\b\d+%\b') # Only matches numbers with % sign
number_pattern = re.compile(r'\b\d+\b') # Only matches standalone numbers
percentage_counts = Counter()
number_counts = Counter()
percentage_values = set() # Store the numeric part of percentages for cross-reference
# First pass: Find all percentages
for sentence in sentences:
# Extract all percentages first
percentage_matches = percentage_pattern.findall(sentence)
for match in percentage_matches:
percentage_counts[match] += 1
# Store the numeric part for later comparison
numeric_part = match.rstrip('%')
percentage_values.add(numeric_part)
# Second pass: Find standalone numbers
for sentence in sentences:
# Only look for standalone numbers now
number_matches = number_pattern.findall(sentence)
for match in number_matches:
# Avoid double counting numbers that we already counted as percentages
if match not in percentage_values:
number_counts[match] += 1
# Process percentages first (they have priority)
threshold = max(1, int(len(sentences) * 1.0))
standalone_numbers = []
# Add percentages that meet the threshold
for num, count in percentage_counts.items():
if count >= threshold:
standalone_numbers.append(num) # Already has % sign
# Then add standalone numbers, converting to percentage format if needed
for num, count in number_counts.items():
if count >= threshold:
# If this number also appeared as part of a percentage, use the percentage format
if num in percentage_values:
standalone_numbers.append(f"{num}%")
else:
standalone_numbers.append(num)
tqdm.write(f"[NgramProcessor] Found {len(standalone_numbers)} standalone numbers: {standalone_numbers}")
return standalone_numbers
def extract_regex_subsequences(self, sentences):
"""Extract potential subsequences using regex patterns before applying NLP"""
tqdm.write("[NgramProcessor] Extracting regex subsequences...")
# Find potential multi-word subsequences (2-5 words) that occur across sentences
potential_subsequences = set()
# Process each sentence to find multi-word phrases
for sentence in sentences:
# First, clean the sentence by removing punctuation and converting to lowercase
clean_sentence = re.sub(r'[^\w\s&-./\'()[\]$€£¥+%]', ' ', sentence.lower())
# Extract sequences of 2-6 words
for i in range(2, 7): # Try sequences of length 2-6 words
pattern = r'\b(\w+(?:[-&\s./\'()[\]$€£¥+%]+\w+){' + str(i-1) + r'})\b'
matches = re.findall(pattern, clean_sentence)
potential_subsequences.update(matches)
# Filter out sequences that consist only of stopwords (but preserve numbers)
filtered_subsequences = []
for subseq in potential_subsequences:
words = re.split(r'[\s-]+', subseq) # Split on spaces or hyphens
# Function to check if a word is a number or percentage
def is_numeric(word):
return bool(re.match(r'^\d+(\.\d+)?%?$|^\d+-\d+$', word))
# Skip if ALL words are stopwords and none are numeric
if all((word in self.stop_words and not is_numeric(word)) for word in words):
tqdm.write(f"[NgramProcessor] Skipping all-stopword phrase: {subseq}")
continue
# Keep if sequence has significant words (not just stopwords)
# OR if it contains numbers/percentages
if len(words) > 1 and (
any(word not in self.stop_words and (len(word) > 2 or is_numeric(word)) for word in words)
):
# Additional check to reject if standalone "the" or other common stopwords
if not (len(words) == 1 and words[0] in self.stop_words and not is_numeric(words[0])):
filtered_subsequences.append(subseq)
# Count occurrences across all sentences
subseq_counts = Counter()
for subseq in filtered_subsequences:
for sentence in sentences:
if re.search(r'\b' + re.escape(subseq) + r'\b', sentence.lower()):
subseq_counts[subseq] += 1
# Keep only subsequences that appear in multiple sentences
threshold = max(2, int(len(sentences) * 1.0)) # threshold to catch all patterns
regex_candidates = [subseq for subseq, count in subseq_counts.items()
if count >= threshold]
tqdm.write(f"[NgramProcessor] Found {len(regex_candidates)} regex subsequences")
return regex_candidates
def filter_standalone_stopwords(self, ngrams_dict):
"""Remove standalone stopwords and very short terms from the ngrams dictionary"""
filtered_dict = {}
for sentence, ngrams in ngrams_dict.items():
filtered_dict[sentence] = {}
for ngram, indices in ngrams.items():
words = ngram.split()
# Skip single stopwords and very short terms UNLESS they are numbers
if (len(words) == 1 and (words[0] in self.stop_words or len(words[0]) < 3)):
# Exception for numbers
if len(words) == 1 and re.match(r'^\d+$', words[0]):
filtered_dict[sentence][ngram] = indices
continue
else:
continue
# Skip if ALL words are stopwords
if all(word in self.stop_words for word in words):
continue
filtered_dict[sentence][ngram] = indices
return filtered_dict
def extract_named_entities(self, sentences):
entity_counter = Counter()
# Process each sentence with each model
for model_name, nlp_model in self.models.items():
tqdm.write(f"[NgramProcessor] Extracting entities with model: {model_name}")
docs = list(nlp_model.pipe(sentences))
# Process each sentence
for doc in docs:
for ent in doc.ents:
# Include entity types relevant to this model
# This is a comprehensive list - some models may not use all these types
if ent.label_ in {
# People, organizations, locations
"PERSON", "ORG", "GPE", "LOC", "NORP",
# Facilities and products
"FAC", "PRODUCT", "WORK_OF_ART", "EVENT",
# Numeric entities
"DATE", "TIME", "MONEY", "QUANTITY", "PERCENT", "CARDINAL", "ORDINAL",
# Others
"LAW", "LANGUAGE",
# Scientific entities
"SCIENTIFIC", "SUBSTANCE", "CHEMICAL", "TECHNOLOGY",
# Medical entities
"DISEASE", "MEDICAL", "CLINICAL", "TREATMENT", "SYMPTOM", "DIAGNOSTIC",
"ANATOMICAL", "BIOLOGY", "GENE", "PROTEIN", "DRUG",
# Legal entities
"LEGAL", "COURT", "STATUTE", "PROVISION", "CASE_CITATION", "JUDGE",
"LEGAL_ROLE", "REGULATION", "CONTRACT"
}:
# Handle possessive forms by stripping 's
clean_entity = re.sub(r"'s\b", "", ent.text.lower()).strip()
# Add model name prefix to distinguish sources
entity_counter[clean_entity] += 1
threshold = max(1, len(sentences) * 1.0) # Adjusted threshold for entities
return [ent for ent, count in entity_counter.items() if count >= threshold]
def extract_domain_specific_entities(self, text):
"""Extract entities from all models and categorize by domain"""
domain_entities = {}
for model_name, nlp_model in self.models.items():
doc = nlp_model(text)
domain_entities[model_name] = [(ent.text, ent.label_) for ent in doc.ents]
return domain_entities
def is_substring_of_any(self, ngram, common_ngrams):
for other_ngram in common_ngrams:
if ngram != other_ngram and ngram in other_ngram:
return True
return False
def find_filtered_ngrams(self, sentences):
tqdm.write("[NgramProcessor] Processing...")
# Step 1: First extract numerical ranges or standalone numbers (special priority)
number_ranges = self.extract_number_ranges(sentences)
standalone_numbers = self.extract_standalone_numbers(sentences)
# Step 2: Use regex to find common subsequences
regex_subsequences = self.extract_regex_subsequences(sentences)
tqdm.write(f"[NgramProcessor] Regex Subsequences: {regex_subsequences}")
# Step 3: Then apply spaCy to detect named entities
named_entities = self.extract_named_entities(sentences)
# Make sure percentage values have proper format
for i, entity in enumerate(named_entities):
if re.match(r'\d+$', entity) and any(f"{entity}%" in sentence for sentence in sentences):
# Replace standalone digit with percentage if it appears as percentage in text
named_entities[i] = f"{entity}%"
tqdm.write(f"[NgramProcessor] Named Entities: {named_entities}")
# Step 4: Consolidate and filter all detected patterns
# Collect all patterns in one list
all_patterns = number_ranges + regex_subsequences + named_entities + standalone_numbers
# Sort by length (longer first) to prioritize more specific patterns
all_patterns.sort(key=len, reverse=True)
# Remove duplicates while preserving order
unique_patterns = []
seen = set()
for pattern in all_patterns:
if pattern not in seen:
# Check if this pattern is a substring of any already selected pattern
is_substring = False
for selected_pattern in unique_patterns:
if pattern in selected_pattern and pattern != selected_pattern:
is_substring = True
break
if not is_substring:
unique_patterns.append(pattern)
seen.add(pattern)
# Re-index sequentially
indexed_patterns = [(i+1, pattern) for i, pattern in enumerate(unique_patterns)]
self.indexed_patterns = indexed_patterns
non_melting_points = [pattern for _, pattern in indexed_patterns]
tqdm.write(f"[NgramProcessor] Filtered non_melting_points: {non_melting_points}")
tqdm.write(f"[NgramProcessor] Filtered non-melting points: {len(non_melting_points)}")
# Filter out patterns that are substrings of longer patterns or standalone numbers
standalone_numbers_set = set(standalone_numbers)
non_melting_points = []
for pattern in unique_patterns:
is_substring = False
for longer_pattern in non_melting_points:
# Check if pattern is contained within a longer pattern
if pattern in longer_pattern:
is_substring = True
break
if not is_substring or pattern in standalone_numbers_set:
non_melting_points.append(pattern)
# For remaining cases that might have been missed, apply NLTK n-gram extraction
# Only on cleaned sentences (less computationally expensive now)
clean_to_original = {}
sentences_cleaned = []
# Process sentences with spaCy to preserve entity information
docs = list(self.nlp.pipe(sentences))
for i, doc in enumerate(docs):
original_sentence = sentences[i]
entity_texts = {ent.text.lower() for ent in doc.ents if len(ent.text.split()) > 1}
# Tokenize while preserving entities and numerical ranges
tokens = []
j = 0
words = [token.text for token in doc]
while j < len(words):
# First check for numerical ranges
current_word = words[j].lower()
if self.number_range_pattern.match(current_word):
tokens.append(current_word)
j += 1
continue
# Then check for entities
matched_entity = None
for ent in sorted(entity_texts, key=len, reverse=True):
ent_words = ent.split()
if j + len(ent_words) <= len(words) and [w.lower() for w in words[j:j+len(ent_words)]] == ent_words:
matched_entity = " ".join(words[j:j+len(ent_words)])
tokens.append(matched_entity.lower()) # preserve full entity
j += len(ent_words)
break
if not matched_entity:
word = words[j].lower()
if word not in self.stop_words and re.match(r'\w+', word):
tokens.append(word)
j += 1
cleaned = " ".join(tokens)
sentences_cleaned.append(cleaned)
clean_to_original[cleaned] = original_sentence
# Step 5: Only run n-gram extraction on gaps not covered by regex and named entities
ngram_lengths = [4, 3, 2, 1] # Consider shorter n-grams now since we already have longer phrases
all_ngrams_by_length = {}
for n in ngram_lengths:
all_ngrams = []
for sentence in sentences_cleaned:
tokens = sentence.split()
if len(tokens) >= n:
sent_ngrams = list(ngrams(tokens, n))
all_ngrams.extend(sent_ngrams)
all_ngrams_by_length[n] = Counter(all_ngrams)
# Step 6: Add additional n-grams that are frequent but weren't caught by regex or named entities
threshold_factor = 1.0 # threshold since we're focusing on gaps
for n_size in sorted(ngram_lengths, reverse=True):
ngram_counts = all_ngrams_by_length[n_size]
threshold = max(2, int(len(sentences) * threshold_factor))
# Sort by count for efficiency
for ngram, count in ngram_counts.most_common():
if count >= threshold:
ngram_str = ' '.join(ngram)
# Skip if is a substring of existing n-grams or already in our collection
if ngram_str not in non_melting_points and not self.is_substring_of_any(ngram_str, non_melting_points):
non_melting_points.append(ngram_str)
# Create sorted version for efficient lookup
final_non_melting_points = non_melting_points.copy()
sorted_non_melting_points = sorted(final_non_melting_points, key=len, reverse=True)
final_indexed_patterns = [(i+1, pattern) for i, pattern in enumerate(sorted_non_melting_points)]
#Filter out n-grams that consist entirely of stop words
filtered_patterns = []
for idx, pattern in final_indexed_patterns:
words = pattern.lower().split()
# Check if the pattern is a number or contains a number
has_number = any(re.match(r'.*\d+.*', word) for word in words)
# If the pattern has a number OR has any non-stop word, keep it
if has_number or any(word not in self.stop_words for word in words):
filtered_patterns.append((idx, pattern))
else:
tqdm.write(f"[NgramProcessor] Removing n-gram with all stop words: {pattern}")
# Reassign filtered patterns with reindexed values
self.indexed_patterns = [(i+1, pattern) for i, (_, pattern) in enumerate(filtered_patterns)]
# Generate the results with more efficient regex matching
result = {}
for sentence in sentences:
sentence_result = {}
for _,ngram in self.indexed_patterns: # Use the filtered patterns
# Skip single word stopwords and short terms
words = ngram.split()
if len(words) == 1 and (words[0] in self.stop_words or len(words[0]) < 3):
continue
# Handle numerical ranges differently - need exact matching
if self.number_range_pattern.match(ngram):
pattern = re.compile(r'\b' + re.escape(ngram) + r'\b', re.IGNORECASE)
else:
# Compile the regex pattern once per n-gram - modified to handle special characters
pattern = re.compile(r'(?<!\w)' + re.escape(ngram) + r'(?!\w)', re.IGNORECASE)
matches = list(pattern.finditer(sentence))
if matches:
indices = []
for match in matches:
# Calculate word indices with improved handling for hyphenated terms
start_pos = match.start()
text_before = sentence[:start_pos]
# More accurate word counting that handles hyphenated terms
start_idx = len(re.findall(r'\s+', text_before)) + (0 if text_before.strip() == "" else 1)
# Count words in the matched n-gram (handling hyphens as single terms)
if self.number_range_pattern.match(ngram):
# Numerical ranges count as one term
ngram_word_count = 1
else:
ngram_word_count = len(re.findall(r'\S+', ngram))
end_idx = start_idx + ngram_word_count - 1
indices.append((start_idx, end_idx))
if indices: # Only add if we found valid indices
sentence_result[ngram] = indices
result[sentence] = sentence_result
# Apply the stopword filter before returning
result = self.filter_standalone_stopwords(result)
return result, dict(self.indexed_patterns)
def find_relative_order(self, sentence, common_ngrams):
# First, identify all possible matches without modifying the sentence
all_matches = []
for ngram in common_ngrams:
# Special handling for percentages
if any(char in ngram for char in '&-/.\'()[]$€£¥+%'):
pattern = re.compile(r'\b' + re.escape(ngram) + r'\b', re.IGNORECASE)
# Handle numerical ranges
elif self.number_range_pattern.match(ngram):
pattern = re.compile(r'\b' + re.escape(ngram) + r'\b', re.IGNORECASE)
else:
pattern = re.compile(r'(?<!\w)' + re.escape(ngram) + r"(?:'s)?(?!\w)", re.IGNORECASE)
for match in pattern.finditer(sentence):
start, end = match.span()
#store character position range, ngram text, and token count
all_matches.append((start, end, ngram, len(ngram.split())))
# Pre-process: identify all word spans in the original sentence
words = []
word_spans = []
for match in re.finditer(r'\S+', sentence):
words.append(match.group())
word_spans.append((match.start(), match.end()))
# Create a mapping from character positions to word indices
char_to_word_idx = {}
for i, (start, end) in enumerate(word_spans):
for pos in range(start, end + 1):
char_to_word_idx[pos] = i
# Sort by length in characters first, then by word count
all_matches.sort(key=lambda x: (-len(x[2]), -x[3], x[0]))
# Filter out ngrams that overlap with already claimed ranges
filtered_matches = []
claimed_ranges = []
for start, end, ngram, length in all_matches:
# Check if this match overlaps with any existing claimed range
is_overlapping = False
for c_start, c_end in claimed_ranges:
# Check for any overlap
if max(start, c_start) < min(end, c_end):
is_overlapping = True
break
if not is_overlapping:
# Add this ngram to our filtered list
filtered_matches.append((start, end, ngram, length))
# Claim its range
claimed_ranges.append((start, end))
# Sort filtered matches by position for final ordering
filtered_matches.sort(key=lambda x: x[0])
# Create word-level indices for the final matches
word_level_matches = []
for start, end, ngram, _ in filtered_matches:
# Find the word index for the start and end positions
try:
start_word_idx = char_to_word_idx.get(start, char_to_word_idx.get(start+1))
end_word_idx = char_to_word_idx.get(end-1, char_to_word_idx.get(end-2))
if start_word_idx is not None and end_word_idx is not None:
word_level_matches.append((start_word_idx, end_word_idx, ngram))
except (KeyError, IndexError):
# Skip this match if we can't determine word indices
continue
# Create the final order with 1-based indexing
ngram_to_index = {pattern: idx for idx, pattern in self.indexed_patterns}
relative_order = [(ngram_to_index.get(ngram, i+1), ngram) for i, (_, _, ngram) in enumerate(word_level_matches)]
return relative_order, sentence
# Example usage
if __name__ == "__main__":
# Test with NBA Play-In Tournament example
sentences = [
"The NBA Play-In Tournament tips off tonight as the No. 7-10 teams in each conference battle for a spot in the playoffs. Here's everything you need to know as the action unfolds.",
"Tonight the NBA Play-In Tournament begins with No. 7-10 teams from each conference competing for playoff spots. Here's your guide to following all the action.",
"The NBA Play-In Tournament kicks off this evening featuring the No. 7-10 teams across both conferences fighting for playoff positions. Here's what you should know about the upcoming games.",
"Starting tonight, the NBA Play-In Tournament will showcase the No. 7-10 teams from each conference as they compete for remaining playoff berths. Here's your complete guide to the action.",
"The NBA Play-In Tournament begins tonight with the No. 7-10 teams in both conferences battling for playoff spots. Here's everything you need to know about the upcoming games.",
"Tonight marks the start of the NBA Play-In Tournament where No. 7-10 teams in each conference compete for playoff positions. Here's your essential guide to following the action.",
"The NBA Play-In Tournament tips off tonight, featuring No. 7-10 teams from both conferences fighting for playoff berths. Here's what you need to know about the tournament.",
"Beginning tonight, the NBA Play-In Tournament will pit the No. 7-10 teams in each conference against each other for playoff spots. Here's everything you should know about the games.",
"The NBA Play-In Tournament starts tonight with No. 7-10 teams across both conferences competing for playoff positions. Here's your complete guide to all the action.",
"Tonight is the tip-off of the NBA Play-In Tournament where the No. 7-10 teams from each conference battle for remaining playoff spots. Here's what you need to know as the games unfold."
]
# Initialize with multiple models
processor = NgramProcessor(models=["en_core_web_trf"])
# Process with all models combined
common_ngrams,indexed_ngrams = processor.find_filtered_ngrams(sentences)
# Print results
print("Common n-grams with indices per sentence:")
for sentence in sentences:
order, updated_sentence = processor.find_relative_order(sentence, common_ngrams[sentence])
print(f"Sentence: {sentence}")
print(f"Order: {order}")
print()