""" Utility functions for OCR processing with Mistral AI. Contains helper functions for working with OCR responses and image handling. """ import json import base64 import io import zipfile import logging import numpy as np import time from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Union, Any, Tuple from functools import lru_cache # Configure logging logger = logging.getLogger("ocr_utils") try: from PIL import Image, ImageEnhance, ImageFilter, ImageOps import cv2 PILLOW_AVAILABLE = True CV2_AVAILABLE = True except ImportError as e: # Check which image libraries are available if "PIL" in str(e): PILLOW_AVAILABLE = False if "cv2" in str(e): CV2_AVAILABLE = False from mistralai import DocumentURLChunk, ImageURLChunk, TextChunk # Import configuration try: from config import IMAGE_PREPROCESSING except ImportError: # Fallback defaults if config not available IMAGE_PREPROCESSING = { "enhance_contrast": 1.5, "sharpen": True, "denoise": True, "max_size_mb": 8.0, "target_dpi": 300, "compression_quality": 92 } def replace_images_in_markdown(markdown_str: str, images_dict: dict) -> str: """ Replace image placeholders in markdown with base64-encoded images. Args: markdown_str: Markdown text containing image placeholders images_dict: Dictionary mapping image IDs to base64 strings Returns: Markdown text with images replaced by base64 data """ for img_name, base64_str in images_dict.items(): markdown_str = markdown_str.replace( f"", f"" ) return markdown_str def get_combined_markdown(ocr_response) -> str: """ Combine OCR text and images into a single markdown document. Args: ocr_response: OCR response object from Mistral AI Returns: Combined markdown string with embedded images """ markdowns = [] # Process each page of the OCR response for page in ocr_response.pages: # Extract image data if available image_data = {} if hasattr(page, "images"): for img in page.images: if hasattr(img, "id") and hasattr(img, "image_base64"): image_data[img.id] = img.image_base64 # Replace image placeholders with base64 data page_markdown = page.markdown if hasattr(page, "markdown") else "" processed_markdown = replace_images_in_markdown(page_markdown, image_data) markdowns.append(processed_markdown) # Join all pages' markdown with double newlines return "\n\n".join(markdowns) def encode_image_for_api(image_path: Union[str, Path]) -> str: """ Encode an image as base64 data URL for API submission. Args: image_path: Path to the image file Returns: Base64 data URL for the image """ # Convert to Path object if string image_file = Path(image_path) if isinstance(image_path, str) else image_path # Verify image exists if not image_file.is_file(): raise FileNotFoundError(f"Image file not found: {image_file}") # Encode image as base64 encoded = base64.b64encode(image_file.read_bytes()).decode() return f"data:image/jpeg;base64,{encoded}" def process_image_with_ocr(client, image_path: Union[str, Path], model: str = "mistral-ocr-latest"): """ Process an image with OCR and return the response. Args: client: Mistral AI client image_path: Path to the image file model: OCR model to use Returns: OCR response object """ # Encode image as base64 base64_data_url = encode_image_for_api(image_path) # Process image with OCR image_response = client.ocr.process( document=ImageURLChunk(image_url=base64_data_url), model=model ) return image_response def ocr_response_to_json(ocr_response, indent: int = 4) -> str: """ Convert OCR response to a formatted JSON string. Args: ocr_response: OCR response object indent: Indentation level for JSON formatting Returns: Formatted JSON string """ # Convert OCR response to a dictionary response_dict = { "text": ocr_response.text if hasattr(ocr_response, "text") else "", "pages": [] } # Process pages if available if hasattr(ocr_response, "pages"): for page in ocr_response.pages: page_dict = { "text": page.text if hasattr(page, "text") else "", "markdown": page.markdown if hasattr(page, "markdown") else "", "images": [] } # Process images if available if hasattr(page, "images"): for img in page.images: img_dict = { "id": img.id if hasattr(img, "id") else "", "base64": img.image_base64 if hasattr(img, "image_base64") else "" } page_dict["images"].append(img_dict) response_dict["pages"].append(page_dict) # Convert dictionary to JSON return json.dumps(response_dict, indent=indent) def create_results_zip_in_memory(results): """ Create a zip file containing OCR results in memory. Args: results: Dictionary or list of OCR results Returns: Binary zip file data """ # Create a BytesIO object zip_buffer = io.BytesIO() # Check if results is a list or a dictionary is_list = isinstance(results, list) # Create zip file in memory with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zipf: if is_list: # Handle list of results for i, result in enumerate(results): try: # Add JSON results for each file result_json = json.dumps(result, indent=2) zipf.writestr(f"results_{i+1}.json", result_json) # Add HTML content (generated from the result) html_content = create_html_with_images(result) filename = result.get('file_name', f'document_{i+1}').split('.')[0] zipf.writestr(f"{filename}_with_images.html", html_content) # Add raw OCR text if available if "ocr_contents" in result and "raw_text" in result["ocr_contents"]: zipf.writestr(f"ocr_text_{i+1}.txt", result["ocr_contents"]["raw_text"]) # Add HTML visualization if available if "html_visualization" in result: zipf.writestr(f"visualization_{i+1}.html", result["html_visualization"]) # Add images if available (limit to conserve memory) if "pages_data" in result: for page_idx, page in enumerate(result["pages_data"]): for img_idx, img in enumerate(page.get("images", [])[:3]): # Limit to first 3 images per page img_base64 = img.get("image_base64", "") if img_base64: # Strip data URL prefix if present if img_base64.startswith("data:image"): img_base64 = img_base64.split(",", 1)[1] # Decode base64 and add to zip try: img_data = base64.b64decode(img_base64) zipf.writestr(f"images/result_{i+1}_page_{page_idx+1}_img_{img_idx+1}.jpg", img_data) except: pass except Exception: # If any result fails, skip it and continue continue else: # Handle single result try: # Add JSON results results_json = json.dumps(results, indent=2) zipf.writestr("results.json", results_json) # Add HTML content html_content = create_html_with_images(results) filename = results.get('file_name', 'document').split('.')[0] zipf.writestr(f"{filename}_with_images.html", html_content) # Add raw OCR text if available if "ocr_contents" in results and "raw_text" in results["ocr_contents"]: zipf.writestr("ocr_text.txt", results["ocr_contents"]["raw_text"]) # Add HTML visualization if available if "html_visualization" in results: zipf.writestr("visualization.html", results["html_visualization"]) # Add images if available if "pages_data" in results: for page_idx, page in enumerate(results["pages_data"]): for img_idx, img in enumerate(page.get("images", [])): img_base64 = img.get("image_base64", "") if img_base64: # Strip data URL prefix if present if img_base64.startswith("data:image"): img_base64 = img_base64.split(",", 1)[1] # Decode base64 and add to zip try: img_data = base64.b64decode(img_base64) zipf.writestr(f"images/page_{page_idx+1}_img_{img_idx+1}.jpg", img_data) except: pass except Exception: # If processing fails, return empty zip pass # Seek to the beginning of the BytesIO object zip_buffer.seek(0) # Return the zip file bytes return zip_buffer.getvalue() def create_results_zip(results, output_dir=None, zip_name=None): """ Create a zip file containing OCR results. Args: results: Dictionary or list of OCR results output_dir: Optional output directory zip_name: Optional zip file name Returns: Path to the created zip file """ # Create temporary output directory if not provided if output_dir is None: output_dir = Path.cwd() / "output" output_dir.mkdir(exist_ok=True) else: output_dir = Path(output_dir) output_dir.mkdir(exist_ok=True) # Check if results is a list or a dictionary is_list = isinstance(results, list) # Generate zip name if not provided if zip_name is None: if is_list: # For list of results, use timestamp and generic name timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") zip_name = f"ocr-results_{timestamp}.zip" else: # For single result, use original file's info # Check if processed_at exists, otherwise use current timestamp if "processed_at" in results: timestamp = results.get("processed_at", "").replace(":", "-").replace(".", "-") else: timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") file_name = results.get("file_name", "ocr-results") zip_name = f"{file_name}_{timestamp}.zip" try: # Get zip data in memory first zip_data = create_results_zip_in_memory(results) # Save to file zip_path = output_dir / zip_name with open(zip_path, 'wb') as f: f.write(zip_data) return zip_path except Exception as e: # Create an empty zip file as fallback zip_path = output_dir / zip_name with zipfile.ZipFile(zip_path, 'w') as zipf: zipf.writestr("info.txt", "Could not create complete archive") return zip_path # Advanced image preprocessing functions def preprocess_image_for_ocr(image_path: Union[str, Path]) -> Tuple[Image.Image, str]: """ Preprocess an image for optimal OCR performance with enhanced speed and memory optimization. Args: image_path: Path to the image file Returns: Tuple of (processed PIL Image, base64 string) """ # Fast path: Skip all processing if PIL not available if not PILLOW_AVAILABLE: logger.info("PIL not available, skipping image preprocessing") return None, encode_image_for_api(image_path) # Convert to Path object if string image_file = Path(image_path) if isinstance(image_path, str) else image_path # Thread-safe caching with early exit for already processed images try: # Fast stat calls for file metadata - consolidate to reduce I/O file_stat = image_file.stat() file_size = file_stat.st_size file_size_mb = file_size / (1024 * 1024) mod_time = file_stat.st_mtime # Create a cache key based on essential file properties cache_key = f"{image_file.name}_{file_size}_{mod_time}" # Fast path: Return cached result if available if hasattr(preprocess_image_for_ocr, "_cache") and cache_key in preprocess_image_for_ocr._cache: logger.debug(f"Using cached preprocessing result for {image_file.name}") return preprocess_image_for_ocr._cache[cache_key] # Optimization: Skip heavy processing for very small files # Small images (less than 100KB) likely don't need preprocessing if file_size < 100000: # 100KB logger.info(f"Image {image_file.name} is small ({file_size/1024:.1f}KB), using minimal processing") with Image.open(image_file) as img: # Normalize mode only if img.mode not in ('RGB', 'L'): img = img.convert('RGB') # Save with light optimization buffer = io.BytesIO() img.save(buffer, format="JPEG", quality=95, optimize=True) buffer.seek(0) # Get base64 encoded_image = base64.b64encode(buffer.getvalue()).decode() base64_data_url = f"data:image/jpeg;base64,{encoded_image}" # Cache and return result = (img, base64_data_url) if not hasattr(preprocess_image_for_ocr, "_cache"): preprocess_image_for_ocr._cache = {} # Clean cache if needed if len(preprocess_image_for_ocr._cache) > 20: # Increased cache size for better performance # Remove oldest 5 entries for better batch processing for _ in range(5): if preprocess_image_for_ocr._cache: preprocess_image_for_ocr._cache.pop(next(iter(preprocess_image_for_ocr._cache))) preprocess_image_for_ocr._cache[cache_key] = result return result except Exception as e: # If stat or cache handling fails, log and continue with processing logger.debug(f"Cache handling failed for {image_path}: {str(e)}") # Ensure we have a valid file_size_mb for later decisions try: file_size_mb = image_file.stat().st_size / (1024 * 1024) except: file_size_mb = 0 # Default if we can't determine size try: # Process start time for performance logging start_time = time.time() # Open and process the image with minimal memory footprint with Image.open(image_file) as img: # Normalize image mode if img.mode not in ('RGB', 'L'): img = img.convert('RGB') # Fast path: Quick check of image properties to determine appropriate processing width, height = img.size image_area = width * height # Detect document type only for medium to large images to save processing time is_document = False if image_area > 500000: # Approx 700x700 or larger # Store image for document detection _detect_document_type_impl._current_img = img is_document = _detect_document_type_impl(None) logger.debug(f"Document type detection for {image_file.name}: {'document' if is_document else 'photo'}") # Resize large images for API efficiency if file_size_mb > IMAGE_PREPROCESSING["max_size_mb"] or max(width, height) > 3000: # Calculate target dimensions directly instead of using the heavier resize function target_width, target_height = width, height max_dimension = max(width, height) # Use a sliding scale for reduction based on image size if max_dimension > 5000: scale_factor = 0.25 # Aggressive reduction for very large images elif max_dimension > 3000: scale_factor = 0.4 # Significant reduction for large images else: scale_factor = 0.6 # Moderate reduction for medium images # Calculate new dimensions new_width = int(width * scale_factor) new_height = int(height * scale_factor) # Use direct resize with optimized resampling filter based on image size if image_area > 3000000: # Very large, use faster but lower quality processed_img = img.resize((new_width, new_height), Image.BILINEAR) else: # Medium size, use better quality processed_img = img.resize((new_width, new_height), Image.LANCZOS) logger.debug(f"Resized image from {width}x{height} to {new_width}x{new_height}") else: # Skip resizing for smaller images processed_img = img # Apply appropriate processing based on document type and size if is_document: # Process as document with optimized path based on size if image_area > 1000000: # Full processing for larger documents preprocess_document_image._current_img = processed_img processed = _preprocess_document_image_impl() else: # Lightweight processing for smaller documents # Just enhance contrast for small documents to save time enhancer = ImageEnhance.Contrast(processed_img) processed = enhancer.enhance(1.3) else: # Process as photo with optimized path based on size if image_area > 1000000: # Full processing for larger photos preprocess_general_image._current_img = processed_img processed = _preprocess_general_image_impl() else: # Skip processing for smaller photos processed = processed_img # Optimize memory handling during encoding buffer = io.BytesIO() # Adjust quality based on image size to optimize API payload if file_size_mb > 5: quality = 85 # Lower quality for large files else: quality = IMAGE_PREPROCESSING["compression_quality"] # Save with optimized parameters processed.save(buffer, format="JPEG", quality=quality, optimize=True) buffer.seek(0) # Get base64 with minimal memory footprint encoded_image = base64.b64encode(buffer.getvalue()).decode() base64_data_url = f"data:image/jpeg;base64,{encoded_image}" # Update cache thread-safely result = (processed, base64_data_url) if not hasattr(preprocess_image_for_ocr, "_cache"): preprocess_image_for_ocr._cache = {} # LRU-like cache management with improved clearing if len(preprocess_image_for_ocr._cache) > 20: try: # Remove several entries to avoid frequent cache clearing for _ in range(5): if preprocess_image_for_ocr._cache: preprocess_image_for_ocr._cache.pop(next(iter(preprocess_image_for_ocr._cache))) except: # If removal fails, just continue pass # Add to cache try: preprocess_image_for_ocr._cache[cache_key] = result except Exception: # If caching fails, just proceed pass # Log performance metrics processing_time = time.time() - start_time logger.debug(f"Image preprocessing completed in {processing_time:.3f}s for {image_file.name}") # Return both processed image and base64 string return result except Exception as e: # If preprocessing fails, log error and use original image logger.warning(f"Image preprocessing failed: {str(e)}. Using original image.") return None, encode_image_for_api(image_path) # Removed caching decorator to fix unhashable type error def detect_document_type(img: Image.Image) -> bool: """ Detect if an image is likely a document (text-heavy) vs. a photo. Args: img: PIL Image object Returns: True if likely a document, False otherwise """ # Direct implementation without caching return _detect_document_type_impl(None) def _detect_document_type_impl(img_hash=None) -> bool: """ Optimized implementation of document type detection for faster processing. The img_hash parameter is unused but kept for backward compatibility. Enhanced to better detect handwritten documents. """ # Fast path: Get the image from thread-local storage if not hasattr(_detect_document_type_impl, "_current_img"): return False # Fail safe in case image is not set img = _detect_document_type_impl._current_img # Skip processing for tiny images - just classify as non-documents width, height = img.size if width * height < 100000: # Approx 300x300 or smaller return False # Convert to grayscale for analysis (using faster conversion) gray_img = img.convert('L') # PIL-only path for systems without OpenCV if not CV2_AVAILABLE: # Faster method: Sample a subset of the image for edge detection # Downscale image for faster processing sample_size = min(width, height, 1000) scale_factor = sample_size / max(width, height) if scale_factor < 0.9: # Only resize if significant reduction sample_img = gray_img.resize( (int(width * scale_factor), int(height * scale_factor)), Image.NEAREST # Fastest resampling method ) else: sample_img = gray_img # Fast edge detection on sample edges = sample_img.filter(ImageFilter.FIND_EDGES) # Count edge pixels using threshold (faster than summing individual pixels) edge_data = edges.getdata() edge_threshold = 40 # Lowered threshold to better detect handwritten texts # Use list comprehension for better performance edge_count = sum(1 for p in edge_data if p > edge_threshold) total_pixels = len(edge_data) edge_ratio = edge_count / total_pixels # Check if bright areas exist - simple approximation of text/background contrast bright_count = sum(1 for p in gray_img.getdata() if p > 200) bright_ratio = bright_count / (width * height) # Documents typically have more edges (text boundaries) and bright areas (background) # Lowered edge threshold to better detect handwritten documents return edge_ratio > 0.035 or bright_ratio > 0.4 # OpenCV path - optimized for speed and enhanced for handwritten documents img_np = np.array(gray_img) # 1. Fast check: Variance of pixel values # Documents typically have high variance (text on background) # Handwritten documents may have less contrast than printed text std_dev = np.std(img_np) if std_dev > 45: # Lowered threshold to better detect handwritten documents return True # 2. Quick check using downsampled image for edges # Downscale for faster processing on large images if max(img_np.shape) > 1000: scale = 1000 / max(img_np.shape) small_img = cv2.resize(img_np, None, fx=scale, fy=scale, interpolation=cv2.INTER_NEAREST) else: small_img = img_np # Use adaptive edge detection parameters for handwritten documents # Lowered threshold to better detect fainter handwritten text edges = cv2.Canny(small_img, 30, 130, L2gradient=False) edge_ratio = np.count_nonzero(edges) / edges.size # 3. Fast histogram approximation using bins # Instead of calculating full histogram, use bins for dark and light regions # Adjusted for handwritten documents which may have more gray values dark_mask = img_np < 60 # Increased threshold to capture lighter handwritten text light_mask = img_np > 180 # Lowered threshold to account for aged paper dark_ratio = np.count_nonzero(dark_mask) / img_np.size light_ratio = np.count_nonzero(light_mask) / img_np.size # Special analysis for handwritten documents # Check for line-like structures typical in handwritten text if CV2_AVAILABLE and edge_ratio > 0.02: # Lower threshold to capture handwritten documents # Try to find line segments that could indicate text lines lines = cv2.HoughLinesP(edges, 1, np.pi/180, threshold=50, # Lower threshold for detection minLineLength=30, # Shorter lines for handwriting maxLineGap=20) # Larger gap for discontinuous handwriting # If we find enough line segments, it's likely a document with text if lines is not None and len(lines) > 10: return True # Combine heuristics for final decision # Documents typically have both dark (text) and light (background) regions, # and/or well-defined edges # Lower thresholds for handwritten documents return (dark_ratio > 0.03 and light_ratio > 0.25) or edge_ratio > 0.03 # Removed caching to fix unhashable type error def preprocess_document_image(img: Image.Image) -> Image.Image: """ Preprocess a document image for optimal OCR. Args: img: PIL Image object Returns: Processed PIL Image """ # Store the image for the implementation function preprocess_document_image._current_img = img # The actual implementation is separated for cleaner code organization return _preprocess_document_image_impl() def _preprocess_document_image_impl() -> Image.Image: """ Optimized implementation of document preprocessing with adaptive processing based on image size. Enhanced for better handwritten document processing. """ # Fast path: Get image from thread-local storage if not hasattr(preprocess_document_image, "_current_img"): raise ValueError("No image set for document preprocessing") img = preprocess_document_image._current_img # Analyze image size to determine processing strategy width, height = img.size img_size = width * height # Check if the image might be a handwritten document - use special processing is_handwritten = False try: # Simple check for handwritten document characteristics # Handwritten documents often have more varied strokes and less stark contrast if CV2_AVAILABLE: # Convert to grayscale and calculate local variance gray_np = np.array(img.convert('L')) # Higher variance in edge strengths can indicate handwriting edges = cv2.Canny(gray_np, 30, 100) if np.count_nonzero(edges) / edges.size > 0.02: # Low edge threshold for handwriting # Additional check with gradient magnitudes sobelx = cv2.Sobel(gray_np, cv2.CV_64F, 1, 0, ksize=3) sobely = cv2.Sobel(gray_np, cv2.CV_64F, 0, 1, ksize=3) magnitude = np.sqrt(sobelx**2 + sobely**2) # Handwriting typically has more variation in gradient magnitudes if np.std(magnitude) > 20: is_handwritten = True except: # If detection fails, assume it's not handwritten pass # Ultra-fast path for tiny images - just convert to grayscale with contrast enhancement if img_size < 300000: # ~500x600 or smaller gray = img.convert('L') # Lower contrast enhancement for handwritten documents contrast_level = 1.4 if is_handwritten else IMAGE_PREPROCESSING["enhance_contrast"] enhancer = ImageEnhance.Contrast(gray) return enhancer.enhance(contrast_level) # Fast path for small images - minimal processing if img_size < 1000000: # ~1000x1000 or smaller gray = img.convert('L') # Use gentler contrast enhancement for handwritten documents contrast_level = 1.4 if is_handwritten else IMAGE_PREPROCESSING["enhance_contrast"] enhancer = ImageEnhance.Contrast(gray) enhanced = enhancer.enhance(contrast_level) # Light sharpening only if sharpen is enabled # Use milder sharpening for handwritten documents to preserve stroke detail if IMAGE_PREPROCESSING["sharpen"]: if is_handwritten: # Use edge enhancement which is gentler than SHARPEN for handwriting enhanced = enhanced.filter(ImageFilter.EDGE_ENHANCE) else: enhanced = enhanced.filter(ImageFilter.SHARPEN) return enhanced # Standard path for medium images # Convert to grayscale (faster processing) gray = img.convert('L') # Adaptive contrast enhancement based on document type contrast_level = 1.4 if is_handwritten else IMAGE_PREPROCESSING["enhance_contrast"] enhancer = ImageEnhance.Contrast(gray) enhanced = enhancer.enhance(contrast_level) # Apply light sharpening for text clarity - adapt based on document type if IMAGE_PREPROCESSING["sharpen"]: if is_handwritten: # Use edge enhancement which is gentler than SHARPEN for handwriting enhanced = enhanced.filter(ImageFilter.EDGE_ENHANCE) else: enhanced = enhanced.filter(ImageFilter.SHARPEN) # Advanced processing with OpenCV if available if CV2_AVAILABLE and IMAGE_PREPROCESSING["denoise"]: try: # Convert to numpy array for OpenCV processing img_np = np.array(enhanced) if is_handwritten: # Special treatment for handwritten documents # Use guided filter which preserves edges better than NLMeans # Guided filter works well for handwriting by preserving stroke details if img_size > 3000000: # Large images - downsample first scale_factor = 0.5 small_img = cv2.resize(img_np, None, fx=scale_factor, fy=scale_factor, interpolation=cv2.INTER_AREA) # Apply bilateral filter which preserves edges while smoothing filtered = cv2.bilateralFilter(small_img, 9, 75, 75) # Resize back filtered = cv2.resize(filtered, (width, height), interpolation=cv2.INTER_LINEAR) else: # Use bilateral filter directly for smaller images filtered = cv2.bilateralFilter(img_np, 7, 50, 50) # Convert back to PIL Image enhanced = Image.fromarray(filtered) # For handwritten docs, avoid binary thresholding which can destroy subtle strokes return enhanced else: # Standard document processing - optimized for printed text # Optimize denoising parameters based on image size if img_size > 4000000: # Very large images # More aggressive downsampling for very large images scale_factor = 0.5 downsample = cv2.resize(img_np, None, fx=scale_factor, fy=scale_factor, interpolation=cv2.INTER_AREA) # Lighter denoising for downsampled image h_value = 7 # Strength parameter template_window = 5 search_window = 13 # Apply denoising on smaller image denoised_np = cv2.fastNlMeansDenoising(downsample, None, h_value, template_window, search_window) # Resize back to original size denoised_np = cv2.resize(denoised_np, (width, height), interpolation=cv2.INTER_LINEAR) else: # Direct denoising for medium-large images h_value = 8 # Balanced for speed and quality template_window = 5 search_window = 15 # Apply denoising denoised_np = cv2.fastNlMeansDenoising(img_np, None, h_value, template_window, search_window) # Convert back to PIL Image enhanced = Image.fromarray(denoised_np) # Apply adaptive thresholding only if it improves text visibility # Create a binarized version of the image if img_size < 8000000: # Skip for extremely large images to save processing time binary = cv2.adaptiveThreshold(denoised_np, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 11, 2) # Quick verification that binarization preserves text information # Use simplified check that works well for document images white_pixels_binary = np.count_nonzero(binary > 200) white_pixels_orig = np.count_nonzero(denoised_np > 200) # Check if binary preserves reasonable amount of white pixels (background) if white_pixels_binary > white_pixels_orig * 0.8: # Binarization looks good, use it return Image.fromarray(binary) return enhanced except Exception as e: # If OpenCV processing fails, continue with PIL-enhanced image pass elif IMAGE_PREPROCESSING["denoise"]: # Fallback PIL denoising for systems without OpenCV if is_handwritten: # Lighter filtering for handwritten text to preserve details # Use a smaller median filter for handwritten documents enhanced = enhanced.filter(ImageFilter.MedianFilter(1)) else: # Standard filtering for printed documents enhanced = enhanced.filter(ImageFilter.MedianFilter(3)) # Return enhanced grayscale image return enhanced # Removed caching to fix unhashable type error def preprocess_general_image(img: Image.Image) -> Image.Image: """ Preprocess a general image for OCR. Args: img: PIL Image object Returns: Processed PIL Image """ # Store the image for implementation function preprocess_general_image._current_img = img return _preprocess_general_image_impl() def _preprocess_general_image_impl() -> Image.Image: """ Optimized implementation of general image preprocessing with size-based processing paths """ # Fast path: Get the image from thread-local storage if not hasattr(preprocess_general_image, "_current_img"): raise ValueError("No image set for general preprocessing") img = preprocess_general_image._current_img # Ultra-fast path: Skip processing completely for small images to improve performance width, height = img.size img_size = width * height if img_size < 300000: # Skip for tiny images under ~0.3 megapixel # Just ensure correct color mode if img.mode != 'RGB': return img.convert('RGB') return img # Fast path: Minimal processing for smaller images if img_size < 600000: # ~800x750 or smaller # Ensure RGB mode if img.mode != 'RGB': img = img.convert('RGB') # Very light contrast enhancement only enhancer = ImageEnhance.Contrast(img) return enhancer.enhance(1.15) # Lighter enhancement for small images # Standard path: Apply moderate enhancements for medium images # Convert to RGB to ensure compatibility if img.mode != 'RGB': img = img.convert('RGB') # Moderate enhancement only enhancer = ImageEnhance.Contrast(img) enhanced = enhancer.enhance(1.2) # Less aggressive than document enhancement # Skip additional processing for medium-sized images if img_size < 1000000: # Skip for images under ~1 megapixel return enhanced # Enhanced path: Additional processing for larger images try: # Apply optimized enhancement pipeline for large non-document images # 1. Improve color saturation slightly for better feature extraction saturation = ImageEnhance.Color(enhanced) enhanced = saturation.enhance(1.1) # 2. Apply adaptive sharpening based on image size if img_size > 2500000: # Very large images (~1600x1600 or larger) # Use EDGE_ENHANCE instead of SHARPEN for more subtle enhancement on large images enhanced = enhanced.filter(ImageFilter.EDGE_ENHANCE) else: # Standard sharpening for regular large images enhanced = enhanced.filter(ImageFilter.SHARPEN) # 3. Apply additional processing with OpenCV if available (for largest images) if CV2_AVAILABLE and img_size > 3000000: # Convert to numpy array img_np = np.array(enhanced) # Apply subtle enhancement of details (CLAHE) try: # Convert to LAB color space for better processing lab = cv2.cvtColor(img_np, cv2.COLOR_RGB2LAB) # Only enhance the L channel (luminance) l, a, b = cv2.split(lab) # Create CLAHE object with optimal parameters for photos clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8)) # Apply CLAHE to L channel l = clahe.apply(l) # Merge channels back and convert to RGB lab = cv2.merge((l, a, b)) enhanced_np = cv2.cvtColor(lab, cv2.COLOR_LAB2RGB) # Convert back to PIL enhanced = Image.fromarray(enhanced_np) except: # If CLAHE fails, continue with PIL-enhanced image pass except Exception: # If any enhancement fails, fall back to basic contrast enhancement if img.mode != 'RGB': img = img.convert('RGB') enhancer = ImageEnhance.Contrast(img) enhanced = enhancer.enhance(1.2) return enhanced # Removed caching decorator to fix unhashable type error def resize_image(img: Image.Image, target_dpi: int = 300) -> Image.Image: """ Resize an image to an optimal size for OCR while preserving quality. Args: img: PIL Image object target_dpi: Target DPI (dots per inch) Returns: Resized PIL Image """ # Store the image for implementation function resize_image._current_img = img return resize_image_impl(target_dpi) def resize_image_impl(target_dpi: int = 300) -> Image.Image: """ Implementation of resize function that uses thread-local storage. Args: target_dpi: Target DPI (dots per inch) Returns: Resized PIL Image """ # Get the image from thread-local storage (set by the caller) if not hasattr(resize_image, "_current_img"): raise ValueError("No image set for resizing") img = resize_image._current_img # Calculate current dimensions width, height = img.size # Fixed target dimensions based on DPI # Using 8.5x11 inches (standard paper size) as reference max_width = int(8.5 * target_dpi) max_height = int(11 * target_dpi) # Check if resizing is needed - quick early return if width <= max_width and height <= max_height: return img # No resizing needed # Calculate scaling factor once scale_factor = min(max_width / width, max_height / height) # Calculate new dimensions new_width = int(width * scale_factor) new_height = int(height * scale_factor) # Use BICUBIC for better balance of speed and quality return img.resize((new_width, new_height), Image.BICUBIC) def calculate_image_entropy(img: Image.Image) -> float: """ Calculate the entropy (information content) of an image. Args: img: PIL Image object Returns: Entropy value """ # Convert to grayscale if img.mode != 'L': img = img.convert('L') # Calculate histogram histogram = img.histogram() total_pixels = img.width * img.height # Calculate entropy entropy = 0 for h in histogram: if h > 0: probability = h / total_pixels entropy -= probability * np.log2(probability) return entropy def create_html_with_images(result): """ Create an HTML document with embedded images from OCR results. Args: result: OCR result dictionary containing pages_data Returns: HTML content as string """ # Create HTML document structure html_content = """
{line}
" html_content += '{content}
' elif isinstance(content, list): html_content += '