""" Chapter-level structural analysis for Tibetan legal manuscripts. Enhanced with Juxta/CollateX-inspired advanced alignment algorithms. """ import difflib import re import logging from ..pipeline.advanced_alignment import enhanced_structural_analysis logger = logging.getLogger(__name__) def detect_structural_changes(text1: str, text2: str, min_change_length: int = 5, context_window: int = 10) -> dict: """ Detect structural changes between two Tibetan text chapters. Args: text1: First text chapter text2: Second text chapter min_change_length: Minimum length of change to report context_window: Number of characters to include as context Returns: Dictionary with detected changes: insertions, deletions, modifications """ # Clean texts for comparison def clean_text(text): # Remove extra whitespace and normalize text = re.sub(r'\s+', ' ', text.strip()) return text clean1 = clean_text(text1) clean2 = clean_text(text2) # Use difflib to detect changes differ = difflib.Differ() diff = list(differ.compare(clean1.split(), clean2.split())) changes = { 'insertions': [], 'deletions': [], 'modifications': [], 'unchanged': [] } # Track current position in both texts pos1 = 0 pos2 = 0 for i, line in enumerate(diff): if line.startswith(' '): # Unchanged word = line[2:] changes['unchanged'].append({ 'word': word, 'position1': pos1, 'position2': pos2, 'length': len(word) }) pos1 += len(word) + 1 pos2 += len(word) + 1 elif line.startswith('- '): # Deletion word = line[2:] if len(word) >= min_change_length: changes['deletions'].append({ 'word': word, 'position': pos1, 'length': len(word), 'context': get_context(clean1, pos1, context_window) }) pos1 += len(word) + 1 elif line.startswith('+ '): # Insertion word = line[2:] if len(word) >= min_change_length: changes['insertions'].append({ 'word': word, 'position': pos2, 'length': len(word), 'context': get_context(clean2, pos2, context_window) }) pos2 += len(word) + 1 # Detect modifications (adjacent deletions and insertions) modifications = detect_modifications(changes['deletions'], changes['insertions']) changes['modifications'] = modifications return changes def get_context(text: str, position: int, window: int) -> str: """Get context around a position in text.""" start = max(0, position - window) end = min(len(text), position + window) return text[start:end] def detect_modifications(deletions: list[dict], insertions: list[dict]) -> list[dict]: """Detect modifications by pairing nearby deletions and insertions.""" modifications = [] for deletion in deletions[:]: # Copy to avoid modification during iteration for insertion in insertions[:]: # If deletion and insertion are close (within 5 positions) if abs(deletion['position'] - insertion['position']) <= 5: modifications.append({ 'original': deletion['word'], 'replacement': insertion['word'], 'position': deletion['position'], 'deletion_context': deletion['context'], 'insertion_context': insertion['context'] }) # Remove from original lists to avoid duplicates if deletion in deletions: deletions.remove(deletion) if insertion in insertions: insertions.remove(insertion) break return modifications def generate_structural_alignment(text1: str, text2: str) -> dict[str, list]: """ Generate enhanced structural alignment using advanced algorithms. Returns: Dictionary with Juxta/CollateX-inspired alignment information """ try: # Use enhanced alignment from advanced_alignment module result = enhanced_structural_analysis(text1, text2) # Convert to legacy format for backward compatibility alignment = { 'matches': [], 'gaps': [], 'mismatches': [], 'segments1': [], 'segments2': [] } # Process alignment segments for segment in result.get('alignment_segments', []): if segment['type'] == 'match': alignment['matches'].append({ 'segments1': [segment['content1']], 'segments2': [segment['content2']], 'type': 'match', 'confidence': segment['confidence'] }) elif segment['type'] == 'insertion': alignment['gaps'].append({ 'segments': [segment['content2']], 'type': 'insertion', 'position': 'text2', 'confidence': segment['confidence'] }) elif segment['type'] == 'deletion': alignment['gaps'].append({ 'segments': [segment['content1']], 'type': 'deletion', 'position': 'text1', 'confidence': segment['confidence'] }) elif segment['type'] in ['mismatch', 'modification']: alignment['mismatches'].append({ 'original': [segment['content1']], 'replacement': [segment['content2']], 'type': 'modification', 'confidence': segment['confidence'] }) return alignment except Exception as e: logger.warning(f"Enhanced alignment failed, falling back to basic: {e}") # Fallback to basic alignment for robustness def split_into_segments(text): segments = re.split(r'[།༎༏༐༑༔]', text) return [seg.strip() for seg in segments if seg.strip()] segments1 = split_into_segments(text1) segments2 = split_into_segments(text2) matcher = difflib.SequenceMatcher(None, segments1, segments2) alignment = { 'matches': [], 'gaps': [], 'mismatches': [], 'segments1': segments1, 'segments2': segments2 } for tag, i1, i2, j1, j2 in matcher.get_opcodes(): if tag == 'equal': alignment['matches'].append({ 'segments1': segments1[i1:i2], 'segments2': segments2[j1:j2], 'type': 'match' }) elif tag == 'delete': alignment['gaps'].append({ 'segments': segments1[i1:i2], 'type': 'deletion', 'position': 'text1' }) elif tag == 'insert': alignment['gaps'].append({ 'segments': segments2[j1:j2], 'type': 'insertion', 'position': 'text2' }) elif tag == 'replace': alignment['mismatches'].append({ 'original': segments1[i1:i2], 'replacement': segments2[j1:j2], 'type': 'modification' }) return alignment def calculate_structural_similarity_score(text1: str, text2: str) -> dict[str, float]: """ Calculate various structural similarity scores between two texts. Returns: Dictionary with multiple similarity metrics """ changes = detect_structural_changes(text1, text2) alignment = generate_structural_alignment(text1, text2) # Calculate scores total_changes = len(changes['insertions']) + len(changes['deletions']) + len(changes['modifications']) # Structural similarity score (inverse of changes) text_length = max(len(text1.split()), len(text2.split())) structural_score = max(0, 1 - (total_changes / text_length)) if text_length > 0 else 0 # Alignment-based score total_segments = len(alignment['segments1']) + len(alignment['segments2']) matches = len(alignment['matches']) alignment_score = matches / (total_segments / 2) if total_segments > 0 else 0 return { 'structural_similarity': structural_score, 'alignment_score': alignment_score, 'insertions': len(changes['insertions']), 'deletions': len(changes['deletions']), 'modifications': len(changes['modifications']), 'total_changes': total_changes } def generate_differential_report(text1: str, text2: str, file1_name: str = "Text 1", file2_name: str = "Text 2") -> dict[str, any]: """ Generate a comprehensive differential report for two text chapters. Returns: Complete report with changes, alignment, and recommendations """ changes = detect_structural_changes(text1, text2) alignment = generate_structural_alignment(text1, text2) scores = calculate_structural_similarity_score(text1, text2) report = { 'file1': file1_name, 'file2': file2_name, 'changes': changes, 'alignment': alignment, 'scores': scores, 'summary': { 'significant_differences': len([c for c in changes['modifications'] if len(c['original']) > 10 or len(c['replacement']) > 10]), 'minor_variants': len([c for c in changes['modifications'] if len(c['original']) <= 5 and len(c['replacement']) <= 5]), 'structural_preservation': scores['alignment_score'] > 0.8, 'recommendation': 'Manuscripts are structurally similar' if scores['alignment_score'] > 0.7 else 'Significant structural differences detected' } } return report