Spaces:
Running
Running
# /// script | |
# requires-python = ">=3.12" | |
# dependencies = [ | |
# "charset-normalizer==3.4.2", | |
# "great-tables==0.17.0", | |
# "marimo", | |
# "pandas==2.3.0", | |
# ] | |
# /// | |
import marimo | |
__generated_with = "0.14.6" | |
app = marimo.App(width="full", app_title="LLM Text Preprocessing Checker") | |
def _(): | |
import marimo as mo | |
return (mo,) | |
def _(mo): | |
mo.md( | |
r""" | |
# LLM Text Preprocessing Checker | |
Checks two files and provides the diff output as well as metrics on deleted and inserted characters. | |
Additionaly, provides a breakdown by Unicode character class of deletions and insertions. | |
Note that this uses a pure-Python Myers diff algorithm for the comparison and may not be performant for larger diffs. | |
""" | |
) | |
return | |
def _(): | |
import unicodedata | |
from typing import List, Dict, Any | |
from dataclasses import dataclass | |
from enum import IntEnum | |
import html as python_html | |
from great_tables import GT, loc, style | |
import pandas as pd | |
class Operation(IntEnum): | |
DELETE = 0 | |
INSERT = 1 | |
EQUAL = 2 | |
class Edit: | |
operation: Operation | |
old_start: int | |
old_end: int | |
new_start: int | |
new_end: int | |
old_text: str = "" | |
new_text: str = "" | |
DEL_STYLE = "background-color:#ffcccc;color:#880000;text-decoration:line-through;" | |
INS_STYLE = "background-color:#ccffcc;color:#008800;" | |
EQUAL_STYLE = "color:#666666;" | |
CONTAINER_STYLE = ( | |
"font-family: ui-monospace, monospace; " | |
"white-space: pre-wrap; " | |
"line-height: 1.6; " | |
"padding: 20px; " | |
"background-color: #f8f9fa; " | |
"border-radius: 8px; " | |
"border: 1px solid #dee2e6;" | |
) | |
def classify_char(char: str) -> str: | |
"""Classify a character using Unicode categories.""" | |
if not char: | |
return "empty" | |
category = unicodedata.category(char) | |
# Map Unicode categories to readable classifications | |
category_map = { | |
"Ll": "lowercase", | |
"Lu": "uppercase", | |
"Lt": "titlecase", | |
"Lm": "modifier_letter", | |
"Lo": "other_letter", | |
"Nd": "decimal_digit", | |
"Nl": "letter_number", | |
"No": "other_number", | |
"Pc": "connector_punctuation", | |
"Pd": "dash_punctuation", | |
"Ps": "open_punctuation", | |
"Pe": "close_punctuation", | |
"Pi": "initial_punctuation", | |
"Pf": "final_punctuation", | |
"Po": "other_punctuation", | |
"Sm": "math_symbol", | |
"Sc": "currency_symbol", | |
"Sk": "modifier_symbol", | |
"So": "other_symbol", | |
"Zs": "space", | |
"Zl": "line_separator", | |
"Zp": "paragraph_separator", | |
"Cc": "control", | |
"Cf": "format", | |
"Co": "private_use", | |
"Cn": "unassigned", | |
} | |
# Special handling for CJK | |
if "\u4e00" <= char <= "\u9fff": | |
return "cjk_ideograph" | |
elif "\u3040" <= char <= "\u309f": | |
return "hiragana" | |
elif "\u30a0" <= char <= "\u30ff": | |
return "katakana" | |
elif "\uac00" <= char <= "\ud7af": | |
return "hangul" | |
return category_map.get(category, category) | |
def _myers_backtrack(trace: List[List[int]], a: str, b: str) -> List[Edit]: | |
"""Back-tracking helper to materialise the edit script.""" | |
edits: List[Edit] = [] | |
n, m = len(a), len(b) | |
x, y = n, m | |
offset = len(trace[0]) // 2 | |
# Walk the layers backwards | |
for d in range(len(trace) - 1, 0, -1): | |
v = trace[d] | |
k = x - y | |
idx = k + offset | |
# Determine the predecessor k' | |
if k == -d or (k != d and v[idx - 1] < v[idx + 1]): | |
k_prev = k + 1 # came from below (insertion) | |
else: | |
k_prev = k - 1 # came from right (deletion) | |
x_prev = trace[d - 1][k_prev + offset] | |
y_prev = x_prev - k_prev | |
# Emit the matching "snake" | |
while x > x_prev and y > y_prev: | |
x -= 1 | |
y -= 1 | |
edits.append(Edit(Operation.EQUAL, x, x + 1, y, y + 1, a[x], b[y])) | |
# Emit the single edit (INSERT or DELETE) that led to the snake | |
if x_prev == x: # insertion | |
y -= 1 | |
edits.append(Edit(Operation.INSERT, x, x, y, y + 1, "", b[y])) | |
else: # deletion | |
x -= 1 | |
edits.append(Edit(Operation.DELETE, x, x + 1, y, y, a[x], "")) | |
# Leading snake (d = 0) – everything matched at the start | |
while x > 0 and y > 0: | |
x -= 1 | |
y -= 1 | |
edits.append(Edit(Operation.EQUAL, x, x + 1, y, y + 1, a[x], b[y])) | |
# Any remaining leading insertions / deletions | |
while x > 0: | |
x -= 1 | |
edits.append(Edit(Operation.DELETE, x, x + 1, y, y, a[x], "")) | |
while y > 0: | |
y -= 1 | |
edits.append(Edit(Operation.INSERT, x, x, y, y + 1, "", b[y])) | |
edits.reverse() | |
return edits | |
def myers_diff(a: str, b: str) -> List[Edit]: | |
""" | |
Very fast Myers diff (O((N+M)·D) time, O(N+M) memory). | |
Returns a list of Edit objects (DELETE / INSERT / EQUAL). | |
""" | |
n, m = len(a), len(b) | |
if n == 0: | |
return [Edit(Operation.INSERT, 0, 0, 0, m, "", b)] if m else [] | |
if m == 0: | |
return [Edit(Operation.DELETE, 0, n, 0, 0, a, "")] if n else [] | |
max_d = n + m | |
offset = max_d # map k ∈ [-max_d .. +max_d] → index | |
v = [0] * (2 * max_d + 1) # current frontier | |
trace = [] # keeps a copy of v for every d | |
# Forward phase – build the "trace" that will be backtracked | |
for d in range(max_d + 1): | |
v_next = v[:] # copy *once* per layer | |
for k in range(-d, d + 1, 2): | |
idx = k + offset | |
# Choosing the predecessor (insertion vs deletion) | |
if k == -d or (k != d and v[idx - 1] < v[idx + 1]): | |
x = v[idx + 1] # insertion (move down) | |
else: | |
x = v[idx - 1] + 1 # deletion (move right) | |
y = x - k | |
# Greedy snake – march diagonally while chars match | |
while x < n and y < m and a[x] == b[y]: | |
x += 1 | |
y += 1 | |
v_next[idx] = x | |
# Reached the end – stop early | |
if x >= n and y >= m: | |
trace.append(v_next) | |
return _myers_backtrack(trace, a, b) | |
trace.append(v_next) | |
v = v_next # reuse buffer | |
# Should never get here | |
raise RuntimeError("diff failed") | |
def classify_text(text: str) -> Dict[str, int]: | |
"""Count characters by classification.""" | |
if not text: | |
return {} | |
classifications = {} | |
for char in text: | |
char_class = classify_char(char) | |
classifications[char_class] = classifications.get(char_class, 0) + 1 | |
return classifications | |
def classify_edits(edits: List[Edit]) -> Dict[Operation, Dict[str, int]]: | |
""" | |
Classify edit operations by character class. | |
Returns a nested dictionary: {operation: {char_class: count}} | |
""" | |
# Filter out EQUAL operations to save memory | |
change_edits = [e for e in edits if e.operation != Operation.EQUAL] | |
# Group all edits by operation type (not consecutive grouping) | |
edits_by_op = {} | |
for edit in change_edits: | |
if edit.operation not in edits_by_op: | |
edits_by_op[edit.operation] = [] | |
edits_by_op[edit.operation].append(edit) | |
result = {} | |
for op, edit_list in edits_by_op.items(): | |
combined_text = "" | |
if op == Operation.DELETE: | |
combined_text = "".join(e.old_text for e in edit_list) | |
elif op == Operation.INSERT: | |
combined_text = "".join(e.new_text for e in edit_list) | |
result[op] = classify_text(combined_text) | |
return result | |
def calculate_change_metrics( | |
original: str, | |
edits: List[Edit], | |
classifications: Dict[Operation, Dict[str, int]], | |
) -> Dict[str, Any]: | |
"""Calculate detailed change metrics including percentages.""" | |
metrics = { | |
"total_original_chars": len(original), | |
"total_deleted_chars": 0, | |
"total_inserted_chars": 0, | |
"deletion_percentage": 0.0, | |
"insertion_percentage": 0.0, | |
"net_change_percentage": 0.0, | |
"char_class_metrics": {}, | |
} | |
# Calculate total changes | |
for edit in edits: | |
if edit.operation == Operation.DELETE: | |
metrics["total_deleted_chars"] += len(edit.old_text) | |
elif edit.operation == Operation.INSERT: | |
metrics["total_inserted_chars"] += len(edit.new_text) | |
# Calculate percentages | |
if metrics["total_original_chars"] > 0: | |
metrics["deletion_percentage"] = ( | |
metrics["total_deleted_chars"] / metrics["total_original_chars"] | |
) * 100 | |
metrics["insertion_percentage"] = ( | |
metrics["total_inserted_chars"] / metrics["total_original_chars"] | |
) * 100 | |
net_change = ( | |
metrics["total_inserted_chars"] - metrics["total_deleted_chars"] | |
) | |
metrics["net_change_percentage"] = ( | |
net_change / metrics["total_original_chars"] | |
) * 100 | |
# Get character classification of original text | |
original_classifications = classify_text(original) | |
# Calculate per-character-class metrics | |
all_char_classes = set() | |
for op_classes in classifications.values(): | |
all_char_classes.update(op_classes.keys()) | |
all_char_classes.update(original_classifications.keys()) | |
for char_class in all_char_classes: | |
original_count = original_classifications.get(char_class, 0) | |
deleted_count = classifications.get(Operation.DELETE, {}).get(char_class, 0) | |
inserted_count = classifications.get(Operation.INSERT, {}).get( | |
char_class, 0 | |
) | |
class_metrics = { | |
"original_count": original_count, | |
"deleted_count": deleted_count, | |
"inserted_count": inserted_count, | |
"deletion_percentage": 0.0, | |
"insertion_percentage": 0.0, | |
} | |
if original_count > 0: | |
class_metrics["deletion_percentage"] = ( | |
deleted_count / original_count | |
) * 100 | |
# Insertion percentage relative to original count of this class | |
if original_count > 0: | |
class_metrics["insertion_percentage"] = ( | |
inserted_count / original_count | |
) * 100 | |
elif inserted_count > 0: | |
# If there were none originally, show as new | |
class_metrics["insertion_percentage"] = float("inf") | |
metrics["char_class_metrics"][char_class] = class_metrics | |
return metrics | |
def escape_html(text: str) -> str: | |
"""Escape HTML and make whitespace visible.""" | |
# First escape HTML | |
text = python_html.escape(text) | |
# Make whitespace visible | |
ws_trans = str.maketrans({" ": "·", "\t": "→ ", "\n": "¶\n"}) | |
return text.translate(ws_trans) | |
def generate_html_diff( | |
edits: List[Edit], show_equal: bool = True, max_equal_length: int = 100 | |
) -> str: | |
"""Generate HTML visualization of the diff with performance optimizations.""" | |
# Pre-allocate list for better performance | |
html_parts = [] | |
# Group consecutive edits of the same type to reduce HTML tags | |
grouped_edits = [] | |
current_group = [] | |
current_op = None | |
for edit in edits: | |
if ( | |
edit.operation == current_op and len(current_group) < 100 | |
): # Batch up to 100 | |
current_group.append(edit) | |
else: | |
if current_group: | |
grouped_edits.append((current_op, current_group)) | |
current_group = [edit] | |
current_op = edit.operation | |
if current_group: | |
grouped_edits.append((current_op, current_group)) | |
# Process grouped edits | |
for op, group in grouped_edits: | |
if op == Operation.DELETE: | |
combined_text = "".join(e.old_text for e in group) | |
escaped = escape_html(combined_text) | |
html_parts.append( | |
f'<span style="{DEL_STYLE}" title="Deleted">{escaped}</span>' | |
) | |
elif op == Operation.INSERT: | |
combined_text = "".join(e.new_text for e in group) | |
escaped = escape_html(combined_text) | |
html_parts.append( | |
f'<span style="{INS_STYLE}" title="Added">{escaped}</span>' | |
) | |
elif op == Operation.EQUAL and show_equal: | |
combined_text = "".join(e.old_text for e in group) | |
# Truncate very long equal sections | |
if len(combined_text) > max_equal_length: | |
start = escape_html(combined_text[: max_equal_length // 2]) | |
end = escape_html(combined_text[-max_equal_length // 2 :]) | |
omitted = len(combined_text) - max_equal_length | |
html_parts.append( | |
f'<span style="{EQUAL_STYLE}">{start}' | |
f"<em>...{omitted} chars omitted...</em>" | |
f"{end}</span>" | |
) | |
else: | |
escaped = escape_html(combined_text) | |
html_parts.append(f'<span style="{EQUAL_STYLE}">{escaped}</span>') | |
return f'<div style="{CONTAINER_STYLE}">{"".join(html_parts)}</div>' | |
def generate_side_by_side_html(edits: List[Edit]) -> str: | |
"""Generate side-by-side HTML diff view.""" | |
old_parts = [] | |
new_parts = [] | |
for edit in edits: | |
if edit.operation == Operation.DELETE: | |
escaped = escape_html(edit.old_text) | |
old_parts.append(f'<span style="{DEL_STYLE}">{escaped}</span>') | |
elif edit.operation == Operation.INSERT: | |
escaped = escape_html(edit.new_text) | |
new_parts.append(f'<span style="{INS_STYLE}">{escaped}</span>') | |
elif edit.operation == Operation.EQUAL: | |
escaped = escape_html(edit.old_text) | |
old_parts.append(f"<span>{escaped}</span>") | |
new_parts.append(f"<span>{escaped}</span>") | |
return f''' | |
<div style="display: grid; grid-template-columns: 1fr 1fr; gap: 20px;"> | |
<div> | |
<h4 style="margin: 0 0 10px 0;">Original</h4> | |
<div style="{CONTAINER_STYLE}">{"".join(old_parts)}</div> | |
</div> | |
<div> | |
<h4 style="margin: 0 0 10px 0;">Processed</h4> | |
<div style="{CONTAINER_STYLE}">{"".join(new_parts)}</div> | |
</div> | |
</div> | |
''' | |
def generate_html_diff_fast(edits: List[Edit], context_chars: int = 5) -> str: | |
""" | |
Ultra-fast HTML diff generation showing only changes with context. | |
""" | |
html_parts = [] | |
# Filter to only show changes and surrounding context | |
change_indices = [ | |
i for i, e in enumerate(edits) if e.operation != Operation.EQUAL | |
] | |
if not change_indices: | |
return '<div style="{CONTAINER_STYLE}">No changes found.</div>' | |
# Build ranges to show (change + context) | |
ranges_to_show = [] | |
start = max(0, change_indices[0] - context_chars) | |
end = min(len(edits), change_indices[0] + context_chars + 1) | |
for idx in change_indices[1:]: | |
if idx - end <= context_chars * 2: | |
# Extend current range | |
end = min(len(edits), idx + context_chars + 1) | |
else: | |
# Save current range and start new one | |
ranges_to_show.append((start, end)) | |
start = max(0, idx - context_chars) | |
end = min(len(edits), idx + context_chars + 1) | |
ranges_to_show.append((start, end)) | |
# Generate HTML for ranges | |
for i, (start, end) in enumerate(ranges_to_show): | |
if i > 0: | |
html_parts.append( | |
'<div style="color:#999;text-align:center;margin:10px 0;">...</div>' | |
) | |
for j in range(start, end): | |
edit = edits[j] | |
if edit.operation == Operation.DELETE: | |
escaped = escape_html(edit.old_text) | |
html_parts.append(f'<span style="{DEL_STYLE}">{escaped}</span>') | |
elif edit.operation == Operation.INSERT: | |
escaped = escape_html(edit.new_text) | |
html_parts.append(f'<span style="{INS_STYLE}">{escaped}</span>') | |
else: # EQUAL | |
escaped = escape_html(edit.old_text) | |
html_parts.append(f'<span style="{EQUAL_STYLE}">{escaped}</span>') | |
return f'<div style="{CONTAINER_STYLE}">{"".join(html_parts)}</div>' | |
def generate_side_by_side_html_fast( | |
edits: List[Edit], context_chars: int = 5 | |
) -> str: | |
""" | |
Fast side-by-side HTML diff generation showing only changes with context. | |
""" | |
# Filter to only show changes and surrounding context | |
change_indices = [ | |
i for i, e in enumerate(edits) if e.operation != Operation.EQUAL | |
] | |
if not change_indices: | |
return """ | |
<div style="display: grid; grid-template-columns: 1fr 1fr; gap: 20px;"> | |
<div> | |
<h4 style="margin: 0 0 10px 0;">Original</h4> | |
<div style="{CONTAINER_STYLE}">No changes found.</div> | |
</div> | |
<div> | |
<h4 style="margin: 0 0 10px 0;">Processed</h4> | |
<div style="{CONTAINER_STYLE}">No changes found.</div> | |
</div> | |
</div> | |
""" | |
# Build ranges to show (change + context) | |
ranges_to_show = [] | |
start = max(0, change_indices[0] - context_chars) | |
end = min(len(edits), change_indices[0] + context_chars + 1) | |
for idx in change_indices[1:]: | |
if idx - end <= context_chars * 2: | |
# Extend current range | |
end = min(len(edits), idx + context_chars + 1) | |
else: | |
# Save current range and start new one | |
ranges_to_show.append((start, end)) | |
start = max(0, idx - context_chars) | |
end = min(len(edits), idx + context_chars + 1) | |
ranges_to_show.append((start, end)) | |
# Generate HTML for ranges | |
old_parts = [] | |
new_parts = [] | |
for i, (start, end) in enumerate(ranges_to_show): | |
if i > 0: | |
separator = ( | |
'<div style="color:#999;text-align:center;margin:10px 0;">...</div>' | |
) | |
old_parts.append(separator) | |
new_parts.append(separator) | |
for j in range(start, end): | |
edit = edits[j] | |
if edit.operation == Operation.DELETE: | |
escaped = escape_html(edit.old_text) | |
old_parts.append(f'<span style="{DEL_STYLE}">{escaped}</span>') | |
elif edit.operation == Operation.INSERT: | |
escaped = escape_html(edit.new_text) | |
new_parts.append(f'<span style="{INS_STYLE}">{escaped}</span>') | |
else: # EQUAL | |
escaped = escape_html(edit.old_text) | |
old_parts.append(f'<span style="{EQUAL_STYLE}">{escaped}</span>') | |
new_parts.append(f'<span style="{EQUAL_STYLE}">{escaped}</span>') | |
return f''' | |
<div style="display: grid; grid-template-columns: 1fr 1fr; gap: 20px;"> | |
<div> | |
<h4 style="margin: 0 0 10px 0;">Original</h4> | |
<div style="{CONTAINER_STYLE}">{"".join(old_parts)}</div> | |
</div> | |
<div> | |
<h4 style="margin: 0 0 10px 0;">Processed</h4> | |
<div style="{CONTAINER_STYLE}">{"".join(new_parts)}</div> | |
</div> | |
</div> | |
''' | |
def operation_to_past(op: Operation) -> str: | |
if op == Operation.INSERT: | |
return "inserted" | |
else: | |
return str(op) + "d" | |
def format_diff_summary( | |
edits: List[Edit], | |
classifications: Dict[Operation, Dict[str, int]], | |
metrics: Dict[str, Any], | |
) -> str: | |
"""Create a human-readable summary of the diff.""" | |
lines = ["## Diff Summary\n"] | |
# Overall statistics | |
lines.append("### Overall Statistics") | |
lines.append( | |
f"- **Original text**: {metrics['total_original_chars']:,} characters" | |
) | |
# Format deletions | |
del_pct = format_percentage(metrics["deletion_percentage"]) | |
lines.append( | |
f"- **Deletions**: {metrics['total_deleted_chars']:,} characters ({del_pct})" | |
) | |
# Format insertions | |
ins_pct = format_percentage(metrics["insertion_percentage"]) | |
lines.append( | |
f"- **Insertions**: {metrics['total_inserted_chars']:,} characters ({ins_pct})" | |
) | |
# Format net change | |
net_pct = metrics["net_change_percentage"] | |
if abs(net_pct) < 0.01: | |
net_pct_str = f"{net_pct:+.3f}%" | |
else: | |
net_pct_str = f"{net_pct:+.1f}%" | |
lines.append( | |
f"- **Net change**: {net_pct_str} " | |
f"({'increase' if metrics['net_change_percentage'] > 0 else 'decrease' if metrics['net_change_percentage'] < 0 else 'no change'})" | |
) | |
# Character classifications | |
if classifications: | |
lines.append("\n### Character Classifications") | |
# Show changes by character class | |
for op in [Operation.DELETE, Operation.INSERT]: | |
if op in classifications and classifications[op]: | |
lines.append(f"\n**{operation_to_past(op).title()} Characters:**") | |
for char_class, count in sorted( | |
classifications[op].items(), key=lambda x: -x[1] | |
): | |
lines.append( | |
f"- {char_class.replace('_', ' ').title()}: {count}" | |
) | |
# Show percentage changes by character class | |
lines.append("\n### Change Percentages by Character Class") | |
# Sort by most changed (highest deletion or insertion percentage) | |
sorted_classes = sorted( | |
metrics["char_class_metrics"].items(), | |
key=lambda x: max( | |
x[1]["deletion_percentage"], | |
0 | |
if x[1]["insertion_percentage"] == float("inf") | |
else x[1]["insertion_percentage"], | |
), | |
reverse=True, | |
) | |
for char_class, class_metrics in sorted_classes: | |
if ( | |
class_metrics["deleted_count"] > 0 | |
or class_metrics["inserted_count"] > 0 | |
): | |
class_name = char_class.replace("_", " ").title() | |
# Format the line | |
line_parts = [f"- **{class_name}**:"] | |
if class_metrics["original_count"] > 0: | |
line_parts.append( | |
f"Original: {class_metrics['original_count']}" | |
) | |
if class_metrics["deleted_count"] > 0: | |
line_parts.append( | |
f"Deleted: {class_metrics['deleted_count']} " | |
f"({class_metrics['deletion_percentage']:.1f}%)" | |
) | |
if class_metrics["inserted_count"] > 0: | |
if class_metrics["insertion_percentage"] == float("inf"): | |
line_parts.append( | |
f"Inserted: {class_metrics['inserted_count']} (new)" | |
) | |
else: | |
line_parts.append( | |
f"Inserted: {class_metrics['inserted_count']} " | |
f"({class_metrics['insertion_percentage']:.1f}%)" | |
) | |
lines.append(" | ".join(line_parts)) | |
return "\n".join(lines) | |
def format_percentage(value: float, min_decimals: int = 1) -> str: | |
"""Format percentage with adaptive decimal places.""" | |
if value == 0: | |
return "0%" | |
elif value < 0.01: | |
return f"{value:.3f}%" # Show 3 decimals for very small values | |
elif value < 0.1: | |
return f"{value:.2f}%" # Show 2 decimals for small values | |
elif value < 1: | |
return f"{value:.1f}%" # Show 1 decimal for values < 1% | |
else: | |
return f"{value:.0f}%" # No decimals for values >= 1% | |
def classify_edits_with_chars( | |
edits: List[Edit], | |
) -> Dict[Operation, Dict[str, Dict[str, int]]]: | |
""" | |
Classify edit operations by character class and track character frequencies. | |
Returns: {operation: {char_class: {char: count}}} | |
""" | |
from collections import defaultdict, Counter | |
# Filter out EQUAL operations | |
change_edits = [e for e in edits if e.operation != Operation.EQUAL] | |
# Track characters by operation and classification | |
result = defaultdict(lambda: defaultdict(Counter)) | |
for edit in change_edits: | |
text = ( | |
edit.old_text if edit.operation == Operation.DELETE else edit.new_text | |
) | |
for char in text: | |
char_class = classify_char(char) | |
result[edit.operation][char_class][char] += 1 | |
return dict(result) | |
def get_top_chars(char_counter: Dict[str, int], n: int = 5) -> str: | |
"""Get top n characters by frequency, formatted for display.""" | |
if not char_counter: | |
return "-" | |
# Sort by frequency and take top n | |
top_chars = sorted(char_counter.items(), key=lambda x: -x[1])[:n] | |
# Format characters for display | |
formatted_chars = [] | |
for char, _ in top_chars: | |
if char == " ": | |
formatted_chars.append("·") # Middle dot for space | |
elif char == "\n": | |
formatted_chars.append("¶") # Pilcrow for newline | |
elif char == "\t": | |
formatted_chars.append("→") # Arrow for tab | |
elif ord(char) < 32 or ord(char) == 127: | |
formatted_chars.append(f"\\x{ord(char):02x}") # Hex for control chars | |
else: | |
formatted_chars.append(char) | |
return " ".join(formatted_chars) | |
def create_summary_tables( | |
edits: List[Edit], | |
classifications: Dict[Operation, Dict[str, int]], | |
metrics: Dict[str, Any], | |
) -> Dict[str, GT]: | |
"""Create great_tables tables for the diff summary.""" | |
# Get detailed character data | |
detailed_classifications = classify_edits_with_chars(edits) | |
# Table 1: Overall Statistics (unchanged) | |
overall_data = pd.DataFrame( | |
{ | |
"Metric": [ | |
"Original Length", | |
"Characters Deleted", | |
"Characters Inserted", | |
"Net Change", | |
], | |
"Count": [ | |
metrics["total_original_chars"], | |
metrics["total_deleted_chars"], | |
metrics["total_inserted_chars"], | |
metrics["total_inserted_chars"] - metrics["total_deleted_chars"], | |
], | |
"Percentage": [ | |
"-", | |
format_percentage(metrics["deletion_percentage"]), | |
format_percentage(metrics["insertion_percentage"]), | |
f"{metrics['net_change_percentage']:+.3f}%" | |
if abs(metrics["net_change_percentage"]) < 0.01 | |
else f"{metrics['net_change_percentage']:+.1f}%", | |
], | |
} | |
) | |
overall_table = ( | |
GT(overall_data) | |
.tab_header( | |
title="Text Change Summary", | |
subtitle=f"Total edits: {len([e for e in edits if e.operation != Operation.EQUAL])}", | |
) | |
.fmt_number(columns="Count", decimals=0, use_seps=True) | |
.tab_style( | |
style=[style.fill(color="#f0f0f0"), style.text(weight="bold")], | |
locations=loc.body(rows=[3]), | |
) | |
.cols_align(align="center", columns=["Count", "Percentage"]) | |
.opt_stylize(style=1, color="blue") | |
) | |
# Table 2: Character Class Changes with top characters | |
char_class_data = [] | |
# Get all character classes | |
all_classes = set() | |
for op_classes in classifications.values(): | |
all_classes.update(op_classes.keys()) | |
all_classes.update(metrics["char_class_metrics"].keys()) | |
# Build rows | |
for char_class in sorted(all_classes): | |
class_metrics = metrics["char_class_metrics"].get(char_class, {}) | |
# Get top characters for this class | |
del_chars = detailed_classifications.get(Operation.DELETE, {}).get( | |
char_class, {} | |
) | |
ins_chars = detailed_classifications.get(Operation.INSERT, {}).get( | |
char_class, {} | |
) | |
row = { | |
"Character Class": char_class.replace("_", " ").title(), | |
"Original": class_metrics.get("original_count", 0), | |
"Deleted": class_metrics.get("deleted_count", 0), | |
"Top Deleted": get_top_chars(del_chars, 5), | |
"Inserted": class_metrics.get("inserted_count", 0), | |
"Top Inserted": get_top_chars(ins_chars, 5), | |
"Del %": format_percentage(class_metrics.get("deletion_percentage", 0)) | |
if class_metrics.get("deletion_percentage", 0) > 0 | |
else "-", | |
"Ins %": ( | |
"new" | |
if class_metrics.get("insertion_percentage", 0) == float("inf") | |
else format_percentage(class_metrics.get("insertion_percentage", 0)) | |
if class_metrics.get("insertion_percentage", 0) > 0 | |
else "-" | |
), | |
} | |
# Only include rows with changes | |
if row["Deleted"] > 0 or row["Inserted"] > 0: | |
char_class_data.append(row) | |
if char_class_data: | |
char_class_df = pd.DataFrame(char_class_data) | |
char_class_table = ( | |
GT(char_class_df) | |
.tab_header(title="Changes by Character Classification") | |
.fmt_number( | |
columns=["Original", "Deleted", "Inserted"], | |
decimals=0, | |
use_seps=True, | |
) | |
.tab_style( | |
style=style.fill(color="#ffcccc"), | |
locations=loc.body(columns=["Deleted", "Top Deleted"]), | |
) | |
.tab_style( | |
style=style.fill(color="#ccffcc"), | |
locations=loc.body(columns=["Inserted", "Top Inserted"]), | |
) | |
.tab_style( | |
style=style.text(font="monospace"), | |
locations=loc.body(columns=["Top Deleted", "Top Inserted"]), | |
) | |
.cols_align( | |
align="center", | |
columns=["Original", "Deleted", "Inserted", "Del %", "Ins %"], | |
) | |
.cols_align(align="left", columns=["Top Deleted", "Top Inserted"]) | |
.tab_spanner( | |
label="Counts", columns=["Original", "Deleted", "Inserted"] | |
) | |
.tab_spanner( | |
label="Characters", columns=["Top Deleted", "Top Inserted"] | |
) | |
.tab_spanner(label="Percentages", columns=["Del %", "Ins %"]) | |
.cols_width( | |
{ | |
"Character Class": "20%", | |
"Original": "10%", | |
"Deleted": "10%", | |
"Top Deleted": "15%", | |
"Inserted": "10%", | |
"Top Inserted": "15%", | |
"Del %": "10%", | |
"Ins %": "10%", | |
} | |
) | |
.opt_stylize(style=1, color="blue") | |
) | |
else: | |
char_class_table = None | |
# Table 3: Compact Combined View (unchanged except for percentage formatting) | |
compact_data = [] | |
# Add summary row | |
compact_data.append( | |
{ | |
"Type": "Total", | |
"Deleted": metrics["total_deleted_chars"], | |
"Inserted": metrics["total_inserted_chars"], | |
"Net": metrics["total_inserted_chars"] - metrics["total_deleted_chars"], | |
"Change": f"{metrics['net_change_percentage']:+.3f}%" | |
if abs(metrics["net_change_percentage"]) < 0.01 | |
else f"{metrics['net_change_percentage']:+.0f}%", | |
} | |
) | |
# Add top character classes (sorted by total change) | |
class_changes = [] | |
for char_class, class_metrics in metrics["char_class_metrics"].items(): | |
if ( | |
class_metrics["deleted_count"] > 0 | |
or class_metrics["inserted_count"] > 0 | |
): | |
class_changes.append( | |
{ | |
"Type": char_class.replace("_", " ").title(), | |
"Deleted": class_metrics["deleted_count"], | |
"Inserted": class_metrics["inserted_count"], | |
"Net": class_metrics["inserted_count"] | |
- class_metrics["deleted_count"], | |
"Change": class_metrics["deleted_count"] | |
+ class_metrics["inserted_count"], | |
} | |
) | |
# Sort by total change and take top 5 | |
class_changes.sort(key=lambda x: x["Change"], reverse=True) | |
for item in class_changes[:5]: | |
item["Change"] = f"{item['Net']:+d}" if item["Net"] != 0 else "±0" | |
compact_data.append(item) | |
compact_df = pd.DataFrame(compact_data) | |
compact_table = ( | |
GT(compact_df) | |
.tab_header(title="Edit Summary - Compact View") | |
.fmt_number( | |
columns=["Deleted", "Inserted", "Net"], decimals=0, use_seps=True | |
) | |
.tab_style( | |
style=[ | |
style.fill(color="#e8e8e8"), | |
style.text(weight="bold"), | |
style.borders(sides=["top", "bottom"], color="#666", weight="2px"), | |
], | |
locations=loc.body(rows=[0]), | |
) | |
.tab_style( | |
style=style.text(color="#880000"), | |
locations=loc.body(columns=["Deleted"]), | |
) | |
.tab_style( | |
style=style.text(color="#008800"), | |
locations=loc.body(columns=["Inserted"]), | |
) | |
.cols_align( | |
align="center", columns=["Deleted", "Inserted", "Net", "Change"] | |
) | |
.cols_width( | |
{ | |
"Type": "40%", | |
"Deleted": "15%", | |
"Inserted": "15%", | |
"Net": "15%", | |
"Change": "15%", | |
} | |
) | |
.opt_stylize(style=1, color="cyan") | |
) | |
return { | |
"overall": overall_table, | |
"char_class": char_class_table, | |
"compact": compact_table, | |
} | |
def create_operation_matrix_table( | |
edits: List[Edit], classifications: Dict[Operation, Dict[str, int]] | |
) -> GT: | |
"""Create a matrix view of operations by character class.""" | |
# Get all character classes | |
all_classes = set() | |
for op_classes in classifications.values(): | |
all_classes.update(op_classes.keys()) | |
# Build matrix data | |
matrix_data = [] | |
for char_class in sorted(all_classes): | |
row = { | |
"Character Type": char_class.replace("_", " ").title(), | |
"Deletions": classifications.get(Operation.DELETE, {}).get( | |
char_class, 0 | |
), | |
"Insertions": classifications.get(Operation.INSERT, {}).get( | |
char_class, 0 | |
), | |
"Balance": ( | |
classifications.get(Operation.INSERT, {}).get(char_class, 0) | |
- classifications.get(Operation.DELETE, {}).get(char_class, 0) | |
), | |
} | |
matrix_data.append(row) | |
# Sort by total changes | |
matrix_data.sort(key=lambda x: x["Deletions"] + x["Insertions"], reverse=True) | |
# Convert to DataFrame | |
matrix_df = pd.DataFrame(matrix_data) | |
# Calculate max values for domains | |
max_del = max((r["Deletions"] for r in matrix_data), default=1) | |
max_ins = max((r["Insertions"] for r in matrix_data), default=1) | |
max_balance = max((abs(r["Balance"]) for r in matrix_data), default=1) | |
matrix_table = ( | |
GT(matrix_df) | |
.tab_header(title="Operation Matrix by Character Type") | |
.fmt_number(columns=["Deletions", "Insertions", "Balance"], decimals=0) | |
.data_color( | |
columns=["Deletions"], | |
palette=["white", "#ffcccc"], | |
domain=[0, max_del], | |
) | |
.data_color( | |
columns=["Insertions"], | |
palette=["white", "#ccffcc"], | |
domain=[0, max_ins], | |
) | |
.data_color( | |
columns=["Balance"], | |
palette=["#ffcccc", "white", "#ccffcc"], | |
domain=[-max_balance, max_balance], | |
) | |
.cols_align(align="center", columns=["Deletions", "Insertions", "Balance"]) | |
.opt_stylize(style=2, color="gray") | |
) | |
return matrix_table | |
def is_long_diff(edits: List[Edit], original: str) -> bool: | |
"""Determine if a diff should use fast rendering.""" | |
return len(edits) > 1000 or len(original) > 10000 | |
def analyze_text_changes( | |
original: str, | |
processed: str, | |
) -> Dict[str, Any]: | |
""" | |
Main function to analyze changes between two texts. | |
""" | |
edits = myers_diff(original, processed) | |
classifications = classify_edits(edits) | |
metrics = calculate_change_metrics(original, edits, classifications) | |
summary = format_diff_summary(edits, classifications, metrics) | |
result = { | |
"edits": edits, | |
"classifications": classifications, | |
"metrics": metrics, | |
"summary": summary, | |
"tables": create_summary_tables(edits, classifications, metrics), | |
"matrix_table": create_operation_matrix_table(edits, classifications), | |
} | |
return result | |
def render_html_diff( | |
edits: List[Edit], | |
original: str, | |
context_chars: int = 5, | |
side_by_side: bool = False, | |
use_fast_html: bool | None = None, | |
) -> str: | |
""" | |
Unified function to render HTML diffs with automatic optimization. | |
Args: | |
edits: List of Edit operations | |
original: Original text (for length checking) | |
context_chars: Number of context lines to show in fast mode | |
side_by_side: Whether to use side-by-side view | |
use_fast_html: Force fast mode (None for auto-detect) | |
Returns: | |
HTML string of the diff | |
""" | |
if use_fast_html is None: | |
use_fast_html = is_long_diff(edits, original) | |
if use_fast_html: | |
if side_by_side: | |
return generate_side_by_side_html_fast( | |
edits, context_chars=context_chars | |
) | |
else: | |
return generate_html_diff_fast(edits, context_chars=context_chars) | |
else: | |
if side_by_side: | |
# For non-fast mode, still use length-based optimization | |
if len(edits) > 500: | |
return generate_side_by_side_html_fast(edits, max_length=50000) | |
else: | |
return generate_side_by_side_html(edits) | |
else: | |
return generate_html_diff(edits, show_equal=True, max_equal_length=200) | |
return analyze_text_changes, render_html_diff | |
def _(mo): | |
o_file_upload = mo.ui.file(label="Original text", kind="area") | |
p_file_upload = mo.ui.file(label="Preprocessed text", kind="area") | |
file_stack = mo.hstack([o_file_upload, p_file_upload], widths="equal") | |
return file_stack, o_file_upload, p_file_upload | |
def _(mo): | |
o_textbox = mo.ui.text_area(label="Original text", full_width=True) | |
p_textbox = mo.ui.text_area(label="Preprocessed text", full_width=True) | |
text_stack = mo.hstack([o_textbox, p_textbox], widths="equal") | |
return o_textbox, p_textbox, text_stack | |
def _(file_stack, mo, text_stack): | |
mo.ui.tabs({"Text": text_stack, "File": file_stack}) | |
return | |
def check_text_similarity(text1: str, text2: str, threshold: float = 0.1) -> bool: | |
"""Check if texts are similar enough based on length and character overlap.""" | |
if not text1 or not text2: | |
return False | |
return len(set(text1) & set(text2)) / len( | |
set(text1) | set(text2) | |
) >= threshold and abs(len(text1) - len(text2)) / max(len(text1), len(text2)) <= ( | |
1 - threshold | |
) | |
def _(mo, o_file_upload, o_textbox, p_file_upload, p_textbox): | |
from charset_normalizer import detect | |
def detect_encoding(b: bytes) -> str: | |
result = detect(b) | |
return result["encoding"] | |
o_text, p_text = ( | |
"Example text will be used if none provided!", | |
"Example Text will be used, if none provided.", | |
) | |
try: | |
if o_file_upload.contents(): | |
encoding = detect_encoding(o_file_upload.contents()) | |
try: | |
o_text = o_file_upload.contents().decode(encoding) | |
except UnicodeDecodeError: | |
o_text = o_file_upload.contents().decode("utf-8") | |
elif o_textbox.value: | |
o_text = o_textbox.value | |
if p_file_upload.contents(): | |
encoding = detect_encoding(o_file_upload.contents()) | |
try: | |
p_text = p_file_upload.contents().decode(encoding) | |
except UnicodeDecodeError: | |
p_text = p_file_upload.contents().decode("utf-8") | |
elif p_textbox.value: | |
p_text = p_textbox.value | |
except UnicodeDecodeError: | |
mo.stop( | |
True, | |
mo.md("Error decoding files. Please try UTF-8.").callout(kind="danger"), | |
) | |
mo.stop( | |
not check_text_similarity(o_text, p_text), | |
mo.md( | |
f"Texts are too dissimilar! Aborting comparison.\n\n{o_text[:50]}\n\n{p_text[:50]}" | |
).callout(kind="danger"), | |
) | |
return o_text, p_text | |
def _(analyze_text_changes, o_text, p_text): | |
results = analyze_text_changes(o_text, p_text) | |
return (results,) | |
def _(mo, results): | |
results_tables = mo.vstack( | |
[ | |
results["tables"]["overall"], | |
results["tables"]["char_class"], | |
results["tables"]["compact"], | |
] | |
) | |
return (results_tables,) | |
def _(mo, o_text, render_html_diff, results, results_tables): | |
diff_view = mo.ui.tabs( | |
{ | |
"Combined diff": mo.Html( | |
render_html_diff( | |
results["edits"], | |
o_text, | |
) | |
), | |
"Side-by-side diff": mo.Html( | |
render_html_diff( | |
results["edits"], | |
o_text, | |
side_by_side=True, | |
) | |
), | |
} | |
) | |
mo.md(f""" | |
# Results | |
{results_tables} | |
{diff_view} | |
""") | |
return | |
def _(): | |
return | |
if __name__ == "__main__": | |
app.run() | |