ttm-webapp-hf / pipeline /structural_analysis.py
daniel-wojahn's picture
cleanup and expansion of structural analysis
edd4b9d
"""
Chapter-level structural analysis for Tibetan legal manuscripts.
Enhanced with Juxta/CollateX-inspired advanced alignment algorithms.
"""
import difflib
import re
import logging
from ..pipeline.advanced_alignment import enhanced_structural_analysis
logger = logging.getLogger(__name__)
def detect_structural_changes(text1: str, text2: str,
min_change_length: int = 5,
context_window: int = 10) -> dict:
"""
Detect structural changes between two Tibetan text chapters.
Args:
text1: First text chapter
text2: Second text chapter
min_change_length: Minimum length of change to report
context_window: Number of characters to include as context
Returns:
Dictionary with detected changes: insertions, deletions, modifications
"""
# Clean texts for comparison
def clean_text(text):
# Remove extra whitespace and normalize
text = re.sub(r'\s+', ' ', text.strip())
return text
clean1 = clean_text(text1)
clean2 = clean_text(text2)
# Use difflib to detect changes
differ = difflib.Differ()
diff = list(differ.compare(clean1.split(), clean2.split()))
changes = {
'insertions': [],
'deletions': [],
'modifications': [],
'unchanged': []
}
# Track current position in both texts
pos1 = 0
pos2 = 0
for i, line in enumerate(diff):
if line.startswith(' '): # Unchanged
word = line[2:]
changes['unchanged'].append({
'word': word,
'position1': pos1,
'position2': pos2,
'length': len(word)
})
pos1 += len(word) + 1
pos2 += len(word) + 1
elif line.startswith('- '): # Deletion
word = line[2:]
if len(word) >= min_change_length:
changes['deletions'].append({
'word': word,
'position': pos1,
'length': len(word),
'context': get_context(clean1, pos1, context_window)
})
pos1 += len(word) + 1
elif line.startswith('+ '): # Insertion
word = line[2:]
if len(word) >= min_change_length:
changes['insertions'].append({
'word': word,
'position': pos2,
'length': len(word),
'context': get_context(clean2, pos2, context_window)
})
pos2 += len(word) + 1
# Detect modifications (adjacent deletions and insertions)
modifications = detect_modifications(changes['deletions'], changes['insertions'])
changes['modifications'] = modifications
return changes
def get_context(text: str, position: int, window: int) -> str:
"""Get context around a position in text."""
start = max(0, position - window)
end = min(len(text), position + window)
return text[start:end]
def detect_modifications(deletions: list[dict], insertions: list[dict]) -> list[dict]:
"""Detect modifications by pairing nearby deletions and insertions."""
modifications = []
for deletion in deletions[:]: # Copy to avoid modification during iteration
for insertion in insertions[:]:
# If deletion and insertion are close (within 5 positions)
if abs(deletion['position'] - insertion['position']) <= 5:
modifications.append({
'original': deletion['word'],
'replacement': insertion['word'],
'position': deletion['position'],
'deletion_context': deletion['context'],
'insertion_context': insertion['context']
})
# Remove from original lists to avoid duplicates
if deletion in deletions:
deletions.remove(deletion)
if insertion in insertions:
insertions.remove(insertion)
break
return modifications
def generate_structural_alignment(text1: str, text2: str) -> dict[str, list]:
"""
Generate enhanced structural alignment using advanced algorithms.
Returns:
Dictionary with Juxta/CollateX-inspired alignment information
"""
try:
# Use enhanced alignment from advanced_alignment module
result = enhanced_structural_analysis(text1, text2)
# Convert to legacy format for backward compatibility
alignment = {
'matches': [],
'gaps': [],
'mismatches': [],
'segments1': [],
'segments2': []
}
# Process alignment segments
for segment in result.get('alignment_segments', []):
if segment['type'] == 'match':
alignment['matches'].append({
'segments1': [segment['content1']],
'segments2': [segment['content2']],
'type': 'match',
'confidence': segment['confidence']
})
elif segment['type'] == 'insertion':
alignment['gaps'].append({
'segments': [segment['content2']],
'type': 'insertion',
'position': 'text2',
'confidence': segment['confidence']
})
elif segment['type'] == 'deletion':
alignment['gaps'].append({
'segments': [segment['content1']],
'type': 'deletion',
'position': 'text1',
'confidence': segment['confidence']
})
elif segment['type'] in ['mismatch', 'modification']:
alignment['mismatches'].append({
'original': [segment['content1']],
'replacement': [segment['content2']],
'type': 'modification',
'confidence': segment['confidence']
})
return alignment
except Exception as e:
logger.warning(f"Enhanced alignment failed, falling back to basic: {e}")
# Fallback to basic alignment for robustness
def split_into_segments(text):
segments = re.split(r'[།༎༏༐༑༔]', text)
return [seg.strip() for seg in segments if seg.strip()]
segments1 = split_into_segments(text1)
segments2 = split_into_segments(text2)
matcher = difflib.SequenceMatcher(None, segments1, segments2)
alignment = {
'matches': [],
'gaps': [],
'mismatches': [],
'segments1': segments1,
'segments2': segments2
}
for tag, i1, i2, j1, j2 in matcher.get_opcodes():
if tag == 'equal':
alignment['matches'].append({
'segments1': segments1[i1:i2],
'segments2': segments2[j1:j2],
'type': 'match'
})
elif tag == 'delete':
alignment['gaps'].append({
'segments': segments1[i1:i2],
'type': 'deletion',
'position': 'text1'
})
elif tag == 'insert':
alignment['gaps'].append({
'segments': segments2[j1:j2],
'type': 'insertion',
'position': 'text2'
})
elif tag == 'replace':
alignment['mismatches'].append({
'original': segments1[i1:i2],
'replacement': segments2[j1:j2],
'type': 'modification'
})
return alignment
def calculate_structural_similarity_score(text1: str, text2: str) -> dict[str, float]:
"""
Calculate various structural similarity scores between two texts.
Returns:
Dictionary with multiple similarity metrics
"""
changes = detect_structural_changes(text1, text2)
alignment = generate_structural_alignment(text1, text2)
# Calculate scores
total_changes = len(changes['insertions']) + len(changes['deletions']) + len(changes['modifications'])
# Structural similarity score (inverse of changes)
text_length = max(len(text1.split()), len(text2.split()))
structural_score = max(0, 1 - (total_changes / text_length)) if text_length > 0 else 0
# Alignment-based score
total_segments = len(alignment['segments1']) + len(alignment['segments2'])
matches = len(alignment['matches'])
alignment_score = matches / (total_segments / 2) if total_segments > 0 else 0
return {
'structural_similarity': structural_score,
'alignment_score': alignment_score,
'insertions': len(changes['insertions']),
'deletions': len(changes['deletions']),
'modifications': len(changes['modifications']),
'total_changes': total_changes
}
def generate_differential_report(text1: str, text2: str,
file1_name: str = "Text 1",
file2_name: str = "Text 2") -> dict[str, any]:
"""
Generate a comprehensive differential report for two text chapters.
Returns:
Complete report with changes, alignment, and recommendations
"""
changes = detect_structural_changes(text1, text2)
alignment = generate_structural_alignment(text1, text2)
scores = calculate_structural_similarity_score(text1, text2)
report = {
'file1': file1_name,
'file2': file2_name,
'changes': changes,
'alignment': alignment,
'scores': scores,
'summary': {
'significant_differences': len([c for c in changes['modifications'] if len(c['original']) > 10 or len(c['replacement']) > 10]),
'minor_variants': len([c for c in changes['modifications'] if len(c['original']) <= 5 and len(c['replacement']) <= 5]),
'structural_preservation': scores['alignment_score'] > 0.8,
'recommendation': 'Manuscripts are structurally similar' if scores['alignment_score'] > 0.7 else 'Significant structural differences detected'
}
}
return report