Spaces:
Sleeping
Sleeping
""" | |
Chapter-level structural analysis for Tibetan legal manuscripts. | |
Enhanced with Juxta/CollateX-inspired advanced alignment algorithms. | |
""" | |
import difflib | |
import re | |
import logging | |
from ..pipeline.advanced_alignment import enhanced_structural_analysis | |
logger = logging.getLogger(__name__) | |
def detect_structural_changes(text1: str, text2: str, | |
min_change_length: int = 5, | |
context_window: int = 10) -> dict: | |
""" | |
Detect structural changes between two Tibetan text chapters. | |
Args: | |
text1: First text chapter | |
text2: Second text chapter | |
min_change_length: Minimum length of change to report | |
context_window: Number of characters to include as context | |
Returns: | |
Dictionary with detected changes: insertions, deletions, modifications | |
""" | |
# Clean texts for comparison | |
def clean_text(text): | |
# Remove extra whitespace and normalize | |
text = re.sub(r'\s+', ' ', text.strip()) | |
return text | |
clean1 = clean_text(text1) | |
clean2 = clean_text(text2) | |
# Use difflib to detect changes | |
differ = difflib.Differ() | |
diff = list(differ.compare(clean1.split(), clean2.split())) | |
changes = { | |
'insertions': [], | |
'deletions': [], | |
'modifications': [], | |
'unchanged': [] | |
} | |
# Track current position in both texts | |
pos1 = 0 | |
pos2 = 0 | |
for i, line in enumerate(diff): | |
if line.startswith(' '): # Unchanged | |
word = line[2:] | |
changes['unchanged'].append({ | |
'word': word, | |
'position1': pos1, | |
'position2': pos2, | |
'length': len(word) | |
}) | |
pos1 += len(word) + 1 | |
pos2 += len(word) + 1 | |
elif line.startswith('- '): # Deletion | |
word = line[2:] | |
if len(word) >= min_change_length: | |
changes['deletions'].append({ | |
'word': word, | |
'position': pos1, | |
'length': len(word), | |
'context': get_context(clean1, pos1, context_window) | |
}) | |
pos1 += len(word) + 1 | |
elif line.startswith('+ '): # Insertion | |
word = line[2:] | |
if len(word) >= min_change_length: | |
changes['insertions'].append({ | |
'word': word, | |
'position': pos2, | |
'length': len(word), | |
'context': get_context(clean2, pos2, context_window) | |
}) | |
pos2 += len(word) + 1 | |
# Detect modifications (adjacent deletions and insertions) | |
modifications = detect_modifications(changes['deletions'], changes['insertions']) | |
changes['modifications'] = modifications | |
return changes | |
def get_context(text: str, position: int, window: int) -> str: | |
"""Get context around a position in text.""" | |
start = max(0, position - window) | |
end = min(len(text), position + window) | |
return text[start:end] | |
def detect_modifications(deletions: list[dict], insertions: list[dict]) -> list[dict]: | |
"""Detect modifications by pairing nearby deletions and insertions.""" | |
modifications = [] | |
for deletion in deletions[:]: # Copy to avoid modification during iteration | |
for insertion in insertions[:]: | |
# If deletion and insertion are close (within 5 positions) | |
if abs(deletion['position'] - insertion['position']) <= 5: | |
modifications.append({ | |
'original': deletion['word'], | |
'replacement': insertion['word'], | |
'position': deletion['position'], | |
'deletion_context': deletion['context'], | |
'insertion_context': insertion['context'] | |
}) | |
# Remove from original lists to avoid duplicates | |
if deletion in deletions: | |
deletions.remove(deletion) | |
if insertion in insertions: | |
insertions.remove(insertion) | |
break | |
return modifications | |
def generate_structural_alignment(text1: str, text2: str) -> dict[str, list]: | |
""" | |
Generate enhanced structural alignment using advanced algorithms. | |
Returns: | |
Dictionary with Juxta/CollateX-inspired alignment information | |
""" | |
try: | |
# Use enhanced alignment from advanced_alignment module | |
result = enhanced_structural_analysis(text1, text2) | |
# Convert to legacy format for backward compatibility | |
alignment = { | |
'matches': [], | |
'gaps': [], | |
'mismatches': [], | |
'segments1': [], | |
'segments2': [] | |
} | |
# Process alignment segments | |
for segment in result.get('alignment_segments', []): | |
if segment['type'] == 'match': | |
alignment['matches'].append({ | |
'segments1': [segment['content1']], | |
'segments2': [segment['content2']], | |
'type': 'match', | |
'confidence': segment['confidence'] | |
}) | |
elif segment['type'] == 'insertion': | |
alignment['gaps'].append({ | |
'segments': [segment['content2']], | |
'type': 'insertion', | |
'position': 'text2', | |
'confidence': segment['confidence'] | |
}) | |
elif segment['type'] == 'deletion': | |
alignment['gaps'].append({ | |
'segments': [segment['content1']], | |
'type': 'deletion', | |
'position': 'text1', | |
'confidence': segment['confidence'] | |
}) | |
elif segment['type'] in ['mismatch', 'modification']: | |
alignment['mismatches'].append({ | |
'original': [segment['content1']], | |
'replacement': [segment['content2']], | |
'type': 'modification', | |
'confidence': segment['confidence'] | |
}) | |
return alignment | |
except Exception as e: | |
logger.warning(f"Enhanced alignment failed, falling back to basic: {e}") | |
# Fallback to basic alignment for robustness | |
def split_into_segments(text): | |
segments = re.split(r'[།༎༏༐༑༔]', text) | |
return [seg.strip() for seg in segments if seg.strip()] | |
segments1 = split_into_segments(text1) | |
segments2 = split_into_segments(text2) | |
matcher = difflib.SequenceMatcher(None, segments1, segments2) | |
alignment = { | |
'matches': [], | |
'gaps': [], | |
'mismatches': [], | |
'segments1': segments1, | |
'segments2': segments2 | |
} | |
for tag, i1, i2, j1, j2 in matcher.get_opcodes(): | |
if tag == 'equal': | |
alignment['matches'].append({ | |
'segments1': segments1[i1:i2], | |
'segments2': segments2[j1:j2], | |
'type': 'match' | |
}) | |
elif tag == 'delete': | |
alignment['gaps'].append({ | |
'segments': segments1[i1:i2], | |
'type': 'deletion', | |
'position': 'text1' | |
}) | |
elif tag == 'insert': | |
alignment['gaps'].append({ | |
'segments': segments2[j1:j2], | |
'type': 'insertion', | |
'position': 'text2' | |
}) | |
elif tag == 'replace': | |
alignment['mismatches'].append({ | |
'original': segments1[i1:i2], | |
'replacement': segments2[j1:j2], | |
'type': 'modification' | |
}) | |
return alignment | |
def calculate_structural_similarity_score(text1: str, text2: str) -> dict[str, float]: | |
""" | |
Calculate various structural similarity scores between two texts. | |
Returns: | |
Dictionary with multiple similarity metrics | |
""" | |
changes = detect_structural_changes(text1, text2) | |
alignment = generate_structural_alignment(text1, text2) | |
# Calculate scores | |
total_changes = len(changes['insertions']) + len(changes['deletions']) + len(changes['modifications']) | |
# Structural similarity score (inverse of changes) | |
text_length = max(len(text1.split()), len(text2.split())) | |
structural_score = max(0, 1 - (total_changes / text_length)) if text_length > 0 else 0 | |
# Alignment-based score | |
total_segments = len(alignment['segments1']) + len(alignment['segments2']) | |
matches = len(alignment['matches']) | |
alignment_score = matches / (total_segments / 2) if total_segments > 0 else 0 | |
return { | |
'structural_similarity': structural_score, | |
'alignment_score': alignment_score, | |
'insertions': len(changes['insertions']), | |
'deletions': len(changes['deletions']), | |
'modifications': len(changes['modifications']), | |
'total_changes': total_changes | |
} | |
def generate_differential_report(text1: str, text2: str, | |
file1_name: str = "Text 1", | |
file2_name: str = "Text 2") -> dict[str, any]: | |
""" | |
Generate a comprehensive differential report for two text chapters. | |
Returns: | |
Complete report with changes, alignment, and recommendations | |
""" | |
changes = detect_structural_changes(text1, text2) | |
alignment = generate_structural_alignment(text1, text2) | |
scores = calculate_structural_similarity_score(text1, text2) | |
report = { | |
'file1': file1_name, | |
'file2': file2_name, | |
'changes': changes, | |
'alignment': alignment, | |
'scores': scores, | |
'summary': { | |
'significant_differences': len([c for c in changes['modifications'] if len(c['original']) > 10 or len(c['replacement']) > 10]), | |
'minor_variants': len([c for c in changes['modifications'] if len(c['original']) <= 5 and len(c['replacement']) <= 5]), | |
'structural_preservation': scores['alignment_score'] > 0.8, | |
'recommendation': 'Manuscripts are structurally similar' if scores['alignment_score'] > 0.7 else 'Significant structural differences detected' | |
} | |
} | |
return report | |