Spaces:
Running
Running
""" | |
Utility functions for text processing. | |
Contains helper functions for working with text data from OCR. | |
""" | |
import re | |
import logging | |
import difflib | |
from typing import List, Dict, Any, Optional | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
def format_ocr_text(text: str, for_display: bool = False) -> str: | |
""" | |
Format OCR text for display or processing. | |
This function maintains clean separation between data and presentation. | |
Args: | |
text: OCR text to format | |
for_display: Whether to format for display (HTML) or plain text | |
Returns: | |
Formatted text | |
""" | |
if not text: | |
return "" | |
# Clean the text first | |
text = clean_raw_text(text) | |
# Basic text formatting (line breaks, etc.) | |
formatted_text = text.replace("\n", "<br>" if for_display else "\n") | |
if for_display: | |
# For display, wrap in paragraph tags but avoid unnecessary divs | |
# to maintain content purity | |
return f"<p>{formatted_text}</p>" | |
else: | |
# For processing, return clean text only - no markup | |
return formatted_text | |
def format_markdown_text(text: str, preserve_format: bool = True) -> str: | |
""" | |
Format text as Markdown, preserving or enhancing its structure. | |
Ensures that text has clean markdown formatting without introducing | |
unnecessary presentation elements. | |
Args: | |
text: Raw text to format as Markdown | |
preserve_format: Whether to preserve original formatting | |
Returns: | |
Markdown-formatted text | |
""" | |
if not text: | |
return "" | |
# Clean the text first | |
text = clean_raw_text(text) | |
# Normalize line endings | |
text = text.replace('\r\n', '\n').replace('\r', '\n') | |
# Preserve paragraphs if requested | |
if preserve_format: | |
# Ensure paragraphs are separated by double line breaks | |
text = re.sub(r'\n{3,}', '\n\n', text) | |
else: | |
# Convert single line breaks within paragraphs to spaces | |
text = re.sub(r'(?<!\n)\n(?!\n)', ' ', text) | |
# Ensure paragraphs are separated by double line breaks | |
text = re.sub(r'\n{2,}', '\n\n', text) | |
# Remove excess whitespace | |
text = re.sub(r' {2,}', ' ', text) | |
# Enhance markdown features if they exist | |
# Make sure headers have space after # marks | |
text = re.sub(r'(^|\n)(#{1,6})([^#\s])', r'\1\2 \3', text) | |
# Make sure list items have space after markers | |
text = re.sub(r'(^|\n)([*+-])([^\s])', r'\1\2 \3', text) | |
text = re.sub(r'(^|\n)(\d+\.)([^\s])', r'\1\2 \3', text) | |
return text.strip() | |
def clean_raw_text(text: str) -> str: | |
""" | |
Clean raw text by removing unnecessary whitespace and artifacts. | |
Args: | |
text: Raw text to clean | |
Returns: | |
Cleaned text | |
""" | |
if not text: | |
return "" | |
# Remove image references like  | |
text = re.sub(r'!\[.*?\]\(data:image/[^)]+\)', '', text) | |
# Remove basic markdown image references like  | |
text = re.sub(r'!\[[^\]]*\]\([^)]+\)', '', text) | |
# Remove base64 encoded image data | |
text = re.sub(r'data:image/[^;]+;base64,[a-zA-Z0-9+/=]+', '', text) | |
# Clean up any JSON-like image object references | |
text = re.sub(r'{"image(_data)?":("[^"]*"|null|true|false|\{[^}]*\}|\[[^\]]*\])}', '', text) | |
# Clean up excessive whitespace and line breaks created by removals | |
text = re.sub(r'\n{3,}', '\n\n', text) | |
text = re.sub(r'\s{3,}', ' ', text) | |
return text.strip() | |
def detect_content_regions(image_np): | |
""" | |
Detect content regions based on text density analysis. | |
Returns regions with adaptive overlapping. | |
Args: | |
image_np: Numpy array image | |
Returns: | |
list: List of region tuples (x, y, width, height) | |
""" | |
# Import necessary modules | |
import numpy as np | |
import cv2 | |
# Convert to grayscale for text detection | |
if len(image_np.shape) > 2 and image_np.shape[2] == 3: | |
gray = cv2.cvtColor(image_np, cv2.COLOR_RGB2GRAY) | |
else: | |
gray = image_np | |
# Create text density profile | |
# Sum pixel values horizontally to get vertical text density | |
v_profile = np.sum(255 - gray, axis=1) | |
# Normalize the profile | |
v_profile = v_profile / np.max(v_profile) if np.max(v_profile) > 0 else v_profile | |
# Find significant density changes | |
changes = [] | |
threshold = 0.2 | |
for i in range(1, len(v_profile)): | |
if abs(v_profile[i] - v_profile[i-1]) > threshold: | |
changes.append(i) | |
# Create adaptive regions based on density changes | |
img_height, img_width = gray.shape | |
# Default to at least 3 regions with overlap | |
if len(changes) < 2: | |
# If no significant changes, use default division with overlapping regions | |
header_height = int(img_height * 0.3) | |
middle_start = int(img_height * 0.2) | |
middle_height = int(img_height * 0.4) | |
body_start = int(img_height * 0.5) | |
body_height = img_height - body_start | |
else: | |
# Use detected density changes for more precise regions | |
changes = sorted(changes) | |
header_height = changes[0] + int(img_height * 0.05) # Add overlap | |
middle_start = max(0, changes[0] - int(img_height * 0.05)) | |
if len(changes) > 1: | |
middle_height = (changes[1] - middle_start) + int(img_height * 0.05) | |
body_start = max(0, changes[1] - int(img_height * 0.05)) | |
else: | |
middle_height = int(img_height * 0.4) | |
body_start = int(img_height * 0.5) | |
body_height = img_height - body_start | |
# Define regions with adaptive overlap | |
regions = [ | |
(0, 0, img_width, header_height), # Header region | |
(0, middle_start, img_width, middle_height), # Middle region with overlap | |
(0, body_start, img_width, body_height) # Body region with overlap | |
] | |
return regions | |
def merge_region_texts(regions: List[Dict[str, Any]], min_similarity_threshold: float = 0.7) -> str: | |
""" | |
Intelligently merge text from multiple document regions, handling overlapping content. | |
Uses text similarity detection to avoid duplicating content from overlapping regions. | |
Args: | |
regions: List of region dictionaries, each containing 'text' and 'order' keys | |
min_similarity_threshold: Minimum similarity ratio to consider text as duplicate | |
Returns: | |
Merged text with duplications removed | |
""" | |
# If no regions, return empty string | |
if not regions: | |
return "" | |
# If only one region, return its text directly | |
if len(regions) == 1: | |
return regions[0]['text'] | |
# Sort regions by their defined order | |
sorted_regions = sorted(regions, key=lambda x: x.get('order', 0)) | |
# Extract text segments from each region | |
texts = [region.get('text', '').strip() for region in sorted_regions] | |
# Remove empty texts | |
texts = [t for t in texts if t] | |
if not texts: | |
return "" | |
# Start with the first region's text | |
merged_text = texts[0] | |
# Process each subsequent region | |
for i in range(1, len(texts)): | |
current_text = texts[i] | |
# Skip if current text is empty | |
if not current_text: | |
continue | |
# Find potential overlap with existing merged text | |
# Split both texts into lines for line-by-line comparison | |
merged_lines = merged_text.splitlines() | |
current_lines = current_text.splitlines() | |
# Initialize variables to track where to start appending | |
append_from_line = 0 # Default: append all lines from current text | |
max_similarity = 0.0 | |
max_similarity_pos = -1 | |
# Check for potential line duplications | |
# Look at the last N lines of merged text (N = min(20, len(merged_lines))) | |
# to see if they match the first N lines of current text | |
check_lines = min(20, len(merged_lines)) | |
for j in range(1, check_lines + 1): | |
# Get the last j lines from merged text | |
merged_end = "\n".join(merged_lines[-j:]) | |
# Get the first j lines from current text | |
current_start = "\n".join(current_lines[:j]) | |
# Skip comparison if either section is too short | |
if len(merged_end) < 10 or len(current_start) < 10: | |
continue | |
# Calculate similarity ratio | |
similarity = difflib.SequenceMatcher(None, merged_end, current_start).ratio() | |
# If we found a better match, update | |
if similarity > max_similarity and similarity >= min_similarity_threshold: | |
max_similarity = similarity | |
max_similarity_pos = j | |
# If we found a good match, skip those lines from current text | |
if max_similarity_pos > 0: | |
logger.info(f"Found overlapping text with similarity {max_similarity:.2f}, skipping {max_similarity_pos} lines") | |
append_from_line = max_similarity_pos | |
# Append non-duplicated content with a separator | |
if append_from_line < len(current_lines): | |
remaining_text = "\n".join(current_lines[append_from_line:]) | |
if remaining_text.strip(): | |
merged_text += "\n\n" + remaining_text | |
return merged_text | |