Spaces:
Running
Running
""" | |
Utility functions for OCR image processing with Mistral AI. | |
Contains helper functions for working with OCR responses and image handling. | |
""" | |
# Standard library imports | |
import json | |
import base64 | |
import io | |
import zipfile | |
import logging | |
import re | |
import time | |
import math | |
from datetime import datetime | |
from pathlib import Path | |
from typing import Dict, List, Optional, Union, Any, Tuple | |
from functools import lru_cache | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
# Third-party imports | |
import numpy as np | |
# Mistral AI imports | |
from mistralai import DocumentURLChunk, ImageURLChunk, TextChunk | |
from mistralai.models import OCRImageObject | |
# Check for image processing libraries | |
try: | |
from PIL import Image, ImageEnhance, ImageFilter, ImageOps | |
PILLOW_AVAILABLE = True | |
except ImportError: | |
logger.warning("PIL not available - image preprocessing will be limited") | |
PILLOW_AVAILABLE = False | |
try: | |
import cv2 | |
CV2_AVAILABLE = True | |
except ImportError: | |
logger.warning("OpenCV (cv2) not available - advanced image processing will be limited") | |
CV2_AVAILABLE = False | |
# Import configuration | |
try: | |
from config import IMAGE_PREPROCESSING | |
except ImportError: | |
# Fallback defaults if config not available | |
IMAGE_PREPROCESSING = { | |
"enhance_contrast": 1.5, | |
"sharpen": True, | |
"denoise": True, | |
"max_size_mb": 8.0, | |
"target_dpi": 300, | |
"compression_quality": 92 | |
} | |
def detect_skew(image: Union[Image.Image, np.ndarray]) -> float: | |
""" | |
Quick skew detection that returns angle in degrees. | |
Uses a computationally efficient approach by analyzing at 1% resolution. | |
Args: | |
image: PIL Image or numpy array | |
Returns: | |
Estimated skew angle in degrees (positive or negative) | |
""" | |
# Convert PIL Image to numpy array if needed | |
if isinstance(image, Image.Image): | |
# Convert to grayscale for processing | |
if image.mode != 'L': | |
img_np = np.array(image.convert('L')) | |
else: | |
img_np = np.array(image) | |
else: | |
# If already numpy array, ensure it's grayscale | |
if len(image.shape) == 3: | |
if CV2_AVAILABLE: | |
img_np = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY) | |
else: | |
# Fallback grayscale conversion | |
img_np = np.mean(image, axis=2).astype(np.uint8) | |
else: | |
img_np = image | |
# Downsample to 1% resolution for faster processing | |
height, width = img_np.shape | |
target_size = int(min(width, height) * 0.01) | |
# Use a sane minimum size and ensure we have enough pixels to detect lines | |
target_size = max(target_size, 100) | |
if CV2_AVAILABLE: | |
# OpenCV-based implementation (faster) | |
# Resize the image to the target size | |
scale_factor = target_size / max(width, height) | |
small_img = cv2.resize(img_np, None, fx=scale_factor, fy=scale_factor, interpolation=cv2.INTER_AREA) | |
# Apply binary thresholding to get cleaner edges | |
_, binary = cv2.threshold(small_img, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU) | |
# Use Hough Line Transform to detect lines | |
lines = cv2.HoughLinesP(binary, 1, np.pi/180, threshold=target_size//10, | |
minLineLength=target_size//5, maxLineGap=target_size//10) | |
if lines is None or len(lines) < 3: | |
# Not enough lines detected, assume no significant skew | |
return 0.0 | |
# Calculate angles of lines | |
angles = [] | |
for line in lines: | |
x1, y1, x2, y2 = line[0] | |
if x2 - x1 == 0: # Avoid division by zero | |
continue | |
angle = math.atan2(y2 - y1, x2 - x1) * 180.0 / np.pi | |
# Normalize angle to -45 to 45 range | |
angle = angle % 180 | |
if angle > 90: | |
angle -= 180 | |
if angle > 45: | |
angle -= 90 | |
if angle < -45: | |
angle += 90 | |
angles.append(angle) | |
if not angles: | |
return 0.0 | |
# Use median to reduce impact of outliers | |
angles.sort() | |
median_angle = angles[len(angles) // 2] | |
return median_angle | |
else: | |
# PIL-only fallback implementation | |
# Resize using PIL | |
small_img = Image.fromarray(img_np).resize( | |
(int(width * target_size / max(width, height)), | |
int(height * target_size / max(width, height))), | |
Image.NEAREST | |
) | |
# Find edges | |
edges = small_img.filter(ImageFilter.FIND_EDGES) | |
edges_data = np.array(edges) | |
# Simple edge orientation analysis (less precise than OpenCV) | |
# Count horizontal vs vertical edges | |
h_edges = np.sum(np.abs(np.diff(edges_data, axis=1))) | |
v_edges = np.sum(np.abs(np.diff(edges_data, axis=0))) | |
# If horizontal edges dominate, no significant skew | |
if h_edges > v_edges * 1.2: | |
return 0.0 | |
# Simple angle estimation based on edge distribution | |
# This is a simplified approach that works for slight skews | |
rows, cols = edges_data.shape | |
xs, ys = [], [] | |
# Sample strong edge points | |
for r in range(0, rows, 2): | |
for c in range(0, cols, 2): | |
if edges_data[r, c] > 128: | |
xs.append(c) | |
ys.append(r) | |
if len(xs) < 10: # Not enough edge points | |
return 0.0 | |
# Use simple linear regression to estimate the slope | |
n = len(xs) | |
mean_x = sum(xs) / n | |
mean_y = sum(ys) / n | |
# Calculate slope | |
numerator = sum((xs[i] - mean_x) * (ys[i] - mean_y) for i in range(n)) | |
denominator = sum((xs[i] - mean_x) ** 2 for i in range(n)) | |
if abs(denominator) < 1e-6: # Avoid division by zero | |
return 0.0 | |
slope = numerator / denominator | |
angle = math.atan(slope) * 180.0 / math.pi | |
# Normalize to -45 to 45 degrees | |
if angle > 45: | |
angle -= 90 | |
elif angle < -45: | |
angle += 90 | |
return angle | |
def replace_images_in_markdown(md: str, images: dict[str, str]) -> str: | |
""" | |
Replace image placeholders in markdown with base64-encoded images. | |
Uses regex-based matching to handle variations in image IDs and formats. | |
Args: | |
md: Markdown text containing image placeholders | |
images: Dictionary mapping image IDs to base64 strings | |
Returns: | |
Markdown text with images replaced by base64 data | |
""" | |
# Process each image ID in the dictionary | |
for img_id, base64_str in images.items(): | |
# Extract the base ID without extension for more flexible matching | |
base_id = img_id.split('.')[0] | |
# Match markdown image pattern where URL contains the base ID | |
# Using a single regex with groups to capture the full pattern | |
pattern = re.compile(rf'!\[([^\]]*)\]\(([^\)]*{base_id}[^\)]*)\)') | |
# Process all matches | |
matches = list(pattern.finditer(md)) | |
for match in reversed(matches): # Process in reverse to avoid offset issues | |
# Replace the entire match with a properly formatted base64 image | |
md = md[:match.start()] + f"" + md[match.end():] | |
return md | |
def get_combined_markdown(ocr_response) -> str: | |
""" | |
Combine OCR text and images into a single markdown document. | |
Args: | |
ocr_response: OCR response object from Mistral AI | |
Returns: | |
Combined markdown string with embedded images | |
""" | |
markdowns = [] | |
# Process each page of the OCR response | |
for page in ocr_response.pages: | |
# Extract image data if available | |
image_data = {} | |
if hasattr(page, "images"): | |
for img in page.images: | |
if hasattr(img, "id") and hasattr(img, "image_base64"): | |
image_data[img.id] = img.image_base64 | |
# Replace image placeholders with base64 data | |
page_markdown = page.markdown if hasattr(page, "markdown") else "" | |
processed_markdown = replace_images_in_markdown(page_markdown, image_data) | |
markdowns.append(processed_markdown) | |
# Join all pages' markdown with double newlines | |
return "\n\n".join(markdowns) | |
def encode_image_for_api(image_path: Union[str, Path]) -> str: | |
""" | |
Encode an image as base64 data URL for API submission. | |
Args: | |
image_path: Path to the image file | |
Returns: | |
Base64 data URL for the image | |
""" | |
# Convert to Path object if string | |
image_file = Path(image_path) if isinstance(image_path, str) else image_path | |
# Verify image exists | |
if not image_file.is_file(): | |
raise FileNotFoundError(f"Image file not found: {image_file}") | |
# Determine mime type based on file extension | |
mime_type = 'image/jpeg' # Default mime type | |
suffix = image_file.suffix.lower() | |
if suffix == '.png': | |
mime_type = 'image/png' | |
elif suffix == '.gif': | |
mime_type = 'image/gif' | |
elif suffix in ['.jpg', '.jpeg']: | |
mime_type = 'image/jpeg' | |
elif suffix == '.pdf': | |
mime_type = 'application/pdf' | |
# Encode image as base64 | |
encoded = base64.b64encode(image_file.read_bytes()).decode() | |
return f"data:{mime_type};base64,{encoded}" | |
def encode_bytes_for_api(file_bytes: bytes, mime_type: str) -> str: | |
""" | |
Encode binary data as base64 data URL for API submission. | |
Args: | |
file_bytes: Binary file data | |
mime_type: MIME type of the file (e.g., 'image/jpeg', 'application/pdf') | |
Returns: | |
Base64 data URL for the data | |
""" | |
# Encode data as base64 | |
encoded = base64.b64encode(file_bytes).decode() | |
return f"data:{mime_type};base64,{encoded}" | |
def calculate_image_entropy(pil_img: Image.Image) -> float: | |
""" | |
Calculate the entropy of a PIL image. | |
Entropy is a measure of randomness; low entropy indicates a blank or simple image, | |
high entropy indicates more complex content (e.g., text or detailed images). | |
Args: | |
pil_img: PIL Image object | |
Returns: | |
float: Entropy value | |
""" | |
# Convert to grayscale for entropy calculation | |
gray_img = pil_img.convert("L") | |
arr = np.array(gray_img) | |
# Compute histogram | |
hist, _ = np.histogram(arr, bins=256, range=(0, 255), density=True) | |
# Remove zero entries to avoid log(0) | |
hist = hist[hist > 0] | |
# Calculate entropy | |
entropy = -np.sum(hist * np.log2(hist)) | |
return float(entropy) | |
def estimate_text_density(image_np): | |
""" | |
Estimate text density patterns in an image. | |
Returns metrics on text distribution and special cases. | |
Args: | |
image_np: Numpy array of the image | |
Returns: | |
dict: Text density metrics | |
""" | |
# Convert to grayscale | |
if len(image_np.shape) > 2 and image_np.shape[2] == 3: | |
gray = cv2.cvtColor(image_np, cv2.COLOR_RGB2GRAY) | |
else: | |
gray = image_np | |
# Binarize image | |
_, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU) | |
# Analyze vertical text density profile (important for headers/footers) | |
height, width = gray.shape | |
vertical_profile = np.sum(binary, axis=1) / width | |
# Analyze horizontal text density profile | |
horizontal_profile = np.sum(binary, axis=0) / height | |
# Calculate statistics | |
v_mean = np.mean(vertical_profile) | |
v_std = np.std(vertical_profile) | |
v_max = np.max(vertical_profile) | |
# Detect uppercase text regions (common in headers of Baldwin document) | |
# Uppercase text tends to have more consistent height and uniform vertical density | |
section_height = height // 10 # Divide into 10 vertical sections | |
uppercase_sections = 0 | |
for i in range(0, height, section_height): | |
section = binary[i:min(i+section_height, height), :] | |
section_profile = np.sum(section, axis=1) / width | |
# Uppercase characteristics: high density with low variation | |
if np.mean(section_profile) > v_mean * 1.5 and np.std(section_profile) < v_std * 0.7: | |
uppercase_sections += 1 | |
# Determine overall pattern | |
if v_std / v_mean > 0.8: | |
pattern = 'varied' # High variance indicates sections with different text densities | |
else: | |
pattern = 'uniform' # Low variance indicates uniform text distribution | |
return { | |
'mean_density': float(v_mean), | |
'density_variation': float(v_std), | |
'pattern': pattern, | |
'uppercase_sections': uppercase_sections, | |
'max_density': float(v_max) | |
} | |
def serialize_ocr_object(obj): | |
""" | |
Serialize OCR response objects to JSON serializable format. | |
Handles OCRImageObject specifically to prevent serialization errors. | |
Args: | |
obj: The object to serialize | |
Returns: | |
JSON serializable representation of the object | |
""" | |
# Fast path: Handle primitive types directly | |
if obj is None or isinstance(obj, (str, int, float, bool)): | |
return obj | |
# Handle collections | |
if isinstance(obj, list): | |
return [serialize_ocr_object(item) for item in obj] | |
elif isinstance(obj, dict): | |
return {k: serialize_ocr_object(v) for k, v in obj.items()} | |
elif isinstance(obj, OCRImageObject): | |
# Special handling for OCRImageObject | |
return { | |
'id': obj.id if hasattr(obj, 'id') else None, | |
'image_base64': obj.image_base64 if hasattr(obj, 'image_base64') else None | |
} | |
elif hasattr(obj, '__dict__'): | |
# For objects with __dict__ attribute | |
return {k: serialize_ocr_object(v) for k, v in obj.__dict__.items() | |
if not k.startswith('_')} # Skip private attributes | |
else: | |
# Try to convert to string as last resort | |
try: | |
return str(obj) | |
except: | |
return None | |
# Clean OCR result with focus on Mistral compatibility | |
def clean_ocr_result(result, use_segmentation=False, vision_enabled=True, preprocessing_options=None): | |
""" | |
Clean text content in OCR results, preserving original structure from Mistral API. | |
Only removes markdown/HTML conflicts without duplicating content across fields. | |
Args: | |
result: OCR result object or dictionary | |
use_segmentation: Whether image segmentation was used | |
vision_enabled: Whether vision model was used | |
preprocessing_options: Dictionary of preprocessing options | |
Returns: | |
Cleaned result object | |
""" | |
if not result: | |
return result | |
# Import text utilities for cleaning | |
try: | |
from utils.text_utils import clean_raw_text | |
text_cleaner_available = True | |
except ImportError: | |
text_cleaner_available = False | |
def clean_text(text): | |
"""Clean text content, removing markdown image references and base64 data""" | |
if not text or not isinstance(text, str): | |
return "" | |
if text_cleaner_available: | |
text = clean_raw_text(text) | |
else: | |
# Remove image references like  | |
text = re.sub(r'!\[.*?\]\(data:image/[^)]+\)', '', text) | |
# Remove basic markdown image references like  | |
text = re.sub(r'!\[[^\]]*\]\([^)]+\)', '', text) | |
# Remove base64 encoded image data | |
text = re.sub(r'data:image/[^;]+;base64,[a-zA-Z0-9+/=]+', '', text) | |
# Clean up any JSON-like image object references | |
text = re.sub(r'{"image(_data)?":("[^"]*"|null|true|false|\{[^}]*\}|\[[^\]]*\])}', '', text) | |
# Clean up excessive whitespace and line breaks created by removals | |
text = re.sub(r'\n{3,}', '\n\n', text) | |
text = re.sub(r'\s{3,}', ' ', text) | |
return text.strip() | |
# Process dictionary | |
if isinstance(result, dict): | |
# For PDF documents, preserve original structure from Mistral API | |
is_pdf = result.get('file_type', '') == 'pdf' or ( | |
result.get('file_name', '').lower().endswith('.pdf') | |
) | |
# Ensure ocr_contents exists | |
if 'ocr_contents' not in result: | |
result['ocr_contents'] = {} | |
# Clean raw_text if it exists but don't duplicate it | |
if 'raw_text' in result: | |
result['raw_text'] = clean_text(result['raw_text']) | |
# Handle ocr_contents fields - clean them but don't duplicate | |
if 'ocr_contents' in result: | |
for key, value in list(result['ocr_contents'].items()): | |
# Skip binary fields and image data | |
if key in ['image_base64', 'images', 'binary_data'] and value: | |
continue | |
# Clean string values to remove markdown/HTML conflicts | |
if isinstance(value, str): | |
result['ocr_contents'][key] = clean_text(value) | |
# Handle segmentation data | |
if use_segmentation and preprocessing_options and 'segmentation_data' in preprocessing_options: | |
# Store segmentation metadata | |
result['segmentation_applied'] = True | |
# Extract combined text if available | |
if 'combined_text' in preprocessing_options['segmentation_data']: | |
segmentation_text = clean_text(preprocessing_options['segmentation_data']['combined_text']) | |
# Add as dedicated field | |
result['ocr_contents']['segmentation_text'] = segmentation_text | |
# IMPORTANT: For documents with overlapping regions like baldwin-15th-north, | |
# the intelligently merged segmentation text is more accurate than the raw OCR | |
# Always use segmentation text as the primary source when available | |
# This ensures clean, non-duplicated content from overlapping regions | |
result['ocr_contents']['raw_text'] = segmentation_text | |
# Also update the 'text' field which is used in some contexts | |
if 'text' in result['ocr_contents']: | |
result['ocr_contents']['text'] = segmentation_text | |
# Clean pages_data if available (Mistral OCR format) | |
if 'pages_data' in result: | |
for page in result['pages_data']: | |
if isinstance(page, dict): | |
# Clean text field | |
if 'text' in page: | |
page['text'] = clean_text(page['text']) | |
# Clean markdown field | |
if 'markdown' in page: | |
page['markdown'] = clean_text(page['markdown']) | |
# Handle list content recursively | |
elif isinstance(result, list): | |
return [clean_ocr_result(item, use_segmentation, vision_enabled, preprocessing_options) | |
for item in result] | |
return result | |
def create_results_zip(results, output_dir=None, zip_name=None): | |
""" | |
Create a zip file containing OCR results. | |
Args: | |
results: Dictionary or list of OCR results | |
output_dir: Optional output directory | |
zip_name: Optional zip file name | |
Returns: | |
Path to the created zip file | |
""" | |
# Create temporary output directory if not provided | |
if output_dir is None: | |
output_dir = Path.cwd() / "output" | |
output_dir.mkdir(exist_ok=True) | |
else: | |
output_dir = Path(output_dir) | |
output_dir.mkdir(exist_ok=True) | |
# Generate zip name if not provided | |
if zip_name is None: | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
if isinstance(results, list): | |
# For a list of results, create a descriptive name | |
file_count = len(results) | |
zip_name = f"ocr_results_{file_count}_{timestamp}.zip" | |
else: | |
# For single result, create descriptive filename | |
base_name = results.get('file_name', 'document').split('.')[0] | |
zip_name = f"{base_name}_{timestamp}.zip" | |
try: | |
# Get zip data in memory first | |
zip_data = create_results_zip_in_memory(results) | |
# Save to file | |
zip_path = output_dir / zip_name | |
with open(zip_path, 'wb') as f: | |
f.write(zip_data) | |
return zip_path | |
except Exception as e: | |
# Create an empty zip file as fallback | |
logger.error(f"Error creating zip file: {str(e)}") | |
zip_path = output_dir / zip_name | |
with zipfile.ZipFile(zip_path, 'w') as zipf: | |
zipf.writestr("info.txt", "Could not create complete archive") | |
return zip_path | |
def create_results_zip_in_memory(results): | |
""" | |
Create a zip file containing OCR results in memory. | |
Packages markdown with embedded image tags, raw text, and JSON file | |
in a contextually relevant structure. | |
Args: | |
results: Dictionary or list of OCR results | |
Returns: | |
Binary zip file data | |
""" | |
# Create a BytesIO object | |
zip_buffer = io.BytesIO() | |
# Create a ZipFile instance | |
with zipfile.ZipFile(zip_buffer, 'w', compression=zipfile.ZIP_DEFLATED) as zipf: | |
# Check if results is a list or a dictionary | |
is_list = isinstance(results, list) | |
if is_list: | |
# Handle multiple results by creating subdirectories | |
for idx, result in enumerate(results): | |
if result and isinstance(result, dict): | |
# Create a folder name based on the file name or index | |
folder_name = result.get('file_name', f'document_{idx+1}') | |
folder_name = Path(folder_name).stem # Remove file extension | |
# Add files to this folder | |
add_result_files_to_zip(zipf, result, f"{folder_name}/") | |
else: | |
# Single result - add files directly to root of zip | |
add_result_files_to_zip(zipf, results) | |
# Seek to the beginning of the BytesIO object | |
zip_buffer.seek(0) | |
# Return the zip file bytes | |
return zip_buffer.getvalue() | |
def truncate_base64_in_result(result, prefix_length=32, suffix_length=32): | |
""" | |
Create a copy of the result dictionary with base64 image data truncated. | |
This keeps the structure intact while making the JSON more readable. | |
Args: | |
result: OCR result dictionary | |
prefix_length: Number of characters to keep at the beginning | |
suffix_length: Number of characters to keep at the end | |
Returns: | |
Dictionary with truncated base64 data | |
""" | |
if not result or not isinstance(result, dict): | |
return {} | |
# Create a deep copy to avoid modifying the original | |
import copy | |
truncated_result = copy.deepcopy(result) | |
# Helper function to truncate base64 strings | |
def truncate_base64(data): | |
if not isinstance(data, str) or len(data) <= prefix_length + suffix_length + 10: | |
return data | |
# Extract prefix and suffix based on whether this is a data URI or raw base64 | |
if data.startswith('data:'): | |
# Handle data URIs like 'data:image/jpeg;base64,/9j/4AAQ...' | |
parts = data.split(',', 1) | |
if len(parts) != 2: | |
return data # Unexpected format, return as is | |
header = parts[0] + ',' | |
base64_content = parts[1] | |
if len(base64_content) <= prefix_length + suffix_length + 10: | |
return data # Not long enough to truncate | |
truncated = (f"{header}{base64_content[:prefix_length]}..." | |
f"[truncated {len(base64_content) - prefix_length - suffix_length} chars]..." | |
f"{base64_content[-suffix_length:]}") | |
else: | |
# Handle raw base64 strings | |
truncated = (f"{data[:prefix_length]}..." | |
f"[truncated {len(data) - prefix_length - suffix_length} chars]..." | |
f"{data[-suffix_length:]}") | |
return truncated | |
# Helper function to recursively truncate base64 in nested structures | |
def truncate_base64_recursive(obj): | |
if isinstance(obj, dict): | |
# Check for keys that typically contain base64 data | |
for key in list(obj.keys()): | |
if key in ['image_base64', 'base64'] and isinstance(obj[key], str): | |
obj[key] = truncate_base64(obj[key]) | |
elif isinstance(obj[key], (dict, list)): | |
truncate_base64_recursive(obj[key]) | |
elif isinstance(obj, list): | |
for item in obj: | |
if isinstance(item, (dict, list)): | |
truncate_base64_recursive(item) | |
# Truncate base64 data throughout the result | |
truncate_base64_recursive(truncated_result) | |
# Specifically handle the pages_data structure | |
if 'pages_data' in truncated_result: | |
for page in truncated_result['pages_data']: | |
if isinstance(page, dict) and 'images' in page: | |
for img in page['images']: | |
if isinstance(img, dict) and 'image_base64' in img and isinstance(img['image_base64'], str): | |
img['image_base64'] = truncate_base64(img['image_base64']) | |
# Handle raw_response_data if present | |
if 'raw_response_data' in truncated_result and isinstance(truncated_result['raw_response_data'], dict): | |
if 'pages' in truncated_result['raw_response_data']: | |
for page in truncated_result['raw_response_data']['pages']: | |
if isinstance(page, dict) and 'images' in page: | |
for img in page['images']: | |
if isinstance(img, dict) and 'base64' in img and isinstance(img['base64'], str): | |
img['base64'] = truncate_base64(img['base64']) | |
return truncated_result | |
def clean_base64_from_result(result): | |
""" | |
Create a clean copy of the result dictionary with base64 image data removed. | |
This ensures JSON files don't contain large base64 strings. | |
Args: | |
result: OCR result dictionary | |
Returns: | |
Cleaned dictionary without base64 data | |
""" | |
if not result or not isinstance(result, dict): | |
return {} | |
# Create a deep copy to avoid modifying the original | |
import copy | |
clean_result = copy.deepcopy(result) | |
# Helper function to recursively clean base64 from nested structures | |
def clean_base64_recursive(obj): | |
if isinstance(obj, dict): | |
# Check for keys that typically contain base64 data | |
for key in list(obj.keys()): | |
if key in ['image_base64', 'base64']: | |
obj[key] = "[BASE64_DATA_REMOVED]" | |
elif isinstance(obj[key], (dict, list)): | |
clean_base64_recursive(obj[key]) | |
elif isinstance(obj, list): | |
for item in obj: | |
if isinstance(item, (dict, list)): | |
clean_base64_recursive(item) | |
# Clean the entire result | |
clean_base64_recursive(clean_result) | |
# Specifically handle the pages_data structure | |
if 'pages_data' in clean_result: | |
for page in clean_result['pages_data']: | |
if isinstance(page, dict) and 'images' in page: | |
for img in page['images']: | |
if isinstance(img, dict) and 'image_base64' in img: | |
img['image_base64'] = "[BASE64_DATA_REMOVED]" | |
# Handle raw_response_data if present | |
if 'raw_response_data' in clean_result and isinstance(clean_result['raw_response_data'], dict): | |
if 'pages' in clean_result['raw_response_data']: | |
for page in clean_result['raw_response_data']['pages']: | |
if isinstance(page, dict) and 'images' in page: | |
for img in page['images']: | |
if isinstance(img, dict) and 'base64' in img: | |
img['base64'] = "[BASE64_DATA_REMOVED]" | |
return clean_result | |
def create_markdown_with_file_references(result, image_path_prefix="images/"): | |
""" | |
Create a markdown document with file references to images instead of base64 embedding. | |
Ideal for use in zip archives where images are stored as separate files. | |
Args: | |
result: OCR result dictionary | |
image_path_prefix: Path prefix for image references (e.g., "images/") | |
Returns: | |
Markdown content as string with file references | |
""" | |
# Similar to create_markdown_with_images but uses file references | |
# Import content utils to use classification functions | |
try: | |
from utils.content_utils import classify_document_content, extract_document_text, extract_image_description | |
content_utils_available = True | |
except ImportError: | |
content_utils_available = False | |
# Get content classification | |
has_text = True | |
has_images = False | |
if content_utils_available: | |
classification = classify_document_content(result) | |
has_text = classification['has_content'] | |
has_images = result.get('has_images', False) | |
else: | |
# Minimal fallback detection | |
if 'has_images' in result: | |
has_images = result['has_images'] | |
# Check for image data more thoroughly | |
if 'pages_data' in result and isinstance(result['pages_data'], list): | |
for page in result['pages_data']: | |
if isinstance(page, dict) and 'images' in page and page['images']: | |
has_images = True | |
break | |
# Start building the markdown document | |
md = [] | |
# Add document title/header | |
md.append(f"# {result.get('file_name', 'Document')}\n") | |
# Add metadata section | |
md.append("## Document Metadata\n") | |
# Add timestamp | |
if 'timestamp' in result: | |
md.append(f"**Processed:** {result['timestamp']}\n") | |
# Add languages if available | |
if 'languages' in result and result['languages']: | |
languages = [lang for lang in result['languages'] if lang] | |
if languages: | |
md.append(f"**Languages:** {', '.join(languages)}\n") | |
# Add document type and topics | |
if 'detected_document_type' in result: | |
md.append(f"**Document Type:** {result['detected_document_type']}\n") | |
if 'topics' in result and result['topics']: | |
md.append(f"**Topics:** {', '.join(result['topics'])}\n") | |
md.append("\n---\n") | |
# Document title - extract from result if available | |
if 'ocr_contents' in result and 'title' in result['ocr_contents'] and result['ocr_contents']['title']: | |
title_content = result['ocr_contents']['title'] | |
md.append(f"## {title_content}\n") | |
# Add images if present | |
if has_images and 'pages_data' in result: | |
md.append("## Images\n") | |
# Extract and display all images with file references | |
for page_idx, page in enumerate(result['pages_data']): | |
if 'images' in page and isinstance(page['images'], list): | |
for img_idx, img in enumerate(page['images']): | |
if 'image_base64' in img: | |
# Create image reference to file in the zip | |
image_filename = f"image_{page_idx+1}_{img_idx+1}.jpg" | |
image_path = f"{image_path_prefix}{image_filename}" | |
image_caption = f"Image {page_idx+1}-{img_idx+1}" | |
md.append(f"\n") | |
# Add image description if available through utils | |
if content_utils_available: | |
description = extract_image_description(result) | |
if description: | |
md.append(f"*{description}*\n") | |
md.append("\n---\n") | |
# Add document text section | |
md.append("## Text Content\n") | |
# Extract text content systematically | |
text_content = "" | |
structured_sections = {} | |
# Helper function to extract clean text from dictionary objects | |
def extract_clean_text(content): | |
if isinstance(content, str): | |
# Check if content is a stringified JSON | |
if content.strip().startswith("{") and content.strip().endswith("}"): | |
try: | |
# Try to parse as JSON | |
content_dict = json.loads(content.replace("'", '"')) | |
if 'text' in content_dict: | |
return content_dict['text'] | |
return content | |
except: | |
return content | |
return content | |
elif isinstance(content, dict): | |
# If it's a dictionary with a 'text' key, return just that value | |
if 'text' in content and isinstance(content['text'], str): | |
return content['text'] | |
return content | |
return content | |
if content_utils_available: | |
# Use the systematic utility function for main text | |
text_content = extract_document_text(result) | |
text_content = extract_clean_text(text_content) | |
# Collect all available structured sections | |
if 'ocr_contents' in result: | |
for field, content in result['ocr_contents'].items(): | |
# Skip certain fields that are handled separately | |
if field in ["raw_text", "error", "partial_text", "main_text"]: | |
continue | |
if content: | |
# Extract clean text from content if possible | |
clean_content = extract_clean_text(content) | |
# Add this as a structured section | |
structured_sections[field] = clean_content | |
else: | |
# Fallback extraction logic | |
if 'ocr_contents' in result: | |
# First find main text | |
for field in ["main_text", "content", "text", "transcript", "raw_text"]: | |
if field in result['ocr_contents'] and result['ocr_contents'][field]: | |
content = result['ocr_contents'][field] | |
if isinstance(content, str) and content.strip(): | |
text_content = content | |
break | |
elif isinstance(content, dict): | |
# Try to convert complex objects to string | |
try: | |
text_content = json.dumps(content, indent=2) | |
break | |
except: | |
pass | |
# Then collect all structured sections | |
for field, content in result['ocr_contents'].items(): | |
# Skip certain fields that are handled separately | |
if field in ["raw_text", "error", "partial_text", "main_text", "content", "text", "transcript"]: | |
continue | |
if content: | |
# Add this as a structured section | |
structured_sections[field] = content | |
# Add the main text content - display raw text without a field label | |
if text_content: | |
# Check if this is from raw_text (based on content match) | |
is_raw_text = False | |
if 'ocr_contents' in result and 'raw_text' in result['ocr_contents']: | |
if result['ocr_contents']['raw_text'] == text_content: | |
is_raw_text = True | |
# Display content without adding a "raw_text:" label | |
md.append(text_content + "\n\n") | |
# Add structured sections if available | |
if structured_sections: | |
for section_name, section_content in structured_sections.items(): | |
# Use proper markdown header for sections - consistently capitalize all section names | |
display_name = section_name.replace("_", " ").capitalize() | |
# Handle different content types | |
if isinstance(section_content, str): | |
md.append(section_content + "\n\n") | |
elif isinstance(section_content, dict): | |
# Dictionary content - format as key-value pairs | |
for key, value in section_content.items(): | |
# Treat all values as plain text to maintain content purity | |
# This prevents JSON-like structures from being formatted as code blocks | |
md.append(f"**{key}:** {value}\n\n") | |
elif isinstance(section_content, list): | |
# List content - create a markdown list | |
for item in section_content: | |
# Treat all items as plain text | |
md.append(f"- {item}\n") | |
md.append("\n") | |
# Join all markdown parts into a single string | |
return "\n".join(md) | |
def add_result_files_to_zip(zipf, result, prefix=""): | |
""" | |
Add files for a single result to a zip file. | |
Args: | |
zipf: ZipFile instance to add files to | |
result: OCR result dictionary | |
prefix: Optional prefix for file paths in the zip | |
""" | |
if not result or not isinstance(result, dict): | |
return | |
# Create a timestamp for filename if not in result | |
timestamp = result.get('timestamp', datetime.now().strftime("%Y-%m-%d_%H-%M-%S")) | |
# Get base name for files | |
file_name = result.get('file_name', 'document') | |
base_name = Path(file_name).stem | |
try: | |
# 1. Add JSON file - with base64 data cleaned out | |
clean_result = clean_base64_from_result(result) | |
json_str = json.dumps(clean_result, indent=2) | |
zipf.writestr(f"{prefix}{base_name}.json", json_str) | |
# 2. Add markdown file that exactly matches Tab 1 display | |
# Use the create_markdown_with_images function to ensure it matches the UI exactly | |
try: | |
markdown_content = create_markdown_with_images(result) | |
zipf.writestr(f"{prefix}{base_name}.md", markdown_content) | |
except Exception as e: | |
logger.error(f"Error creating markdown: {str(e)}") | |
# Fallback to simpler markdown if error occurs | |
zipf.writestr(f"{prefix}{base_name}.md", f"# {file_name}\n\nError generating complete markdown output.") | |
# Extract and save images first to ensure they exist before creating markdown | |
img_paths = {} | |
has_images = result.get('has_images', False) | |
# 3. Add individual images if available | |
if has_images and 'pages_data' in result: | |
img_folder = f"{prefix}images/" | |
for page_idx, page in enumerate(result['pages_data']): | |
if 'images' in page and isinstance(page['images'], list): | |
for img_idx, img in enumerate(page['images']): | |
if 'image_base64' in img and img['image_base64']: | |
# Extract the base64 data | |
try: | |
# Get the base64 data | |
img_data = img['image_base64'] | |
# Handle the base64 data carefully | |
if isinstance(img_data, str): | |
# If it has a data URI prefix, remove it | |
if ',' in img_data and ';base64,' in img_data: | |
# Keep the complete data after the comma | |
img_data = img_data.split(',', 1)[1] | |
# Make sure we have the complete data (not truncated) | |
try: | |
# Decode the base64 data with padding correction | |
# Add padding if needed to prevent truncation errors | |
missing_padding = len(img_data) % 4 | |
if missing_padding: | |
img_data += '=' * (4 - missing_padding) | |
img_bytes = base64.b64decode(img_data) | |
except Exception as e: | |
logger.error(f"Base64 decoding error: {str(e)} for image {page_idx}-{img_idx}") | |
# Skip this image if we can't decode it | |
continue | |
else: | |
# If it's not a string (e.g., already bytes), use it directly | |
img_bytes = img_data | |
# Create image filename | |
image_filename = f"image_{page_idx+1}_{img_idx+1}.jpg" | |
img_paths[(page_idx, img_idx)] = image_filename | |
# Write the image to the zip file | |
zipf.writestr(f"{img_folder}{image_filename}", img_bytes) | |
except Exception as e: | |
logger.warning(f"Could not add image to zip: {str(e)}") | |
# 4. Add markdown with file references to images for offline viewing | |
try: | |
if has_images: | |
# Create markdown with file references | |
file_ref_markdown = create_markdown_with_file_references(result, "images/") | |
zipf.writestr(f"{prefix}{base_name}_with_files.md", file_ref_markdown) | |
except Exception as e: | |
logger.warning(f"Error creating markdown with file references: {str(e)}") | |
# 5. Add README.txt with explanation of file contents | |
readme_content = f""" | |
OCR RESULTS FOR: {file_name} | |
Processed: {timestamp} | |
This archive contains the following files: | |
- {base_name}.json: Complete JSON data with all extracted information | |
- {base_name}.md: Markdown document with embedded base64 images (exactly as shown in the app) | |
- {base_name}_with_files.md: Alternative markdown with file references instead of base64 (for offline viewing) | |
- images/ folder: Contains extracted images from the document (if present) | |
Generated by Historical OCR using Mistral AI | |
""" | |
zipf.writestr(f"{prefix}README.txt", readme_content.strip()) | |
except Exception as e: | |
logger.error(f"Error adding files to zip: {str(e)}") | |
def create_markdown_with_images(result): | |
""" | |
Create a clean Markdown document from OCR results that properly preserves | |
image references and text structure, following the principle of content purity. | |
Args: | |
result: OCR result dictionary | |
Returns: | |
Markdown content as string | |
""" | |
# Similar to create_markdown_with_file_references but embeds base64 images | |
# Import content utils to use classification functions | |
try: | |
from utils.content_utils import classify_document_content, extract_document_text, extract_image_description | |
content_utils_available = True | |
except ImportError: | |
content_utils_available = False | |
# Get content classification | |
has_text = True | |
has_images = False | |
if content_utils_available: | |
classification = classify_document_content(result) | |
has_text = classification['has_content'] | |
has_images = result.get('has_images', False) | |
else: | |
# Minimal fallback detection | |
if 'has_images' in result: | |
has_images = result['has_images'] | |
# Check for image data more thoroughly | |
if 'pages_data' in result and isinstance(result['pages_data'], list): | |
for page in result['pages_data']: | |
if isinstance(page, dict) and 'images' in page and page['images']: | |
has_images = True | |
break | |
# Start building the markdown document | |
md = [] | |
# Add document title/header | |
md.append(f"# {result.get('file_name', 'Document')}\n") | |
# Add metadata section | |
md.append("## Document Metadata\n") | |
# Add timestamp | |
if 'timestamp' in result: | |
md.append(f"**Processed:** {result['timestamp']}\n") | |
# Add languages if available | |
if 'languages' in result and result['languages']: | |
languages = [lang for lang in result['languages'] if lang] | |
if languages: | |
md.append(f"**Languages:** {', '.join(languages)}\n") | |
# Add document type and topics | |
if 'detected_document_type' in result: | |
md.append(f"**Document Type:** {result['detected_document_type']}\n") | |
if 'topics' in result and result['topics']: | |
md.append(f"**Topics:** {', '.join(result['topics'])}\n") | |
md.append("\n---\n") | |
# Document title - extract from result if available | |
if 'ocr_contents' in result and 'title' in result['ocr_contents'] and result['ocr_contents']['title']: | |
title_content = result['ocr_contents']['title'] | |
md.append(f"## {title_content}\n") | |
# Add images if present - with base64 embedding | |
if has_images and 'pages_data' in result: | |
md.append("## Images\n") | |
# Extract and display all images with embedded base64 | |
for page_idx, page in enumerate(result['pages_data']): | |
if 'images' in page and isinstance(page['images'], list): | |
for img_idx, img in enumerate(page['images']): | |
if 'image_base64' in img: | |
# Use the base64 data directly | |
image_caption = f"Image {page_idx+1}-{img_idx+1}" | |
img_data = img['image_base64'] | |
# Make sure it has proper data URI format | |
if isinstance(img_data, str) and not img_data.startswith('data:'): | |
img_data = f"data:image/jpeg;base64,{img_data}" | |
md.append(f"\n") | |
# Add image description if available through utils | |
if content_utils_available: | |
description = extract_image_description(result) | |
if description: | |
md.append(f"*{description}*\n") | |
md.append("\n---\n") | |
# Add document text section | |
md.append("## Text Content\n") | |
# Extract text content systematically | |
text_content = "" | |
structured_sections = {} | |
if content_utils_available: | |
# Use the systematic utility function for main text | |
text_content = extract_document_text(result) | |
# Collect all available structured sections | |
if 'ocr_contents' in result: | |
for field, content in result['ocr_contents'].items(): | |
# Skip certain fields that are handled separately | |
if field in ["raw_text", "error", "partial_text", "main_text"]: | |
continue | |
if content: | |
# Add this as a structured section | |
structured_sections[field] = content | |
else: | |
# Fallback extraction logic | |
if 'ocr_contents' in result: | |
# First find main text | |
for field in ["main_text", "content", "text", "transcript", "raw_text"]: | |
if field in result['ocr_contents'] and result['ocr_contents'][field]: | |
content = result['ocr_contents'][field] | |
if isinstance(content, str) and content.strip(): | |
text_content = content | |
break | |
elif isinstance(content, dict): | |
# Try to convert complex objects to string | |
try: | |
text_content = json.dumps(content, indent=2) | |
break | |
except: | |
pass | |
# Then collect all structured sections | |
for field, content in result['ocr_contents'].items(): | |
# Skip certain fields that are handled separately | |
if field in ["raw_text", "error", "partial_text", "main_text", "content", "text", "transcript"]: | |
continue | |
if content: | |
# Add this as a structured section | |
structured_sections[field] = content | |
# Add the main text content | |
if text_content: | |
md.append(text_content + "\n\n") | |
# Add structured sections if available | |
if structured_sections: | |
for section_name, section_content in structured_sections.items(): | |
# Use proper markdown header for sections - consistently capitalize all section names | |
display_name = section_name.replace("_", " ").capitalize() | |
md.append(f"### {display_name}\n") | |
# Add a separator for clarity | |
md.append("\n---\n\n") | |
# Handle different content types | |
if isinstance(section_content, str): | |
md.append(section_content + "\n\n") | |
elif isinstance(section_content, dict): | |
# Dictionary content - format as key-value pairs | |
for key, value in section_content.items(): | |
# Treat all values as plain text to maintain content purity | |
md.append(f"**{key}:** {value}\n\n") | |
elif isinstance(section_content, list): | |
# List content - create a markdown list | |
for item in section_content: | |
# Keep list items as plain text | |
md.append(f"- {item}\n") | |
md.append("\n") | |
# Join all markdown parts into a single string | |
return "\n".join(md) | |