"""
Utility functions for text processing.
Contains helper functions for working with text data from OCR.
"""
import re
import logging
import difflib
from typing import List, Dict, Any, Optional
# Configure logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def format_ocr_text(text: str, for_display: bool = False) -> str:
"""
Format OCR text for display or processing.
This function maintains clean separation between data and presentation.
Args:
text: OCR text to format
for_display: Whether to format for display (HTML) or plain text
Returns:
Formatted text
"""
if not text:
return ""
# Clean the text first
text = clean_raw_text(text)
# Basic text formatting (line breaks, etc.)
formatted_text = text.replace("\n", "
" if for_display else "\n")
if for_display:
# For display, wrap in paragraph tags but avoid unnecessary divs
# to maintain content purity
return f"
{formatted_text}
" else: # For processing, return clean text only - no markup return formatted_text def format_markdown_text(text: str, preserve_format: bool = True) -> str: """ Format text as Markdown, preserving or enhancing its structure. Ensures that text has clean markdown formatting without introducing unnecessary presentation elements. Args: text: Raw text to format as Markdown preserve_format: Whether to preserve original formatting Returns: Markdown-formatted text """ if not text: return "" # Clean the text first text = clean_raw_text(text) # Normalize line endings text = text.replace('\r\n', '\n').replace('\r', '\n') # Preserve paragraphs if requested if preserve_format: # Ensure paragraphs are separated by double line breaks text = re.sub(r'\n{3,}', '\n\n', text) else: # Convert single line breaks within paragraphs to spaces text = re.sub(r'(? str: """ Clean raw text by removing unnecessary whitespace and artifacts. Args: text: Raw text to clean Returns: Cleaned text """ if not text: return "" # Remove image references like  text = re.sub(r'!\[.*?\]\(data:image/[^)]+\)', '', text) # Remove basic markdown image references like  text = re.sub(r'!\[[^\]]*\]\([^)]+\)', '', text) # Remove base64 encoded image data text = re.sub(r'data:image/[^;]+;base64,[a-zA-Z0-9+/=]+', '', text) # Clean up any JSON-like image object references text = re.sub(r'{"image(_data)?":("[^"]*"|null|true|false|\{[^}]*\}|\[[^\]]*\])}', '', text) # Clean up excessive whitespace and line breaks created by removals text = re.sub(r'\n{3,}', '\n\n', text) text = re.sub(r'\s{3,}', ' ', text) return text.strip() def detect_content_regions(image_np): """ Detect content regions based on text density analysis. Returns regions with adaptive overlapping. Args: image_np: Numpy array image Returns: list: List of region tuples (x, y, width, height) """ # Import necessary modules import numpy as np import cv2 # Convert to grayscale for text detection if len(image_np.shape) > 2 and image_np.shape[2] == 3: gray = cv2.cvtColor(image_np, cv2.COLOR_RGB2GRAY) else: gray = image_np # Create text density profile # Sum pixel values horizontally to get vertical text density v_profile = np.sum(255 - gray, axis=1) # Normalize the profile v_profile = v_profile / np.max(v_profile) if np.max(v_profile) > 0 else v_profile # Find significant density changes changes = [] threshold = 0.2 for i in range(1, len(v_profile)): if abs(v_profile[i] - v_profile[i-1]) > threshold: changes.append(i) # Create adaptive regions based on density changes img_height, img_width = gray.shape # Default to at least 3 regions with overlap if len(changes) < 2: # If no significant changes, use default division with overlapping regions header_height = int(img_height * 0.3) middle_start = int(img_height * 0.2) middle_height = int(img_height * 0.4) body_start = int(img_height * 0.5) body_height = img_height - body_start else: # Use detected density changes for more precise regions changes = sorted(changes) header_height = changes[0] + int(img_height * 0.05) # Add overlap middle_start = max(0, changes[0] - int(img_height * 0.05)) if len(changes) > 1: middle_height = (changes[1] - middle_start) + int(img_height * 0.05) body_start = max(0, changes[1] - int(img_height * 0.05)) else: middle_height = int(img_height * 0.4) body_start = int(img_height * 0.5) body_height = img_height - body_start # Define regions with adaptive overlap regions = [ (0, 0, img_width, header_height), # Header region (0, middle_start, img_width, middle_height), # Middle region with overlap (0, body_start, img_width, body_height) # Body region with overlap ] return regions def merge_region_texts(regions: List[Dict[str, Any]], min_similarity_threshold: float = 0.7) -> str: """ Intelligently merge text from multiple document regions, handling overlapping content. Uses text similarity detection to avoid duplicating content from overlapping regions. Args: regions: List of region dictionaries, each containing 'text' and 'order' keys min_similarity_threshold: Minimum similarity ratio to consider text as duplicate Returns: Merged text with duplications removed """ # If no regions, return empty string if not regions: return "" # If only one region, return its text directly if len(regions) == 1: return regions[0]['text'] # Sort regions by their defined order sorted_regions = sorted(regions, key=lambda x: x.get('order', 0)) # Extract text segments from each region texts = [region.get('text', '').strip() for region in sorted_regions] # Remove empty texts texts = [t for t in texts if t] if not texts: return "" # Start with the first region's text merged_text = texts[0] # Process each subsequent region for i in range(1, len(texts)): current_text = texts[i] # Skip if current text is empty if not current_text: continue # Find potential overlap with existing merged text # Split both texts into lines for line-by-line comparison merged_lines = merged_text.splitlines() current_lines = current_text.splitlines() # Initialize variables to track where to start appending append_from_line = 0 # Default: append all lines from current text max_similarity = 0.0 max_similarity_pos = -1 # Check for potential line duplications # Look at the last N lines of merged text (N = min(20, len(merged_lines))) # to see if they match the first N lines of current text check_lines = min(20, len(merged_lines)) for j in range(1, check_lines + 1): # Get the last j lines from merged text merged_end = "\n".join(merged_lines[-j:]) # Get the first j lines from current text current_start = "\n".join(current_lines[:j]) # Skip comparison if either section is too short if len(merged_end) < 10 or len(current_start) < 10: continue # Calculate similarity ratio similarity = difflib.SequenceMatcher(None, merged_end, current_start).ratio() # If we found a better match, update if similarity > max_similarity and similarity >= min_similarity_threshold: max_similarity = similarity max_similarity_pos = j # If we found a good match, skip those lines from current text if max_similarity_pos > 0: logger.info(f"Found overlapping text with similarity {max_similarity:.2f}, skipping {max_similarity_pos} lines") append_from_line = max_similarity_pos # Append non-duplicated content with a separator if append_from_line < len(current_lines): remaining_text = "\n".join(current_lines[append_from_line:]) if remaining_text.strip(): merged_text += "\n\n" + remaining_text return merged_text