diff --git "a/structured_ocr.py" "b/structured_ocr.py" --- "a/structured_ocr.py" +++ "b/structured_ocr.py" @@ -1,770 +1,1994 @@ -# structured_ocr.py -""" -Core OCR processing using Mistral models with structured data extraction. - -This module handles the interaction with the Mistral API for OCR and -structured data extraction from document images. -""" - -import base64 +# Standard library imports import os -import io +import sys import time +import random import json +import base64 import logging -import traceback from enum import Enum from pathlib import Path -from typing import List, Dict, Any, Optional, Union, Tuple -from datetime import datetime -from PIL import Image +from functools import lru_cache +from typing import Optional, Dict, Any, List, Union, Tuple # Configure logging +logging.basicConfig(level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) -logging.basicConfig(level=logging.INFO) -# Try to import Mistral SDK - in some environments it may be optional +# Third-party imports +from pydantic import BaseModel + +# Try to import pycountry, provide fallback if not available try: - from mistralai.client import MistralClient - from mistralai.models.chat_completion import ChatMessage - MISTRAL_SDK_AVAILABLE = True + import pycountry + PYCOUNTRY_AVAILABLE = True except ImportError: - MISTRAL_SDK_AVAILABLE = False - logger.warning("Mistral SDK not available. Some features will be limited.") - # Create stub classes for type checking - class MistralClient: - def __init__(self, *args, **kwargs): - pass - - class ChatMessage: - def __init__(self, *args, **kwargs): - pass + PYCOUNTRY_AVAILABLE = False + logger.warning("pycountry module not available - using language code fallback") -# Pydantic is used for structured OCR response validation +# Try to import Mistral AI, provide fallback if not available try: - from pydantic import BaseModel, Field, validator, root_validator - from typing import Optional, List, Dict, Any, Union - - # Define response models - class OCRImageObject(BaseModel): - """Represents an image within the OCR result""" - caption: Optional[str] = None - image_base64: Optional[str] = None - - class OCRStructuredContent(BaseModel): - """Structured OCR content with typed fields""" - # Document body content - raw_text: str - title: Optional[str] = None - author: Optional[str] = None - date: Optional[str] = None - summary: Optional[str] = None - - # Additional structured data - main_text: Optional[str] = None - headings: Optional[List[str]] = None - paragraphs: Optional[List[str]] = None - sections: Optional[Dict[str, str]] = None - metadata: Optional[Dict[str, Any]] = None - - # Layout-specific content - header: Optional[str] = None - footer: Optional[str] = None - marginalia: Optional[str] = None - page_number: Optional[Union[str, int]] = None - - # Multi-column support - left_column: Optional[str] = None - right_column: Optional[str] = None - - # Document parts for scientific papers, letters, etc. - abstract: Optional[str] = None - introduction: Optional[str] = None - conclusion: Optional[str] = None - bibliography: Optional[str] = None - references: Optional[str] = None - - # Letter/correspondence specific fields - recipient: Optional[str] = None - sender: Optional[str] = None - signature: Optional[str] = None - salutation: Optional[str] = None - closing: Optional[str] = None - subject: Optional[str] = None - - # Table content - can be text or structured - tables: Optional[Union[str, List[Dict[str, Any]]]] = None - - # Additional fields that might be appropriate for specific documents - publication: Optional[str] = None - volume: Optional[str] = None - issue: Optional[str] = None - location: Optional[str] = None - - # Images - illustrations: Optional[List[OCRImageObject]] = None - - # Allow additional props for flexibility - class Config: - extra = "allow" - - class StructuredOCRModel(BaseModel): - """Top-level OCR result model""" - file_name: str - languages: Optional[List[str]] = None - topics: Optional[List[str]] = None - confidence: Optional[float] = None - ocr_contents: OCRStructuredContent - - class Config: - extra = "allow" - + from mistralai import Mistral + from mistralai import DocumentURLChunk, ImageURLChunk, TextChunk + from mistralai.models import OCRImageObject + MISTRAL_AVAILABLE = True except ImportError: - logger.warning("Pydantic not available. Model validation will be limited.") - # Create stub classes for type checking - class BaseModel: - pass - - class StructuredOCRModel(BaseModel): - pass + MISTRAL_AVAILABLE = False + logger.warning("mistralai module not available - OCR functionality will be limited") + +# Import our language detection module +try: + from utils.helpers.language_detection import LanguageDetector + LANG_DETECTOR_AVAILABLE = True +except ImportError: + LANG_DETECTOR_AVAILABLE = False + logger.warning("language_detection module not available - using fallback language detection") + +# Import utilities for OCR processing +try: + from utils.image_utils import replace_images_in_markdown, get_combined_markdown +except ImportError: + # Define minimal fallback functions if module not found + logger.warning("Could not import utils.image_utils - using minimal fallback functions") - class OCRStructuredContent(BaseModel): - pass + def replace_images_in_markdown(markdown_str, images_dict): + """Minimal fallback implementation of replace_images_in_markdown""" + import re + for img_id, base64_str in images_dict.items(): + # Match alt text OR link part, ignore extension + base_id = img_id.split('.')[0] + pattern = re.compile(rf"!\[[^\]]*{base_id}[^\]]*\]\([^\)]+\)") + markdown_str = pattern.sub(f"![{img_id}](data:image/jpeg;base64,{base64_str})", markdown_str) + return markdown_str + + def get_combined_markdown(ocr_response): + """Minimal fallback implementation of get_combined_markdown""" + markdowns = [] + for page in ocr_response.pages: + image_data = {} + if hasattr(page, "images"): + for img in page.images: + if hasattr(img, "id") and hasattr(img, "image_base64"): + image_data[img.id] = img.image_base64 + page_markdown = page.markdown if hasattr(page, "markdown") else "" + processed_markdown = replace_images_in_markdown(page_markdown, image_data) + markdowns.append(processed_markdown) + return "\n\n".join(markdowns) # Import config directly (now local to historical-ocr) try: - from config import ( - MISTRAL_API_KEY, OCR_MODEL, TEXT_MODEL, VISION_MODEL, - VISION_MODEL_SMALL, PERFORMANCE_MODES, - TEST_MODE, IMAGE_PREPROCESSING - ) + from config import MISTRAL_API_KEY, OCR_MODEL, TEXT_MODEL, VISION_MODEL, TEST_MODE, IMAGE_PREPROCESSING except ImportError: # Fallback defaults if config is not available import os MISTRAL_API_KEY = os.environ.get("MISTRAL_API_KEY", "") OCR_MODEL = "mistral-ocr-latest" TEXT_MODEL = "mistral-large-latest" - VISION_MODEL_SMALL = "mistral-small-latest" - VISION_MODEL = VISION_MODEL_SMALL # Always use small model - # Define performance modes for fallback - both use small model - PERFORMANCE_MODES = { - "Speed": { - "model": VISION_MODEL_SMALL, - "timeout_ms": 45000, - "max_retries": 2, - "thread_count": 2 - }, - "Quality": { - "model": VISION_MODEL_SMALL, # Also using small model for Quality mode - "timeout_ms": 120000, - "max_retries": 1, - "thread_count": 1 - } - } + VISION_MODEL = "mistral-large-latest" TEST_MODE = True # Default image preprocessing settings if config not available IMAGE_PREPROCESSING = { - "enhance_contrast": 1.5, - "sharpen": True, + "max_size_mb": 8.0, + # Add basic defaults for preprocessing + "enhance_contrast": 1.2, "denoise": True, - "deskew": True, - "deskew_threshold": 1.0, - "handwritten": { - "block_size": 21, - "constant": 5, - "use_dilation": True, - "dilation_iterations": 1, - "dilation_kernel_size": 2 - } + "compression_quality": 95 } + logging.warning("Config module not found. Using environment variables and defaults.") -# Import OCR-specific constants -try: - from constants import MAX_IMAGE_DIMENSION -except ImportError: - MAX_IMAGE_DIMENSION = 3000 # Default if constants not available - -# Helper functions for OCR processing - -def is_valid_base64(s): - """Check if a string is valid base64""" - try: - # Check if the string is properly padded - padding_needed = len(s) % 4 - if padding_needed: - s += '=' * (4 - padding_needed) - - # Try to decode - base64.b64decode(s) - return True - except Exception: - return False - +# Helper function to make OCR objects JSON serializable +# Removed caching to fix unhashable type error def serialize_ocr_response(obj): - """Custom JSON serializer for OCR responses""" - if isinstance(obj, datetime): - return obj.isoformat() - elif isinstance(obj, bytes): - return base64.b64encode(obj).decode('utf-8') - elif hasattr(obj, 'model_dump'): - # For pydantic models (v2+) - return obj.model_dump() - elif hasattr(obj, 'dict'): - # For pydantic models (v1) - return obj.dict() - elif isinstance(obj, BaseModel): - # Fallback for pydantic-like models - return {k: v for k, v in obj.__dict__.items() if not k.startswith('_')} - elif isinstance(obj, Image.Image): - # For PIL images, convert to base64 - buffer = io.BytesIO() - obj.save(buffer, format="JPEG") - img_str = base64.b64encode(buffer.getvalue()).decode() - return f"data:image/jpeg;base64,{img_str}" - else: - # Special handling for OCRImageObject and similar types - if hasattr(obj, '__class__') and obj.__class__.__name__ == 'OCRImageObject': - try: - # Extract attributes manually, with special handling for image data - result = {} - for key, value in obj.__dict__.items(): - if key.startswith('_'): - continue + """ + Convert OCR response objects to JSON serializable format + Optimized for speed and memory usage + """ + # Fast path: Handle primitive types directly + if obj is None or isinstance(obj, (str, int, float, bool)): + return obj + + # Handle collections with optimized recursion + if isinstance(obj, list): + return [serialize_ocr_response(item) for item in obj] + elif isinstance(obj, dict): + return {k: serialize_ocr_response(v) for k, v in obj.items()} + elif hasattr(obj, '__dict__'): + # For OCR objects with __dict__ attribute + result = {} + for key, value in obj.__dict__.items(): + if key.startswith('_'): + continue # Skip private attributes + # Fast path for OCRImageObject - most common complex object + if isinstance(value, OCRImageObject): # Get image base64 data for validation image_base64 = value.image_base64 if hasattr(value, 'image_base64') else None - - # COMMENTED OUT: Extensive validation logic that's rarely needed and adds overhead - # Simple validation - check for image data URL prefix as reliable indicator - is_valid_image = image_base64 and isinstance(image_base64, str) and image_base64.startswith('data:image/') - - # Quick handling for markdown image references - if image_base64 and isinstance(image_base64, str) and image_base64.startswith('![') and '](' in image_base64: + + # COMPLETELY REWRITTEN validation logic using proven test approach + # Default to FALSE (treating as text) unless proven to be an image + is_valid_image = False + + # Quick exit conditions + if not image_base64 or not isinstance(image_base64, str): + # No data or not a string - not a valid image is_valid_image = False - + logging.warning("Invalid image data (not a string)") + + # Case 1: Definite image with proper data URL prefix + elif image_base64.startswith('data:image/'): + is_valid_image = True + logging.debug("Valid image with data:image/ prefix") + + # Case 2: Markdown image reference, not an actual image + elif image_base64.startswith('![') and '](' in image_base64 and image_base64.endswith(')'): + is_valid_image = False + logging.warning("Markdown image reference detected") + + # Extract the image ID for logging + try: + img_id = image_base64.split('![')[1].split('](')[0] + logging.debug(f"Markdown reference for image: {img_id}") + except: + img_id = "unknown" + + # Case 3: Needs detailed text content detection + else: + # Use the same proven approach as in our tests + # Take a sample for efficiency + sample = image_base64[:min(len(image_base64), 1000)] + sample_lower = sample.lower() + + # Check for obvious text features using multiple indicators + has_spaces = ' ' in sample + has_newlines = '\n' in sample + has_punctuation = any(p in sample for p in ',.;:!?"\'()[]{}') + + # Check for sentence-like structures + has_sentences = False + for i in range(len(sample) - 5): + if sample[i] in '.!?\n' and i+2 < len(sample) and sample[i+1] == ' ' and sample[i+2].isupper(): + has_sentences = True + break + + # Check for common words with word boundary protection + common_words = ['the', 'and', 'of', 'to', 'a', 'in', 'is', 'that', 'this', 'for'] + has_common_words = any(f" {word} " in f" {sample_lower} " for word in common_words) + + # Count the text indicators + text_indicators = [has_spaces, has_newlines, has_punctuation, has_sentences, has_common_words] + text_indicator_count = sum(1 for indicator in text_indicators if indicator) + + # Log detailed findings for debugging + logging.debug(f"Text detection - spaces: {has_spaces}, newlines: {has_newlines}, " + + f"punctuation: {has_punctuation}, sentences: {has_sentences}, " + + f"common words: {has_common_words}") + logging.debug(f"Text indicators found: {text_indicator_count}/5") + + # CRITICAL FIX: If we detect 2 or more text indicators, this is TEXT not an image! + if text_indicator_count >= 2: + is_valid_image = False + logging.warning(f"Content identified as TEXT with {text_indicator_count}/5 indicators") + # Only if we have no clear text indicators AND valid base64 chars, treat as image + elif all(c in 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=' + for c in image_base64[:100]): + is_valid_image = True + logging.debug("Valid base64 data with no text indicators") + else: + # Default to TEXT for anything else - safer approach + is_valid_image = False + logging.warning("No clear image patterns detected - treating as text by default") + + # Final validation result with definitive message + logging.warning(f"FINAL CLASSIFICATION: OCRImageObject content type = {'IMAGE' if is_valid_image else 'TEXT'}") + # Process based on final validation result if is_valid_image: - # It's a valid image, keep it - return {k: serialize_ocr_response(v) for k, v in obj.__dict__.items() if not k.startswith('_')} + # Process as image if validation passes + result[key] = { + 'id': value.id if hasattr(value, 'id') else None, + 'image_base64': image_base64 + } else: - # It's actually text content masquerading as an image, extract just the text - text_content = None + # Process as text if validation fails, but properly handle markdown references if image_base64 and isinstance(image_base64, str): - # Clean up the text content - text_content = image_base64 - # Remove Markdown image syntax if present - if text_content.startswith('![') and text_content.endswith(')'): - if '](' in text_content: - text_content = text_content.split('](')[0][2:] # Extract text between ![ and ]( - - # Return just the caption (or fallback to text content) - caption = obj.caption if hasattr(obj, 'caption') else text_content - return caption - except Exception as e: - logger.warning(f"Error serializing OCRImageObject: {str(e)}") - return str(obj) - - # Handle list-like objects - try: - if hasattr(obj, '__iter__') and not isinstance(obj, (str, bytes, dict)): - return [serialize_ocr_response(item) for item in obj] - except Exception: - pass - - # Default fallback - return str(obj) + # Special handling for markdown image references + if image_base64.startswith('![') and '](' in image_base64 and image_base64.endswith(')'): + # Extract the image description (alt text) if available + try: + # Parse the alt text from ![alt_text](url) + alt_text = image_base64.split('![')[1].split('](')[0] + # Use the alt text or a placeholder if it's just the image name + if alt_text and not alt_text.endswith('.jpeg') and not alt_text.endswith('.jpg'): + result[key] = f"[Image: {alt_text}]" + else: + # Just note that there's an image without the reference + result[key] = "[Image]" + logging.info(f"Converted markdown reference to text placeholder: {result[key]}") + except: + # Fallback for parsing errors + result[key] = "[Image]" + else: + # Regular text content + result[key] = image_base64 + else: + result[key] = str(value) + # Handle collections + elif isinstance(value, list): + result[key] = [serialize_ocr_response(item) for item in value] + # Handle nested objects + elif hasattr(value, '__dict__'): + result[key] = serialize_ocr_response(value) + # Handle primitives and other types + else: + result[key] = value + return result + else: + return obj -class OCRDocumentType(str, Enum): - """Enum for document types to optimize OCR processing""" - STANDARD = "standard" - HANDWRITTEN = "handwritten" - NEWSPAPER = "newspaper" - BOOK = "book" - SCIENTIFIC = "scientific" - MANUSCRIPT = "manuscript" - MAP = "map" - LETTERHEAD = "letterhead" - RECEIPT = "receipt" - CERTIFICATE = "certificate" +# Create language enum for structured output - cache language lookup to avoid repeated processing +@lru_cache(maxsize=1) +def get_language_dict(): + if PYCOUNTRY_AVAILABLE: + return {lang.alpha_2: lang.name for lang in pycountry.languages if hasattr(lang, 'alpha_2')} + else: + # Fallback with basic languages when pycountry is not available + return { + "en": "English", + "es": "Spanish", + "fr": "French", + "de": "German", + "it": "Italian", + "pt": "Portuguese", + "ru": "Russian", + "zh": "Chinese", + "ja": "Japanese", + "ar": "Arabic", + "hi": "Hindi", + "la": "Latin" + } + +class LanguageMeta(Enum.__class__): + def __new__(metacls, cls, bases, classdict): + languages = get_language_dict() + for code, name in languages.items(): + classdict[name.upper().replace(' ', '_')] = name + return super().__new__(metacls, cls, bases, classdict) + +class Language(Enum, metaclass=LanguageMeta): + pass + +class StructuredOCRModel(BaseModel): + file_name: str + topics: list[str] + languages: list[Language] + ocr_contents: dict class StructuredOCR: - """Core class for OCR processing with structured output""" - - def __init__(self): - """Initialize OCR processor""" - self.logger = logging.getLogger("structured_ocr") + def __init__(self, api_key=None): + """Initialize the OCR processor with API key""" + # Set up logger for this class instance + self.logger = logging.getLogger(__name__) - # Set up Mistral client if API key is available - if MISTRAL_API_KEY and MISTRAL_SDK_AVAILABLE and not TEST_MODE: - self.client = MistralClient(api_key=MISTRAL_API_KEY) - self.logger.info(f"OCR initialized with Mistral SDK, models: {OCR_MODEL}, {TEXT_MODEL}, {VISION_MODEL}") - else: - # Test mode or missing API key + # Check if we're running in test mode or if Mistral is not available + self.test_mode = TEST_MODE or not MISTRAL_AVAILABLE + # Initialize current filename for language detection + self.current_filename = None + + if not MISTRAL_AVAILABLE: + self.logger.warning("Mistral AI package not available - running in test mode") + self.api_key = "placeholder_key" self.client = None - if TEST_MODE: - self.logger.info("OCR initialized in TEST_MODE with mock responses") - else: - self.logger.warning("OCR initialized without Mistral API key - functionality limited") + return + + # Initialize API key - use provided key, or environment var + if self.test_mode and not api_key: + self.api_key = "placeholder_key" + else: + self.api_key = api_key or MISTRAL_API_KEY - # Try to import language detection module if available + # Ensure we have a valid API key when not in test mode + if not self.api_key and not self.test_mode: + raise ValueError("No Mistral API key provided. Please set the MISTRAL_API_KEY environment variable or enable TEST_MODE.") + + # Clean the API key by removing any whitespace + self.api_key = self.api_key.strip() + + # Check if API key exists but don't enforce length requirements + if not self.test_mode and not self.api_key: + self.logger.warning("Warning: No API key provided") + + # Initialize client with the API key try: - # This is an optional dependency, we can work without it - from language_detection import detect_languages - self.language_detector = detect_languages - self.logger.info("Language detection module loaded") - except ImportError: + self.client = Mistral(api_key=self.api_key) + # Skip validation to avoid unnecessary API calls + except Exception as e: + error_msg = str(e).lower() + if "unauthorized" in error_msg or "401" in error_msg: + raise ValueError(f"API key authentication failed. Please check your Mistral API key: {str(e)}") + else: + self.logger.warning(f"Failed to initialize Mistral client: {str(e)}") + self.test_mode = True + self.client = None + + # Initialize language detector + if LANG_DETECTOR_AVAILABLE: + self.logger.info("Using statistical language detection module") + self.language_detector = LanguageDetector() + else: self.logger.warning("External language detection not available - using internal fallback") self.language_detector = None - - def process_file(self, file_path, file_type=None, use_vision=True, max_pages=None, file_size_mb=None, custom_pages=None, custom_prompt=None, perf_mode="Speed"): + + def process_file(self, file_path, file_type=None, use_vision=True, max_pages=None, file_size_mb=None, custom_pages=None, custom_prompt=None): """Process a file and return structured OCR results - + Args: - file_path: Path to the file (image or PDF) - file_type: Type of file ('image' or 'pdf'), inferred from extension if None - use_vision: Whether to use vision model for additional processing - max_pages: Maximum number of pages to process (PDFs only) - file_size_mb: File size in MB, calculated if not provided - custom_pages: List of specific pages to process (PDFs only) - custom_prompt: Custom instructions for more accurate extraction - perf_mode: Performance mode ('Speed' or 'Quality') - + file_path: Path to the file to process + file_type: 'pdf' or 'image' (will be auto-detected if None) + use_vision: Whether to use vision model for improved analysis + max_pages: Optional limit on number of pages to process + file_size_mb: Optional file size in MB (used for automatic page limiting) + custom_pages: Optional list of specific page numbers to process + custom_prompt: Optional instructions for the AI to handle unusual document formatting or specific extraction needs + Returns: - Structured OCR results as a dictionary + Dictionary with structured OCR results """ - self.logger.info(f"Processing file: {file_path}") - start_time = time.time() + # Convert file_path to Path object if it's a string + file_path = Path(file_path) - # Ensure file_path is a Path object - if not isinstance(file_path, Path): - file_path = Path(file_path) + # Store current filename for language detection + self.current_filename = file_path.name - # Check if file exists - if not file_path.exists(): - self.logger.error(f"File not found: {file_path}") - return {"error": f"File not found: {file_path}"} - - # Determine file type from extension if not provided + # Auto-detect file type if not provided if file_type is None: - ext = file_path.suffix.lower() - if ext in ['.pdf']: - file_type = 'pdf' - elif ext in ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.tif', '.webp']: - file_type = 'image' - else: - self.logger.error(f"Unsupported file type: {ext}") - return {"error": f"Unsupported file type: {ext}"} - + suffix = file_path.suffix.lower() + file_type = "pdf" if suffix == ".pdf" else "image" + # Check for handwritten document by filename filename_lower = file_path.name.lower() if "handwritten" in filename_lower or "manuscript" in filename_lower or "letter" in filename_lower: - self.logger.info(f"Detected likely handwritten document from filename: {file_path.name}") + logger.info(f"Detected likely handwritten document from filename: {file_path.name}") # This will be used during processing to apply handwritten-specific handling - - # Get file size if not provided - if file_size_mb is None: - try: - file_size_bytes = file_path.stat().st_size - file_size_mb = file_size_bytes / (1024 * 1024) - except Exception as e: - self.logger.warning(f"Could not determine file size: {str(e)}") - file_size_mb = 0 - # Check if file is too large - max_size_mb = IMAGE_PREPROCESSING.get("max_size_mb", 200.0) - if file_size_mb > max_size_mb: - self.logger.warning(f"File size ({file_size_mb:.1f} MB) exceeds maximum ({max_size_mb:.1f} MB)") - - # Return error for PDFs that are too large - if file_type == "pdf" and file_size_mb > max_size_mb * 1.5: # Even more lenient for PDFs - return { - "error": f"PDF file is too large ({file_size_mb:.1f} MB). Maximum size is {max_size_mb:.1f} MB.", - "file_name": file_path.name, - "file_size_mb": file_size_mb, - "processing_time": time.time() - start_time - } + # Get file size if not provided + if file_size_mb is None and file_path.exists(): + file_size_mb = file_path.stat().st_size / (1024 * 1024) # Convert bytes to MB - # For images, we'll try to proceed but with a warning - if file_type == "image": - self.logger.warning(f"Large image will be processed but may be downscaled") - - # Check if we have a valid client in non-test mode - if not TEST_MODE and not self.client: - self.logger.error("No Mistral API key provided and not in test mode.") + # Check if file exceeds API limits (50 MB) + if file_size_mb and file_size_mb > 50: + logging.warning(f"File size {file_size_mb:.2f} MB exceeds Mistral API limit of 50 MB") return { - "error": "OCR processing requires a valid Mistral API key.", "file_name": file_path.name, - "processing_time": time.time() - start_time + "topics": ["Document"], + "languages": ["English"], + "confidence_score": 0.0, + "error": f"File size {file_size_mb:.2f} MB exceeds API limit of 50 MB", + "ocr_contents": { + "error": f"Failed to process file: File size {file_size_mb:.2f} MB exceeds Mistral API limit of 50 MB", + "partial_text": "Document could not be processed due to size limitations." + } } + + # For PDF files, limit pages based on file size if no explicit limit is given + if file_type == "pdf" and file_size_mb and max_pages is None and custom_pages is None: + if file_size_mb > 100: # Very large files + max_pages = 3 + elif file_size_mb > 50: # Large files + max_pages = 5 + elif file_size_mb > 20: # Medium files + max_pages = 10 + else: # Small files + max_pages = None # Process all pages - # Process the file based on type + # Start processing timer + start_time = time.time() + + # Read and process the file if file_type == "pdf": result = self._process_pdf(file_path, use_vision, max_pages, custom_pages, custom_prompt) else: - result = self._process_image(file_path, use_vision, custom_prompt, perf_mode) - + result = self._process_image(file_path, use_vision, custom_prompt) + # Add processing time information processing_time = time.time() - start_time - result["processing_time"] = processing_time - self.logger.info(f"Processing completed in {processing_time:.2f} seconds") + result['processing_time'] = processing_time + # Add a default confidence score if not present + if 'confidence_score' not in result: + result['confidence_score'] = 0.85 # Default confidence + + # Ensure the entire result is fully JSON serializable by running it through our serializer + try: + # First convert to a standard dict if it's not already + if not isinstance(result, dict): + result = serialize_ocr_response(result) + + # Make a final pass to check for any remaining non-serializable objects + # Proactively check for OCRImageObject instances to avoid serialization warnings + def has_ocr_image_objects(obj): + """Check if object contains any OCRImageObject instances recursively""" + if isinstance(obj, dict): + return any(has_ocr_image_objects(v) for v in obj.values()) + elif isinstance(obj, list): + return any(has_ocr_image_objects(item) for item in obj) + else: + return 'OCRImageObject' in str(type(obj)) + + # Apply serialization preemptively if OCRImageObjects are detected + if has_ocr_image_objects(result): + # Quietly apply full serialization before any errors occur + result = serialize_ocr_response(result) + else: + # Test JSON serialization to catch any other issues + json.dumps(result) + except TypeError as e: + # If there's still a serialization error, run the whole result through our serializer + logger = logging.getLogger("serializer") + logger.warning(f"JSON serialization error in result: {str(e)}. Applying full serialization.") + # Use a more robust approach to ensure complete serialization + try: + # First attempt with our custom serializer + result = serialize_ocr_response(result) + # Test if it's fully serializable now + json.dumps(result) + except Exception as inner_e: + # If still not serializable, convert to a simpler format + logger.warning(f"Secondary serialization error: {str(inner_e)}. Converting to basic format.") + # Create a simplified result with just the essential information + simplified_result = { + "file_name": result.get("file_name", "unknown"), + "topics": result.get("topics", ["Document"]), + "languages": [str(lang) for lang in result.get("languages", ["English"]) if lang is not None], + "ocr_contents": { + "raw_text": result.get("ocr_contents", {}).get("raw_text", "Text extraction failed due to serialization error") + }, + "serialization_error": f"Original result could not be fully serialized: {str(e)}" + } + result = simplified_result + return result - + def _process_pdf(self, file_path, use_vision=True, max_pages=None, custom_pages=None, custom_prompt=None): - """Process a PDF file with OCR""" + """ + Process a PDF file with OCR - optimized version with smart page handling and memory management + + Args: + file_path: Path to the PDF file + use_vision: Whether to use vision model for enhanced analysis + max_pages: Optional limit on the number of pages to process + custom_pages: Optional list of specific page numbers to process + custom_prompt: Optional custom prompt for specialized extraction + """ logger = logging.getLogger("pdf_processor") logger.info(f"Processing PDF: {file_path}") + + # Track processing time start_time = time.time() - # Default max pages if not specified - if max_pages is None: - max_pages = 5 # Default to processing first 5 pages + # Fast path: Return placeholder if in test mode + if self.test_mode: + logger.info("Test mode active, returning placeholder response") + # Enhanced test mode placeholder that's more realistic + return { + "file_name": file_path.name, + "topics": ["Historical Document", "Literature", "American History"], + "languages": ["English"], + "ocr_contents": { + "title": "Harper's New Monthly Magazine", + "publication_date": "1855", + "publisher": "Harper & Brothers, New York", + "raw_text": "This is a test mode placeholder for Harper's New Monthly Magazine from 1855. The actual document contains articles on literature, politics, science, and culture from mid-19th century America.", + "content": "The magazine includes various literary pieces, poetry, political commentary, and illustrations typical of 19th century periodicals. Known for publishing works by prominent American authors including Herman Melville and Charles Dickens.", + "key_figures": ["Herman Melville", "Charles Dickens", "Henry Wadsworth Longfellow"], + "noted_articles": ["Continued serialization of popular novels", "Commentary on contemporary political events", "Scientific discoveries and technological advancements"] + }, + "pdf_processing_method": "enhanced_test_mode", + "total_pages": 12, + "processed_pages": 3, + "processing_time": 0.5, + "confidence_score": 0.9 + } try: - # We'll use pdf2image to convert PDF pages to images - try: - from pdf2image import convert_from_path - import pdf2image - except ImportError: - logger.error("pdf2image module not found. Please install it to process PDF files.") - return { - "error": "PDF processing requires the pdf2image module.", - "file_name": file_path.name, - "processing_time": time.time() - start_time - } + # PDF processing strategy decision based on file size + file_size_mb = file_path.stat().st_size / (1024 * 1024) + logger.info(f"PDF size: {file_size_mb:.2f} MB") - # Check if poppler is installed - if not pdf2image.pdfinfo_from_path: - logger.error("Poppler utilities not found. Please install poppler-utils.") - return { - "error": "PDF processing requires poppler-utils to be installed.", - "file_name": file_path.name, - "processing_time": time.time() - start_time - } - - # Get PDF info to determine number of pages - try: - pdf_info = pdf2image.pdfinfo_from_path(file_path) - total_pages = pdf_info["Pages"] - logger.info(f"PDF has {total_pages} pages") - except Exception as e: - logger.error(f"Error getting PDF info: {str(e)}") - return { - "error": f"Error analyzing PDF: {str(e)}", - "file_name": file_path.name, - "processing_time": time.time() - start_time - } - - # Limit pages to process - pages_to_process = min(total_pages, max_pages) - - # If specific pages are requested, use those instead - page_numbers = list(range(1, pages_to_process + 1)) # Default: process first N pages - if custom_pages and isinstance(custom_pages, list): - # Filter out page numbers that are out of range - valid_pages = [p for p in custom_pages if 1 <= p <= total_pages] - if valid_pages: - page_numbers = valid_pages - pages_to_process = len(valid_pages) - logger.info(f"Processing {pages_to_process} custom pages: {valid_pages}") - else: - logger.warning(f"No valid custom pages specified. Using first {pages_to_process} pages.") - - # Extract the pages as images - dpi = 300 # Default DPI for better OCR + # Always use pdf2image for better control and consistency across all PDF files + use_pdf2image = True - # Batch convert to reduce memory usage - batch_size = 3 # Process small batches to limit memory usage - all_pages_data = [] - - for batch_start in range(0, len(page_numbers), batch_size): - batch_pages = page_numbers[batch_start:batch_start + batch_size] - logger.info(f"Processing PDF batch: pages {batch_pages}") - + # First try local PDF processing for better performance and control + if use_pdf2image: try: - # Convert the batch of pages - images = convert_from_path( - file_path, - dpi=dpi, - first_page=min(batch_pages), - last_page=max(batch_pages), - fmt="jpeg", - thread_count=1, # Single thread to avoid memory issues - use_pdftocairo=True, - transparent=False - ) + import tempfile + from pdf2image import convert_from_path + + logger.info("Processing PDF using pdf2image for better multi-page handling") - # Process each image in the batch - for i, img in enumerate(images): - page_idx = batch_pages[i] - 1 # Convert to 0-based index - page_num = batch_pages[i] # 1-based page number + # Convert PDF to images with optimized parameters + conversion_start = time.time() + + # Use consistent DPI for all files to ensure reliable results + dpi = 200 # Higher quality DPI for all files to ensure better text recognition - logger.info(f"Processing page {page_num}/{total_pages}") + # Only convert first page initially to check document type + pdf_first_page = convert_from_path(file_path, dpi=dpi, first_page=1, last_page=1) + logger.info(f"First page converted in {time.time() - conversion_start:.2f}s") + + # Quick check if PDF has readable content + if not pdf_first_page: + logger.warning("PDF conversion produced no images, falling back to API") + raise Exception("PDF conversion failed to produce images") + + # Determine total pages in the document + # First, try simple estimate from first page conversion + total_pages = 1 + + # Try pdf2image info extraction + try: + # Try with pdf2image page counting - use simpler parameters + logger.info("Determining PDF page count...") + count_start = time.time() - # Generate page-specific prompt - page_prompt = f"{custom_prompt}" if custom_prompt else "" - page_prompt += f" This is page {page_num} of {total_pages}." + # Use a lightweight approach with multi-threading for faster processing + pdf_info = convert_from_path( + file_path, + dpi=72, # Low DPI just for info + first_page=1, + last_page=1, + size=(100, 100), # Tiny image to save memory + fmt="jpeg", + thread_count=4, # Increased thread count for faster processing + output_file=None + ) - # Save the image to a temporary buffer - img_buffer = io.BytesIO() - img.save(img_buffer, format="JPEG", quality=85) - img_buffer.seek(0) + # Extract page count + if hasattr(pdf_info, 'n_pages'): + total_pages = pdf_info.n_pages + elif isinstance(pdf_info, dict) and "Pages" in pdf_info: + total_pages = int(pdf_info.get("Pages", "1")) + elif len(pdf_first_page) > 0: + # Just estimate based on first page - at least we have one + total_pages = 1 + + logger.info(f"Page count determined in {time.time() - count_start:.2f}s") + except Exception as count_error: + logger.warning(f"Error determining page count: {str(count_error)}. Using default of 1") + total_pages = 1 + + logger.info(f"PDF has {total_pages} total pages") + + # Determine which pages to process + pages_to_process = [] + + # Handle custom page selection if provided + if custom_pages and any(0 < p <= total_pages for p in custom_pages): + # Filter valid page numbers + pages_to_process = [p for p in custom_pages if 0 < p <= total_pages] + logger.info(f"Processing {len(pages_to_process)} custom-selected pages: {pages_to_process}") + # Otherwise use max_pages limit if provided + elif max_pages and max_pages < total_pages: + pages_to_process = list(range(1, max_pages + 1)) + logger.info(f"Processing first {max_pages} pages of {total_pages} total") + # Or process all pages if reasonable count + elif total_pages <= 10: + pages_to_process = list(range(1, total_pages + 1)) + logger.info(f"Processing all {total_pages} pages") + # For large documents without limits, process subset of pages + else: + # Smart sampling: first page, last page, and some pages in between + pages_to_process = [1] # Always include first page - # Create a temporary path for the image - temp_path = Path(f"{file_path.stem}_page_{page_num}.jpg") + if total_pages > 1: + if total_pages <= 5: + # For few pages, process all + pages_to_process = list(range(1, total_pages + 1)) + else: + # For many pages, sample intelligently + # Add pages from the middle of the document + middle = total_pages // 2 + # Add last page if more than 3 pages + if total_pages > 3: + pages_to_process.append(total_pages) + # Add up to 3 pages from middle if document is large + if total_pages > 5: + pages_to_process.append(middle) + if total_pages > 10: + pages_to_process.append(middle // 2) + pages_to_process.append(middle + (middle // 2)) + + # Sort pages for sequential processing + pages_to_process = sorted(list(set(pages_to_process))) + logger.info(f"Processing {len(pages_to_process)} sampled pages out of {total_pages} total: {pages_to_process}") + + # Convert only the selected pages to minimize memory usage + selected_images = [] + combined_text = [] + detected_languages = set() # Track detected languages across all pages + + # Process pages in larger batches for better efficiency + batch_size = 5 # Process 5 pages at a time for better throughput + for i in range(0, len(pages_to_process), batch_size): + batch_pages = pages_to_process[i:i+batch_size] + logger.info(f"Converting batch of pages {batch_pages}") - # Process the page image - result = self._process_image(temp_path, use_vision, page_prompt) + # Convert batch of pages with multi-threading for better performance + batch_start = time.time() + batch_images = convert_from_path( + file_path, + dpi=dpi, + first_page=min(batch_pages), + last_page=max(batch_pages), + thread_count=4, # Use multi-threading for faster PDF processing + fmt="jpeg" # Use JPEG format for better compatibility + ) + logger.info(f"Batch conversion completed in {time.time() - batch_start:.2f}s") - # Add page-specific information - result["page_number"] = page_num - result["total_pages"] = total_pages + # Map converted images to requested page numbers + for idx, page_num in enumerate(range(min(batch_pages), max(batch_pages) + 1)): + if page_num in pages_to_process and idx < len(batch_images): + if page_num == pages_to_process[0]: # First page to process + selected_images.append(batch_images[idx]) + + # Process each page individually + with tempfile.NamedTemporaryFile(suffix='.jpeg', delete=False) as tmp: + batch_images[idx].save(tmp.name, format='JPEG') + # Simple OCR to extract text + try: + page_result = self._process_image(Path(tmp.name), False, None) + if 'ocr_contents' in page_result and 'raw_text' in page_result['ocr_contents']: + # Add page text to combined text without obvious page markers + page_text = page_result['ocr_contents']['raw_text'] + combined_text.append(f"{page_text}") + + # Collect detected languages from each page + if 'languages' in page_result: + for lang in page_result['languages']: + detected_languages.add(lang) + except Exception as page_e: + logger.warning(f"Error processing page {page_num}: {str(page_e)}") + # Clean up temp file + import os + os.unlink(tmp.name) + + # If we have processed pages + if selected_images and combined_text: + # Save first image to temp file for vision model + with tempfile.NamedTemporaryFile(suffix='.jpeg', delete=False) as tmp: + selected_images[0].save(tmp.name, format='JPEG', quality=95) + first_image_path = tmp.name - # Replace the filename with the PDF name and page number - result["file_name"] = f"{file_path.stem} (Page {page_num})" + # Combine all extracted text + all_text = "\n\n".join(combined_text) - # Add to results - all_pages_data.append(result) - - except Exception as e: - logger.error(f"Error processing PDF batch: {str(e)}") - logger.error(traceback.format_exc()) - # Continue with other batches even if one fails + # For custom prompts, use specialized processing + if custom_prompt: + try: + # Process image with vision model + result = self._process_image(Path(first_image_path), use_vision, None) + + # Enhance with text analysis using combined text from all pages + enhanced_result = self._extract_structured_data_text_only(all_text, file_path.name, custom_prompt) + + # Merge results, keeping images from original result + for key, value in enhanced_result.items(): + if key not in ('raw_response_data', 'pages_data', 'has_images'): + result[key] = value + + # Update raw text with full document text + if 'ocr_contents' in result: + result['ocr_contents']['raw_text'] = all_text + + # Add flag to indicate custom prompt was applied + result['custom_prompt_applied'] = 'text_only' + + # Simplified approach - no document type detection + + except Exception as e: + logger.warning(f"Custom prompt processing failed: {str(e)}. Using standard processing.") + # Fall back to standard processing + result = self._process_image(Path(first_image_path), use_vision, None) + if 'ocr_contents' in result: + result['ocr_contents']['raw_text'] = all_text + else: + # Standard processing with combined text + result = self._process_image(Path(first_image_path), use_vision, None) + if 'ocr_contents' in result: + result['ocr_contents']['raw_text'] = all_text + + # Merge detected languages if available + if detected_languages: + result['languages'] = list(detected_languages) + + # Add PDF metadata + result['file_name'] = file_path.name + result['pdf_processing_method'] = 'pdf2image_optimized' + result['total_pages'] = total_pages + result['processed_pages'] = len(pages_to_process) + result['pages_processed'] = pages_to_process + + # Add processing info + result['processing_info'] = { + 'method': 'local_pdf_processing', + 'dpi': dpi, + 'pages_sampled': pages_to_process, + 'processing_time': time.time() - start_time + } + + # Clean up + os.unlink(first_image_path) + + return result + else: + logger.warning("No pages successfully processed with pdf2image, falling back to API") + raise Exception("Failed to process PDF pages locally") + + except Exception as pdf2image_error: + logger.warning(f"Local PDF processing failed, falling back to API: {str(pdf2image_error)}") + # Fall back to API processing - # Combine results from all pages - combined_result = self._combine_pdf_results(file_path.name, all_pages_data, total_pages, pages_to_process) - combined_result["processing_time"] = time.time() - start_time + # API-based PDF processing + logger.info("Processing PDF via Mistral API") - return combined_result + # Optimize file upload for faster processing + logger.info("Uploading PDF file to Mistral API") + upload_start = time.time() + # Set appropriate timeout based on file size + upload_timeout = max(60, min(300, int(file_size_mb * 5))) # 60s to 300s based on size + + try: + # Upload the file (Mistral client doesn't support timeout parameter for upload) + uploaded_file = self.client.files.upload( + file={ + "file_name": file_path.stem, + "content": file_path.read_bytes(), + }, + purpose="ocr" + ) + + logger.info(f"PDF uploaded in {time.time() - upload_start:.2f}s") + + # Get a signed URL for the uploaded file + signed_url = self.client.files.get_signed_url(file_id=uploaded_file.id, expiry=1) + + # Process the PDF with OCR - use adaptive timeout based on file size + logger.info(f"Processing PDF with OCR using {OCR_MODEL}") + + # Adaptive retry strategy based on file size + max_retries = 3 if file_size_mb < 20 else 2 # Fewer retries for large files + base_retry_delay = 1 if file_size_mb < 10 else 2 # Longer delays for large files + + # Adaptive timeout based on file size + ocr_timeout_ms = min(180000, max(60000, int(file_size_mb * 3000))) # 60s to 180s + + # Try processing with retries + for retry in range(max_retries): + try: + ocr_start = time.time() + pdf_response = self.client.ocr.process( + document=DocumentURLChunk(document_url=signed_url.url), + model=OCR_MODEL, + include_image_base64=True, + timeout_ms=ocr_timeout_ms + ) + logger.info(f"PDF OCR processing completed in {time.time() - ocr_start:.2f}s") + break # Success, exit retry loop + except Exception as e: + error_msg = str(e) + logger.warning(f"API error on attempt {retry+1}/{max_retries}: {error_msg}") + + # Handle errors with optimized retry logic + error_lower = error_msg.lower() + + # Authentication errors - no point in retrying + if any(term in error_lower for term in ["unauthorized", "401", "403", "authentication"]): + logger.error("API authentication failed. Check your API key.") + raise ValueError(f"Authentication failed. Please verify your Mistral API key: {error_msg}") + + # Connection or server errors - worth retrying + elif any(term in error_lower for term in ["connection", "timeout", "520", "server error", "502", "503", "504"]): + if retry < max_retries - 1: + # Exponential backoff with jitter for better retry behavior + wait_time = base_retry_delay * (2 ** retry) * (0.8 + 0.4 * random.random()) + logger.info(f"Connection issue detected. Waiting {wait_time:.1f}s before retry...") + time.sleep(wait_time) + else: + # Last retry failed + logger.error("Maximum retries reached, API connection error persists.") + raise ValueError(f"Could not connect to Mistral API after {max_retries} attempts: {error_msg}") + + # Rate limit errors - much longer wait + elif any(term in error_lower for term in ["rate limit", "429", "too many requests", "requests rate limit exceeded"]): + # Check specifically for token exhaustion vs temporary rate limit + if "quota" in error_lower or "credit" in error_lower or "subscription" in error_lower: + logger.error("API quota or credit limit reached. No retry will help.") + raise ValueError(f"Mistral API quota or credit limit reached. Please check your subscription: {error_msg}") + elif retry < max_retries - 1: + wait_time = base_retry_delay * (2 ** retry) * 6.0 # Significantly longer wait for rate limits + logger.info(f"Rate limit exceeded. Waiting {wait_time:.1f}s before retry...") + time.sleep(wait_time) + else: + logger.error("Maximum retries reached, rate limit error persists.") + raise ValueError(f"API rate limit exceeded. Please try again later: {error_msg}") + + # Misc errors - typically no retry will help + else: + if retry < max_retries - 1 and any(term in error_lower for term in ["transient", "temporary"]): + # Only retry for errors explicitly marked as transient + wait_time = base_retry_delay * (2 ** retry) + logger.info(f"Transient error detected. Waiting {wait_time:.1f}s before retry...") + time.sleep(wait_time) + else: + logger.error(f"Unrecoverable API error: {error_msg}") + raise + + # Calculate the number of pages to process + pages_to_process = pdf_response.pages + total_pages = len(pdf_response.pages) + limited_pages = False + + logger.info(f"API returned {total_pages} total PDF pages") + + # Smart page selection logic for better performance + if custom_pages: + # Convert to 0-based indexing and filter valid page numbers + valid_indices = [i-1 for i in custom_pages if 0 < i <= total_pages] + if valid_indices: + pages_to_process = [pdf_response.pages[i] for i in valid_indices] + limited_pages = True + logger.info(f"Processing {len(valid_indices)} custom-selected pages") + # Max pages limit with smart sampling + elif max_pages and total_pages > max_pages: + if max_pages == 1: + # Just first page + pages_to_process = pages_to_process[:1] + elif max_pages < 5 and total_pages > 10: + # For small max_pages on large docs, include first, last, and middle + indices = [0] # First page + if max_pages > 1: + indices.append(total_pages - 1) # Last page + if max_pages > 2: + indices.append(total_pages // 2) # Middle page + # Add more pages up to max_pages if needed + if max_pages > 3: + remaining = max_pages - len(indices) + step = total_pages // (remaining + 1) + for i in range(1, remaining + 1): + idx = i * step + if idx not in indices and 0 <= idx < total_pages: + indices.append(idx) + indices.sort() + pages_to_process = [pdf_response.pages[i] for i in indices] + else: + # Default: first max_pages + pages_to_process = pages_to_process[:max_pages] + + limited_pages = True + logger.info(f"Processing {len(pages_to_process)} pages out of {total_pages} total") + + # Directly extract any language information from the OCR response + detected_languages = set() + + # Check if the response has a 'languages' attribute in any form + # First check direct attributes on the response object + if hasattr(pdf_response, 'languages') and pdf_response.languages: + for lang in pdf_response.languages: + detected_languages.add(str(lang)) + logger.info(f"Found language in OCR response: {lang}") + + # Then check if it's in the response as a dictionary format + elif hasattr(pdf_response, '__dict__'): + response_dict = pdf_response.__dict__ + if 'languages' in response_dict and response_dict['languages']: + for lang in response_dict['languages']: + detected_languages.add(str(lang)) + logger.info(f"Found language in OCR response dict: {lang}") + + # Calculate confidence score if available + try: + confidence_values = [page.confidence for page in pages_to_process if hasattr(page, 'confidence')] + confidence_score = sum(confidence_values) / len(confidence_values) if confidence_values else 0.89 + except Exception: + confidence_score = 0.89 # Improved default + + # Merge page content intelligently - include page numbers for better context + all_markdown = [] + for idx, page in enumerate(pages_to_process): + # Try to determine actual page number + if custom_pages and len(custom_pages) == len(pages_to_process): + page_num = custom_pages[idx] + else: + # Estimate page number - may not be accurate with sampling + page_num = idx + 1 + + page_markdown = page.markdown if hasattr(page, 'markdown') else "" + # Add page content without obvious page markers + if page_markdown.strip(): + all_markdown.append(f"{page_markdown}") + + # Collect language information from individual pages if available + if hasattr(page, 'languages') and page.languages: + for lang in page.languages: + detected_languages.add(str(lang)) + logger.info(f"Found language in page {page_num}: {lang}") + + # Join all pages with separation + combined_markdown = "\n\n".join(all_markdown) + + # Extract structured data with the appropriate model + if use_vision: + # Try to get a good image for vision model + vision_image = None + + # Try first page with images + for page in pages_to_process: + if hasattr(page, 'images') and page.images: + vision_image = page.images[0].image_base64 + break + + if vision_image: + # Use vision model with enhanced prompt + logger.info(f"Using vision model: {VISION_MODEL}") + result = self._extract_structured_data_with_vision( + vision_image, combined_markdown, file_path.name, custom_prompt + ) + else: + # Fall back to text-only if no images available + logger.info(f"No images in PDF, falling back to text model: {TEXT_MODEL}") + result = self._extract_structured_data_text_only( + combined_markdown, file_path.name, custom_prompt + ) + else: + # Use text-only model as requested + logger.info(f"Using text-only model as specified: {TEXT_MODEL}") + result = self._extract_structured_data_text_only( + combined_markdown, file_path.name, custom_prompt + ) + + # If we have detected languages directly from the OCR model, use them + if detected_languages: + logger.info(f"Using languages detected by OCR model: {', '.join(detected_languages)}") + result['languages'] = list(detected_languages) + # Add flag to indicate source of language detection + result['language_detection_source'] = 'mistral-ocr-latest' + + # Add metadata about pages + if limited_pages: + result['limited_pages'] = { + 'processed': len(pages_to_process), + 'total': total_pages + } + + # Set confidence score from OCR + result['confidence_score'] = confidence_score + + # Add processing method info + result['pdf_processing_method'] = 'api' + result['total_pages'] = total_pages + result['processed_pages'] = len(pages_to_process) + + # Store serialized OCR response for rendering + serialized_response = serialize_ocr_response(pdf_response) + result['raw_response_data'] = serialized_response + + # Check if there are images to include + has_images = hasattr(pdf_response, 'pages') and any( + hasattr(page, 'images') and page.images for page in pdf_response.pages + ) + result['has_images'] = has_images + + # Include image data for rendering if available + if has_images: + # Prepare pages data with image references + result['pages_data'] = [] + + # Get serialized pages - handle different formats + serialized_pages = None + try: + if hasattr(serialized_response, 'pages'): + serialized_pages = serialized_response.pages + elif isinstance(serialized_response, dict) and 'pages' in serialized_response: + serialized_pages = serialized_response.get('pages', []) + else: + # No pages found in response + logger.warning("No pages found in OCR response") + serialized_pages = [] + except Exception as pages_err: + logger.warning(f"Error extracting pages from OCR response: {str(pages_err)}") + serialized_pages = [] + + # Process each page to extract images + for page_idx, page in enumerate(serialized_pages): + try: + # Skip processing pages not in our selection + if limited_pages and page_idx >= len(pages_to_process): + continue + + # Extract page data with careful error handling + markdown = "" + images = [] + + # Handle different page formats safely + if isinstance(page, dict): + markdown = page.get('markdown', '') + images = page.get('images', []) + else: + # Try attribute access + if hasattr(page, 'markdown'): + markdown = page.markdown + if hasattr(page, 'images'): + images = page.images + + # Create page data record + page_data = { + 'page_number': page_idx + 1, + 'markdown': markdown, + 'images': [] + } + + # Process images with careful error handling + for img_idx, img in enumerate(images): + try: + # Extract image ID and base64 data + img_id = None + img_base64 = None + + if isinstance(img, dict): + img_id = img.get('id') + img_base64 = img.get('image_base64') + else: + # Try attribute access + if hasattr(img, 'id'): + img_id = img.id + if hasattr(img, 'image_base64'): + img_base64 = img.image_base64 + + # Only add if we have valid image data + if img_base64 and isinstance(img_base64, str): + # Ensure ID exists + safe_id = img_id if img_id else f"img_{page_idx}_{img_idx}" + page_data['images'].append({ + 'id': safe_id, + 'image_base64': img_base64 + }) + except Exception as img_err: + logger.warning(f"Error processing image {img_idx} on page {page_idx+1}: {str(img_err)}") + continue # Skip this image + + # Add page data if it has content + if page_data['markdown'] or page_data['images']: + result['pages_data'].append(page_data) + + except Exception as page_err: + logger.warning(f"Error processing page {page_idx+1}: {str(page_err)}") + continue # Skip this page + + # Record final processing time + total_time = time.time() - start_time + result['processing_time'] = total_time + logger.info(f"PDF API processing completed in {total_time:.2f}s") + + return result + + except Exception as api_e: + logger.error(f"Error in API-based PDF processing: {str(api_e)}") + # Re-raise to be caught by outer exception handler + raise + except Exception as e: + # Log the error and return a helpful error result logger.error(f"Error processing PDF: {str(e)}") - logger.error(traceback.format_exc()) + + # Return basic result on error return { - "error": f"Error processing PDF: {str(e)}", "file_name": file_path.name, + "topics": ["Document"], + "languages": ["English"], + "confidence_score": 0.0, + "error": str(e), + "ocr_contents": { + "error": f"Failed to process PDF: {str(e)}", + "partial_text": "Document could not be fully processed." + }, "processing_time": time.time() - start_time } - - def _combine_pdf_results(self, filename, pages_data, total_pages, processed_pages): - """Combine OCR results from multiple PDF pages""" - logger = logging.getLogger("pdf_combiner") - - # Create combined result structure - combined_result = { - "file_name": filename, - "file_type": "pdf", - "limited_pages": { - "processed": processed_pages, - "total": total_pages - }, - "pages_data": pages_data, - "languages": [], - "topics": [] - } + + def _process_image(self, file_path, use_vision=True, custom_prompt=None): + """Process an image file with OCR""" + logger = logging.getLogger("image_processor") + logger.info(f"Processing image: {file_path}") - # Collect all topics and languages - all_languages = set() - all_topics = set() - confidence_values = [] + # Check if we're in test mode + if self.test_mode: + # Return a placeholder document response + return { + "file_name": file_path.name, + "topics": ["Document"], + "languages": ["English"], + "ocr_contents": { + "title": "Document", + "content": "Please set up API key to process documents." + }, + "processing_time": 0.5, + "confidence_score": 0.0 + } - # Combine text content from all pages - combined_text = "" - combined_contents = {} + # No automatic document type detection - rely on the document type specified in the custom prompt + # The document type is passed from the UI through the custom prompt in ocr_processing.py - for page_data in pages_data: - # Add languages and topics - if "languages" in page_data and page_data["languages"]: - for lang in page_data["languages"]: - if lang and lang.strip(): - all_languages.add(lang.strip()) - - if "topics" in page_data and page_data["topics"]: - for topic in page_data["topics"]: - if topic and topic.strip(): - all_topics.add(topic.strip()) - - # Collect confidence values - if "confidence" in page_data and page_data["confidence"]: - confidence_values.append(float(page_data["confidence"])) - - # Add page text content - if "ocr_contents" in page_data and page_data["ocr_contents"]: - ocr_contents = page_data["ocr_contents"] - - # Add raw text to combined text - if "raw_text" in ocr_contents and ocr_contents["raw_text"]: - page_text = ocr_contents["raw_text"].strip() - page_num = page_data.get("page_number", None) - - if page_num: - page_header = f"\n\n--- Page {page_num} ---\n\n" + try: + # Check file size + file_size_mb = file_path.stat().st_size / (1024 * 1024) + logger.info(f"Original image size: {file_size_mb:.2f} MB") + + # Use enhanced preprocessing functions from ocr_utils + try: + from preprocessing import preprocess_image + from utils.file_utils import get_base64_from_bytes + + logger.info(f"Applying image preprocessing for OCR") + + # Get preprocessing settings from config + max_size_mb = IMAGE_PREPROCESSING.get("max_size_mb", 8.0) + + if file_size_mb > max_size_mb: + logger.info(f"Image is large ({file_size_mb:.2f} MB), optimizing for API submission") + + # Use standard preprocessing - document type will be handled by preprocessing.py + # based on the options passed from the UI + base64_data_url = get_base64_from_bytes( + preprocess_image(file_path.read_bytes(), + {"document_type": "standard", + "grayscale": True, + "denoise": True, + "contrast": 0}) + ) + + logger.info(f"Image preprocessing completed successfully") + + except (ImportError, AttributeError) as e: + # Fallback to basic processing if advanced functions not available + logger.warning(f"Advanced preprocessing not available: {str(e)}. Using basic image processing.") + + # If image is larger than 8MB, resize it to reduce API payload size + if file_size_mb > 8: + logger.info("Image is large, resizing before API submission") + try: + from PIL import Image + import io + + # Open and process the image + with Image.open(file_path) as img: + # Convert to RGB if not already (prevents mode errors) + if img.mode != 'RGB': + img = img.convert('RGB') + + # Calculate new dimensions (maintain aspect ratio) + # Target around 2000-2500 pixels on longest side for better OCR quality + width, height = img.size + max_dimension = max(width, height) + target_dimension = 2000 # Restored to 2000 for better image quality + + if max_dimension > target_dimension: + scale_factor = target_dimension / max_dimension + resized_width = int(width * scale_factor) + resized_height = int(height * scale_factor) + # Use LANCZOS instead of BILINEAR for better quality + img = img.resize((resized_width, resized_height), Image.LANCZOS) + + # Enhance contrast for better text recognition + from PIL import ImageEnhance + enhancer = ImageEnhance.Contrast(img) + img = enhancer.enhance(1.3) + + # Save to bytes with compression + buffer = io.BytesIO() + img.save(buffer, format="JPEG", quality=92, optimize=True) # Higher quality for better OCR + buffer.seek(0) + + # Get the base64 + encoded_image = base64.b64encode(buffer.getvalue()).decode() + base64_data_url = f"data:image/jpeg;base64,{encoded_image}" + + # Log the new size + new_size_mb = len(buffer.getvalue()) / (1024 * 1024) + logger.info(f"Resized image to {new_size_mb:.2f} MB") + except ImportError: + logger.warning("PIL not available for resizing. Using original image.") + # Use enhanced encoder with proper MIME type detection + from utils.image_utils import encode_image_for_api + base64_data_url = encode_image_for_api(file_path) + except Exception as e: + logger.warning(f"Image resize failed: {str(e)}. Using original image.") + # Use enhanced encoder with proper MIME type detection + from utils.image_utils import encode_image_for_api + base64_data_url = encode_image_for_api(file_path) + else: + # For smaller images, use as-is with proper MIME type + from utils.image_utils import encode_image_for_api + base64_data_url = encode_image_for_api(file_path) + except Exception as e: + # Fallback to original image if any preprocessing fails + logger.warning(f"Image preprocessing failed: {str(e)}. Using original image.") + # Use enhanced encoder with proper MIME type detection + from utils.image_utils import encode_image_for_api + base64_data_url = encode_image_for_api(file_path) + + # Process the image with OCR + logger.info(f"Processing image with OCR using {OCR_MODEL}") + + # Add retry logic with more retries and longer backoff periods for rate limit issues + max_retries = 2 # Reduced to prevent rate limiting + retry_delay = 1 # Shorter delay between retries + + for retry in range(max_retries): + try: + image_response = self.client.ocr.process( + document=ImageURLChunk(image_url=base64_data_url), + model=OCR_MODEL, + include_image_base64=True, + timeout_ms=45000 # 45 second timeout for better performance + ) + break # Success, exit retry loop + except Exception as e: + error_msg = str(e) + logger.warning(f"API error on attempt {retry+1}/{max_retries}: {error_msg}") + + # Check specific error types to handle them appropriately + error_lower = error_msg.lower() + + # Authentication errors - no point in retrying + if "unauthorized" in error_lower or "401" in error_lower: + logger.error("API authentication failed. Check your API key.") + raise ValueError(f"Authentication failed with API key. Please verify your Mistral API key is correct and active: {error_msg}") + + # Connection errors - worth retrying + elif "connection" in error_lower or "timeout" in error_lower or "520" in error_msg or "server error" in error_lower: + if retry < max_retries - 1: + # Wait with shorter delay before retrying + wait_time = retry_delay * (2 ** retry) + logger.info(f"Connection issue detected. Waiting {wait_time}s before retry...") + time.sleep(wait_time) + else: + # Last retry failed + logger.error("Maximum retries reached, API connection error persists.") + raise ValueError(f"Could not connect to Mistral API after {max_retries} attempts: {error_msg}") + + # Rate limit errors + elif "rate limit" in error_lower or "429" in error_lower or "requests rate limit exceeded" in error_lower: + # Check specifically for token exhaustion vs temporary rate limit + if "quota" in error_lower or "credit" in error_lower or "subscription" in error_lower: + logger.error("API quota or credit limit reached. No retry will help.") + raise ValueError(f"Mistral API quota or credit limit reached. Please check your subscription: {error_msg}") + elif retry < max_retries - 1: + # More aggressive backoff for rate limits + wait_time = retry_delay * (2 ** retry) * 5 # 5x longer wait for rate limits + logger.info(f"Rate limit exceeded. Waiting {wait_time}s before retry...") + time.sleep(wait_time) + else: + # Last retry failed, try local OCR as fallback + logger.error("Maximum retries reached, rate limit error persists.") + try: + # Try to import the local OCR fallback function + from utils.image_utils import try_local_ocr_fallback + + # Attempt local OCR fallback + ocr_text = try_local_ocr_fallback(file_path, base64_data_url) + + if ocr_text: + logger.info("Successfully used local OCR fallback") + # Return a basic result with the local OCR text + return { + "file_name": file_path.name, + "topics": ["Document"], + "languages": ["English"], + "ocr_contents": { + "title": "Document (Local OCR)", + "content": "This document was processed with local OCR due to API rate limiting.", + "raw_text": ocr_text + }, + "processing_method": "local_fallback", + "processing_note": "Used local OCR due to API rate limit" + } + except (ImportError, Exception) as local_err: + logger.warning(f"Local OCR fallback failed: {str(local_err)}") + + # If we get here, both API and local OCR failed + raise ValueError(f"Mistral API rate limit exceeded. Please try again later: {error_msg}") + + # Other errors - no retry else: - page_header = "\n\n--- New Page ---\n\n" - - combined_text += page_header + page_text - - # Add other page-specific content - for key, value in ocr_contents.items(): - if key != "raw_text" and value: - # Handle special fields that should be combined - if key in ["title", "author", "date", "summary"]: - if key not in combined_contents: - combined_contents[key] = value - # For other fields, add page number suffix + logger.error(f"Unrecoverable API error: {error_msg}") + raise + + # Get the OCR markdown from the first page + image_ocr_markdown = image_response.pages[0].markdown if image_response.pages else "" + + # Check if the OCR response has images + has_images = hasattr(image_response, 'pages') and image_response.pages and hasattr(image_response.pages[0], 'images') and image_response.pages[0].images + + # Check for language information directly from the OCR model + detected_languages = set() + + # Check if the response has a 'languages' attribute in any form + # First check direct attributes on the response object + if hasattr(image_response, 'languages') and image_response.languages: + for lang in image_response.languages: + detected_languages.add(str(lang)) + logger.info(f"Found language in OCR response: {lang}") + + # Then check if it's in the response as a dictionary format + elif hasattr(image_response, '__dict__'): + response_dict = image_response.__dict__ + if 'languages' in response_dict and response_dict['languages']: + for lang in response_dict['languages']: + detected_languages.add(str(lang)) + logger.info(f"Found language in OCR response dict: {lang}") + + # Check for languages in individual pages + if hasattr(image_response, 'pages') and image_response.pages: + for page in image_response.pages: + if hasattr(page, 'languages') and page.languages: + for lang in page.languages: + detected_languages.add(str(lang)) + logger.info(f"Found language in page: {lang}") + + # Optimize: Skip vision model step if ocr_markdown is very small or empty + # BUT make an exception if custom_prompt is provided + # OR if the image has visual content worth preserving + if (not custom_prompt and not has_images) and (not image_ocr_markdown or len(image_ocr_markdown) < 50): + logger.warning("OCR produced minimal text with no images. Returning basic result.") + return { + "file_name": file_path.name, + "topics": ["Document"], + "languages": ["English"], + "ocr_contents": { + "raw_text": image_ocr_markdown if image_ocr_markdown else "No text could be extracted from the image." + }, + "processing_note": "OCR produced minimal text content", + # Include raw response data for images + "raw_response_data": serialize_ocr_response(image_response) + } + + # For images with minimal text but visual content, enhance the prompt + elif has_images and (not image_ocr_markdown or len(image_ocr_markdown) < 100): + logger.info("Document with images but minimal text detected. Using enhanced prompt for mixed media.") + if not custom_prompt: + custom_prompt = "This is a mixed media document with both text and important visual elements. Please carefully describe the image content and extract all visible text, preserving the relationship between text and visuals." + elif "visual" not in custom_prompt.lower() and "image" not in custom_prompt.lower(): + custom_prompt += " The document contains important visual elements that should be described along with the text content." + + # Extract structured data using the appropriate model, with a single API call + if use_vision: + logger.info(f"Using vision model: {VISION_MODEL}") + result = self._extract_structured_data_with_vision(base64_data_url, image_ocr_markdown, file_path.name, custom_prompt) + else: + logger.info(f"Using text-only model: {TEXT_MODEL}") + result = self._extract_structured_data_text_only(image_ocr_markdown, file_path.name, custom_prompt) + + # If we have detected languages directly from the OCR model, use them + if detected_languages: + logger.info(f"Using languages detected by OCR model: {', '.join(detected_languages)}") + result['languages'] = list(detected_languages) + # Add flag to indicate source of language detection + result['language_detection_source'] = 'mistral-ocr-latest' + + # Store the serialized OCR response for image rendering (for compatibility with original version) + # Don't store raw_response directly as it's not JSON serializable + serialized_response = serialize_ocr_response(image_response) + result['raw_response_data'] = serialized_response + + # Store key parts of the OCR response for image rendering + # With serialized format that can be stored in JSON + result['has_images'] = has_images + + if has_images: + # Serialize the entire response to ensure it's JSON serializable + serialized_response = serialize_ocr_response(image_response) + + # Create a structured representation of images that can be serialized + result['pages_data'] = [] + + if hasattr(serialized_response, 'pages'): + serialized_pages = serialized_response.pages + else: + # Handle case where serialization returns a dict instead of an object + serialized_pages = serialized_response.get('pages', []) + + for page_idx, page in enumerate(serialized_pages): + # Handle both object and dict forms + if isinstance(page, dict): + markdown = page.get('markdown', '') + images = page.get('images', []) + else: + markdown = page.markdown if hasattr(page, 'markdown') else '' + images = page.images if hasattr(page, 'images') else [] + + page_data = { + 'page_number': page_idx + 1, + 'markdown': markdown, + 'images': [] + } + + # Extract images if present + for img_idx, img in enumerate(images): + img_id = None + img_base64 = None + + if isinstance(img, dict): + img_id = img.get('id') + img_base64 = img.get('image_base64') else: - page_num = page_data.get("page_number", None) - if page_num: - combined_contents[f"{key}_page_{page_num}"] = value - else: - # Use existing field if we can't add page number - combined_contents[key] = value - - # Add combined languages and topics - combined_result["languages"] = list(all_languages) - combined_result["topics"] = list(all_topics) - - # Set average confidence - if confidence_values: - combined_result["confidence"] = sum(confidence_values) / len(confidence_values) - - # Add combined text content - combined_contents["raw_text"] = combined_text.strip() - combined_result["ocr_contents"] = combined_contents - - return combined_result - - def _extract_text_from_image(self, image, model=OCR_MODEL, timeout_ms=30000): - """Extract text from image using OCR model""" - logger = logging.getLogger("ocr_extractor") - - # Convert image to base64 if it's a PIL Image - if isinstance(image, Image.Image): - buffer = io.BytesIO() - image.save(buffer, format="JPEG") - image_bytes = buffer.getvalue() - base64_image = base64.b64encode(image_bytes).decode("utf-8") - elif isinstance(image, bytes): - base64_image = base64.b64encode(image).decode("utf-8") - elif isinstance(image, str) and is_valid_base64(image): - base64_image = image - else: - logger.error("Invalid image format for OCR") - return "Error: Invalid image format" - - if TEST_MODE: - # Mock response in test mode - logger.info("Test mode: Returning mock OCR result") - return "This is a mock OCR result for testing purposes." + img_id = img.id if hasattr(img, 'id') else None + img_base64 = img.image_base64 if hasattr(img, 'image_base64') else None + + if img_base64: + page_data['images'].append({ + 'id': img_id if img_id else f"img_{page_idx}_{img_idx}", + 'image_base64': img_base64 + }) + + result['pages_data'].append(page_data) + + logger.info("Image processing completed successfully") + return result + + except Exception as e: + logger.error(f"Error processing image: {str(e)}") + # Return basic result on error + return { + "file_name": file_path.name, + "topics": ["Document"], + "languages": ["English"], + "error": str(e), + "ocr_contents": { + "error": f"Failed to process image: {str(e)}", + "partial_text": "Image could not be processed." + } + } + + def _extract_structured_data_with_vision(self, image_base64, ocr_markdown, filename, custom_prompt=None): + """ + Extract structured data using vision model with detailed historical context prompting + Optimized for speed, accuracy, and resilience + """ + logger = logging.getLogger("vision_processor") try: - logger.info(f"Extracting text with model: {model}") - response = self.client.chat( - model=model, - messages=[ - ChatMessage(role="user", content=[ + # Check if this is a newspaper or document with columns by filename + is_likely_newspaper = False + newspaper_keywords = ["newspaper", "gazette", "herald", "times", "journal", + "chronicle", "post", "tribune", "news", "press", "gender"] + + # Check filename for newspaper indicators + filename_lower = filename.lower() + for keyword in newspaper_keywords: + if keyword in filename_lower: + is_likely_newspaper = True + logger.info(f"Likely newspaper document detected in vision processing: {filename}") + break + + # Fast path: Skip vision API if OCR already produced reasonable text + # We'll define "reasonable" as having at least 300 characters + if len(ocr_markdown.strip()) > 300: + logger.info("Sufficient OCR text detected, analyzing language before using OCR text directly") + + # Perform language detection on the OCR text before returning + if LANG_DETECTOR_AVAILABLE and self.language_detector: + detected_languages = self.language_detector.detect_languages( + ocr_markdown, + filename=getattr(self, 'current_filename', None) + ) + else: + # If language detector is not available, use default English + detected_languages = ["English"] + + return { + "file_name": filename, + "topics": ["Document"], + "languages": detected_languages, + "ocr_contents": { + "raw_text": ocr_markdown + } + } + + # Only use vision model for minimal OCR text or when document has columns + if is_likely_newspaper and (not ocr_markdown or len(ocr_markdown.strip()) < 300): + logger.info("Using vision model for newspaper with minimal OCR text") + if not custom_prompt: + custom_prompt = "Document has columns. Extract text by reading each column top to bottom." + + # Fast path: Skip if in test mode or no API key + if self.test_mode or not self.api_key: + logger.info("Test mode or no API key, using text-only processing") + return self._extract_structured_data_text_only(ocr_markdown, filename) + + # Use only the first part of OCR text to keep prompts small and processing fast + if len(ocr_markdown) > 1000: + truncated_ocr = ocr_markdown[:1000] + logger.info(f"Truncated OCR text from {len(ocr_markdown)} to 1000 chars for faster processing") + else: + truncated_ocr = ocr_markdown + + # Build a comprehensive prompt with OCR text and detailed instructions for title detection and language handling + enhanced_prompt = f"This is a document's OCR text:\n\n{truncated_ocr}\n\n\n" + + # Add custom prompt if provided + if custom_prompt: + enhanced_prompt += f"User instructions: {custom_prompt}\n\n" + + # Primary focus on document structure and title detection + enhanced_prompt += "You are analyzing a historical document. Follow these extraction priorities:\n" + enhanced_prompt += "1. FIRST PRIORITY: Identify and extract the TITLE of the document. Look for large text at the top, decorative typography, or centered text that appears to be a title. The title is often one of the first elements in historical documents.\n" + enhanced_prompt += "2. SECOND: Extract all text content accurately from this document, including any text visible in the image that may not have been captured by OCR.\n\n" + enhanced_prompt += "Document Title Guidelines:\n" + enhanced_prompt += "- For printed historical works: Look for primary heading at top of the document, all-caps text, or larger font size text\n" + enhanced_prompt += "- For newspapers/periodicals: Extract both newspaper name and article title if present\n" + enhanced_prompt += "- For handwritten documents: Look for centered text at the top or underlined headings\n" + enhanced_prompt += "- For engravings/illustrations: Include the title or caption, which often appears below the image\n\n" + + # Language detection guidance + enhanced_prompt += "IMPORTANT: After extracting the title and text content, determine the languages present.\n" + enhanced_prompt += "Precisely identify and list ALL languages present in the document separately. Look closely for multiple languages that might appear together.\n" + enhanced_prompt += "For language detection, examine these specific indicators:\n" + enhanced_prompt += "- French: accents (é, è, ê, à, ç, â, î, ô, û), words like 'le', 'la', 'les', 'et', 'en', 'de', 'du', 'des', 'dans', 'ce', 'cette', 'ces', 'par', 'pour', 'qui', 'que', 'où', 'avec'\n" + enhanced_prompt += "- Portuguese: accents (ã, õ, á, é, ê, ó, ç), words like 'e', 'o', 'de', 'da', 'do', 'em', 'para', 'que', 'não', 'com'\n" + enhanced_prompt += "- Spanish: ñ, inverted punctuation (¿, ¡), accents (á, é, í, ó, ú), words like 'el', 'la', 'los', 'las', 'y', 'en', 'por', 'que', 'con'\n" + enhanced_prompt += "- German: umlauts (ä, ö, ü), sharp s (ß), words like 'und', 'der', 'die', 'das', 'in', 'mit'\n" + enhanced_prompt += "- Italian: accents (à, è, é, ì, ò, ù), words like 'il', 'la', 'e', 'di', 'che', 'per', 'con'\n" + enhanced_prompt += "- Chinese: hanzi characters (汉字), lack of spaces between words, markers like 的, 是, 了, 在, 和, 有\n" + enhanced_prompt += "- Latin: words like 'et', 'in', 'ad', 'est', 'sunt', 'non', 'cum', 'sed'\n\n" + enhanced_prompt += "If the document contains multiple columns or sections, process each section independently and then combine them logically.\n" + enhanced_prompt += "Return ALL detected languages as separate entries in the languages array, never combine them.\n" + enhanced_prompt += "CRITICAL: Do NOT default to English unless absolutely certain. If you see French characteristics like 'é', 'è', 'ê', 'ç' or French words, prioritize French in your language detection." + + # Measure API call time for optimization feedback + start_time = time.time() + + try: + # Use a fixed, shorter timeout for single-page documents + timeout_ms = 45000 # 45 seconds is optimal for most single-page documents + + logger.info(f"Calling vision model with {timeout_ms}ms timeout") + chat_response = self.client.chat.parse( + model=VISION_MODEL, + messages=[ { - "type": "image", - "data": base64_image, + "role": "user", + "content": [ + ImageURLChunk(image_url=image_base64), + TextChunk(text=enhanced_prompt) + ], }, - { - "type": "text", - "text": "Extract all text from this image accurately." - } - ]) - ], - temperature=0, - timeout_ms=timeout_ms - ) - - # Extract the OCR text from the response - if response and hasattr(response, 'choices') and response.choices: - return response.choices[0].message.content - else: - logger.warning("Empty or invalid OCR response") - return "" + ], + response_format=StructuredOCRModel, + temperature=0, + timeout_ms=timeout_ms + ) + + api_time = time.time() - start_time + logger.info(f"Vision model completed in {api_time:.2f}s") + + except Exception as e: + # If there's an error with the enhanced prompt, try progressively simpler approaches + logger.warning(f"Enhanced prompt failed after {time.time() - start_time:.2f}s: {str(e)}") + + # Try a very simplified approach with minimal context + try: + # Ultra-short prompt for faster processing + simplified_prompt = ( + f"Extract text from this document image. " + f"\n{truncated_ocr[:500]}\n\n" + f"Return a JSON with file_name, topics, languages, and ocr_contents fields." + ) + + # Only add minimal custom prompt if provided + if custom_prompt and len(custom_prompt) < 100: + simplified_prompt += f"\n{custom_prompt}" + + logger.info(f"Trying simplified prompt approach") + chat_response = self.client.chat.parse( + model=VISION_MODEL, + messages=[ + { + "role": "user", + "content": [ + ImageURLChunk(image_url=image_base64), + TextChunk(text=simplified_prompt) + ], + }, + ], + response_format=StructuredOCRModel, + temperature=0, + timeout_ms=30000 # Very short timeout for simplified approach (30 seconds) + ) + + logger.info(f"Simplified prompt approach succeeded") + + except Exception as second_e: + # If that fails, try with minimal prompt and just image analysis + logger.warning(f"Simplified prompt failed: {str(second_e)}. Trying minimal prompt.") + + try: + # Minimal prompt focusing only on OCR task + minimal_prompt = ( + f"Extract the text from this image. " + f"Return JSON with file_name, topics, languages, and ocr_contents.raw_text fields." + ) + + logger.info(f"Trying minimal prompt with image-only focus") + chat_response = self.client.chat.parse( + model=VISION_MODEL, + messages=[ + { + "role": "user", + "content": [ + ImageURLChunk(image_url=image_base64), + TextChunk(text=minimal_prompt) + ], + }, + ], + response_format=StructuredOCRModel, + temperature=0, + timeout_ms=25000 # Minimal timeout for last attempt (25 seconds) + ) + + logger.info(f"Minimal prompt approach succeeded") + + except Exception as third_e: + # If all vision attempts fail, fall back to text-only model + logger.warning(f"All vision model attempts failed, falling back to text-only model: {str(third_e)}") + return self._extract_structured_data_text_only(ocr_markdown, filename) + + # Convert the response to a dictionary + result = json.loads(chat_response.choices[0].message.parsed.json()) + + # Ensure languages is a list of strings, not Language enum objects + if 'languages' in result: + result['languages'] = [str(lang) for lang in result.get('languages', [])] + + # Add simplified metadata about processing + result['processing_info'] = { + 'method': 'vision_model', + 'ocr_text_length': len(ocr_markdown), + 'api_response_time': time.time() - start_time + } + + # Note if custom prompt was applied + if custom_prompt: + result['custom_prompt_applied'] = 'vision_model' + + # Add confidence score if not present + if 'confidence_score' not in result: + result['confidence_score'] = 0.92 # Vision model typically has higher confidence + + # If OCR text has clear French patterns but language is English or missing, fix it + if ocr_markdown and 'languages' in result: + if LANG_DETECTOR_AVAILABLE and self.language_detector: + result['languages'] = self.language_detector.detect_languages( + ocr_markdown, + filename=getattr(self, 'current_filename', None), + current_languages=result['languages'] + ) except Exception as e: - logger.error(f"OCR extraction error: {str(e)}") - return f"Error: {str(e)}" + # Fall back to text-only model if vision model fails + logger.warning(f"Vision model processing failed, falling back to text-only model: {str(e)}") + result = self._extract_structured_data_text_only(ocr_markdown, filename) + + return result + + # We've removed document type detection entirely for simplicity - def _process_image(self, file_path, use_vision=True, custom_prompt=None, perf_mode="Speed"): - """Process an image file with OCR""" - logger = logging.getLogger("image_processor") - logger.info(f"Processing image: {file_path}") + + # Create a prompt with enhanced language detection instructions + generic_section = ( + f"You are an OCR specialist processing historical documents. " + f"Focus on accurately extracting text content and image chunks while preserving structure and formatting. " + f"Pay attention to any historical features and document characteristics.\n\n" + f"Create a structured JSON response with the following fields:\n" + f"- file_name: The document's name\n" + f"- topics: An array of topics covered in the document\n" + f"- languages: An array of languages used in the document (be precise and specific about language detection)\n" + f"- ocr_contents: A comprehensive dictionary with the document's contents including:\n" + f" * title: The title or heading (if present)\n" + f" * transcript: The full text of the document\n" + f" * text: The main text content (if different from transcript)\n" + f" * content: The body content (if different than transcript)\n" + f" * images: An array of image objects with their base64 data\n" + f" * alt_text: The alt text or description of the images\n" + f" * caption: The caption or title of the images\n" + f" * raw_text: The complete OCR text\n" + ) + + # Add custom prompt if provided + custom_section = "" + if custom_prompt: + custom_section = f"\n\nUser-provided instructions: {custom_prompt}\n" + + # Return the enhanced prompt + return generic_section + custom_section + + def _extract_structured_data_text_only(self, ocr_markdown, filename, custom_prompt=None): + """ + Extract structured data using text-only model with detailed historical context prompting + and improved error handling with enhanced language detection + """ + logger = logging.getLogger("text_processor") start_time = time.time() try: - # Check if we're dealing with a path or already loaded image - if isinstance(file_path, (str, Path)): - # It's a path, load the image - try: - if not Path(file_path).exists(): - # This might be a temporary path for a PDF page - # In this case, we'll get the image from memory rather than a file - if hasattr(file_path, '_image') and file_path._image: - image = file_path._image - else: - logger.error(f"Image file not found: {file_path}") - return { - "error": f"Image file not found: {file_path}", - "file_name": str(file_path).split('/')[-1] if isinstance(file_path, (str, Path)) else "unknown", - "processing_time": time.time() - start_time - } - else: - # Load the image from disk - image = Image.open(file_path) - except Exception as e: - logger.error(f"Error loading image: {str(e)}") - return { - "error": f"Error loading image: {str(e)}", - "file_name": str(file_path).split('/')[-1] if isinstance(file_path, (str, Path)) else "unknown", - "processing_time": time.time() - start_time - } - elif isinstance(file_path, Image.Image): - # It's already a PIL Image - image = file_path - # Use a generic filename if actual path is not available - file_path = getattr(image, '_filename', 'image.jpg') - elif isinstance(file_path, bytes): - # It's image bytes - try: - image = Image.open(io.BytesIO(file_path)) - file_path = getattr(image, '_filename', 'image.jpg') - except Exception as e: - logger.error(f"Error loading image from bytes: {str(e)}") - return { - "error": f"Error loading image from bytes: {str(e)}", - "file_name": "unknown", - "processing_time": time.time() - start_time + # Fast path: Skip for minimal OCR text + if not ocr_markdown or len(ocr_markdown.strip()) < 50: + logger.info("Minimal OCR text - returning basic result") + + # Attempt comprehensive language detection even for minimal text + detected_languages = [] + + # Simple language detection based on character frequency + if ocr_markdown and len(ocr_markdown.strip()) > 10: + # Define indicators for all supported languages + language_indicators = { + "Portuguese": { + "chars": ['ã', 'õ', 'á', 'é', 'ê', 'í', 'ó', 'ú', 'ç'], + "words": ['e', 'o', 'de', 'da', 'do', 'em', 'para', 'que', 'não', 'com'] + }, + "Spanish": { + "chars": ['ñ', 'á', 'é', 'í', 'ó', 'ú', '¿', '¡'], + "words": ['el', 'la', 'los', 'las', 'y', 'en', 'por', 'que', 'con', 'del'] + }, + "French": { + "chars": ['é', 'è', 'ê', 'à', 'ç', 'ù', 'â', 'î', 'ô', 'û'], + "words": ['le', 'la', 'les', 'et', 'en', 'de', 'du', 'des', 'un', 'une', 'ce', 'cette', 'qui', 'que', 'pour', 'dans', 'par', 'sur'] + }, + "German": { + "chars": ['ä', 'ö', 'ü', 'ß'], + "words": ['der', 'die', 'das', 'und', 'ist', 'von', 'mit', 'für', 'sich'] + }, + "Italian": { + "chars": ['à', 'è', 'é', 'ì', 'ò', 'ù'], + "words": ['il', 'la', 'e', 'di', 'che', 'per', 'con', 'sono', 'non'] + }, + "Latin": { + "chars": [], + "words": ['et', 'in', 'ad', 'est', 'sunt', 'non', 'cum', 'sed', 'qui', 'quod'] + } } - else: - logger.error(f"Unsupported image input type: {type(file_path)}") + + words = ocr_markdown.lower().split() + + # Check for indicators of each language + for language, indicators in language_indicators.items(): + chars = indicators["chars"] + lang_words = indicators["words"] + + has_chars = any(char in ocr_markdown for char in chars) if chars else False + word_count = sum(1 for word in words if word in lang_words) + + # Add language if strong enough indicators are present + if has_chars or word_count >= 2: + detected_languages.append(language) + + # Check for English separately + english_words = ['the', 'and', 'of', 'to', 'in', 'a', 'is', 'that', 'for', 'it'] + english_count = sum(1 for word in words if word in english_words) + if english_count >= 2: + detected_languages.append("English") + + # If no languages detected, default to English + if not detected_languages: + detected_languages = ["English"] + return { - "error": f"Unsupported image input type: {type(file_path)}", - "file_name": "unknown", - "processing_time": time.time() - start_time + "file_name": filename, + "topics": ["Document"], + "languages": detected_languages, + "ocr_contents": { + "raw_text": ocr_markdown if ocr_markdown else "No text could be extracted" + }, + "processing_method": "minimal_text" } - # Convert file_path to string if it's a Path object - if isinstance(file_path, Path): - file_path = str(file_path) + # Check for API key to avoid unnecessary processing + if self.test_mode or not self.api_key: + logger.info("Test mode or no API key - returning basic result") + return { + "file_name": filename, + "topics": ["Document"], + "languages": ["English"], + "ocr_contents": { + "raw_text": ocr_markdown[:10000] if ocr_markdown else "No text could be extracted", + "note": "API key not provided - showing raw OCR text only" + }, + "processing_method": "test_mode" + } + + # If OCR text is very large, truncate it to avoid API limits + truncated_text = ocr_markdown + if len(ocr_markdown) > 25000: + # Keep first 15000 chars and last 5000 chars + truncated_text = ocr_markdown[:15000] + "\n...[content truncated]...\n" + ocr_markdown[-5000:] + logger.info(f"OCR text truncated from {len(ocr_markdown)} to {len(truncated_text)} chars") - # Rest of image processing... - # (Code truncated for brevity) + # Build a prompt with enhanced title detection and language detection instructions + enhanced_prompt = f"This is a document's OCR text:\n\n{truncated_text}\n\n\n" - # Return a basic result to complete the function - return { - "file_name": os.path.basename(file_path) if isinstance(file_path, str) else "unknown", - "processing_time": time.time() - start_time, - "ocr_contents": {"raw_text": "Processed image content would appear here"} - } + # Add custom prompt if provided + if custom_prompt: + enhanced_prompt += f"User instructions: {custom_prompt}\n\n" + + # Add title detection focus + enhanced_prompt += "You are analyzing a historical document. Please follow these extraction priorities:\n" + enhanced_prompt += "1. FIRST PRIORITY: Identify and extract the TITLE of the document. Look for prominent text at the top, decorative typography, or centered text that appears to be a title.\n" + enhanced_prompt += " - For historical documents with prominent headings at the top\n" + enhanced_prompt += " - For newspapers or periodicals, extract both the publication name and article title\n" + enhanced_prompt += " - For manuscripts or letters, identify any heading or subject line\n" + enhanced_prompt += "2. SECOND PRIORITY: Extract all text content accurately and return structured data with the document's contents.\n\n" + enhanced_prompt += "IMPORTANT: Precisely identify and list ALL languages present in the document separately. Look closely for multiple languages that might appear together.\n" + enhanced_prompt += "For language detection, examine these specific indicators:\n" + enhanced_prompt += "- French: accents (é, è, ê, à, ç), words like 'le', 'la', 'les', 'et', 'en', 'de', 'du'\n" + enhanced_prompt += "- German: umlauts (ä, ö, ü), sharp s (ß), words like 'und', 'der', 'die', 'das', 'in', 'mit'\n" + enhanced_prompt += "- Spanish: ñ, inverted punctuation (¿, ¡), accents (á, é, í, ó, ú), words like 'el', 'la', 'los', 'las', 'y', 'en'\n" + enhanced_prompt += "- Italian: words like 'il', 'la', 'e', 'di', 'che', 'per', 'con'\n" + enhanced_prompt += "- Chinese: hanzi characters (汉字), lack of spaces between words, markers like 的, 是, 了, 在, 和, 有\n" + enhanced_prompt += "- Latin: words like 'et', 'in', 'ad', 'est', 'sunt', 'non', 'cum', 'sed'\n" + enhanced_prompt += "Do NOT classify text as English unless you can positively confirm it contains specifically English words and phrases.\n\n" + enhanced_prompt += "Return ALL detected languages as separate entries in the languages array. If multiple languages are present, list them ALL separately." + + # Use enhanced prompt with text-only model - with retry logic + max_retries = 2 + retry_delay = 1 + + for retry in range(max_retries): + try: + logger.info(f"Calling text model ({TEXT_MODEL})") + api_start = time.time() + + # Set appropriate timeout based on text length + timeout_ms = min(120000, max(30000, len(truncated_text) * 5)) # 30-120s based on length + + # Make API call with appropriate timeout + chat_response = self.client.chat.parse( + model=TEXT_MODEL, + messages=[ + { + "role": "user", + "content": enhanced_prompt + }, + ], + response_format=StructuredOCRModel, + temperature=0, + timeout_ms=timeout_ms + ) + + api_time = time.time() - api_start + logger.info(f"Text model API call completed in {api_time:.2f}s") + + # Convert the response to a dictionary + result = json.loads(chat_response.choices[0].message.parsed.json()) + + # Ensure languages is a list of strings, not Language enum objects + if 'languages' in result: + result['languages'] = [str(lang) for lang in result.get('languages', [])] + + # Add simplified processing metadata + result['processing_method'] = 'text_model' + result['model_used'] = TEXT_MODEL + result['processing_time'] = time.time() - start_time + + # Flag when custom prompt has been successfully applied + if custom_prompt: + result['custom_prompt_applied'] = 'text_model' + + # Add raw text for reference if not already present + if 'ocr_contents' in result and 'raw_text' not in result['ocr_contents']: + # Add truncated raw text if very large + if len(ocr_markdown) > 50000: + result['ocr_contents']['raw_text'] = ocr_markdown[:50000] + "\n...[content truncated]..." + else: + result['ocr_contents']['raw_text'] = ocr_markdown + + return result + + except Exception as api_error: + error_msg = str(api_error).lower() + logger.warning(f"API error on attempt {retry+1}/{max_retries}: {str(api_error)}") + + # Check if retry would help + if retry < max_retries - 1: + # Rate limit errors - special handling with longer wait + if any(term in error_msg for term in ["rate limit", "429", "too many requests", "requests rate limit exceeded"]): + # Check specifically for token exhaustion vs temporary rate limit + if any(term in error_msg for term in ["quota", "credit", "subscription"]): + logger.error("API quota or credit limit reached. No retry will help.") + raise ValueError(f"Mistral API quota or credit limit reached. Please check your subscription: {error_msg}") + # Longer backoff for rate limit errors + wait_time = retry_delay * (2 ** retry) * 6.0 # 6x longer wait for rate limits + logger.info(f"Rate limit exceeded. Waiting {wait_time:.1f}s before retry...") + time.sleep(wait_time) + # Other transient errors + elif any(term in error_msg for term in ["timeout", "connection", "500", "503", "504"]): + # Wait before retrying + wait_time = retry_delay * (2 ** retry) + logger.info(f"Transient error, retrying in {wait_time}s") + time.sleep(wait_time) + else: + # Non-retryable error + raise + else: + # Last retry failed + raise + + # This shouldn't be reached due to raise in the loop, but just in case + raise Exception("All retries failed for text model") except Exception as e: - logger.error(f"Error processing image: {str(e)}") - logger.error(traceback.format_exc()) - return { - "error": f"Error processing image: {str(e)}", - "file_name": str(file_path).split('/')[-1] if isinstance(file_path, (str, Path)) else "unknown", - "processing_time": time.time() - start_time - } + logger.error(f"Text model failed: {str(e)}. Creating basic result.") + + # Create a basic result with available OCR text + try: + # Create a more informative fallback result + result = { + "file_name": filename, + "topics": ["Document"], + "languages": ["English"], + "ocr_contents": { + "raw_text": ocr_markdown[:50000] if ocr_markdown else "No text could be extracted", + "error": "AI processing failed: " + str(e).replace('"', '\\"') + }, + "processing_method": "fallback", + "processing_error": str(e), + "processing_time": time.time() - start_time + } + + # No topic detection to avoid issue with document misclassification + + except Exception as inner_e: + logger.error(f"Error creating basic result: {str(inner_e)}") + result = { + "file_name": str(filename) if filename else "unknown", + "topics": ["Document"], + "languages": ["English"], + "ocr_contents": { + "error": "Processing failed completely", + "partial_text": ocr_markdown[:1000] if ocr_markdown else "Document could not be processed." + } + } + + return result + +# For testing directly +if __name__ == "__main__": + import sys + + if len(sys.argv) < 2: + print("Usage: python structured_ocr.py ") + sys.exit(1) + + file_path = sys.argv[1] + processor = StructuredOCR() + result = processor.process_file(file_path) + + print(json.dumps(result, indent=2))