historical-ocr / utils /image_utils.py
milwright's picture
Fix OCR processing variable scope issue by using explicit module reference for apply_preprocessing_to_file
3dd2ff2
"""
Utility functions for OCR image processing with Mistral AI.
Contains helper functions for working with OCR responses and image handling.
"""
# Standard library imports
import json
import base64
import io
import zipfile
import logging
import re
import time
import math
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Union, Any, Tuple
from functools import lru_cache
# Configure logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Third-party imports
import numpy as np
# Mistral AI imports
from mistralai import DocumentURLChunk, ImageURLChunk, TextChunk
from mistralai.models import OCRImageObject
# Check for image processing libraries
try:
from PIL import Image, ImageEnhance, ImageFilter, ImageOps
PILLOW_AVAILABLE = True
except ImportError:
logger.warning("PIL not available - image preprocessing will be limited")
PILLOW_AVAILABLE = False
try:
import cv2
CV2_AVAILABLE = True
except ImportError:
logger.warning("OpenCV (cv2) not available - advanced image processing will be limited")
CV2_AVAILABLE = False
# Import configuration
try:
from config import IMAGE_PREPROCESSING
except ImportError:
# Fallback defaults if config not available
IMAGE_PREPROCESSING = {
"enhance_contrast": 1.5,
"sharpen": True,
"denoise": True,
"max_size_mb": 8.0,
"target_dpi": 300,
"compression_quality": 92
}
def detect_skew(image: Union[Image.Image, np.ndarray]) -> float:
"""
Quick skew detection that returns angle in degrees.
Uses a computationally efficient approach by analyzing at 1% resolution.
Args:
image: PIL Image or numpy array
Returns:
Estimated skew angle in degrees (positive or negative)
"""
# Convert PIL Image to numpy array if needed
if isinstance(image, Image.Image):
# Convert to grayscale for processing
if image.mode != 'L':
img_np = np.array(image.convert('L'))
else:
img_np = np.array(image)
else:
# If already numpy array, ensure it's grayscale
if len(image.shape) == 3:
if CV2_AVAILABLE:
img_np = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
else:
# Fallback grayscale conversion
img_np = np.mean(image, axis=2).astype(np.uint8)
else:
img_np = image
# Downsample to 1% resolution for faster processing
height, width = img_np.shape
target_size = int(min(width, height) * 0.01)
# Use a sane minimum size and ensure we have enough pixels to detect lines
target_size = max(target_size, 100)
if CV2_AVAILABLE:
# OpenCV-based implementation (faster)
# Resize the image to the target size
scale_factor = target_size / max(width, height)
small_img = cv2.resize(img_np, None, fx=scale_factor, fy=scale_factor, interpolation=cv2.INTER_AREA)
# Apply binary thresholding to get cleaner edges
_, binary = cv2.threshold(small_img, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
# Use Hough Line Transform to detect lines
lines = cv2.HoughLinesP(binary, 1, np.pi/180, threshold=target_size//10,
minLineLength=target_size//5, maxLineGap=target_size//10)
if lines is None or len(lines) < 3:
# Not enough lines detected, assume no significant skew
return 0.0
# Calculate angles of lines
angles = []
for line in lines:
x1, y1, x2, y2 = line[0]
if x2 - x1 == 0: # Avoid division by zero
continue
angle = math.atan2(y2 - y1, x2 - x1) * 180.0 / np.pi
# Normalize angle to -45 to 45 range
angle = angle % 180
if angle > 90:
angle -= 180
if angle > 45:
angle -= 90
if angle < -45:
angle += 90
angles.append(angle)
if not angles:
return 0.0
# Use median to reduce impact of outliers
angles.sort()
median_angle = angles[len(angles) // 2]
return median_angle
else:
# PIL-only fallback implementation
# Resize using PIL
small_img = Image.fromarray(img_np).resize(
(int(width * target_size / max(width, height)),
int(height * target_size / max(width, height))),
Image.NEAREST
)
# Find edges
edges = small_img.filter(ImageFilter.FIND_EDGES)
edges_data = np.array(edges)
# Simple edge orientation analysis (less precise than OpenCV)
# Count horizontal vs vertical edges
h_edges = np.sum(np.abs(np.diff(edges_data, axis=1)))
v_edges = np.sum(np.abs(np.diff(edges_data, axis=0)))
# If horizontal edges dominate, no significant skew
if h_edges > v_edges * 1.2:
return 0.0
# Simple angle estimation based on edge distribution
# This is a simplified approach that works for slight skews
rows, cols = edges_data.shape
xs, ys = [], []
# Sample strong edge points
for r in range(0, rows, 2):
for c in range(0, cols, 2):
if edges_data[r, c] > 128:
xs.append(c)
ys.append(r)
if len(xs) < 10: # Not enough edge points
return 0.0
# Use simple linear regression to estimate the slope
n = len(xs)
mean_x = sum(xs) / n
mean_y = sum(ys) / n
# Calculate slope
numerator = sum((xs[i] - mean_x) * (ys[i] - mean_y) for i in range(n))
denominator = sum((xs[i] - mean_x) ** 2 for i in range(n))
if abs(denominator) < 1e-6: # Avoid division by zero
return 0.0
slope = numerator / denominator
angle = math.atan(slope) * 180.0 / math.pi
# Normalize to -45 to 45 degrees
if angle > 45:
angle -= 90
elif angle < -45:
angle += 90
return angle
def replace_images_in_markdown(md: str, images: dict[str, str]) -> str:
"""
Replace image placeholders in markdown with base64-encoded images.
Uses regex-based matching to handle variations in image IDs and formats.
Args:
md: Markdown text containing image placeholders
images: Dictionary mapping image IDs to base64 strings
Returns:
Markdown text with images replaced by base64 data
"""
# Process each image ID in the dictionary
for img_id, base64_str in images.items():
# Extract the base ID without extension for more flexible matching
base_id = img_id.split('.')[0]
# Match markdown image pattern where URL contains the base ID
# Using a single regex with groups to capture the full pattern
pattern = re.compile(rf'!\[([^\]]*)\]\(([^\)]*{base_id}[^\)]*)\)')
# Process all matches
matches = list(pattern.finditer(md))
for match in reversed(matches): # Process in reverse to avoid offset issues
# Replace the entire match with a properly formatted base64 image
md = md[:match.start()] + f"![{img_id}](data:image/jpeg;base64,{base64_str})" + md[match.end():]
return md
def get_combined_markdown(ocr_response) -> str:
"""
Combine OCR text and images into a single markdown document.
Args:
ocr_response: OCR response object from Mistral AI
Returns:
Combined markdown string with embedded images
"""
markdowns = []
# Process each page of the OCR response
for page in ocr_response.pages:
# Extract image data if available
image_data = {}
if hasattr(page, "images"):
for img in page.images:
if hasattr(img, "id") and hasattr(img, "image_base64"):
image_data[img.id] = img.image_base64
# Replace image placeholders with base64 data
page_markdown = page.markdown if hasattr(page, "markdown") else ""
processed_markdown = replace_images_in_markdown(page_markdown, image_data)
markdowns.append(processed_markdown)
# Join all pages' markdown with double newlines
return "\n\n".join(markdowns)
def encode_image_for_api(image_path: Union[str, Path]) -> str:
"""
Encode an image as base64 data URL for API submission.
Args:
image_path: Path to the image file
Returns:
Base64 data URL for the image
"""
# Convert to Path object if string
image_file = Path(image_path) if isinstance(image_path, str) else image_path
# Verify image exists
if not image_file.is_file():
raise FileNotFoundError(f"Image file not found: {image_file}")
# Determine mime type based on file extension
mime_type = 'image/jpeg' # Default mime type
suffix = image_file.suffix.lower()
if suffix == '.png':
mime_type = 'image/png'
elif suffix == '.gif':
mime_type = 'image/gif'
elif suffix in ['.jpg', '.jpeg']:
mime_type = 'image/jpeg'
elif suffix == '.pdf':
mime_type = 'application/pdf'
# Encode image as base64
encoded = base64.b64encode(image_file.read_bytes()).decode()
return f"data:{mime_type};base64,{encoded}"
def encode_bytes_for_api(file_bytes: bytes, mime_type: str) -> str:
"""
Encode binary data as base64 data URL for API submission.
Args:
file_bytes: Binary file data
mime_type: MIME type of the file (e.g., 'image/jpeg', 'application/pdf')
Returns:
Base64 data URL for the data
"""
# Encode data as base64
encoded = base64.b64encode(file_bytes).decode()
return f"data:{mime_type};base64,{encoded}"
def calculate_image_entropy(pil_img: Image.Image) -> float:
"""
Calculate the entropy of a PIL image.
Entropy is a measure of randomness; low entropy indicates a blank or simple image,
high entropy indicates more complex content (e.g., text or detailed images).
Args:
pil_img: PIL Image object
Returns:
float: Entropy value
"""
# Convert to grayscale for entropy calculation
gray_img = pil_img.convert("L")
arr = np.array(gray_img)
# Compute histogram
hist, _ = np.histogram(arr, bins=256, range=(0, 255), density=True)
# Remove zero entries to avoid log(0)
hist = hist[hist > 0]
# Calculate entropy
entropy = -np.sum(hist * np.log2(hist))
return float(entropy)
def estimate_text_density(image_np):
"""
Estimate text density patterns in an image.
Returns metrics on text distribution and special cases.
Args:
image_np: Numpy array of the image
Returns:
dict: Text density metrics
"""
# Convert to grayscale
if len(image_np.shape) > 2 and image_np.shape[2] == 3:
gray = cv2.cvtColor(image_np, cv2.COLOR_RGB2GRAY)
else:
gray = image_np
# Binarize image
_, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
# Analyze vertical text density profile (important for headers/footers)
height, width = gray.shape
vertical_profile = np.sum(binary, axis=1) / width
# Analyze horizontal text density profile
horizontal_profile = np.sum(binary, axis=0) / height
# Calculate statistics
v_mean = np.mean(vertical_profile)
v_std = np.std(vertical_profile)
v_max = np.max(vertical_profile)
# Detect uppercase text regions (common in headers of Baldwin document)
# Uppercase text tends to have more consistent height and uniform vertical density
section_height = height // 10 # Divide into 10 vertical sections
uppercase_sections = 0
for i in range(0, height, section_height):
section = binary[i:min(i+section_height, height), :]
section_profile = np.sum(section, axis=1) / width
# Uppercase characteristics: high density with low variation
if np.mean(section_profile) > v_mean * 1.5 and np.std(section_profile) < v_std * 0.7:
uppercase_sections += 1
# Determine overall pattern
if v_std / v_mean > 0.8:
pattern = 'varied' # High variance indicates sections with different text densities
else:
pattern = 'uniform' # Low variance indicates uniform text distribution
return {
'mean_density': float(v_mean),
'density_variation': float(v_std),
'pattern': pattern,
'uppercase_sections': uppercase_sections,
'max_density': float(v_max)
}
def serialize_ocr_object(obj):
"""
Serialize OCR response objects to JSON serializable format.
Handles OCRImageObject specifically to prevent serialization errors.
Args:
obj: The object to serialize
Returns:
JSON serializable representation of the object
"""
# Fast path: Handle primitive types directly
if obj is None or isinstance(obj, (str, int, float, bool)):
return obj
# Handle collections
if isinstance(obj, list):
return [serialize_ocr_object(item) for item in obj]
elif isinstance(obj, dict):
return {k: serialize_ocr_object(v) for k, v in obj.items()}
elif isinstance(obj, OCRImageObject):
# Special handling for OCRImageObject
return {
'id': obj.id if hasattr(obj, 'id') else None,
'image_base64': obj.image_base64 if hasattr(obj, 'image_base64') else None
}
elif hasattr(obj, '__dict__'):
# For objects with __dict__ attribute
return {k: serialize_ocr_object(v) for k, v in obj.__dict__.items()
if not k.startswith('_')} # Skip private attributes
else:
# Try to convert to string as last resort
try:
return str(obj)
except:
return None
# Clean OCR result with focus on Mistral compatibility
def clean_ocr_result(result, use_segmentation=False, vision_enabled=True, preprocessing_options=None):
"""
Clean text content in OCR results, preserving original structure from Mistral API.
Only removes markdown/HTML conflicts without duplicating content across fields.
Args:
result: OCR result object or dictionary
use_segmentation: Whether image segmentation was used
vision_enabled: Whether vision model was used
preprocessing_options: Dictionary of preprocessing options
Returns:
Cleaned result object
"""
if not result:
return result
# Import text utilities for cleaning
try:
from utils.text_utils import clean_raw_text
text_cleaner_available = True
except ImportError:
text_cleaner_available = False
def clean_text(text):
"""Clean text content, removing markdown image references and base64 data"""
if not text or not isinstance(text, str):
return ""
if text_cleaner_available:
text = clean_raw_text(text)
else:
# Remove image references like ![image](data:image/...)
text = re.sub(r'!\[.*?\]\(data:image/[^)]+\)', '', text)
# Remove basic markdown image references like ![alt](img-1.jpg)
text = re.sub(r'!\[[^\]]*\]\([^)]+\)', '', text)
# Remove base64 encoded image data
text = re.sub(r'data:image/[^;]+;base64,[a-zA-Z0-9+/=]+', '', text)
# Clean up any JSON-like image object references
text = re.sub(r'{"image(_data)?":("[^"]*"|null|true|false|\{[^}]*\}|\[[^\]]*\])}', '', text)
# Clean up excessive whitespace and line breaks created by removals
text = re.sub(r'\n{3,}', '\n\n', text)
text = re.sub(r'\s{3,}', ' ', text)
return text.strip()
# Process dictionary
if isinstance(result, dict):
# For PDF documents, preserve original structure from Mistral API
is_pdf = result.get('file_type', '') == 'pdf' or (
result.get('file_name', '').lower().endswith('.pdf')
)
# Ensure ocr_contents exists
if 'ocr_contents' not in result:
result['ocr_contents'] = {}
# Clean raw_text if it exists but don't duplicate it
if 'raw_text' in result:
result['raw_text'] = clean_text(result['raw_text'])
# Handle ocr_contents fields - clean them but don't duplicate
if 'ocr_contents' in result:
for key, value in list(result['ocr_contents'].items()):
# Skip binary fields and image data
if key in ['image_base64', 'images', 'binary_data'] and value:
continue
# Clean string values to remove markdown/HTML conflicts
if isinstance(value, str):
result['ocr_contents'][key] = clean_text(value)
# Handle segmentation data
if use_segmentation and preprocessing_options and 'segmentation_data' in preprocessing_options:
# Store segmentation metadata
result['segmentation_applied'] = True
# Extract combined text if available
if 'combined_text' in preprocessing_options['segmentation_data']:
segmentation_text = clean_text(preprocessing_options['segmentation_data']['combined_text'])
# Add as dedicated field
result['ocr_contents']['segmentation_text'] = segmentation_text
# IMPORTANT: For documents with overlapping regions like baldwin-15th-north,
# the intelligently merged segmentation text is more accurate than the raw OCR
# Always use segmentation text as the primary source when available
# This ensures clean, non-duplicated content from overlapping regions
result['ocr_contents']['raw_text'] = segmentation_text
# Also update the 'text' field which is used in some contexts
if 'text' in result['ocr_contents']:
result['ocr_contents']['text'] = segmentation_text
# Clean pages_data if available (Mistral OCR format)
if 'pages_data' in result:
for page in result['pages_data']:
if isinstance(page, dict):
# Clean text field
if 'text' in page:
page['text'] = clean_text(page['text'])
# Clean markdown field
if 'markdown' in page:
page['markdown'] = clean_text(page['markdown'])
# Handle list content recursively
elif isinstance(result, list):
return [clean_ocr_result(item, use_segmentation, vision_enabled, preprocessing_options)
for item in result]
return result
def create_results_zip(results, output_dir=None, zip_name=None):
"""
Create a zip file containing OCR results.
Args:
results: Dictionary or list of OCR results
output_dir: Optional output directory
zip_name: Optional zip file name
Returns:
Path to the created zip file
"""
# Create temporary output directory if not provided
if output_dir is None:
output_dir = Path.cwd() / "output"
output_dir.mkdir(exist_ok=True)
else:
output_dir = Path(output_dir)
output_dir.mkdir(exist_ok=True)
# Generate zip name if not provided
if zip_name is None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
if isinstance(results, list):
# For a list of results, create a descriptive name
file_count = len(results)
zip_name = f"ocr_results_{file_count}_{timestamp}.zip"
else:
# For single result, create descriptive filename
base_name = results.get('file_name', 'document').split('.')[0]
zip_name = f"{base_name}_{timestamp}.zip"
try:
# Get zip data in memory first
zip_data = create_results_zip_in_memory(results)
# Save to file
zip_path = output_dir / zip_name
with open(zip_path, 'wb') as f:
f.write(zip_data)
return zip_path
except Exception as e:
# Create an empty zip file as fallback
logger.error(f"Error creating zip file: {str(e)}")
zip_path = output_dir / zip_name
with zipfile.ZipFile(zip_path, 'w') as zipf:
zipf.writestr("info.txt", "Could not create complete archive")
return zip_path
def create_results_zip_in_memory(results):
"""
Create a zip file containing OCR results in memory.
Packages markdown with embedded image tags, raw text, and JSON file
in a contextually relevant structure.
Args:
results: Dictionary or list of OCR results
Returns:
Binary zip file data
"""
# Create a BytesIO object
zip_buffer = io.BytesIO()
# Create a ZipFile instance
with zipfile.ZipFile(zip_buffer, 'w', compression=zipfile.ZIP_DEFLATED) as zipf:
# Check if results is a list or a dictionary
is_list = isinstance(results, list)
if is_list:
# Handle multiple results by creating subdirectories
for idx, result in enumerate(results):
if result and isinstance(result, dict):
# Create a folder name based on the file name or index
folder_name = result.get('file_name', f'document_{idx+1}')
folder_name = Path(folder_name).stem # Remove file extension
# Add files to this folder
add_result_files_to_zip(zipf, result, f"{folder_name}/")
else:
# Single result - add files directly to root of zip
add_result_files_to_zip(zipf, results)
# Seek to the beginning of the BytesIO object
zip_buffer.seek(0)
# Return the zip file bytes
return zip_buffer.getvalue()
def truncate_base64_in_result(result, prefix_length=32, suffix_length=32):
"""
Create a copy of the result dictionary with base64 image data truncated.
This keeps the structure intact while making the JSON more readable.
Args:
result: OCR result dictionary
prefix_length: Number of characters to keep at the beginning
suffix_length: Number of characters to keep at the end
Returns:
Dictionary with truncated base64 data
"""
if not result or not isinstance(result, dict):
return {}
# Create a deep copy to avoid modifying the original
import copy
truncated_result = copy.deepcopy(result)
# Helper function to truncate base64 strings
def truncate_base64(data):
if not isinstance(data, str) or len(data) <= prefix_length + suffix_length + 10:
return data
# Extract prefix and suffix based on whether this is a data URI or raw base64
if data.startswith('data:'):
# Handle data URIs like 'data:image/jpeg;base64,/9j/4AAQ...'
parts = data.split(',', 1)
if len(parts) != 2:
return data # Unexpected format, return as is
header = parts[0] + ','
base64_content = parts[1]
if len(base64_content) <= prefix_length + suffix_length + 10:
return data # Not long enough to truncate
truncated = (f"{header}{base64_content[:prefix_length]}..."
f"[truncated {len(base64_content) - prefix_length - suffix_length} chars]..."
f"{base64_content[-suffix_length:]}")
else:
# Handle raw base64 strings
truncated = (f"{data[:prefix_length]}..."
f"[truncated {len(data) - prefix_length - suffix_length} chars]..."
f"{data[-suffix_length:]}")
return truncated
# Helper function to recursively truncate base64 in nested structures
def truncate_base64_recursive(obj):
if isinstance(obj, dict):
# Check for keys that typically contain base64 data
for key in list(obj.keys()):
if key in ['image_base64', 'base64'] and isinstance(obj[key], str):
obj[key] = truncate_base64(obj[key])
elif isinstance(obj[key], (dict, list)):
truncate_base64_recursive(obj[key])
elif isinstance(obj, list):
for item in obj:
if isinstance(item, (dict, list)):
truncate_base64_recursive(item)
# Truncate base64 data throughout the result
truncate_base64_recursive(truncated_result)
# Specifically handle the pages_data structure
if 'pages_data' in truncated_result:
for page in truncated_result['pages_data']:
if isinstance(page, dict) and 'images' in page:
for img in page['images']:
if isinstance(img, dict) and 'image_base64' in img and isinstance(img['image_base64'], str):
img['image_base64'] = truncate_base64(img['image_base64'])
# Handle raw_response_data if present
if 'raw_response_data' in truncated_result and isinstance(truncated_result['raw_response_data'], dict):
if 'pages' in truncated_result['raw_response_data']:
for page in truncated_result['raw_response_data']['pages']:
if isinstance(page, dict) and 'images' in page:
for img in page['images']:
if isinstance(img, dict) and 'base64' in img and isinstance(img['base64'], str):
img['base64'] = truncate_base64(img['base64'])
return truncated_result
def clean_base64_from_result(result):
"""
Create a clean copy of the result dictionary with base64 image data removed.
This ensures JSON files don't contain large base64 strings.
Args:
result: OCR result dictionary
Returns:
Cleaned dictionary without base64 data
"""
if not result or not isinstance(result, dict):
return {}
# Create a deep copy to avoid modifying the original
import copy
clean_result = copy.deepcopy(result)
# Helper function to recursively clean base64 from nested structures
def clean_base64_recursive(obj):
if isinstance(obj, dict):
# Check for keys that typically contain base64 data
for key in list(obj.keys()):
if key in ['image_base64', 'base64']:
obj[key] = "[BASE64_DATA_REMOVED]"
elif isinstance(obj[key], (dict, list)):
clean_base64_recursive(obj[key])
elif isinstance(obj, list):
for item in obj:
if isinstance(item, (dict, list)):
clean_base64_recursive(item)
# Clean the entire result
clean_base64_recursive(clean_result)
# Specifically handle the pages_data structure
if 'pages_data' in clean_result:
for page in clean_result['pages_data']:
if isinstance(page, dict) and 'images' in page:
for img in page['images']:
if isinstance(img, dict) and 'image_base64' in img:
img['image_base64'] = "[BASE64_DATA_REMOVED]"
# Handle raw_response_data if present
if 'raw_response_data' in clean_result and isinstance(clean_result['raw_response_data'], dict):
if 'pages' in clean_result['raw_response_data']:
for page in clean_result['raw_response_data']['pages']:
if isinstance(page, dict) and 'images' in page:
for img in page['images']:
if isinstance(img, dict) and 'base64' in img:
img['base64'] = "[BASE64_DATA_REMOVED]"
return clean_result
def create_markdown_with_file_references(result, image_path_prefix="images/"):
"""
Create a markdown document with file references to images instead of base64 embedding.
Ideal for use in zip archives where images are stored as separate files.
Args:
result: OCR result dictionary
image_path_prefix: Path prefix for image references (e.g., "images/")
Returns:
Markdown content as string with file references
"""
# Similar to create_markdown_with_images but uses file references
# Import content utils to use classification functions
try:
from utils.content_utils import classify_document_content, extract_document_text, extract_image_description
content_utils_available = True
except ImportError:
content_utils_available = False
# Get content classification
has_text = True
has_images = False
if content_utils_available:
classification = classify_document_content(result)
has_text = classification['has_content']
has_images = result.get('has_images', False)
else:
# Minimal fallback detection
if 'has_images' in result:
has_images = result['has_images']
# Check for image data more thoroughly
if 'pages_data' in result and isinstance(result['pages_data'], list):
for page in result['pages_data']:
if isinstance(page, dict) and 'images' in page and page['images']:
has_images = True
break
# Start building the markdown document
md = []
# Add document title/header
md.append(f"# {result.get('file_name', 'Document')}\n")
# Add metadata section
md.append("## Document Metadata\n")
# Add timestamp
if 'timestamp' in result:
md.append(f"**Processed:** {result['timestamp']}\n")
# Add languages if available
if 'languages' in result and result['languages']:
languages = [lang for lang in result['languages'] if lang]
if languages:
md.append(f"**Languages:** {', '.join(languages)}\n")
# Add document type and topics
if 'detected_document_type' in result:
md.append(f"**Document Type:** {result['detected_document_type']}\n")
if 'topics' in result and result['topics']:
md.append(f"**Topics:** {', '.join(result['topics'])}\n")
md.append("\n---\n")
# Document title - extract from result if available
if 'ocr_contents' in result and 'title' in result['ocr_contents'] and result['ocr_contents']['title']:
title_content = result['ocr_contents']['title']
md.append(f"## {title_content}\n")
# Add images if present
if has_images and 'pages_data' in result:
md.append("## Images\n")
# Extract and display all images with file references
for page_idx, page in enumerate(result['pages_data']):
if 'images' in page and isinstance(page['images'], list):
for img_idx, img in enumerate(page['images']):
if 'image_base64' in img:
# Create image reference to file in the zip
image_filename = f"image_{page_idx+1}_{img_idx+1}.jpg"
image_path = f"{image_path_prefix}{image_filename}"
image_caption = f"Image {page_idx+1}-{img_idx+1}"
md.append(f"![{image_caption}]({image_path})\n")
# Add image description if available through utils
if content_utils_available:
description = extract_image_description(result)
if description:
md.append(f"*{description}*\n")
md.append("\n---\n")
# Add document text section
md.append("## Text Content\n")
# Extract text content systematically
text_content = ""
structured_sections = {}
# Helper function to extract clean text from dictionary objects
def extract_clean_text(content):
if isinstance(content, str):
# Check if content is a stringified JSON
if content.strip().startswith("{") and content.strip().endswith("}"):
try:
# Try to parse as JSON
content_dict = json.loads(content.replace("'", '"'))
if 'text' in content_dict:
return content_dict['text']
return content
except:
return content
return content
elif isinstance(content, dict):
# If it's a dictionary with a 'text' key, return just that value
if 'text' in content and isinstance(content['text'], str):
return content['text']
return content
return content
if content_utils_available:
# Use the systematic utility function for main text
text_content = extract_document_text(result)
text_content = extract_clean_text(text_content)
# Collect all available structured sections
if 'ocr_contents' in result:
for field, content in result['ocr_contents'].items():
# Skip certain fields that are handled separately
if field in ["raw_text", "error", "partial_text", "main_text"]:
continue
if content:
# Extract clean text from content if possible
clean_content = extract_clean_text(content)
# Add this as a structured section
structured_sections[field] = clean_content
else:
# Fallback extraction logic
if 'ocr_contents' in result:
# First find main text
for field in ["main_text", "content", "text", "transcript", "raw_text"]:
if field in result['ocr_contents'] and result['ocr_contents'][field]:
content = result['ocr_contents'][field]
if isinstance(content, str) and content.strip():
text_content = content
break
elif isinstance(content, dict):
# Try to convert complex objects to string
try:
text_content = json.dumps(content, indent=2)
break
except:
pass
# Then collect all structured sections
for field, content in result['ocr_contents'].items():
# Skip certain fields that are handled separately
if field in ["raw_text", "error", "partial_text", "main_text", "content", "text", "transcript"]:
continue
if content:
# Add this as a structured section
structured_sections[field] = content
# Add the main text content - display raw text without a field label
if text_content:
# Check if this is from raw_text (based on content match)
is_raw_text = False
if 'ocr_contents' in result and 'raw_text' in result['ocr_contents']:
if result['ocr_contents']['raw_text'] == text_content:
is_raw_text = True
# Display content without adding a "raw_text:" label
md.append(text_content + "\n\n")
# Add structured sections if available
if structured_sections:
for section_name, section_content in structured_sections.items():
# Use proper markdown header for sections - consistently capitalize all section names
display_name = section_name.replace("_", " ").capitalize()
# Handle different content types
if isinstance(section_content, str):
md.append(section_content + "\n\n")
elif isinstance(section_content, dict):
# Dictionary content - format as key-value pairs
for key, value in section_content.items():
# Treat all values as plain text to maintain content purity
# This prevents JSON-like structures from being formatted as code blocks
md.append(f"**{key}:** {value}\n\n")
elif isinstance(section_content, list):
# List content - create a markdown list
for item in section_content:
# Treat all items as plain text
md.append(f"- {item}\n")
md.append("\n")
# Join all markdown parts into a single string
return "\n".join(md)
def add_result_files_to_zip(zipf, result, prefix=""):
"""
Add files for a single result to a zip file.
Args:
zipf: ZipFile instance to add files to
result: OCR result dictionary
prefix: Optional prefix for file paths in the zip
"""
if not result or not isinstance(result, dict):
return
# Create a timestamp for filename if not in result
timestamp = result.get('timestamp', datetime.now().strftime("%Y-%m-%d_%H-%M-%S"))
# Get base name for files
file_name = result.get('file_name', 'document')
base_name = Path(file_name).stem
try:
# 1. Add JSON file - with base64 data cleaned out
clean_result = clean_base64_from_result(result)
json_str = json.dumps(clean_result, indent=2)
zipf.writestr(f"{prefix}{base_name}.json", json_str)
# 2. Add markdown file that exactly matches Tab 1 display
# Use the create_markdown_with_images function to ensure it matches the UI exactly
try:
markdown_content = create_markdown_with_images(result)
zipf.writestr(f"{prefix}{base_name}.md", markdown_content)
except Exception as e:
logger.error(f"Error creating markdown: {str(e)}")
# Fallback to simpler markdown if error occurs
zipf.writestr(f"{prefix}{base_name}.md", f"# {file_name}\n\nError generating complete markdown output.")
# Extract and save images first to ensure they exist before creating markdown
img_paths = {}
has_images = result.get('has_images', False)
# 3. Add individual images if available
if has_images and 'pages_data' in result:
img_folder = f"{prefix}images/"
for page_idx, page in enumerate(result['pages_data']):
if 'images' in page and isinstance(page['images'], list):
for img_idx, img in enumerate(page['images']):
if 'image_base64' in img and img['image_base64']:
# Extract the base64 data
try:
# Get the base64 data
img_data = img['image_base64']
# Handle the base64 data carefully
if isinstance(img_data, str):
# If it has a data URI prefix, remove it
if ',' in img_data and ';base64,' in img_data:
# Keep the complete data after the comma
img_data = img_data.split(',', 1)[1]
# Make sure we have the complete data (not truncated)
try:
# Decode the base64 data with padding correction
# Add padding if needed to prevent truncation errors
missing_padding = len(img_data) % 4
if missing_padding:
img_data += '=' * (4 - missing_padding)
img_bytes = base64.b64decode(img_data)
except Exception as e:
logger.error(f"Base64 decoding error: {str(e)} for image {page_idx}-{img_idx}")
# Skip this image if we can't decode it
continue
else:
# If it's not a string (e.g., already bytes), use it directly
img_bytes = img_data
# Create image filename
image_filename = f"image_{page_idx+1}_{img_idx+1}.jpg"
img_paths[(page_idx, img_idx)] = image_filename
# Write the image to the zip file
zipf.writestr(f"{img_folder}{image_filename}", img_bytes)
except Exception as e:
logger.warning(f"Could not add image to zip: {str(e)}")
# 4. Add markdown with file references to images for offline viewing
try:
if has_images:
# Create markdown with file references
file_ref_markdown = create_markdown_with_file_references(result, "images/")
zipf.writestr(f"{prefix}{base_name}_with_files.md", file_ref_markdown)
except Exception as e:
logger.warning(f"Error creating markdown with file references: {str(e)}")
# 5. Add README.txt with explanation of file contents
readme_content = f"""
OCR RESULTS FOR: {file_name}
Processed: {timestamp}
This archive contains the following files:
- {base_name}.json: Complete JSON data with all extracted information
- {base_name}.md: Markdown document with embedded base64 images (exactly as shown in the app)
- {base_name}_with_files.md: Alternative markdown with file references instead of base64 (for offline viewing)
- images/ folder: Contains extracted images from the document (if present)
Generated by Historical OCR using Mistral AI
"""
zipf.writestr(f"{prefix}README.txt", readme_content.strip())
except Exception as e:
logger.error(f"Error adding files to zip: {str(e)}")
def create_markdown_with_images(result):
"""
Create a clean Markdown document from OCR results that properly preserves
image references and text structure, following the principle of content purity.
Args:
result: OCR result dictionary
Returns:
Markdown content as string
"""
# Similar to create_markdown_with_file_references but embeds base64 images
# Import content utils to use classification functions
try:
from utils.content_utils import classify_document_content, extract_document_text, extract_image_description
content_utils_available = True
except ImportError:
content_utils_available = False
# Get content classification
has_text = True
has_images = False
if content_utils_available:
classification = classify_document_content(result)
has_text = classification['has_content']
has_images = result.get('has_images', False)
else:
# Minimal fallback detection
if 'has_images' in result:
has_images = result['has_images']
# Check for image data more thoroughly
if 'pages_data' in result and isinstance(result['pages_data'], list):
for page in result['pages_data']:
if isinstance(page, dict) and 'images' in page and page['images']:
has_images = True
break
# Start building the markdown document
md = []
# Add document title/header
md.append(f"# {result.get('file_name', 'Document')}\n")
# Add metadata section
md.append("## Document Metadata\n")
# Add timestamp
if 'timestamp' in result:
md.append(f"**Processed:** {result['timestamp']}\n")
# Add languages if available
if 'languages' in result and result['languages']:
languages = [lang for lang in result['languages'] if lang]
if languages:
md.append(f"**Languages:** {', '.join(languages)}\n")
# Add document type and topics
if 'detected_document_type' in result:
md.append(f"**Document Type:** {result['detected_document_type']}\n")
if 'topics' in result and result['topics']:
md.append(f"**Topics:** {', '.join(result['topics'])}\n")
md.append("\n---\n")
# Document title - extract from result if available
if 'ocr_contents' in result and 'title' in result['ocr_contents'] and result['ocr_contents']['title']:
title_content = result['ocr_contents']['title']
md.append(f"## {title_content}\n")
# Add images if present - with base64 embedding
if has_images and 'pages_data' in result:
md.append("## Images\n")
# Extract and display all images with embedded base64
for page_idx, page in enumerate(result['pages_data']):
if 'images' in page and isinstance(page['images'], list):
for img_idx, img in enumerate(page['images']):
if 'image_base64' in img:
# Use the base64 data directly
image_caption = f"Image {page_idx+1}-{img_idx+1}"
img_data = img['image_base64']
# Make sure it has proper data URI format
if isinstance(img_data, str) and not img_data.startswith('data:'):
img_data = f"data:image/jpeg;base64,{img_data}"
md.append(f"![{image_caption}]({img_data})\n")
# Add image description if available through utils
if content_utils_available:
description = extract_image_description(result)
if description:
md.append(f"*{description}*\n")
md.append("\n---\n")
# Add document text section
md.append("## Text Content\n")
# Extract text content systematically
text_content = ""
structured_sections = {}
if content_utils_available:
# Use the systematic utility function for main text
text_content = extract_document_text(result)
# Collect all available structured sections
if 'ocr_contents' in result:
for field, content in result['ocr_contents'].items():
# Skip certain fields that are handled separately
if field in ["raw_text", "error", "partial_text", "main_text"]:
continue
if content:
# Add this as a structured section
structured_sections[field] = content
else:
# Fallback extraction logic
if 'ocr_contents' in result:
# First find main text
for field in ["main_text", "content", "text", "transcript", "raw_text"]:
if field in result['ocr_contents'] and result['ocr_contents'][field]:
content = result['ocr_contents'][field]
if isinstance(content, str) and content.strip():
text_content = content
break
elif isinstance(content, dict):
# Try to convert complex objects to string
try:
text_content = json.dumps(content, indent=2)
break
except:
pass
# Then collect all structured sections
for field, content in result['ocr_contents'].items():
# Skip certain fields that are handled separately
if field in ["raw_text", "error", "partial_text", "main_text", "content", "text", "transcript"]:
continue
if content:
# Add this as a structured section
structured_sections[field] = content
# Add the main text content
if text_content:
md.append(text_content + "\n\n")
# Add structured sections if available
if structured_sections:
for section_name, section_content in structured_sections.items():
# Use proper markdown header for sections - consistently capitalize all section names
display_name = section_name.replace("_", " ").capitalize()
md.append(f"### {display_name}\n")
# Add a separator for clarity
md.append("\n---\n\n")
# Handle different content types
if isinstance(section_content, str):
md.append(section_content + "\n\n")
elif isinstance(section_content, dict):
# Dictionary content - format as key-value pairs
for key, value in section_content.items():
# Treat all values as plain text to maintain content purity
md.append(f"**{key}:** {value}\n\n")
elif isinstance(section_content, list):
# List content - create a markdown list
for item in section_content:
# Keep list items as plain text
md.append(f"- {item}\n")
md.append("\n")
# Join all markdown parts into a single string
return "\n".join(md)