Spaces:
Running
Running
""" | |
Utility functions for OCR processing with Mistral AI. | |
Contains helper functions for working with OCR responses and image handling. | |
""" | |
import json | |
import base64 | |
import io | |
import zipfile | |
import logging | |
import numpy as np | |
import time | |
from datetime import datetime | |
from pathlib import Path | |
from typing import Dict, List, Optional, Union, Any, Tuple | |
from functools import lru_cache | |
# Configure logging | |
logger = logging.getLogger("ocr_utils") | |
try: | |
from PIL import Image, ImageEnhance, ImageFilter, ImageOps | |
import cv2 | |
PILLOW_AVAILABLE = True | |
CV2_AVAILABLE = True | |
except ImportError as e: | |
# Check which image libraries are available | |
if "PIL" in str(e): | |
PILLOW_AVAILABLE = False | |
if "cv2" in str(e): | |
CV2_AVAILABLE = False | |
from mistralai import DocumentURLChunk, ImageURLChunk, TextChunk | |
from mistralai.models import OCRImageObject | |
# Import configuration | |
try: | |
from config import IMAGE_PREPROCESSING | |
except ImportError: | |
# Fallback defaults if config not available | |
IMAGE_PREPROCESSING = { | |
"enhance_contrast": 1.5, | |
"sharpen": True, | |
"denoise": True, | |
"max_size_mb": 8.0, | |
"target_dpi": 300, | |
"compression_quality": 92 | |
} | |
def replace_images_in_markdown(markdown_str: str, images_dict: dict) -> str: | |
""" | |
Replace image placeholders in markdown with base64-encoded images. | |
Args: | |
markdown_str: Markdown text containing image placeholders | |
images_dict: Dictionary mapping image IDs to base64 strings | |
Returns: | |
Markdown text with images replaced by base64 data | |
""" | |
for img_name, base64_str in images_dict.items(): | |
markdown_str = markdown_str.replace( | |
f"", f"" | |
) | |
return markdown_str | |
def get_combined_markdown(ocr_response) -> str: | |
""" | |
Combine OCR text and images into a single markdown document. | |
Args: | |
ocr_response: OCR response object from Mistral AI | |
Returns: | |
Combined markdown string with embedded images | |
""" | |
markdowns = [] | |
# Process each page of the OCR response | |
for page in ocr_response.pages: | |
# Extract image data if available | |
image_data = {} | |
if hasattr(page, "images"): | |
for img in page.images: | |
if hasattr(img, "id") and hasattr(img, "image_base64"): | |
image_data[img.id] = img.image_base64 | |
# Replace image placeholders with base64 data | |
page_markdown = page.markdown if hasattr(page, "markdown") else "" | |
processed_markdown = replace_images_in_markdown(page_markdown, image_data) | |
markdowns.append(processed_markdown) | |
# Join all pages' markdown with double newlines | |
return "\n\n".join(markdowns) | |
def encode_image_for_api(image_path: Union[str, Path]) -> str: | |
""" | |
Encode an image as base64 data URL for API submission. | |
Args: | |
image_path: Path to the image file | |
Returns: | |
Base64 data URL for the image | |
""" | |
# Convert to Path object if string | |
image_file = Path(image_path) if isinstance(image_path, str) else image_path | |
# Verify image exists | |
if not image_file.is_file(): | |
raise FileNotFoundError(f"Image file not found: {image_file}") | |
# Encode image as base64 | |
encoded = base64.b64encode(image_file.read_bytes()).decode() | |
return f"data:image/jpeg;base64,{encoded}" | |
def process_image_with_ocr(client, image_path: Union[str, Path], model: str = "mistral-ocr-latest"): | |
""" | |
Process an image with OCR and return the response. | |
Args: | |
client: Mistral AI client | |
image_path: Path to the image file | |
model: OCR model to use | |
Returns: | |
OCR response object | |
""" | |
# Encode image as base64 | |
base64_data_url = encode_image_for_api(image_path) | |
# Process image with OCR | |
image_response = client.ocr.process( | |
document=ImageURLChunk(image_url=base64_data_url), | |
model=model | |
) | |
return image_response | |
def ocr_response_to_json(ocr_response, indent: int = 4) -> str: | |
""" | |
Convert OCR response to a formatted JSON string. | |
Args: | |
ocr_response: OCR response object | |
indent: Indentation level for JSON formatting | |
Returns: | |
Formatted JSON string | |
""" | |
# Convert OCR response to a dictionary | |
response_dict = { | |
"text": ocr_response.text if hasattr(ocr_response, "text") else "", | |
"pages": [] | |
} | |
# Process pages if available | |
if hasattr(ocr_response, "pages"): | |
for page in ocr_response.pages: | |
page_dict = { | |
"text": page.text if hasattr(page, "text") else "", | |
"markdown": page.markdown if hasattr(page, "markdown") else "", | |
"images": [] | |
} | |
# Process images if available | |
if hasattr(page, "images"): | |
for img in page.images: | |
img_dict = { | |
"id": img.id if hasattr(img, "id") else "", | |
"base64": img.image_base64 if hasattr(img, "image_base64") else "" | |
} | |
page_dict["images"].append(img_dict) | |
response_dict["pages"].append(page_dict) | |
# Convert dictionary to JSON | |
return json.dumps(response_dict, indent=indent) | |
def create_results_zip_in_memory(results): | |
""" | |
Create a zip file containing OCR results in memory. | |
Args: | |
results: Dictionary or list of OCR results | |
Returns: | |
Binary zip file data | |
""" | |
# Create a BytesIO object | |
zip_buffer = io.BytesIO() | |
# Check if results is a list or a dictionary | |
is_list = isinstance(results, list) | |
# Create zip file in memory | |
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zipf: | |
if is_list: | |
# Handle list of results | |
for i, result in enumerate(results): | |
try: | |
# Create a descriptive base filename for this result | |
base_filename = result.get('file_name', f'document_{i+1}').split('.')[0] | |
# Add document type if available | |
if 'topics' in result and result['topics']: | |
topic = result['topics'][0].lower().replace(' ', '_') | |
base_filename = f"{base_filename}_{topic}" | |
# Add language if available | |
if 'languages' in result and result['languages']: | |
lang = result['languages'][0].lower() | |
# Only add if it's not already in the filename | |
if lang not in base_filename.lower(): | |
base_filename = f"{base_filename}_{lang}" | |
# For PDFs, add page information | |
if 'total_pages' in result and 'processed_pages' in result: | |
base_filename = f"{base_filename}_p{result['processed_pages']}of{result['total_pages']}" | |
# Add timestamp if available | |
if 'timestamp' in result: | |
try: | |
# Try to parse the timestamp and reformat it | |
dt = datetime.strptime(result['timestamp'], "%Y-%m-%d %H:%M") | |
timestamp = dt.strftime("%Y%m%d_%H%M%S") | |
base_filename = f"{base_filename}_{timestamp}" | |
except: | |
pass | |
# Add JSON results for each file with descriptive name | |
result_json = json.dumps(result, indent=2) | |
zipf.writestr(f"{base_filename}.json", result_json) | |
# Add HTML content (generated from the result) | |
html_content = create_html_with_images(result) | |
zipf.writestr(f"{base_filename}_with_images.html", html_content) | |
# Add raw OCR text if available | |
if "ocr_contents" in result and "raw_text" in result["ocr_contents"]: | |
zipf.writestr(f"{base_filename}.txt", result["ocr_contents"]["raw_text"]) | |
# Add HTML visualization if available | |
if "html_visualization" in result: | |
zipf.writestr(f"visualization_{i+1}.html", result["html_visualization"]) | |
# Add images if available (limit to conserve memory) | |
if "pages_data" in result: | |
for page_idx, page in enumerate(result["pages_data"]): | |
for img_idx, img in enumerate(page.get("images", [])[:3]): # Limit to first 3 images per page | |
img_base64 = img.get("image_base64", "") | |
if img_base64: | |
# Strip data URL prefix if present | |
if img_base64.startswith("data:image"): | |
img_base64 = img_base64.split(",", 1)[1] | |
# Decode base64 and add to zip | |
try: | |
img_data = base64.b64decode(img_base64) | |
zipf.writestr(f"images/result_{i+1}_page_{page_idx+1}_img_{img_idx+1}.jpg", img_data) | |
except: | |
pass | |
except Exception: | |
# If any result fails, skip it and continue | |
continue | |
else: | |
# Handle single result | |
try: | |
# Create a descriptive base filename for this result | |
base_filename = results.get('file_name', 'document').split('.')[0] | |
# Add document type if available | |
if 'topics' in results and results['topics']: | |
topic = results['topics'][0].lower().replace(' ', '_') | |
base_filename = f"{base_filename}_{topic}" | |
# Add language if available | |
if 'languages' in results and results['languages']: | |
lang = results['languages'][0].lower() | |
# Only add if it's not already in the filename | |
if lang not in base_filename.lower(): | |
base_filename = f"{base_filename}_{lang}" | |
# For PDFs, add page information | |
if 'total_pages' in results and 'processed_pages' in results: | |
base_filename = f"{base_filename}_p{results['processed_pages']}of{results['total_pages']}" | |
# Add timestamp if available | |
if 'timestamp' in results: | |
try: | |
# Try to parse the timestamp and reformat it | |
dt = datetime.strptime(results['timestamp'], "%Y-%m-%d %H:%M") | |
timestamp = dt.strftime("%Y%m%d_%H%M%S") | |
base_filename = f"{base_filename}_{timestamp}" | |
except: | |
# If parsing fails, create a new timestamp | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
base_filename = f"{base_filename}_{timestamp}" | |
else: | |
# No timestamp in the result, create a new one | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
base_filename = f"{base_filename}_{timestamp}" | |
# Add JSON results with descriptive name | |
results_json = json.dumps(results, indent=2) | |
zipf.writestr(f"{base_filename}.json", results_json) | |
# Add HTML content with descriptive name | |
html_content = create_html_with_images(results) | |
zipf.writestr(f"{base_filename}_with_images.html", html_content) | |
# Add raw OCR text if available | |
if "ocr_contents" in results and "raw_text" in results["ocr_contents"]: | |
zipf.writestr(f"{base_filename}.txt", results["ocr_contents"]["raw_text"]) | |
# Add HTML visualization if available | |
if "html_visualization" in results: | |
zipf.writestr("visualization.html", results["html_visualization"]) | |
# Add images if available | |
if "pages_data" in results: | |
for page_idx, page in enumerate(results["pages_data"]): | |
for img_idx, img in enumerate(page.get("images", [])): | |
img_base64 = img.get("image_base64", "") | |
if img_base64: | |
# Strip data URL prefix if present | |
if img_base64.startswith("data:image"): | |
img_base64 = img_base64.split(",", 1)[1] | |
# Decode base64 and add to zip | |
try: | |
img_data = base64.b64decode(img_base64) | |
zipf.writestr(f"images/page_{page_idx+1}_img_{img_idx+1}.jpg", img_data) | |
except: | |
pass | |
except Exception: | |
# If processing fails, return empty zip | |
pass | |
# Seek to the beginning of the BytesIO object | |
zip_buffer.seek(0) | |
# Return the zip file bytes | |
return zip_buffer.getvalue() | |
def create_results_zip(results, output_dir=None, zip_name=None): | |
""" | |
Create a zip file containing OCR results. | |
Args: | |
results: Dictionary or list of OCR results | |
output_dir: Optional output directory | |
zip_name: Optional zip file name | |
Returns: | |
Path to the created zip file | |
""" | |
# Create temporary output directory if not provided | |
if output_dir is None: | |
output_dir = Path.cwd() / "output" | |
output_dir.mkdir(exist_ok=True) | |
else: | |
output_dir = Path(output_dir) | |
output_dir.mkdir(exist_ok=True) | |
# Check if results is a list or a dictionary | |
is_list = isinstance(results, list) | |
# Generate zip name if not provided | |
if zip_name is None: | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
if is_list: | |
# For a list of results, create a more descriptive name based on the content | |
file_count = len(results) | |
# Count document types | |
pdf_count = sum(1 for r in results if r.get('file_name', '').lower().endswith('.pdf')) | |
img_count = sum(1 for r in results if r.get('file_name', '').lower().endswith(('.jpg', '.jpeg', '.png'))) | |
# Create descriptive name based on contents | |
if pdf_count > 0 and img_count > 0: | |
zip_name = f"historical_ocr_mixed_{pdf_count}pdf_{img_count}img_{timestamp}.zip" | |
elif pdf_count > 0: | |
zip_name = f"historical_ocr_pdf_documents_{pdf_count}_{timestamp}.zip" | |
elif img_count > 0: | |
zip_name = f"historical_ocr_images_{img_count}_{timestamp}.zip" | |
else: | |
zip_name = f"historical_ocr_results_{file_count}_{timestamp}.zip" | |
else: | |
# For single result, create descriptive filename | |
base_name = results.get("file_name", "document").split('.')[0] | |
# Add document type if available | |
if 'topics' in results and results['topics']: | |
topic = results['topics'][0].lower().replace(' ', '_') | |
base_name = f"{base_name}_{topic}" | |
# Add language if available | |
if 'languages' in results and results['languages']: | |
lang = results['languages'][0].lower() | |
# Only add if it's not already in the filename | |
if lang not in base_name.lower(): | |
base_name = f"{base_name}_{lang}" | |
# For PDFs, add page information | |
if 'total_pages' in results and 'processed_pages' in results: | |
base_name = f"{base_name}_p{results['processed_pages']}of{results['total_pages']}" | |
# Add timestamp | |
zip_name = f"{base_name}_{timestamp}.zip" | |
try: | |
# Get zip data in memory first | |
zip_data = create_results_zip_in_memory(results) | |
# Save to file | |
zip_path = output_dir / zip_name | |
with open(zip_path, 'wb') as f: | |
f.write(zip_data) | |
return zip_path | |
except Exception as e: | |
# Create an empty zip file as fallback | |
zip_path = output_dir / zip_name | |
with zipfile.ZipFile(zip_path, 'w') as zipf: | |
zipf.writestr("info.txt", "Could not create complete archive") | |
return zip_path | |
# Advanced image preprocessing functions | |
def preprocess_image_for_ocr(image_path: Union[str, Path]) -> Tuple[Image.Image, str]: | |
""" | |
Preprocess an image for optimal OCR performance with enhanced speed and memory optimization. | |
Enhanced to handle large newspaper and document images. | |
Args: | |
image_path: Path to the image file | |
Returns: | |
Tuple of (processed PIL Image, base64 string) | |
""" | |
# Fast path: Skip all processing if PIL not available | |
if not PILLOW_AVAILABLE: | |
logger.info("PIL not available, skipping image preprocessing") | |
return None, encode_image_for_api(image_path) | |
# Convert to Path object if string | |
image_file = Path(image_path) if isinstance(image_path, str) else image_path | |
# Thread-safe caching with early exit for already processed images | |
try: | |
# Fast stat calls for file metadata - consolidate to reduce I/O | |
file_stat = image_file.stat() | |
file_size = file_stat.st_size | |
file_size_mb = file_size / (1024 * 1024) | |
mod_time = file_stat.st_mtime | |
# Create a cache key based on essential file properties | |
cache_key = f"{image_file.name}_{file_size}_{mod_time}" | |
# Fast path: Return cached result if available | |
if hasattr(preprocess_image_for_ocr, "_cache") and cache_key in preprocess_image_for_ocr._cache: | |
logger.debug(f"Using cached preprocessing result for {image_file.name}") | |
return preprocess_image_for_ocr._cache[cache_key] | |
# Optimization: Skip heavy processing for very small files | |
# Small images (less than 100KB) likely don't need preprocessing | |
if file_size < 100000: # 100KB | |
logger.info(f"Image {image_file.name} is small ({file_size/1024:.1f}KB), using minimal processing") | |
with Image.open(image_file) as img: | |
# Normalize mode only | |
if img.mode not in ('RGB', 'L'): | |
img = img.convert('RGB') | |
# Save with light optimization | |
buffer = io.BytesIO() | |
img.save(buffer, format="JPEG", quality=95, optimize=True) | |
buffer.seek(0) | |
# Get base64 | |
encoded_image = base64.b64encode(buffer.getvalue()).decode() | |
base64_data_url = f"data:image/jpeg;base64,{encoded_image}" | |
# Cache and return | |
result = (img, base64_data_url) | |
if not hasattr(preprocess_image_for_ocr, "_cache"): | |
preprocess_image_for_ocr._cache = {} | |
# Clean cache if needed | |
if len(preprocess_image_for_ocr._cache) > 20: # Increased cache size for better performance | |
# Remove oldest 5 entries for better batch processing | |
for _ in range(5): | |
if preprocess_image_for_ocr._cache: | |
preprocess_image_for_ocr._cache.pop(next(iter(preprocess_image_for_ocr._cache))) | |
preprocess_image_for_ocr._cache[cache_key] = result | |
return result | |
# Special handling for large newspaper-style documents | |
if file_size_mb > 5 and image_file.name.lower().endswith(('.jpg', '.jpeg', '.png')): | |
logger.info(f"Large image detected ({file_size_mb:.2f}MB), checking for newspaper format") | |
try: | |
# Quickly check dimensions without loading full image | |
with Image.open(image_file) as img: | |
width, height = img.size | |
aspect_ratio = width / height | |
# Newspaper-style documents typically have width > height or are very large | |
is_newspaper_format = (aspect_ratio > 1.2 and width > 2000) or (width > 3000 or height > 3000) | |
if is_newspaper_format: | |
logger.info(f"Newspaper format detected: {width}x{height}, applying specialized processing") | |
except Exception as dim_err: | |
logger.debug(f"Error checking dimensions: {str(dim_err)}") | |
is_newspaper_format = False | |
else: | |
is_newspaper_format = False | |
except Exception as e: | |
# If stat or cache handling fails, log and continue with processing | |
logger.debug(f"Cache handling failed for {image_path}: {str(e)}") | |
# Ensure we have a valid file_size_mb for later decisions | |
try: | |
file_size_mb = image_file.stat().st_size / (1024 * 1024) | |
except: | |
file_size_mb = 0 # Default if we can't determine size | |
# Default to not newspaper format on error | |
is_newspaper_format = False | |
try: | |
# Process start time for performance logging | |
start_time = time.time() | |
# Open and process the image with minimal memory footprint | |
with Image.open(image_file) as img: | |
# Normalize image mode | |
if img.mode not in ('RGB', 'L'): | |
img = img.convert('RGB') | |
# Fast path: Quick check of image properties to determine appropriate processing | |
width, height = img.size | |
image_area = width * height | |
# Detect document type only for medium to large images to save processing time | |
is_document = False | |
is_newspaper = False | |
# More aggressive document type detection for larger images | |
if image_area > 500000: # Approx 700x700 or larger | |
# Store image for document detection | |
_detect_document_type_impl._current_img = img | |
is_document = _detect_document_type_impl(None) | |
# Additional check for newspaper format | |
if is_document: | |
# Newspapers typically have wide formats or very large dimensions | |
aspect_ratio = width / height | |
is_newspaper = (aspect_ratio > 1.2 and width > 2000) or (width > 3000 or height > 3000) | |
logger.debug(f"Document type detection for {image_file.name}: " + | |
f"{'newspaper' if is_newspaper else 'document' if is_document else 'photo'}") | |
# Special processing for very large images (newspapers and large documents) | |
if is_newspaper: | |
# For newspaper format, we need more specialized processing | |
logger.info(f"Processing newspaper format image: {width}x{height}") | |
# For newspapers, we prioritize text clarity over file size | |
# Use higher target resolution to preserve small text common in newspapers | |
# But still need to resize if extremely large to avoid API limits | |
max_dimension = max(width, height) | |
if max_dimension > 6000: # Extremely large | |
scale_factor = 0.4 # Preserve more resolution for newspapers (increased from 0.35) | |
elif max_dimension > 4000: | |
scale_factor = 0.6 # Higher resolution for better text extraction (increased from 0.5) | |
else: | |
scale_factor = 0.8 # Minimal reduction for moderate newspaper size (increased from 0.7) | |
# Calculate new dimensions - maintain higher resolution | |
new_width = int(width * scale_factor) | |
new_height = int(height * scale_factor) | |
# Use high-quality resampling to preserve text clarity in newspapers | |
processed_img = img.resize((new_width, new_height), Image.LANCZOS) | |
logger.debug(f"Resized newspaper image from {width}x{height} to {new_width}x{new_height}") | |
# For newspapers, we also want to enhance the contrast and sharpen the image | |
# before the main OCR processing for better text extraction | |
if img.mode in ('RGB', 'RGBA'): | |
# For color newspapers, enhance both the overall image and then convert to grayscale | |
# This helps with mixed content newspapers that have both text and images | |
enhancer = ImageEnhance.Contrast(processed_img) | |
processed_img = enhancer.enhance(1.3) # Boost contrast but not too aggressively | |
# Also enhance saturation to make colored text more visible | |
enhancer_sat = ImageEnhance.Color(processed_img) | |
processed_img = enhancer_sat.enhance(1.2) | |
# Standard processing for other large images | |
elif file_size_mb > IMAGE_PREPROCESSING["max_size_mb"] or max(width, height) > 3000: | |
# Calculate target dimensions directly instead of using the heavier resize function | |
target_width, target_height = width, height | |
max_dimension = max(width, height) | |
# Use a sliding scale for reduction based on image size | |
if max_dimension > 5000: | |
scale_factor = 0.3 # Slightly less aggressive reduction (was 0.25) | |
elif max_dimension > 3000: | |
scale_factor = 0.45 # Slightly less aggressive reduction (was 0.4) | |
else: | |
scale_factor = 0.65 # Slightly less aggressive reduction (was 0.6) | |
# Calculate new dimensions | |
new_width = int(width * scale_factor) | |
new_height = int(height * scale_factor) | |
# Use direct resize with optimized resampling filter based on image size | |
if image_area > 3000000: # Very large, use faster but lower quality | |
processed_img = img.resize((new_width, new_height), Image.BILINEAR) | |
else: # Medium size, use better quality | |
processed_img = img.resize((new_width, new_height), Image.LANCZOS) | |
logger.debug(f"Resized image from {width}x{height} to {new_width}x{new_height}") | |
else: | |
# Skip resizing for smaller images | |
processed_img = img | |
# Apply appropriate processing based on document type and size | |
if is_document: | |
# Process as document with optimized path based on size | |
if image_area > 1000000: # Full processing for larger documents | |
preprocess_document_image._current_img = processed_img | |
processed = _preprocess_document_image_impl() | |
else: # Lightweight processing for smaller documents | |
# Just enhance contrast for small documents to save time | |
enhancer = ImageEnhance.Contrast(processed_img) | |
processed = enhancer.enhance(1.3) | |
else: | |
# Process as photo with optimized path based on size | |
if image_area > 1000000: # Full processing for larger photos | |
preprocess_general_image._current_img = processed_img | |
processed = _preprocess_general_image_impl() | |
else: # Skip processing for smaller photos | |
processed = processed_img | |
# Optimize memory handling during encoding | |
buffer = io.BytesIO() | |
# Adjust quality based on image size to optimize API payload | |
if file_size_mb > 5: | |
quality = 85 # Lower quality for large files | |
else: | |
quality = IMAGE_PREPROCESSING["compression_quality"] | |
# Save with optimized parameters | |
processed.save(buffer, format="JPEG", quality=quality, optimize=True) | |
buffer.seek(0) | |
# Get base64 with minimal memory footprint | |
encoded_image = base64.b64encode(buffer.getvalue()).decode() | |
base64_data_url = f"data:image/jpeg;base64,{encoded_image}" | |
# Update cache thread-safely | |
result = (processed, base64_data_url) | |
if not hasattr(preprocess_image_for_ocr, "_cache"): | |
preprocess_image_for_ocr._cache = {} | |
# LRU-like cache management with improved clearing | |
if len(preprocess_image_for_ocr._cache) > 20: | |
try: | |
# Remove several entries to avoid frequent cache clearing | |
for _ in range(5): | |
if preprocess_image_for_ocr._cache: | |
preprocess_image_for_ocr._cache.pop(next(iter(preprocess_image_for_ocr._cache))) | |
except: | |
# If removal fails, just continue | |
pass | |
# Add to cache | |
try: | |
preprocess_image_for_ocr._cache[cache_key] = result | |
except Exception: | |
# If caching fails, just proceed | |
pass | |
# Log performance metrics | |
processing_time = time.time() - start_time | |
logger.debug(f"Image preprocessing completed in {processing_time:.3f}s for {image_file.name}") | |
# Return both processed image and base64 string | |
return result | |
except Exception as e: | |
# If preprocessing fails, log error and use original image | |
logger.warning(f"Image preprocessing failed: {str(e)}. Using original image.") | |
return None, encode_image_for_api(image_path) | |
# Removed caching decorator to fix unhashable type error | |
def detect_document_type(img: Image.Image) -> bool: | |
""" | |
Detect if an image is likely a document (text-heavy) vs. a photo. | |
Args: | |
img: PIL Image object | |
Returns: | |
True if likely a document, False otherwise | |
""" | |
# Direct implementation without caching | |
return _detect_document_type_impl(None) | |
def _detect_document_type_impl(img_hash=None) -> bool: | |
""" | |
Optimized implementation of document type detection for faster processing. | |
The img_hash parameter is unused but kept for backward compatibility. | |
Enhanced to better detect handwritten documents and newspaper formats. | |
""" | |
# Fast path: Get the image from thread-local storage | |
if not hasattr(_detect_document_type_impl, "_current_img"): | |
return False # Fail safe in case image is not set | |
img = _detect_document_type_impl._current_img | |
# Skip processing for tiny images - just classify as non-documents | |
width, height = img.size | |
if width * height < 100000: # Approx 300x300 or smaller | |
return False | |
# Convert to grayscale for analysis (using faster conversion) | |
gray_img = img.convert('L') | |
# PIL-only path for systems without OpenCV | |
if not CV2_AVAILABLE: | |
# Faster method: Sample a subset of the image for edge detection | |
# Downscale image for faster processing | |
sample_size = min(width, height, 1000) | |
scale_factor = sample_size / max(width, height) | |
if scale_factor < 0.9: # Only resize if significant reduction | |
sample_img = gray_img.resize( | |
(int(width * scale_factor), int(height * scale_factor)), | |
Image.NEAREST # Fastest resampling method | |
) | |
else: | |
sample_img = gray_img | |
# Fast edge detection on sample | |
edges = sample_img.filter(ImageFilter.FIND_EDGES) | |
# Count edge pixels using threshold (faster than summing individual pixels) | |
edge_data = edges.getdata() | |
edge_threshold = 40 # Lowered threshold to better detect handwritten texts | |
# Use list comprehension for better performance | |
edge_count = sum(1 for p in edge_data if p > edge_threshold) | |
total_pixels = len(edge_data) | |
edge_ratio = edge_count / total_pixels | |
# Check if bright areas exist - simple approximation of text/background contrast | |
bright_count = sum(1 for p in gray_img.getdata() if p > 200) | |
bright_ratio = bright_count / (width * height) | |
# Documents typically have more edges (text boundaries) and bright areas (background) | |
# Lowered edge threshold to better detect handwritten documents | |
return edge_ratio > 0.035 or bright_ratio > 0.4 | |
# OpenCV path - optimized for speed and enhanced for handwritten documents | |
img_np = np.array(gray_img) | |
# 1. Fast check: Variance of pixel values | |
# Documents typically have high variance (text on background) | |
# Handwritten documents may have less contrast than printed text | |
std_dev = np.std(img_np) | |
if std_dev > 45: # Lowered threshold to better detect handwritten documents | |
return True | |
# 2. Quick check using downsampled image for edges | |
# Downscale for faster processing on large images | |
if max(img_np.shape) > 1000: | |
scale = 1000 / max(img_np.shape) | |
small_img = cv2.resize(img_np, None, fx=scale, fy=scale, interpolation=cv2.INTER_NEAREST) | |
else: | |
small_img = img_np | |
# Use adaptive edge detection parameters for handwritten documents | |
# Lowered threshold to better detect fainter handwritten text | |
edges = cv2.Canny(small_img, 30, 130, L2gradient=False) | |
edge_ratio = np.count_nonzero(edges) / edges.size | |
# 3. Fast histogram approximation using bins | |
# Instead of calculating full histogram, use bins for dark and light regions | |
# Adjusted for handwritten documents which may have more gray values | |
dark_mask = img_np < 60 # Increased threshold to capture lighter handwritten text | |
light_mask = img_np > 180 # Lowered threshold to account for aged paper | |
dark_ratio = np.count_nonzero(dark_mask) / img_np.size | |
light_ratio = np.count_nonzero(light_mask) / img_np.size | |
# Special analysis for handwritten documents | |
# Check for line-like structures typical in handwritten text | |
if CV2_AVAILABLE and edge_ratio > 0.02: # Lower threshold to capture handwritten documents | |
# Try to find line segments that could indicate text lines | |
lines = cv2.HoughLinesP(edges, 1, np.pi/180, | |
threshold=50, # Lower threshold for detection | |
minLineLength=30, # Shorter lines for handwriting | |
maxLineGap=20) # Larger gap for discontinuous handwriting | |
# If we find enough line segments, it's likely a document with text | |
if lines is not None and len(lines) > 10: | |
return True | |
# Combine heuristics for final decision | |
# Documents typically have both dark (text) and light (background) regions, | |
# and/or well-defined edges | |
# Lower thresholds for handwritten documents | |
return (dark_ratio > 0.03 and light_ratio > 0.25) or edge_ratio > 0.03 | |
# Removed caching to fix unhashable type error | |
def preprocess_document_image(img: Image.Image) -> Image.Image: | |
""" | |
Preprocess a document image for optimal OCR. | |
Args: | |
img: PIL Image object | |
Returns: | |
Processed PIL Image | |
""" | |
# Store the image for the implementation function | |
preprocess_document_image._current_img = img | |
# The actual implementation is separated for cleaner code organization | |
return _preprocess_document_image_impl() | |
def _preprocess_document_image_impl() -> Image.Image: | |
""" | |
Optimized implementation of document preprocessing with adaptive processing based on image size. | |
Enhanced for better handwritten document processing and newspaper format. | |
""" | |
# Fast path: Get image from thread-local storage | |
if not hasattr(preprocess_document_image, "_current_img"): | |
raise ValueError("No image set for document preprocessing") | |
img = preprocess_document_image._current_img | |
# Analyze image size to determine processing strategy | |
width, height = img.size | |
img_size = width * height | |
# Detect special document types | |
is_handwritten = False | |
is_newspaper = False | |
# Check for newspaper format first (takes precedence) | |
aspect_ratio = width / height | |
if (aspect_ratio > 1.2 and width > 2000) or (width > 3000 or height > 3000): | |
is_newspaper = True | |
logger.debug(f"Newspaper format detected: {width}x{height}, aspect ratio: {aspect_ratio:.2f}") | |
else: | |
# If not newspaper, check if handwritten | |
try: | |
# Simple check for handwritten document characteristics | |
# Handwritten documents often have more varied strokes and less stark contrast | |
if CV2_AVAILABLE: | |
# Convert to grayscale and calculate local variance | |
gray_np = np.array(img.convert('L')) | |
# Higher variance in edge strengths can indicate handwriting | |
edges = cv2.Canny(gray_np, 30, 100) | |
if np.count_nonzero(edges) / edges.size > 0.02: # Low edge threshold for handwriting | |
# Additional check with gradient magnitudes | |
sobelx = cv2.Sobel(gray_np, cv2.CV_64F, 1, 0, ksize=3) | |
sobely = cv2.Sobel(gray_np, cv2.CV_64F, 0, 1, ksize=3) | |
magnitude = np.sqrt(sobelx**2 + sobely**2) | |
# Handwriting typically has more variation in gradient magnitudes | |
if np.std(magnitude) > 20: | |
is_handwritten = True | |
except: | |
# If detection fails, assume it's not handwritten | |
pass | |
# Special processing for newspaper format | |
if is_newspaper: | |
# Convert to grayscale for better text extraction | |
gray = img.convert('L') | |
# For newspapers, we need aggressive text enhancement to make small print readable | |
# First enhance contrast more aggressively for newspaper small text | |
enhancer = ImageEnhance.Contrast(gray) | |
enhanced = enhancer.enhance(2.0) # More aggressive contrast for newspaper text | |
# Apply stronger sharpening to make small text more defined | |
if IMAGE_PREPROCESSING["sharpen"]: | |
# Apply multiple passes of sharpening for newspaper text | |
enhanced = enhanced.filter(ImageFilter.SHARPEN) | |
enhanced = enhanced.filter(ImageFilter.EDGE_ENHANCE_MORE) # Stronger edge enhancement | |
# Enhanced processing for newspapers with OpenCV when available | |
if CV2_AVAILABLE: | |
try: | |
# Convert to numpy array | |
img_np = np.array(enhanced) | |
# For newspaper text extraction, CLAHE (Contrast Limited Adaptive Histogram Equalization) | |
# works much better than simple contrast enhancement | |
clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8)) | |
img_np = clahe.apply(img_np) | |
# Apply different adaptive thresholding approaches and choose the best one | |
# 1. Standard adaptive threshold with larger block size for newspaper columns | |
binary1 = cv2.adaptiveThreshold(img_np, 255, | |
cv2.ADAPTIVE_THRESH_GAUSSIAN_C, | |
cv2.THRESH_BINARY, 15, 4) | |
# 2. Otsu's method for global thresholding - works well for clean newspaper print | |
_, binary2 = cv2.threshold(img_np, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) | |
# Try to determine which method preserves text better | |
# Count white pixels and edges in each binary version | |
white_pixels1 = np.count_nonzero(binary1 > 200) | |
white_pixels2 = np.count_nonzero(binary2 > 200) | |
# Calculate edge density to help determine which preserves text features better | |
edges1 = cv2.Canny(binary1, 100, 200) | |
edges2 = cv2.Canny(binary2, 100, 200) | |
edge_count1 = np.count_nonzero(edges1) | |
edge_count2 = np.count_nonzero(edges2) | |
# For newspaper text, we want to preserve more edges while maintaining reasonable | |
# white space (typical of printed text on paper background) | |
if (edge_count1 > edge_count2 * 1.2 and white_pixels1 > white_pixels2 * 0.7) or \ | |
(white_pixels1 < white_pixels2 * 0.5): # If Otsu removed too much content | |
# Adaptive thresholding usually better preserves small text in newspapers | |
logger.debug("Using adaptive thresholding for newspaper text") | |
# Apply optional denoising to clean up small speckles | |
result = cv2.fastNlMeansDenoising(binary1, None, 7, 7, 21) | |
return Image.fromarray(result) | |
else: | |
# Otsu method was better | |
logger.debug("Using Otsu thresholding for newspaper text") | |
result = cv2.fastNlMeansDenoising(binary2, None, 7, 7, 21) | |
return Image.fromarray(result) | |
except Exception as e: | |
logger.debug(f"Advanced newspaper processing failed: {str(e)}") | |
# Fall back to PIL processing | |
pass | |
# If OpenCV not available or fails, apply additional PIL enhancements | |
# Create a more aggressive binary version to better separate text | |
binary_threshold = enhanced.point(lambda x: 0 if x < 150 else 255, '1') | |
# Return enhanced binary image | |
return binary_threshold | |
# Ultra-fast path for tiny images - just convert to grayscale with contrast enhancement | |
if img_size < 300000: # ~500x600 or smaller | |
gray = img.convert('L') | |
# Lower contrast enhancement for handwritten documents | |
contrast_level = 1.4 if is_handwritten else IMAGE_PREPROCESSING["enhance_contrast"] | |
enhancer = ImageEnhance.Contrast(gray) | |
return enhancer.enhance(contrast_level) | |
# Fast path for small images - minimal processing | |
if img_size < 1000000: # ~1000x1000 or smaller | |
gray = img.convert('L') | |
# Use gentler contrast enhancement for handwritten documents | |
contrast_level = 1.4 if is_handwritten else IMAGE_PREPROCESSING["enhance_contrast"] | |
enhancer = ImageEnhance.Contrast(gray) | |
enhanced = enhancer.enhance(contrast_level) | |
# Light sharpening only if sharpen is enabled | |
# Use milder sharpening for handwritten documents to preserve stroke detail | |
if IMAGE_PREPROCESSING["sharpen"]: | |
if is_handwritten: | |
# Use edge enhancement which is gentler than SHARPEN for handwriting | |
enhanced = enhanced.filter(ImageFilter.EDGE_ENHANCE) | |
else: | |
enhanced = enhanced.filter(ImageFilter.SHARPEN) | |
return enhanced | |
# Standard path for medium images | |
# Convert to grayscale (faster processing) | |
gray = img.convert('L') | |
# Adaptive contrast enhancement based on document type | |
contrast_level = 1.4 if is_handwritten else IMAGE_PREPROCESSING["enhance_contrast"] | |
enhancer = ImageEnhance.Contrast(gray) | |
enhanced = enhancer.enhance(contrast_level) | |
# Apply light sharpening for text clarity - adapt based on document type | |
if IMAGE_PREPROCESSING["sharpen"]: | |
if is_handwritten: | |
# Use edge enhancement which is gentler than SHARPEN for handwriting | |
enhanced = enhanced.filter(ImageFilter.EDGE_ENHANCE) | |
else: | |
enhanced = enhanced.filter(ImageFilter.SHARPEN) | |
# Advanced processing with OpenCV if available | |
if CV2_AVAILABLE and IMAGE_PREPROCESSING["denoise"]: | |
try: | |
# Convert to numpy array for OpenCV processing | |
img_np = np.array(enhanced) | |
if is_handwritten: | |
# Special treatment for handwritten documents | |
# Use guided filter which preserves edges better than NLMeans | |
# Guided filter works well for handwriting by preserving stroke details | |
if img_size > 3000000: # Large images - downsample first | |
scale_factor = 0.5 | |
small_img = cv2.resize(img_np, None, fx=scale_factor, fy=scale_factor, | |
interpolation=cv2.INTER_AREA) | |
# Apply bilateral filter which preserves edges while smoothing | |
filtered = cv2.bilateralFilter(small_img, 9, 75, 75) | |
# Resize back | |
filtered = cv2.resize(filtered, (width, height), interpolation=cv2.INTER_LINEAR) | |
else: | |
# Use bilateral filter directly for smaller images | |
filtered = cv2.bilateralFilter(img_np, 7, 50, 50) | |
# Convert back to PIL Image | |
enhanced = Image.fromarray(filtered) | |
# For handwritten docs, avoid binary thresholding which can destroy subtle strokes | |
return enhanced | |
else: | |
# Standard document processing - optimized for printed text | |
# Optimize denoising parameters based on image size | |
if img_size > 4000000: # Very large images | |
# More aggressive downsampling for very large images | |
scale_factor = 0.5 | |
downsample = cv2.resize(img_np, None, fx=scale_factor, fy=scale_factor, | |
interpolation=cv2.INTER_AREA) | |
# Lighter denoising for downsampled image | |
h_value = 7 # Strength parameter | |
template_window = 5 | |
search_window = 13 | |
# Apply denoising on smaller image | |
denoised_np = cv2.fastNlMeansDenoising(downsample, None, h_value, template_window, search_window) | |
# Resize back to original size | |
denoised_np = cv2.resize(denoised_np, (width, height), interpolation=cv2.INTER_LINEAR) | |
else: | |
# Direct denoising for medium-large images | |
h_value = 8 # Balanced for speed and quality | |
template_window = 5 | |
search_window = 15 | |
# Apply denoising | |
denoised_np = cv2.fastNlMeansDenoising(img_np, None, h_value, template_window, search_window) | |
# Convert back to PIL Image | |
enhanced = Image.fromarray(denoised_np) | |
# Apply adaptive thresholding only if it improves text visibility | |
# Create a binarized version of the image | |
if img_size < 8000000: # Skip for extremely large images to save processing time | |
binary = cv2.adaptiveThreshold(denoised_np, 255, | |
cv2.ADAPTIVE_THRESH_GAUSSIAN_C, | |
cv2.THRESH_BINARY, 11, 2) | |
# Quick verification that binarization preserves text information | |
# Use simplified check that works well for document images | |
white_pixels_binary = np.count_nonzero(binary > 200) | |
white_pixels_orig = np.count_nonzero(denoised_np > 200) | |
# Check if binary preserves reasonable amount of white pixels (background) | |
if white_pixels_binary > white_pixels_orig * 0.8: | |
# Binarization looks good, use it | |
return Image.fromarray(binary) | |
return enhanced | |
except Exception as e: | |
# If OpenCV processing fails, continue with PIL-enhanced image | |
pass | |
elif IMAGE_PREPROCESSING["denoise"]: | |
# Fallback PIL denoising for systems without OpenCV | |
if is_handwritten: | |
# Lighter filtering for handwritten text to preserve details | |
# Use a smaller median filter for handwritten documents | |
enhanced = enhanced.filter(ImageFilter.MedianFilter(1)) | |
else: | |
# Standard filtering for printed documents | |
enhanced = enhanced.filter(ImageFilter.MedianFilter(3)) | |
# Return enhanced grayscale image | |
return enhanced | |
# Removed caching to fix unhashable type error | |
def preprocess_general_image(img: Image.Image) -> Image.Image: | |
""" | |
Preprocess a general image for OCR. | |
Args: | |
img: PIL Image object | |
Returns: | |
Processed PIL Image | |
""" | |
# Store the image for implementation function | |
preprocess_general_image._current_img = img | |
return _preprocess_general_image_impl() | |
def _preprocess_general_image_impl() -> Image.Image: | |
""" | |
Optimized implementation of general image preprocessing with size-based processing paths | |
""" | |
# Fast path: Get the image from thread-local storage | |
if not hasattr(preprocess_general_image, "_current_img"): | |
raise ValueError("No image set for general preprocessing") | |
img = preprocess_general_image._current_img | |
# Ultra-fast path: Skip processing completely for small images to improve performance | |
width, height = img.size | |
img_size = width * height | |
if img_size < 300000: # Skip for tiny images under ~0.3 megapixel | |
# Just ensure correct color mode | |
if img.mode != 'RGB': | |
return img.convert('RGB') | |
return img | |
# Fast path: Minimal processing for smaller images | |
if img_size < 600000: # ~800x750 or smaller | |
# Ensure RGB mode | |
if img.mode != 'RGB': | |
img = img.convert('RGB') | |
# Very light contrast enhancement only | |
enhancer = ImageEnhance.Contrast(img) | |
return enhancer.enhance(1.15) # Lighter enhancement for small images | |
# Standard path: Apply moderate enhancements for medium images | |
# Convert to RGB to ensure compatibility | |
if img.mode != 'RGB': | |
img = img.convert('RGB') | |
# Moderate enhancement only | |
enhancer = ImageEnhance.Contrast(img) | |
enhanced = enhancer.enhance(1.2) # Less aggressive than document enhancement | |
# Skip additional processing for medium-sized images | |
if img_size < 1000000: # Skip for images under ~1 megapixel | |
return enhanced | |
# Enhanced path: Additional processing for larger images | |
try: | |
# Apply optimized enhancement pipeline for large non-document images | |
# 1. Improve color saturation slightly for better feature extraction | |
saturation = ImageEnhance.Color(enhanced) | |
enhanced = saturation.enhance(1.1) | |
# 2. Apply adaptive sharpening based on image size | |
if img_size > 2500000: # Very large images (~1600x1600 or larger) | |
# Use EDGE_ENHANCE instead of SHARPEN for more subtle enhancement on large images | |
enhanced = enhanced.filter(ImageFilter.EDGE_ENHANCE) | |
else: | |
# Standard sharpening for regular large images | |
enhanced = enhanced.filter(ImageFilter.SHARPEN) | |
# 3. Apply additional processing with OpenCV if available (for largest images) | |
if CV2_AVAILABLE and img_size > 3000000: | |
# Convert to numpy array | |
img_np = np.array(enhanced) | |
# Apply subtle enhancement of details (CLAHE) | |
try: | |
# Convert to LAB color space for better processing | |
lab = cv2.cvtColor(img_np, cv2.COLOR_RGB2LAB) | |
# Only enhance the L channel (luminance) | |
l, a, b = cv2.split(lab) | |
# Create CLAHE object with optimal parameters for photos | |
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8)) | |
# Apply CLAHE to L channel | |
l = clahe.apply(l) | |
# Merge channels back and convert to RGB | |
lab = cv2.merge((l, a, b)) | |
enhanced_np = cv2.cvtColor(lab, cv2.COLOR_LAB2RGB) | |
# Convert back to PIL | |
enhanced = Image.fromarray(enhanced_np) | |
except: | |
# If CLAHE fails, continue with PIL-enhanced image | |
pass | |
except Exception: | |
# If any enhancement fails, fall back to basic contrast enhancement | |
if img.mode != 'RGB': | |
img = img.convert('RGB') | |
enhancer = ImageEnhance.Contrast(img) | |
enhanced = enhancer.enhance(1.2) | |
return enhanced | |
# Removed caching decorator to fix unhashable type error | |
def resize_image(img: Image.Image, target_dpi: int = 300) -> Image.Image: | |
""" | |
Resize an image to an optimal size for OCR while preserving quality. | |
Args: | |
img: PIL Image object | |
target_dpi: Target DPI (dots per inch) | |
Returns: | |
Resized PIL Image | |
""" | |
# Store the image for implementation function | |
resize_image._current_img = img | |
return resize_image_impl(target_dpi) | |
def resize_image_impl(target_dpi: int = 300) -> Image.Image: | |
""" | |
Implementation of resize function that uses thread-local storage. | |
Args: | |
target_dpi: Target DPI (dots per inch) | |
Returns: | |
Resized PIL Image | |
""" | |
# Get the image from thread-local storage (set by the caller) | |
if not hasattr(resize_image, "_current_img"): | |
raise ValueError("No image set for resizing") | |
img = resize_image._current_img | |
# Calculate current dimensions | |
width, height = img.size | |
# Fixed target dimensions based on DPI | |
# Using larger dimensions to support newspapers and large documents | |
max_width = int(14 * target_dpi) # Increased from 8.5 to 14 inches | |
max_height = int(22 * target_dpi) # Increased from 11 to 22 inches | |
# Check if resizing is needed - quick early return | |
if width <= max_width and height <= max_height: | |
return img # No resizing needed | |
# Calculate scaling factor once | |
scale_factor = min(max_width / width, max_height / height) | |
# Calculate new dimensions | |
new_width = int(width * scale_factor) | |
new_height = int(height * scale_factor) | |
# Use BICUBIC for better balance of speed and quality | |
return img.resize((new_width, new_height), Image.BICUBIC) | |
def calculate_image_entropy(img: Image.Image) -> float: | |
""" | |
Calculate the entropy (information content) of an image. | |
Args: | |
img: PIL Image object | |
Returns: | |
Entropy value | |
""" | |
# Convert to grayscale | |
if img.mode != 'L': | |
img = img.convert('L') | |
# Calculate histogram | |
histogram = img.histogram() | |
total_pixels = img.width * img.height | |
# Calculate entropy | |
entropy = 0 | |
for h in histogram: | |
if h > 0: | |
probability = h / total_pixels | |
entropy -= probability * np.log2(probability) | |
return entropy | |
def create_html_with_images(result): | |
""" | |
Create an HTML document with embedded images from OCR results. | |
Handles serialization of complex OCR objects automatically. | |
Args: | |
result: OCR result dictionary containing pages_data | |
Returns: | |
HTML content as string | |
""" | |
# Ensure result is fully serializable first | |
result = serialize_ocr_object(result) | |
# Create HTML document structure | |
html_content = """ | |
<!DOCTYPE html> | |
<html> | |
<head> | |
<meta charset="UTF-8"> | |
<meta name="viewport" content="width=device-width, initial-scale=1.0"> | |
<title>OCR Document with Images</title> | |
<style> | |
body { | |
font-family: Georgia, serif; | |
line-height: 1.7; | |
margin: 0 auto; | |
max-width: 800px; | |
padding: 20px; | |
} | |
img { | |
max-width: 90%; | |
max-height: 500px; | |
object-fit: contain; | |
margin: 20px auto; | |
display: block; | |
border: 1px solid #ddd; | |
border-radius: 4px; | |
} | |
.image-container { | |
margin: 20px 0; | |
text-align: center; | |
} | |
.page-break { | |
border-top: 1px solid #ddd; | |
margin: 40px 0; | |
padding-top: 40px; | |
} | |
h3 { | |
color: #333; | |
border-bottom: 1px solid #eee; | |
padding-bottom: 10px; | |
} | |
p { | |
margin: 12px 0; | |
} | |
.page-text-content { | |
margin-bottom: 20px; | |
} | |
.text-block { | |
background-color: #f9f9f9; | |
padding: 15px; | |
border-radius: 4px; | |
border-left: 3px solid #546e7a; | |
margin-bottom: 15px; | |
color: #333; | |
} | |
.text-block p { | |
margin: 8px 0; | |
color: #333; | |
} | |
.metadata { | |
background-color: #f5f5f5; | |
padding: 10px 15px; | |
border-radius: 4px; | |
margin-bottom: 20px; | |
font-size: 14px; | |
} | |
.metadata p { | |
margin: 5px 0; | |
} | |
</style> | |
</head> | |
<body> | |
""" | |
# Add document metadata | |
html_content += f""" | |
<div class="metadata"> | |
<h2>{result.get('file_name', 'Document')}</h2> | |
<p><strong>Processed at:</strong> {result.get('timestamp', '')}</p> | |
<p><strong>Languages:</strong> {', '.join(result.get('languages', ['Unknown']))}</p> | |
<p><strong>Topics:</strong> {', '.join(result.get('topics', ['Unknown']))}</p> | |
</div> | |
""" | |
# Check if we have pages_data | |
if 'pages_data' in result and result['pages_data']: | |
pages_data = result['pages_data'] | |
# Process each page | |
for i, page in enumerate(pages_data): | |
page_markdown = page.get('markdown', '') | |
images = page.get('images', []) | |
# Add page header if multi-page | |
if len(pages_data) > 1: | |
html_content += f"<h3>Page {i+1}</h3>" | |
# Create image dictionary | |
image_dict = {} | |
for img in images: | |
if 'id' in img and 'image_base64' in img: | |
image_dict[img['id']] = img['image_base64'] | |
# Process the markdown content | |
if page_markdown: | |
# Extract text content (lines without images) | |
text_content = [] | |
image_lines = [] | |
for line in page_markdown.split('\n'): | |
if ' | |
elif line.strip(): | |
text_content.append(line) | |
# Add text content | |
if text_content: | |
html_content += '<div class="text-block">' | |
for line in text_content: | |
html_content += f"<p>{line}</p>" | |
html_content += '</div>' | |
# Add images | |
for line in image_lines: | |
# Extract image ID and alt text using simple parsing | |
try: | |
alt_start = line.find('![') + 2 | |
alt_end = line.find(']', alt_start) | |
alt_text = line[alt_start:alt_end] | |
img_start = line.find('(', alt_end) + 1 | |
img_end = line.find(')', img_start) | |
img_id = line[img_start:img_end] | |
if img_id in image_dict: | |
html_content += f'<div class="image-container">' | |
html_content += f'<img src="{image_dict[img_id]}" alt="{alt_text}">' | |
html_content += f'</div>' | |
except: | |
# If parsing fails, just skip this image | |
continue | |
# Add page separator if not the last page | |
if i < len(pages_data) - 1: | |
html_content += '<div class="page-break"></div>' | |
# Add structured content if available | |
if 'ocr_contents' in result and isinstance(result['ocr_contents'], dict): | |
html_content += '<h3>Structured Content</h3>' | |
for section, content in result['ocr_contents'].items(): | |
if content and section not in ['error', 'raw_text', 'partial_text']: | |
html_content += f'<h4>{section.replace("_", " ").title()}</h4>' | |
if isinstance(content, str): | |
html_content += f'<p>{content}</p>' | |
elif isinstance(content, list): | |
html_content += '<ul>' | |
for item in content: | |
html_content += f'<li>{str(item)}</li>' | |
html_content += '</ul>' | |
elif isinstance(content, dict): | |
html_content += '<dl>' | |
for k, v in content.items(): | |
html_content += f'<dt>{k}</dt><dd>{v}</dd>' | |
html_content += '</dl>' | |
# Close HTML document | |
html_content += """ | |
</body> | |
</html> | |
""" | |
return html_content | |
def generate_document_thumbnail(image_path: Union[str, Path], max_size: int = 300) -> str: | |
""" | |
Generate a thumbnail for document preview. | |
Args: | |
image_path: Path to the image file | |
max_size: Maximum dimension for thumbnail | |
Returns: | |
Base64 encoded thumbnail | |
""" | |
if not PILLOW_AVAILABLE: | |
return None | |
try: | |
# Open the image | |
with Image.open(image_path) as img: | |
# Calculate thumbnail size preserving aspect ratio | |
width, height = img.size | |
if width > height: | |
new_width = max_size | |
new_height = int(height * (max_size / width)) | |
else: | |
new_height = max_size | |
new_width = int(width * (max_size / height)) | |
# Create thumbnail | |
thumbnail = img.resize((new_width, new_height), Image.LANCZOS) | |
# Save to buffer | |
buffer = io.BytesIO() | |
thumbnail.save(buffer, format="JPEG", quality=85) | |
buffer.seek(0) | |
# Encode as base64 | |
encoded = base64.b64encode(buffer.getvalue()).decode() | |
return f"data:image/jpeg;base64,{encoded}" | |
except Exception: | |
# Return None if thumbnail generation fails | |
return None | |
def serialize_ocr_object(obj): | |
""" | |
Serialize OCR response objects to JSON serializable format. | |
Handles OCRImageObject specifically to prevent serialization errors. | |
Args: | |
obj: The object to serialize | |
Returns: | |
JSON serializable representation of the object | |
""" | |
# Fast path: Handle primitive types directly | |
if obj is None or isinstance(obj, (str, int, float, bool)): | |
return obj | |
# Handle collections | |
if isinstance(obj, list): | |
return [serialize_ocr_object(item) for item in obj] | |
elif isinstance(obj, dict): | |
return {k: serialize_ocr_object(v) for k, v in obj.items()} | |
elif isinstance(obj, OCRImageObject): | |
# Special handling for OCRImageObject | |
return { | |
'id': obj.id if hasattr(obj, 'id') else None, | |
'image_base64': obj.image_base64 if hasattr(obj, 'image_base64') else None | |
} | |
elif hasattr(obj, '__dict__'): | |
# For objects with __dict__ attribute | |
return {k: serialize_ocr_object(v) for k, v in obj.__dict__.items() | |
if not k.startswith('_')} # Skip private attributes | |
else: | |
# Try to convert to string as last resort | |
try: | |
return str(obj) | |
except: | |
return None | |
def try_local_ocr_fallback(image_path: Union[str, Path], base64_data_url: str = None) -> str: | |
""" | |
Attempt to use local pytesseract OCR as a fallback when API fails | |
Args: | |
image_path: Path to the image file | |
base64_data_url: Optional base64 data URL if already available | |
Returns: | |
OCR text string if successful, None if failed | |
""" | |
logger.info("Attempting local OCR fallback using pytesseract...") | |
try: | |
import pytesseract | |
from PIL import Image | |
# Load image - either from path or from base64 | |
if base64_data_url and base64_data_url.startswith('data:image'): | |
# Extract image from base64 | |
image_data = base64_data_url.split(',', 1)[1] | |
image_bytes = base64.b64decode(image_data) | |
image = Image.open(io.BytesIO(image_bytes)) | |
else: | |
# Load from file path | |
image_path = Path(image_path) if isinstance(image_path, str) else image_path | |
image = Image.open(image_path) | |
# Convert to RGB if not already (pytesseract works best with RGB) | |
if image.mode != 'RGB': | |
image = image.convert('RGB') | |
# Apply image enhancements for better OCR | |
# Convert to grayscale for better text recognition | |
image = image.convert('L') | |
# Enhance contrast | |
enhancer = ImageEnhance.Contrast(image) | |
image = enhancer.enhance(2.0) # Higher contrast for better OCR | |
# Run OCR | |
ocr_text = pytesseract.image_to_string(image, lang='eng') | |
if ocr_text and len(ocr_text.strip()) > 50: | |
logger.info(f"Local OCR successful: extracted {len(ocr_text)} characters") | |
return ocr_text | |
else: | |
logger.warning("Local OCR produced minimal or no text") | |
return None | |
except ImportError: | |
logger.warning("Pytesseract not installed - local OCR not available") | |
return None | |
except Exception as e: | |
logger.error(f"Local OCR fallback failed: {str(e)}") | |
return None |