""" Utility functions for OCR image processing with Mistral AI. Contains helper functions for working with OCR responses and image handling. """ # Standard library imports import json import base64 import io import zipfile import logging import re import time import math from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Union, Any, Tuple from functools import lru_cache # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Third-party imports import numpy as np # Mistral AI imports from mistralai import DocumentURLChunk, ImageURLChunk, TextChunk from mistralai.models import OCRImageObject # Check for image processing libraries try: from PIL import Image, ImageEnhance, ImageFilter, ImageOps PILLOW_AVAILABLE = True except ImportError: logger.warning("PIL not available - image preprocessing will be limited") PILLOW_AVAILABLE = False try: import cv2 CV2_AVAILABLE = True except ImportError: logger.warning("OpenCV (cv2) not available - advanced image processing will be limited") CV2_AVAILABLE = False # Import configuration try: from config import IMAGE_PREPROCESSING except ImportError: # Fallback defaults if config not available IMAGE_PREPROCESSING = { "enhance_contrast": 1.5, "sharpen": True, "denoise": True, "max_size_mb": 8.0, "target_dpi": 300, "compression_quality": 92 } def detect_skew(image: Union[Image.Image, np.ndarray]) -> float: """ Quick skew detection that returns angle in degrees. Uses a computationally efficient approach by analyzing at 1% resolution. Args: image: PIL Image or numpy array Returns: Estimated skew angle in degrees (positive or negative) """ # Convert PIL Image to numpy array if needed if isinstance(image, Image.Image): # Convert to grayscale for processing if image.mode != 'L': img_np = np.array(image.convert('L')) else: img_np = np.array(image) else: # If already numpy array, ensure it's grayscale if len(image.shape) == 3: if CV2_AVAILABLE: img_np = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY) else: # Fallback grayscale conversion img_np = np.mean(image, axis=2).astype(np.uint8) else: img_np = image # Downsample to 1% resolution for faster processing height, width = img_np.shape target_size = int(min(width, height) * 0.01) # Use a sane minimum size and ensure we have enough pixels to detect lines target_size = max(target_size, 100) if CV2_AVAILABLE: # OpenCV-based implementation (faster) # Resize the image to the target size scale_factor = target_size / max(width, height) small_img = cv2.resize(img_np, None, fx=scale_factor, fy=scale_factor, interpolation=cv2.INTER_AREA) # Apply binary thresholding to get cleaner edges _, binary = cv2.threshold(small_img, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU) # Use Hough Line Transform to detect lines lines = cv2.HoughLinesP(binary, 1, np.pi/180, threshold=target_size//10, minLineLength=target_size//5, maxLineGap=target_size//10) if lines is None or len(lines) < 3: # Not enough lines detected, assume no significant skew return 0.0 # Calculate angles of lines angles = [] for line in lines: x1, y1, x2, y2 = line[0] if x2 - x1 == 0: # Avoid division by zero continue angle = math.atan2(y2 - y1, x2 - x1) * 180.0 / np.pi # Normalize angle to -45 to 45 range angle = angle % 180 if angle > 90: angle -= 180 if angle > 45: angle -= 90 if angle < -45: angle += 90 angles.append(angle) if not angles: return 0.0 # Use median to reduce impact of outliers angles.sort() median_angle = angles[len(angles) // 2] return median_angle else: # PIL-only fallback implementation # Resize using PIL small_img = Image.fromarray(img_np).resize( (int(width * target_size / max(width, height)), int(height * target_size / max(width, height))), Image.NEAREST ) # Find edges edges = small_img.filter(ImageFilter.FIND_EDGES) edges_data = np.array(edges) # Simple edge orientation analysis (less precise than OpenCV) # Count horizontal vs vertical edges h_edges = np.sum(np.abs(np.diff(edges_data, axis=1))) v_edges = np.sum(np.abs(np.diff(edges_data, axis=0))) # If horizontal edges dominate, no significant skew if h_edges > v_edges * 1.2: return 0.0 # Simple angle estimation based on edge distribution # This is a simplified approach that works for slight skews rows, cols = edges_data.shape xs, ys = [], [] # Sample strong edge points for r in range(0, rows, 2): for c in range(0, cols, 2): if edges_data[r, c] > 128: xs.append(c) ys.append(r) if len(xs) < 10: # Not enough edge points return 0.0 # Use simple linear regression to estimate the slope n = len(xs) mean_x = sum(xs) / n mean_y = sum(ys) / n # Calculate slope numerator = sum((xs[i] - mean_x) * (ys[i] - mean_y) for i in range(n)) denominator = sum((xs[i] - mean_x) ** 2 for i in range(n)) if abs(denominator) < 1e-6: # Avoid division by zero return 0.0 slope = numerator / denominator angle = math.atan(slope) * 180.0 / math.pi # Normalize to -45 to 45 degrees if angle > 45: angle -= 90 elif angle < -45: angle += 90 return angle def replace_images_in_markdown(md: str, images: dict[str, str]) -> str: """ Replace image placeholders in markdown with base64-encoded images. Uses regex-based matching to handle variations in image IDs and formats. Args: md: Markdown text containing image placeholders images: Dictionary mapping image IDs to base64 strings Returns: Markdown text with images replaced by base64 data """ # Process each image ID in the dictionary for img_id, base64_str in images.items(): # Extract the base ID without extension for more flexible matching base_id = img_id.split('.')[0] # Match markdown image pattern where URL contains the base ID # Using a single regex with groups to capture the full pattern pattern = re.compile(rf'!\[([^\]]*)\]\(([^\)]*{base_id}[^\)]*)\)') # Process all matches matches = list(pattern.finditer(md)) for match in reversed(matches): # Process in reverse to avoid offset issues # Replace the entire match with a properly formatted base64 image md = md[:match.start()] + f"![{img_id}](data:image/jpeg;base64,{base64_str})" + md[match.end():] return md def get_combined_markdown(ocr_response) -> str: """ Combine OCR text and images into a single markdown document. Args: ocr_response: OCR response object from Mistral AI Returns: Combined markdown string with embedded images """ markdowns = [] # Process each page of the OCR response for page in ocr_response.pages: # Extract image data if available image_data = {} if hasattr(page, "images"): for img in page.images: if hasattr(img, "id") and hasattr(img, "image_base64"): image_data[img.id] = img.image_base64 # Replace image placeholders with base64 data page_markdown = page.markdown if hasattr(page, "markdown") else "" processed_markdown = replace_images_in_markdown(page_markdown, image_data) markdowns.append(processed_markdown) # Join all pages' markdown with double newlines return "\n\n".join(markdowns) def encode_image_for_api(image_path: Union[str, Path]) -> str: """ Encode an image as base64 data URL for API submission. Args: image_path: Path to the image file Returns: Base64 data URL for the image """ # Convert to Path object if string image_file = Path(image_path) if isinstance(image_path, str) else image_path # Verify image exists if not image_file.is_file(): raise FileNotFoundError(f"Image file not found: {image_file}") # Determine mime type based on file extension mime_type = 'image/jpeg' # Default mime type suffix = image_file.suffix.lower() if suffix == '.png': mime_type = 'image/png' elif suffix == '.gif': mime_type = 'image/gif' elif suffix in ['.jpg', '.jpeg']: mime_type = 'image/jpeg' elif suffix == '.pdf': mime_type = 'application/pdf' # Encode image as base64 encoded = base64.b64encode(image_file.read_bytes()).decode() return f"data:{mime_type};base64,{encoded}" def encode_bytes_for_api(file_bytes: bytes, mime_type: str) -> str: """ Encode binary data as base64 data URL for API submission. Args: file_bytes: Binary file data mime_type: MIME type of the file (e.g., 'image/jpeg', 'application/pdf') Returns: Base64 data URL for the data """ # Encode data as base64 encoded = base64.b64encode(file_bytes).decode() return f"data:{mime_type};base64,{encoded}" def calculate_image_entropy(pil_img: Image.Image) -> float: """ Calculate the entropy of a PIL image. Entropy is a measure of randomness; low entropy indicates a blank or simple image, high entropy indicates more complex content (e.g., text or detailed images). Args: pil_img: PIL Image object Returns: float: Entropy value """ # Convert to grayscale for entropy calculation gray_img = pil_img.convert("L") arr = np.array(gray_img) # Compute histogram hist, _ = np.histogram(arr, bins=256, range=(0, 255), density=True) # Remove zero entries to avoid log(0) hist = hist[hist > 0] # Calculate entropy entropy = -np.sum(hist * np.log2(hist)) return float(entropy) def estimate_text_density(image_np): """ Estimate text density patterns in an image. Returns metrics on text distribution and special cases. Args: image_np: Numpy array of the image Returns: dict: Text density metrics """ # Convert to grayscale if len(image_np.shape) > 2 and image_np.shape[2] == 3: gray = cv2.cvtColor(image_np, cv2.COLOR_RGB2GRAY) else: gray = image_np # Binarize image _, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU) # Analyze vertical text density profile (important for headers/footers) height, width = gray.shape vertical_profile = np.sum(binary, axis=1) / width # Analyze horizontal text density profile horizontal_profile = np.sum(binary, axis=0) / height # Calculate statistics v_mean = np.mean(vertical_profile) v_std = np.std(vertical_profile) v_max = np.max(vertical_profile) # Detect uppercase text regions (common in headers of Baldwin document) # Uppercase text tends to have more consistent height and uniform vertical density section_height = height // 10 # Divide into 10 vertical sections uppercase_sections = 0 for i in range(0, height, section_height): section = binary[i:min(i+section_height, height), :] section_profile = np.sum(section, axis=1) / width # Uppercase characteristics: high density with low variation if np.mean(section_profile) > v_mean * 1.5 and np.std(section_profile) < v_std * 0.7: uppercase_sections += 1 # Determine overall pattern if v_std / v_mean > 0.8: pattern = 'varied' # High variance indicates sections with different text densities else: pattern = 'uniform' # Low variance indicates uniform text distribution return { 'mean_density': float(v_mean), 'density_variation': float(v_std), 'pattern': pattern, 'uppercase_sections': uppercase_sections, 'max_density': float(v_max) } def serialize_ocr_object(obj): """ Serialize OCR response objects to JSON serializable format. Handles OCRImageObject specifically to prevent serialization errors. Args: obj: The object to serialize Returns: JSON serializable representation of the object """ # Fast path: Handle primitive types directly if obj is None or isinstance(obj, (str, int, float, bool)): return obj # Handle collections if isinstance(obj, list): return [serialize_ocr_object(item) for item in obj] elif isinstance(obj, dict): return {k: serialize_ocr_object(v) for k, v in obj.items()} elif isinstance(obj, OCRImageObject): # Special handling for OCRImageObject return { 'id': obj.id if hasattr(obj, 'id') else None, 'image_base64': obj.image_base64 if hasattr(obj, 'image_base64') else None } elif hasattr(obj, '__dict__'): # For objects with __dict__ attribute return {k: serialize_ocr_object(v) for k, v in obj.__dict__.items() if not k.startswith('_')} # Skip private attributes else: # Try to convert to string as last resort try: return str(obj) except: return None # Clean OCR result with focus on Mistral compatibility def clean_ocr_result(result, use_segmentation=False, vision_enabled=True, preprocessing_options=None): """ Clean text content in OCR results, preserving original structure from Mistral API. Only removes markdown/HTML conflicts without duplicating content across fields. Args: result: OCR result object or dictionary use_segmentation: Whether image segmentation was used vision_enabled: Whether vision model was used preprocessing_options: Dictionary of preprocessing options Returns: Cleaned result object """ if not result: return result # Import text utilities for cleaning try: from utils.text_utils import clean_raw_text text_cleaner_available = True except ImportError: text_cleaner_available = False def clean_text(text): """Clean text content, removing markdown image references and base64 data""" if not text or not isinstance(text, str): return "" if text_cleaner_available: text = clean_raw_text(text) else: # Remove image references like ![image](data:image/...) text = re.sub(r'!\[.*?\]\(data:image/[^)]+\)', '', text) # Remove basic markdown image references like ![alt](img-1.jpg) text = re.sub(r'!\[[^\]]*\]\([^)]+\)', '', text) # Remove base64 encoded image data text = re.sub(r'data:image/[^;]+;base64,[a-zA-Z0-9+/=]+', '', text) # Clean up any JSON-like image object references text = re.sub(r'{"image(_data)?":("[^"]*"|null|true|false|\{[^}]*\}|\[[^\]]*\])}', '', text) # Clean up excessive whitespace and line breaks created by removals text = re.sub(r'\n{3,}', '\n\n', text) text = re.sub(r'\s{3,}', ' ', text) return text.strip() # Process dictionary if isinstance(result, dict): # For PDF documents, preserve original structure from Mistral API is_pdf = result.get('file_type', '') == 'pdf' or ( result.get('file_name', '').lower().endswith('.pdf') ) # Ensure ocr_contents exists if 'ocr_contents' not in result: result['ocr_contents'] = {} # Clean raw_text if it exists but don't duplicate it if 'raw_text' in result: result['raw_text'] = clean_text(result['raw_text']) # Handle ocr_contents fields - clean them but don't duplicate if 'ocr_contents' in result: for key, value in list(result['ocr_contents'].items()): # Skip binary fields and image data if key in ['image_base64', 'images', 'binary_data'] and value: continue # Clean string values to remove markdown/HTML conflicts if isinstance(value, str): result['ocr_contents'][key] = clean_text(value) # Handle segmentation data if use_segmentation and preprocessing_options and 'segmentation_data' in preprocessing_options: # Store segmentation metadata result['segmentation_applied'] = True # Extract combined text if available if 'combined_text' in preprocessing_options['segmentation_data']: segmentation_text = clean_text(preprocessing_options['segmentation_data']['combined_text']) # Add as dedicated field result['ocr_contents']['segmentation_text'] = segmentation_text # IMPORTANT: For documents with overlapping regions like baldwin-15th-north, # the intelligently merged segmentation text is more accurate than the raw OCR # Always use segmentation text as the primary source when available # This ensures clean, non-duplicated content from overlapping regions result['ocr_contents']['raw_text'] = segmentation_text # Also update the 'text' field which is used in some contexts if 'text' in result['ocr_contents']: result['ocr_contents']['text'] = segmentation_text # Clean pages_data if available (Mistral OCR format) if 'pages_data' in result: for page in result['pages_data']: if isinstance(page, dict): # Clean text field if 'text' in page: page['text'] = clean_text(page['text']) # Clean markdown field if 'markdown' in page: page['markdown'] = clean_text(page['markdown']) # Handle list content recursively elif isinstance(result, list): return [clean_ocr_result(item, use_segmentation, vision_enabled, preprocessing_options) for item in result] return result def create_results_zip(results, output_dir=None, zip_name=None): """ Create a zip file containing OCR results. Args: results: Dictionary or list of OCR results output_dir: Optional output directory zip_name: Optional zip file name Returns: Path to the created zip file """ # Create temporary output directory if not provided if output_dir is None: output_dir = Path.cwd() / "output" output_dir.mkdir(exist_ok=True) else: output_dir = Path(output_dir) output_dir.mkdir(exist_ok=True) # Generate zip name if not provided if zip_name is None: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") if isinstance(results, list): # For a list of results, create a descriptive name file_count = len(results) zip_name = f"ocr_results_{file_count}_{timestamp}.zip" else: # For single result, create descriptive filename base_name = results.get('file_name', 'document').split('.')[0] zip_name = f"{base_name}_{timestamp}.zip" try: # Get zip data in memory first zip_data = create_results_zip_in_memory(results) # Save to file zip_path = output_dir / zip_name with open(zip_path, 'wb') as f: f.write(zip_data) return zip_path except Exception as e: # Create an empty zip file as fallback logger.error(f"Error creating zip file: {str(e)}") zip_path = output_dir / zip_name with zipfile.ZipFile(zip_path, 'w') as zipf: zipf.writestr("info.txt", "Could not create complete archive") return zip_path def create_results_zip_in_memory(results): """ Create a zip file containing OCR results in memory. Packages markdown with embedded image tags, raw text, and JSON file in a contextually relevant structure. Args: results: Dictionary or list of OCR results Returns: Binary zip file data """ # Create a BytesIO object zip_buffer = io.BytesIO() # Create a ZipFile instance with zipfile.ZipFile(zip_buffer, 'w', compression=zipfile.ZIP_DEFLATED) as zipf: # Check if results is a list or a dictionary is_list = isinstance(results, list) if is_list: # Handle multiple results by creating subdirectories for idx, result in enumerate(results): if result and isinstance(result, dict): # Create a folder name based on the file name or index folder_name = result.get('file_name', f'document_{idx+1}') folder_name = Path(folder_name).stem # Remove file extension # Add files to this folder add_result_files_to_zip(zipf, result, f"{folder_name}/") else: # Single result - add files directly to root of zip add_result_files_to_zip(zipf, results) # Seek to the beginning of the BytesIO object zip_buffer.seek(0) # Return the zip file bytes return zip_buffer.getvalue() def truncate_base64_in_result(result, prefix_length=32, suffix_length=32): """ Create a copy of the result dictionary with base64 image data truncated. This keeps the structure intact while making the JSON more readable. Args: result: OCR result dictionary prefix_length: Number of characters to keep at the beginning suffix_length: Number of characters to keep at the end Returns: Dictionary with truncated base64 data """ if not result or not isinstance(result, dict): return {} # Create a deep copy to avoid modifying the original import copy truncated_result = copy.deepcopy(result) # Helper function to truncate base64 strings def truncate_base64(data): if not isinstance(data, str) or len(data) <= prefix_length + suffix_length + 10: return data # Extract prefix and suffix based on whether this is a data URI or raw base64 if data.startswith('data:'): # Handle data URIs like 'data:image/jpeg;base64,/9j/4AAQ...' parts = data.split(',', 1) if len(parts) != 2: return data # Unexpected format, return as is header = parts[0] + ',' base64_content = parts[1] if len(base64_content) <= prefix_length + suffix_length + 10: return data # Not long enough to truncate truncated = (f"{header}{base64_content[:prefix_length]}..." f"[truncated {len(base64_content) - prefix_length - suffix_length} chars]..." f"{base64_content[-suffix_length:]}") else: # Handle raw base64 strings truncated = (f"{data[:prefix_length]}..." f"[truncated {len(data) - prefix_length - suffix_length} chars]..." f"{data[-suffix_length:]}") return truncated # Helper function to recursively truncate base64 in nested structures def truncate_base64_recursive(obj): if isinstance(obj, dict): # Check for keys that typically contain base64 data for key in list(obj.keys()): if key in ['image_base64', 'base64'] and isinstance(obj[key], str): obj[key] = truncate_base64(obj[key]) elif isinstance(obj[key], (dict, list)): truncate_base64_recursive(obj[key]) elif isinstance(obj, list): for item in obj: if isinstance(item, (dict, list)): truncate_base64_recursive(item) # Truncate base64 data throughout the result truncate_base64_recursive(truncated_result) # Specifically handle the pages_data structure if 'pages_data' in truncated_result: for page in truncated_result['pages_data']: if isinstance(page, dict) and 'images' in page: for img in page['images']: if isinstance(img, dict) and 'image_base64' in img and isinstance(img['image_base64'], str): img['image_base64'] = truncate_base64(img['image_base64']) # Handle raw_response_data if present if 'raw_response_data' in truncated_result and isinstance(truncated_result['raw_response_data'], dict): if 'pages' in truncated_result['raw_response_data']: for page in truncated_result['raw_response_data']['pages']: if isinstance(page, dict) and 'images' in page: for img in page['images']: if isinstance(img, dict) and 'base64' in img and isinstance(img['base64'], str): img['base64'] = truncate_base64(img['base64']) return truncated_result def clean_base64_from_result(result): """ Create a clean copy of the result dictionary with base64 image data removed. This ensures JSON files don't contain large base64 strings. Args: result: OCR result dictionary Returns: Cleaned dictionary without base64 data """ if not result or not isinstance(result, dict): return {} # Create a deep copy to avoid modifying the original import copy clean_result = copy.deepcopy(result) # Helper function to recursively clean base64 from nested structures def clean_base64_recursive(obj): if isinstance(obj, dict): # Check for keys that typically contain base64 data for key in list(obj.keys()): if key in ['image_base64', 'base64']: obj[key] = "[BASE64_DATA_REMOVED]" elif isinstance(obj[key], (dict, list)): clean_base64_recursive(obj[key]) elif isinstance(obj, list): for item in obj: if isinstance(item, (dict, list)): clean_base64_recursive(item) # Clean the entire result clean_base64_recursive(clean_result) # Specifically handle the pages_data structure if 'pages_data' in clean_result: for page in clean_result['pages_data']: if isinstance(page, dict) and 'images' in page: for img in page['images']: if isinstance(img, dict) and 'image_base64' in img: img['image_base64'] = "[BASE64_DATA_REMOVED]" # Handle raw_response_data if present if 'raw_response_data' in clean_result and isinstance(clean_result['raw_response_data'], dict): if 'pages' in clean_result['raw_response_data']: for page in clean_result['raw_response_data']['pages']: if isinstance(page, dict) and 'images' in page: for img in page['images']: if isinstance(img, dict) and 'base64' in img: img['base64'] = "[BASE64_DATA_REMOVED]" return clean_result def create_markdown_with_file_references(result, image_path_prefix="images/"): """ Create a markdown document with file references to images instead of base64 embedding. Ideal for use in zip archives where images are stored as separate files. Args: result: OCR result dictionary image_path_prefix: Path prefix for image references (e.g., "images/") Returns: Markdown content as string with file references """ # Similar to create_markdown_with_images but uses file references # Import content utils to use classification functions try: from utils.content_utils import classify_document_content, extract_document_text, extract_image_description content_utils_available = True except ImportError: content_utils_available = False # Get content classification has_text = True has_images = False if content_utils_available: classification = classify_document_content(result) has_text = classification['has_content'] has_images = result.get('has_images', False) else: # Minimal fallback detection if 'has_images' in result: has_images = result['has_images'] # Check for image data more thoroughly if 'pages_data' in result and isinstance(result['pages_data'], list): for page in result['pages_data']: if isinstance(page, dict) and 'images' in page and page['images']: has_images = True break # Start building the markdown document md = [] # Add document title/header md.append(f"# {result.get('file_name', 'Document')}\n") # Add metadata section md.append("## Document Metadata\n") # Add timestamp if 'timestamp' in result: md.append(f"**Processed:** {result['timestamp']}\n") # Add languages if available if 'languages' in result and result['languages']: languages = [lang for lang in result['languages'] if lang] if languages: md.append(f"**Languages:** {', '.join(languages)}\n") # Add document type and topics if 'detected_document_type' in result: md.append(f"**Document Type:** {result['detected_document_type']}\n") if 'topics' in result and result['topics']: md.append(f"**Topics:** {', '.join(result['topics'])}\n") md.append("\n---\n") # Document title - extract from result if available if 'ocr_contents' in result and 'title' in result['ocr_contents'] and result['ocr_contents']['title']: title_content = result['ocr_contents']['title'] md.append(f"## {title_content}\n") # Add images if present if has_images and 'pages_data' in result: md.append("## Images\n") # Extract and display all images with file references for page_idx, page in enumerate(result['pages_data']): if 'images' in page and isinstance(page['images'], list): for img_idx, img in enumerate(page['images']): if 'image_base64' in img: # Create image reference to file in the zip image_filename = f"image_{page_idx+1}_{img_idx+1}.jpg" image_path = f"{image_path_prefix}{image_filename}" image_caption = f"Image {page_idx+1}-{img_idx+1}" md.append(f"![{image_caption}]({image_path})\n") # Add image description if available through utils if content_utils_available: description = extract_image_description(result) if description: md.append(f"*{description}*\n") md.append("\n---\n") # Add document text section md.append("## Text Content\n") # Extract text content systematically text_content = "" structured_sections = {} # Helper function to extract clean text from dictionary objects def extract_clean_text(content): if isinstance(content, str): # Check if content is a stringified JSON if content.strip().startswith("{") and content.strip().endswith("}"): try: # Try to parse as JSON content_dict = json.loads(content.replace("'", '"')) if 'text' in content_dict: return content_dict['text'] return content except: return content return content elif isinstance(content, dict): # If it's a dictionary with a 'text' key, return just that value if 'text' in content and isinstance(content['text'], str): return content['text'] return content return content if content_utils_available: # Use the systematic utility function for main text text_content = extract_document_text(result) text_content = extract_clean_text(text_content) # Collect all available structured sections if 'ocr_contents' in result: for field, content in result['ocr_contents'].items(): # Skip certain fields that are handled separately if field in ["raw_text", "error", "partial_text", "main_text"]: continue if content: # Extract clean text from content if possible clean_content = extract_clean_text(content) # Add this as a structured section structured_sections[field] = clean_content else: # Fallback extraction logic if 'ocr_contents' in result: # First find main text for field in ["main_text", "content", "text", "transcript", "raw_text"]: if field in result['ocr_contents'] and result['ocr_contents'][field]: content = result['ocr_contents'][field] if isinstance(content, str) and content.strip(): text_content = content break elif isinstance(content, dict): # Try to convert complex objects to string try: text_content = json.dumps(content, indent=2) break except: pass # Then collect all structured sections for field, content in result['ocr_contents'].items(): # Skip certain fields that are handled separately if field in ["raw_text", "error", "partial_text", "main_text", "content", "text", "transcript"]: continue if content: # Add this as a structured section structured_sections[field] = content # Add the main text content - display raw text without a field label if text_content: # Check if this is from raw_text (based on content match) is_raw_text = False if 'ocr_contents' in result and 'raw_text' in result['ocr_contents']: if result['ocr_contents']['raw_text'] == text_content: is_raw_text = True # Display content without adding a "raw_text:" label md.append(text_content + "\n\n") # Add structured sections if available if structured_sections: for section_name, section_content in structured_sections.items(): # Use proper markdown header for sections - consistently capitalize all section names display_name = section_name.replace("_", " ").capitalize() # Handle different content types if isinstance(section_content, str): md.append(section_content + "\n\n") elif isinstance(section_content, dict): # Dictionary content - format as key-value pairs for key, value in section_content.items(): # Treat all values as plain text to maintain content purity # This prevents JSON-like structures from being formatted as code blocks md.append(f"**{key}:** {value}\n\n") elif isinstance(section_content, list): # List content - create a markdown list for item in section_content: # Treat all items as plain text md.append(f"- {item}\n") md.append("\n") # Join all markdown parts into a single string return "\n".join(md) def add_result_files_to_zip(zipf, result, prefix=""): """ Add files for a single result to a zip file. Args: zipf: ZipFile instance to add files to result: OCR result dictionary prefix: Optional prefix for file paths in the zip """ if not result or not isinstance(result, dict): return # Create a timestamp for filename if not in result timestamp = result.get('timestamp', datetime.now().strftime("%Y-%m-%d_%H-%M-%S")) # Get base name for files file_name = result.get('file_name', 'document') base_name = Path(file_name).stem try: # 1. Add JSON file - with base64 data cleaned out clean_result = clean_base64_from_result(result) json_str = json.dumps(clean_result, indent=2) zipf.writestr(f"{prefix}{base_name}.json", json_str) # 2. Add markdown file that exactly matches Tab 1 display # Use the create_markdown_with_images function to ensure it matches the UI exactly try: markdown_content = create_markdown_with_images(result) zipf.writestr(f"{prefix}{base_name}.md", markdown_content) except Exception as e: logger.error(f"Error creating markdown: {str(e)}") # Fallback to simpler markdown if error occurs zipf.writestr(f"{prefix}{base_name}.md", f"# {file_name}\n\nError generating complete markdown output.") # Extract and save images first to ensure they exist before creating markdown img_paths = {} has_images = result.get('has_images', False) # 3. Add individual images if available if has_images and 'pages_data' in result: img_folder = f"{prefix}images/" for page_idx, page in enumerate(result['pages_data']): if 'images' in page and isinstance(page['images'], list): for img_idx, img in enumerate(page['images']): if 'image_base64' in img and img['image_base64']: # Extract the base64 data try: # Get the base64 data img_data = img['image_base64'] # Handle the base64 data carefully if isinstance(img_data, str): # If it has a data URI prefix, remove it if ',' in img_data and ';base64,' in img_data: # Keep the complete data after the comma img_data = img_data.split(',', 1)[1] # Make sure we have the complete data (not truncated) try: # Decode the base64 data with padding correction # Add padding if needed to prevent truncation errors missing_padding = len(img_data) % 4 if missing_padding: img_data += '=' * (4 - missing_padding) img_bytes = base64.b64decode(img_data) except Exception as e: logger.error(f"Base64 decoding error: {str(e)} for image {page_idx}-{img_idx}") # Skip this image if we can't decode it continue else: # If it's not a string (e.g., already bytes), use it directly img_bytes = img_data # Create image filename image_filename = f"image_{page_idx+1}_{img_idx+1}.jpg" img_paths[(page_idx, img_idx)] = image_filename # Write the image to the zip file zipf.writestr(f"{img_folder}{image_filename}", img_bytes) except Exception as e: logger.warning(f"Could not add image to zip: {str(e)}") # 4. Add markdown with file references to images for offline viewing try: if has_images: # Create markdown with file references file_ref_markdown = create_markdown_with_file_references(result, "images/") zipf.writestr(f"{prefix}{base_name}_with_files.md", file_ref_markdown) except Exception as e: logger.warning(f"Error creating markdown with file references: {str(e)}") # 5. Add README.txt with explanation of file contents readme_content = f""" OCR RESULTS FOR: {file_name} Processed: {timestamp} This archive contains the following files: - {base_name}.json: Complete JSON data with all extracted information - {base_name}.md: Markdown document with embedded base64 images (exactly as shown in the app) - {base_name}_with_files.md: Alternative markdown with file references instead of base64 (for offline viewing) - images/ folder: Contains extracted images from the document (if present) Generated by Historical OCR using Mistral AI """ zipf.writestr(f"{prefix}README.txt", readme_content.strip()) except Exception as e: logger.error(f"Error adding files to zip: {str(e)}") def create_markdown_with_images(result): """ Create a clean Markdown document from OCR results that properly preserves image references and text structure, following the principle of content purity. Args: result: OCR result dictionary Returns: Markdown content as string """ # Similar to create_markdown_with_file_references but embeds base64 images # Import content utils to use classification functions try: from utils.content_utils import classify_document_content, extract_document_text, extract_image_description content_utils_available = True except ImportError: content_utils_available = False # Get content classification has_text = True has_images = False if content_utils_available: classification = classify_document_content(result) has_text = classification['has_content'] has_images = result.get('has_images', False) else: # Minimal fallback detection if 'has_images' in result: has_images = result['has_images'] # Check for image data more thoroughly if 'pages_data' in result and isinstance(result['pages_data'], list): for page in result['pages_data']: if isinstance(page, dict) and 'images' in page and page['images']: has_images = True break # Start building the markdown document md = [] # Add document title/header md.append(f"# {result.get('file_name', 'Document')}\n") # Add metadata section md.append("## Document Metadata\n") # Add timestamp if 'timestamp' in result: md.append(f"**Processed:** {result['timestamp']}\n") # Add languages if available if 'languages' in result and result['languages']: languages = [lang for lang in result['languages'] if lang] if languages: md.append(f"**Languages:** {', '.join(languages)}\n") # Add document type and topics if 'detected_document_type' in result: md.append(f"**Document Type:** {result['detected_document_type']}\n") if 'topics' in result and result['topics']: md.append(f"**Topics:** {', '.join(result['topics'])}\n") md.append("\n---\n") # Document title - extract from result if available if 'ocr_contents' in result and 'title' in result['ocr_contents'] and result['ocr_contents']['title']: title_content = result['ocr_contents']['title'] md.append(f"## {title_content}\n") # Add images if present - with base64 embedding if has_images and 'pages_data' in result: md.append("## Images\n") # Extract and display all images with embedded base64 for page_idx, page in enumerate(result['pages_data']): if 'images' in page and isinstance(page['images'], list): for img_idx, img in enumerate(page['images']): if 'image_base64' in img: # Use the base64 data directly image_caption = f"Image {page_idx+1}-{img_idx+1}" img_data = img['image_base64'] # Make sure it has proper data URI format if isinstance(img_data, str) and not img_data.startswith('data:'): img_data = f"data:image/jpeg;base64,{img_data}" md.append(f"![{image_caption}]({img_data})\n") # Add image description if available through utils if content_utils_available: description = extract_image_description(result) if description: md.append(f"*{description}*\n") md.append("\n---\n") # Add document text section md.append("## Text Content\n") # Extract text content systematically text_content = "" structured_sections = {} if content_utils_available: # Use the systematic utility function for main text text_content = extract_document_text(result) # Collect all available structured sections if 'ocr_contents' in result: for field, content in result['ocr_contents'].items(): # Skip certain fields that are handled separately if field in ["raw_text", "error", "partial_text", "main_text"]: continue if content: # Add this as a structured section structured_sections[field] = content else: # Fallback extraction logic if 'ocr_contents' in result: # First find main text for field in ["main_text", "content", "text", "transcript", "raw_text"]: if field in result['ocr_contents'] and result['ocr_contents'][field]: content = result['ocr_contents'][field] if isinstance(content, str) and content.strip(): text_content = content break elif isinstance(content, dict): # Try to convert complex objects to string try: text_content = json.dumps(content, indent=2) break except: pass # Then collect all structured sections for field, content in result['ocr_contents'].items(): # Skip certain fields that are handled separately if field in ["raw_text", "error", "partial_text", "main_text", "content", "text", "transcript"]: continue if content: # Add this as a structured section structured_sections[field] = content # Add the main text content if text_content: md.append(text_content + "\n\n") # Add structured sections if available if structured_sections: for section_name, section_content in structured_sections.items(): # Use proper markdown header for sections - consistently capitalize all section names display_name = section_name.replace("_", " ").capitalize() md.append(f"### {display_name}\n") # Add a separator for clarity md.append("\n---\n\n") # Handle different content types if isinstance(section_content, str): md.append(section_content + "\n\n") elif isinstance(section_content, dict): # Dictionary content - format as key-value pairs for key, value in section_content.items(): # Treat all values as plain text to maintain content purity md.append(f"**{key}:** {value}\n\n") elif isinstance(section_content, list): # List content - create a markdown list for item in section_content: # Keep list items as plain text md.append(f"- {item}\n") md.append("\n") # Join all markdown parts into a single string return "\n".join(md)