Spaces:
Running
Running
# Standard library imports | |
import os | |
import sys | |
import time | |
import random | |
import json | |
import base64 | |
import logging | |
from enum import Enum | |
from pathlib import Path | |
from functools import lru_cache | |
from typing import Optional, Dict, Any, List, Union, Tuple | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
# Third-party imports | |
from pydantic import BaseModel | |
# Try to import pycountry, provide fallback if not available | |
try: | |
import pycountry | |
PYCOUNTRY_AVAILABLE = True | |
except ImportError: | |
PYCOUNTRY_AVAILABLE = False | |
logger.warning("pycountry module not available - using language code fallback") | |
# Try to import Mistral AI, provide fallback if not available | |
try: | |
from mistralai import Mistral | |
from mistralai import DocumentURLChunk, ImageURLChunk, TextChunk | |
from mistralai.models import OCRImageObject | |
MISTRAL_AVAILABLE = True | |
except ImportError: | |
MISTRAL_AVAILABLE = False | |
logger.warning("mistralai module not available - OCR functionality will be limited") | |
# Import our language detection module | |
try: | |
from utils.helpers.language_detection import LanguageDetector | |
LANG_DETECTOR_AVAILABLE = True | |
except ImportError: | |
LANG_DETECTOR_AVAILABLE = False | |
logger.warning("language_detection module not available - using fallback language detection") | |
# Import utilities for OCR processing | |
try: | |
from utils.image_utils import replace_images_in_markdown, get_combined_markdown | |
except ImportError: | |
# Define minimal fallback functions if module not found | |
logger.warning("Could not import utils.image_utils - using minimal fallback functions") | |
def replace_images_in_markdown(markdown_str, images_dict): | |
"""Minimal fallback implementation of replace_images_in_markdown""" | |
import re | |
for img_id, base64_str in images_dict.items(): | |
# Match alt text OR link part, ignore extension | |
base_id = img_id.split('.')[0] | |
pattern = re.compile(rf"!\[[^\]]*{base_id}[^\]]*\]\([^\)]+\)") | |
markdown_str = pattern.sub(f"", markdown_str) | |
return markdown_str | |
def get_combined_markdown(ocr_response): | |
"""Minimal fallback implementation of get_combined_markdown""" | |
markdowns = [] | |
for page in ocr_response.pages: | |
image_data = {} | |
if hasattr(page, "images"): | |
for img in page.images: | |
if hasattr(img, "id") and hasattr(img, "image_base64"): | |
image_data[img.id] = img.image_base64 | |
page_markdown = page.markdown if hasattr(page, "markdown") else "" | |
processed_markdown = replace_images_in_markdown(page_markdown, image_data) | |
markdowns.append(processed_markdown) | |
return "\n\n".join(markdowns) | |
# Import config directly (now local to historical-ocr) | |
try: | |
from config import MISTRAL_API_KEY, OCR_MODEL, TEXT_MODEL, VISION_MODEL, TEST_MODE, IMAGE_PREPROCESSING | |
except ImportError: | |
# Fallback defaults if config is not available | |
import os | |
MISTRAL_API_KEY = os.environ.get("MISTRAL_API_KEY", "") | |
OCR_MODEL = "mistral-ocr-latest" | |
TEXT_MODEL = "mistral-large-latest" | |
VISION_MODEL = "mistral-large-latest" | |
TEST_MODE = True | |
# Default image preprocessing settings if config not available | |
IMAGE_PREPROCESSING = { | |
"max_size_mb": 8.0, | |
# Add basic defaults for preprocessing | |
"enhance_contrast": 1.2, | |
"denoise": True, | |
"compression_quality": 95 | |
} | |
logging.warning("Config module not found. Using environment variables and defaults.") | |
# Helper function to make OCR objects JSON serializable | |
# Removed caching to fix unhashable type error | |
def serialize_ocr_response(obj): | |
""" | |
Convert OCR response objects to JSON serializable format | |
Optimized for speed and memory usage | |
""" | |
# Fast path: Handle primitive types directly | |
if obj is None or isinstance(obj, (str, int, float, bool)): | |
return obj | |
# Handle collections with optimized recursion | |
if isinstance(obj, list): | |
return [serialize_ocr_response(item) for item in obj] | |
elif isinstance(obj, dict): | |
return {k: serialize_ocr_response(v) for k, v in obj.items()} | |
elif hasattr(obj, '__dict__'): | |
# For OCR objects with __dict__ attribute | |
result = {} | |
for key, value in obj.__dict__.items(): | |
if key.startswith('_'): | |
continue # Skip private attributes | |
# Fast path for OCRImageObject - most common complex object | |
if isinstance(value, OCRImageObject): | |
# Get image base64 data for validation | |
image_base64 = value.image_base64 if hasattr(value, 'image_base64') else None | |
# COMPLETELY REWRITTEN validation logic using proven test approach | |
# Default to FALSE (treating as text) unless proven to be an image | |
is_valid_image = False | |
# Quick exit conditions | |
if not image_base64 or not isinstance(image_base64, str): | |
# No data or not a string - not a valid image | |
is_valid_image = False | |
logging.warning("Invalid image data (not a string)") | |
# Case 1: Definite image with proper data URL prefix | |
elif image_base64.startswith('data:image/'): | |
is_valid_image = True | |
logging.debug("Valid image with data:image/ prefix") | |
# Case 2: Markdown image reference, not an actual image | |
elif image_base64.startswith(''): | |
is_valid_image = False | |
logging.warning("Markdown image reference detected") | |
# Extract the image ID for logging | |
try: | |
img_id = image_base64.split('![')[1].split('](')[0] | |
logging.debug(f"Markdown reference for image: {img_id}") | |
except: | |
img_id = "unknown" | |
# Case 3: Needs detailed text content detection | |
else: | |
# Use the same proven approach as in our tests | |
# Take a sample for efficiency | |
sample = image_base64[:min(len(image_base64), 1000)] | |
sample_lower = sample.lower() | |
# Check for obvious text features using multiple indicators | |
has_spaces = ' ' in sample | |
has_newlines = '\n' in sample | |
has_punctuation = any(p in sample for p in ',.;:!?"\'()[]{}') | |
# Check for sentence-like structures | |
has_sentences = False | |
for i in range(len(sample) - 5): | |
if sample[i] in '.!?\n' and i+2 < len(sample) and sample[i+1] == ' ' and sample[i+2].isupper(): | |
has_sentences = True | |
break | |
# Check for common words with word boundary protection | |
common_words = ['the', 'and', 'of', 'to', 'a', 'in', 'is', 'that', 'this', 'for'] | |
has_common_words = any(f" {word} " in f" {sample_lower} " for word in common_words) | |
# Count the text indicators | |
text_indicators = [has_spaces, has_newlines, has_punctuation, has_sentences, has_common_words] | |
text_indicator_count = sum(1 for indicator in text_indicators if indicator) | |
# Log detailed findings for debugging | |
logging.debug(f"Text detection - spaces: {has_spaces}, newlines: {has_newlines}, " + | |
f"punctuation: {has_punctuation}, sentences: {has_sentences}, " + | |
f"common words: {has_common_words}") | |
logging.debug(f"Text indicators found: {text_indicator_count}/5") | |
# CRITICAL FIX: If we detect 2 or more text indicators, this is TEXT not an image! | |
if text_indicator_count >= 2: | |
is_valid_image = False | |
logging.warning(f"Content identified as TEXT with {text_indicator_count}/5 indicators") | |
# Only if we have no clear text indicators AND valid base64 chars, treat as image | |
elif all(c in 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=' | |
for c in image_base64[:100]): | |
is_valid_image = True | |
logging.debug("Valid base64 data with no text indicators") | |
else: | |
# Default to TEXT for anything else - safer approach | |
is_valid_image = False | |
logging.warning("No clear image patterns detected - treating as text by default") | |
# Final validation result with definitive message | |
logging.warning(f"FINAL CLASSIFICATION: OCRImageObject content type = {'IMAGE' if is_valid_image else 'TEXT'}") | |
# Process based on final validation result | |
if is_valid_image: | |
# Process as image if validation passes | |
result[key] = { | |
'id': value.id if hasattr(value, 'id') else None, | |
'image_base64': image_base64 | |
} | |
else: | |
# Process as text if validation fails, but properly handle markdown references | |
if image_base64 and isinstance(image_base64, str): | |
# Special handling for markdown image references | |
if image_base64.startswith(''): | |
# Extract the image description (alt text) if available | |
try: | |
# Parse the alt text from  | |
alt_text = image_base64.split('![')[1].split('](')[0] | |
# Use the alt text or a placeholder if it's just the image name | |
if alt_text and not alt_text.endswith('.jpeg') and not alt_text.endswith('.jpg'): | |
result[key] = f"[Image: {alt_text}]" | |
else: | |
# Just note that there's an image without the reference | |
result[key] = "[Image]" | |
logging.info(f"Converted markdown reference to text placeholder: {result[key]}") | |
except: | |
# Fallback for parsing errors | |
result[key] = "[Image]" | |
else: | |
# Regular text content | |
result[key] = image_base64 | |
else: | |
result[key] = str(value) | |
# Handle collections | |
elif isinstance(value, list): | |
result[key] = [serialize_ocr_response(item) for item in value] | |
# Handle nested objects | |
elif hasattr(value, '__dict__'): | |
result[key] = serialize_ocr_response(value) | |
# Handle primitives and other types | |
else: | |
result[key] = value | |
return result | |
else: | |
return obj | |
# Create language enum for structured output - cache language lookup to avoid repeated processing | |
def get_language_dict(): | |
if PYCOUNTRY_AVAILABLE: | |
return {lang.alpha_2: lang.name for lang in pycountry.languages if hasattr(lang, 'alpha_2')} | |
else: | |
# Fallback with basic languages when pycountry is not available | |
return { | |
"en": "English", | |
"es": "Spanish", | |
"fr": "French", | |
"de": "German", | |
"it": "Italian", | |
"pt": "Portuguese", | |
"ru": "Russian", | |
"zh": "Chinese", | |
"ja": "Japanese", | |
"ar": "Arabic", | |
"hi": "Hindi", | |
"la": "Latin" | |
} | |
class LanguageMeta(Enum.__class__): | |
def __new__(metacls, cls, bases, classdict): | |
languages = get_language_dict() | |
for code, name in languages.items(): | |
classdict[name.upper().replace(' ', '_')] = name | |
return super().__new__(metacls, cls, bases, classdict) | |
class Language(Enum, metaclass=LanguageMeta): | |
pass | |
class StructuredOCRModel(BaseModel): | |
file_name: str | |
topics: list[str] | |
languages: list[Language] | |
ocr_contents: dict | |
class StructuredOCR: | |
def __init__(self, api_key=None): | |
"""Initialize the OCR processor with API key""" | |
# Set up logger for this class instance | |
self.logger = logging.getLogger(__name__) | |
# Check if we're running in test mode or if Mistral is not available | |
self.test_mode = TEST_MODE or not MISTRAL_AVAILABLE | |
# Initialize current filename for language detection | |
self.current_filename = None | |
if not MISTRAL_AVAILABLE: | |
self.logger.warning("Mistral AI package not available - running in test mode") | |
self.api_key = "placeholder_key" | |
self.client = None | |
return | |
# Initialize API key - use provided key, or environment var | |
if self.test_mode and not api_key: | |
self.api_key = "placeholder_key" | |
else: | |
self.api_key = api_key or MISTRAL_API_KEY | |
# Ensure we have a valid API key when not in test mode | |
if not self.api_key and not self.test_mode: | |
raise ValueError("No Mistral API key provided. Please set the MISTRAL_API_KEY environment variable or enable TEST_MODE.") | |
# Clean the API key by removing any whitespace | |
self.api_key = self.api_key.strip() | |
# Check if API key exists but don't enforce length requirements | |
if not self.test_mode and not self.api_key: | |
self.logger.warning("Warning: No API key provided") | |
# Initialize client with the API key | |
try: | |
self.client = Mistral(api_key=self.api_key) | |
# Skip validation to avoid unnecessary API calls | |
except Exception as e: | |
error_msg = str(e).lower() | |
if "unauthorized" in error_msg or "401" in error_msg: | |
raise ValueError(f"API key authentication failed. Please check your Mistral API key: {str(e)}") | |
else: | |
self.logger.warning(f"Failed to initialize Mistral client: {str(e)}") | |
self.test_mode = True | |
self.client = None | |
# Initialize language detector | |
if LANG_DETECTOR_AVAILABLE: | |
self.logger.info("Using statistical language detection module") | |
self.language_detector = LanguageDetector() | |
else: | |
self.logger.warning("External language detection not available - using internal fallback") | |
self.language_detector = None | |
def process_file(self, file_path, file_type=None, use_vision=True, max_pages=None, file_size_mb=None, custom_pages=None, custom_prompt=None): | |
"""Process a file and return structured OCR results | |
Args: | |
file_path: Path to the file to process | |
file_type: 'pdf' or 'image' (will be auto-detected if None) | |
use_vision: Whether to use vision model for improved analysis | |
max_pages: Optional limit on number of pages to process | |
file_size_mb: Optional file size in MB (used for automatic page limiting) | |
custom_pages: Optional list of specific page numbers to process | |
custom_prompt: Optional instructions for the AI to handle unusual document formatting or specific extraction needs | |
Returns: | |
Dictionary with structured OCR results | |
""" | |
# Convert file_path to Path object if it's a string | |
file_path = Path(file_path) | |
# Store current filename for language detection | |
self.current_filename = file_path.name | |
# Auto-detect file type if not provided | |
if file_type is None: | |
suffix = file_path.suffix.lower() | |
file_type = "pdf" if suffix == ".pdf" else "image" | |
# Check for handwritten document by filename | |
filename_lower = file_path.name.lower() | |
if "handwritten" in filename_lower or "manuscript" in filename_lower or "letter" in filename_lower: | |
logger.info(f"Detected likely handwritten document from filename: {file_path.name}") | |
# This will be used during processing to apply handwritten-specific handling | |
# Get file size if not provided | |
if file_size_mb is None and file_path.exists(): | |
file_size_mb = file_path.stat().st_size / (1024 * 1024) # Convert bytes to MB | |
# Check if file exceeds API limits (50 MB) | |
if file_size_mb and file_size_mb > 50: | |
logging.warning(f"File size {file_size_mb:.2f} MB exceeds Mistral API limit of 50 MB") | |
return { | |
"file_name": file_path.name, | |
"topics": ["Document"], | |
"languages": ["English"], | |
"confidence_score": 0.0, | |
"error": f"File size {file_size_mb:.2f} MB exceeds API limit of 50 MB", | |
"ocr_contents": { | |
"error": f"Failed to process file: File size {file_size_mb:.2f} MB exceeds Mistral API limit of 50 MB", | |
"partial_text": "Document could not be processed due to size limitations." | |
} | |
} | |
# For PDF files, limit pages based on file size if no explicit limit is given | |
if file_type == "pdf" and file_size_mb and max_pages is None and custom_pages is None: | |
if file_size_mb > 100: # Very large files | |
max_pages = 3 | |
elif file_size_mb > 50: # Large files | |
max_pages = 5 | |
elif file_size_mb > 20: # Medium files | |
max_pages = 10 | |
else: # Small files | |
max_pages = None # Process all pages | |
# Start processing timer | |
start_time = time.time() | |
# Read and process the file | |
if file_type == "pdf": | |
result = self._process_pdf(file_path, use_vision, max_pages, custom_pages, custom_prompt) | |
else: | |
result = self._process_image(file_path, use_vision, custom_prompt) | |
# Add processing time information | |
processing_time = time.time() - start_time | |
result['processing_time'] = processing_time | |
# Add a default confidence score if not present | |
if 'confidence_score' not in result: | |
result['confidence_score'] = 0.85 # Default confidence | |
# Ensure the entire result is fully JSON serializable by running it through our serializer | |
try: | |
# First convert to a standard dict if it's not already | |
if not isinstance(result, dict): | |
result = serialize_ocr_response(result) | |
# Make a final pass to check for any remaining non-serializable objects | |
# Proactively check for OCRImageObject instances to avoid serialization warnings | |
def has_ocr_image_objects(obj): | |
"""Check if object contains any OCRImageObject instances recursively""" | |
if isinstance(obj, dict): | |
return any(has_ocr_image_objects(v) for v in obj.values()) | |
elif isinstance(obj, list): | |
return any(has_ocr_image_objects(item) for item in obj) | |
else: | |
return 'OCRImageObject' in str(type(obj)) | |
# Apply serialization preemptively if OCRImageObjects are detected | |
if has_ocr_image_objects(result): | |
# Quietly apply full serialization before any errors occur | |
result = serialize_ocr_response(result) | |
else: | |
# Test JSON serialization to catch any other issues | |
json.dumps(result) | |
except TypeError as e: | |
# If there's still a serialization error, run the whole result through our serializer | |
logger = logging.getLogger("serializer") | |
logger.warning(f"JSON serialization error in result: {str(e)}. Applying full serialization.") | |
# Use a more robust approach to ensure complete serialization | |
try: | |
# First attempt with our custom serializer | |
result = serialize_ocr_response(result) | |
# Test if it's fully serializable now | |
json.dumps(result) | |
except Exception as inner_e: | |
# If still not serializable, convert to a simpler format | |
logger.warning(f"Secondary serialization error: {str(inner_e)}. Converting to basic format.") | |
# Create a simplified result with just the essential information | |
simplified_result = { | |
"file_name": result.get("file_name", "unknown"), | |
"topics": result.get("topics", ["Document"]), | |
"languages": [str(lang) for lang in result.get("languages", ["English"]) if lang is not None], | |
"ocr_contents": { | |
"raw_text": result.get("ocr_contents", {}).get("raw_text", "Text extraction failed due to serialization error") | |
}, | |
"serialization_error": f"Original result could not be fully serialized: {str(e)}" | |
} | |
result = simplified_result | |
return result | |
def _process_pdf(self, file_path, use_vision=True, max_pages=None, custom_pages=None, custom_prompt=None): | |
""" | |
Process a PDF file with OCR - optimized version with smart page handling and memory management | |
Args: | |
file_path: Path to the PDF file | |
use_vision: Whether to use vision model for enhanced analysis | |
max_pages: Optional limit on the number of pages to process | |
custom_pages: Optional list of specific page numbers to process | |
custom_prompt: Optional custom prompt for specialized extraction | |
""" | |
logger = logging.getLogger("pdf_processor") | |
logger.info(f"Processing PDF: {file_path}") | |
# Track processing time | |
start_time = time.time() | |
# Fast path: Return placeholder if in test mode | |
if self.test_mode: | |
logger.info("Test mode active, returning placeholder response") | |
# Enhanced test mode placeholder that's more realistic | |
return { | |
"file_name": file_path.name, | |
"topics": ["Historical Document", "Literature", "American History"], | |
"languages": ["English"], | |
"ocr_contents": { | |
"title": "Harper's New Monthly Magazine", | |
"publication_date": "1855", | |
"publisher": "Harper & Brothers, New York", | |
"raw_text": "This is a test mode placeholder for Harper's New Monthly Magazine from 1855. The actual document contains articles on literature, politics, science, and culture from mid-19th century America.", | |
"content": "The magazine includes various literary pieces, poetry, political commentary, and illustrations typical of 19th century periodicals. Known for publishing works by prominent American authors including Herman Melville and Charles Dickens.", | |
"key_figures": ["Herman Melville", "Charles Dickens", "Henry Wadsworth Longfellow"], | |
"noted_articles": ["Continued serialization of popular novels", "Commentary on contemporary political events", "Scientific discoveries and technological advancements"] | |
}, | |
"pdf_processing_method": "enhanced_test_mode", | |
"total_pages": 12, | |
"processed_pages": 3, | |
"processing_time": 0.5, | |
"confidence_score": 0.9 | |
} | |
try: | |
# PDF processing strategy decision based on file size | |
file_size_mb = file_path.stat().st_size / (1024 * 1024) | |
logger.info(f"PDF size: {file_size_mb:.2f} MB") | |
# Always use pdf2image for better control and consistency across all PDF files | |
use_pdf2image = True | |
# First try local PDF processing for better performance and control | |
if use_pdf2image: | |
try: | |
import tempfile | |
from pdf2image import convert_from_path | |
logger.info("Processing PDF using pdf2image for better multi-page handling") | |
# Convert PDF to images with optimized parameters | |
conversion_start = time.time() | |
# Use consistent DPI for all files to ensure reliable results | |
dpi = 200 # Higher quality DPI for all files to ensure better text recognition | |
# Only convert first page initially to check document type | |
pdf_first_page = convert_from_path(file_path, dpi=dpi, first_page=1, last_page=1) | |
logger.info(f"First page converted in {time.time() - conversion_start:.2f}s") | |
# Quick check if PDF has readable content | |
if not pdf_first_page: | |
logger.warning("PDF conversion produced no images, falling back to API") | |
raise Exception("PDF conversion failed to produce images") | |
# Determine total pages in the document | |
# First, try simple estimate from first page conversion | |
total_pages = 1 | |
# Try pdf2image info extraction | |
try: | |
# Try with pdf2image page counting - use simpler parameters | |
logger.info("Determining PDF page count...") | |
count_start = time.time() | |
# Use a lightweight approach with multi-threading for faster processing | |
pdf_info = convert_from_path( | |
file_path, | |
dpi=72, # Low DPI just for info | |
first_page=1, | |
last_page=1, | |
size=(100, 100), # Tiny image to save memory | |
fmt="jpeg", | |
thread_count=4, # Increased thread count for faster processing | |
output_file=None | |
) | |
# Extract page count | |
if hasattr(pdf_info, 'n_pages'): | |
total_pages = pdf_info.n_pages | |
elif isinstance(pdf_info, dict) and "Pages" in pdf_info: | |
total_pages = int(pdf_info.get("Pages", "1")) | |
elif len(pdf_first_page) > 0: | |
# Just estimate based on first page - at least we have one | |
total_pages = 1 | |
logger.info(f"Page count determined in {time.time() - count_start:.2f}s") | |
except Exception as count_error: | |
logger.warning(f"Error determining page count: {str(count_error)}. Using default of 1") | |
total_pages = 1 | |
logger.info(f"PDF has {total_pages} total pages") | |
# Determine which pages to process | |
pages_to_process = [] | |
# Handle custom page selection if provided | |
if custom_pages and any(0 < p <= total_pages for p in custom_pages): | |
# Filter valid page numbers | |
pages_to_process = [p for p in custom_pages if 0 < p <= total_pages] | |
logger.info(f"Processing {len(pages_to_process)} custom-selected pages: {pages_to_process}") | |
# Otherwise use max_pages limit if provided | |
elif max_pages and max_pages < total_pages: | |
pages_to_process = list(range(1, max_pages + 1)) | |
logger.info(f"Processing first {max_pages} pages of {total_pages} total") | |
# Or process all pages if reasonable count | |
elif total_pages <= 10: | |
pages_to_process = list(range(1, total_pages + 1)) | |
logger.info(f"Processing all {total_pages} pages") | |
# For large documents without limits, process subset of pages | |
else: | |
# Smart sampling: first page, last page, and some pages in between | |
pages_to_process = [1] # Always include first page | |
if total_pages > 1: | |
if total_pages <= 5: | |
# For few pages, process all | |
pages_to_process = list(range(1, total_pages + 1)) | |
else: | |
# For many pages, sample intelligently | |
# Add pages from the middle of the document | |
middle = total_pages // 2 | |
# Add last page if more than 3 pages | |
if total_pages > 3: | |
pages_to_process.append(total_pages) | |
# Add up to 3 pages from middle if document is large | |
if total_pages > 5: | |
pages_to_process.append(middle) | |
if total_pages > 10: | |
pages_to_process.append(middle // 2) | |
pages_to_process.append(middle + (middle // 2)) | |
# Sort pages for sequential processing | |
pages_to_process = sorted(list(set(pages_to_process))) | |
logger.info(f"Processing {len(pages_to_process)} sampled pages out of {total_pages} total: {pages_to_process}") | |
# Convert only the selected pages to minimize memory usage | |
selected_images = [] | |
combined_text = [] | |
detected_languages = set() # Track detected languages across all pages | |
# Process pages in larger batches for better efficiency | |
batch_size = 5 # Process 5 pages at a time for better throughput | |
for i in range(0, len(pages_to_process), batch_size): | |
batch_pages = pages_to_process[i:i+batch_size] | |
logger.info(f"Converting batch of pages {batch_pages}") | |
# Convert batch of pages with multi-threading for better performance | |
batch_start = time.time() | |
batch_images = convert_from_path( | |
file_path, | |
dpi=dpi, | |
first_page=min(batch_pages), | |
last_page=max(batch_pages), | |
thread_count=4, # Use multi-threading for faster PDF processing | |
fmt="jpeg" # Use JPEG format for better compatibility | |
) | |
logger.info(f"Batch conversion completed in {time.time() - batch_start:.2f}s") | |
# Map converted images to requested page numbers | |
for idx, page_num in enumerate(range(min(batch_pages), max(batch_pages) + 1)): | |
if page_num in pages_to_process and idx < len(batch_images): | |
if page_num == pages_to_process[0]: # First page to process | |
selected_images.append(batch_images[idx]) | |
# Process each page individually | |
with tempfile.NamedTemporaryFile(suffix='.jpeg', delete=False) as tmp: | |
batch_images[idx].save(tmp.name, format='JPEG') | |
# Simple OCR to extract text | |
try: | |
page_result = self._process_image(Path(tmp.name), False, None) | |
if 'ocr_contents' in page_result and 'raw_text' in page_result['ocr_contents']: | |
# Add page text to combined text without obvious page markers | |
page_text = page_result['ocr_contents']['raw_text'] | |
combined_text.append(f"{page_text}") | |
# Collect detected languages from each page | |
if 'languages' in page_result: | |
for lang in page_result['languages']: | |
detected_languages.add(lang) | |
except Exception as page_e: | |
logger.warning(f"Error processing page {page_num}: {str(page_e)}") | |
# Clean up temp file | |
import os | |
os.unlink(tmp.name) | |
# If we have processed pages | |
if selected_images and combined_text: | |
# Save first image to temp file for vision model | |
with tempfile.NamedTemporaryFile(suffix='.jpeg', delete=False) as tmp: | |
selected_images[0].save(tmp.name, format='JPEG', quality=95) | |
first_image_path = tmp.name | |
# Combine all extracted text | |
all_text = "\n\n".join(combined_text) | |
# For custom prompts, use specialized processing | |
if custom_prompt: | |
try: | |
# Process image with vision model | |
result = self._process_image(Path(first_image_path), use_vision, None) | |
# Enhance with text analysis using combined text from all pages | |
enhanced_result = self._extract_structured_data_text_only(all_text, file_path.name, custom_prompt) | |
# Merge results, keeping images from original result | |
for key, value in enhanced_result.items(): | |
if key not in ('raw_response_data', 'pages_data', 'has_images'): | |
result[key] = value | |
# Update raw text with full document text | |
if 'ocr_contents' in result: | |
result['ocr_contents']['raw_text'] = all_text | |
# Add flag to indicate custom prompt was applied | |
result['custom_prompt_applied'] = 'text_only' | |
# Simplified approach - no document type detection | |
except Exception as e: | |
logger.warning(f"Custom prompt processing failed: {str(e)}. Using standard processing.") | |
# Fall back to standard processing | |
result = self._process_image(Path(first_image_path), use_vision, None) | |
if 'ocr_contents' in result: | |
result['ocr_contents']['raw_text'] = all_text | |
else: | |
# Standard processing with combined text | |
result = self._process_image(Path(first_image_path), use_vision, None) | |
if 'ocr_contents' in result: | |
result['ocr_contents']['raw_text'] = all_text | |
# Merge detected languages if available | |
if detected_languages: | |
result['languages'] = list(detected_languages) | |
# Add PDF metadata | |
result['file_name'] = file_path.name | |
result['pdf_processing_method'] = 'pdf2image_optimized' | |
result['total_pages'] = total_pages | |
result['processed_pages'] = len(pages_to_process) | |
result['pages_processed'] = pages_to_process | |
# Add processing info | |
result['processing_info'] = { | |
'method': 'local_pdf_processing', | |
'dpi': dpi, | |
'pages_sampled': pages_to_process, | |
'processing_time': time.time() - start_time | |
} | |
# Clean up | |
os.unlink(first_image_path) | |
return result | |
else: | |
logger.warning("No pages successfully processed with pdf2image, falling back to API") | |
raise Exception("Failed to process PDF pages locally") | |
except Exception as pdf2image_error: | |
logger.warning(f"Local PDF processing failed, falling back to API: {str(pdf2image_error)}") | |
# Fall back to API processing | |
# API-based PDF processing | |
logger.info("Processing PDF via Mistral API") | |
# Optimize file upload for faster processing | |
logger.info("Uploading PDF file to Mistral API") | |
upload_start = time.time() | |
# Set appropriate timeout based on file size | |
upload_timeout = max(60, min(300, int(file_size_mb * 5))) # 60s to 300s based on size | |
try: | |
# Upload the file (Mistral client doesn't support timeout parameter for upload) | |
uploaded_file = self.client.files.upload( | |
file={ | |
"file_name": file_path.stem, | |
"content": file_path.read_bytes(), | |
}, | |
purpose="ocr" | |
) | |
logger.info(f"PDF uploaded in {time.time() - upload_start:.2f}s") | |
# Get a signed URL for the uploaded file | |
signed_url = self.client.files.get_signed_url(file_id=uploaded_file.id, expiry=1) | |
# Process the PDF with OCR - use adaptive timeout based on file size | |
logger.info(f"Processing PDF with OCR using {OCR_MODEL}") | |
# Adaptive retry strategy based on file size | |
max_retries = 3 if file_size_mb < 20 else 2 # Fewer retries for large files | |
base_retry_delay = 1 if file_size_mb < 10 else 2 # Longer delays for large files | |
# Adaptive timeout based on file size | |
ocr_timeout_ms = min(180000, max(60000, int(file_size_mb * 3000))) # 60s to 180s | |
# Try processing with retries | |
for retry in range(max_retries): | |
try: | |
ocr_start = time.time() | |
pdf_response = self.client.ocr.process( | |
document=DocumentURLChunk(document_url=signed_url.url), | |
model=OCR_MODEL, | |
include_image_base64=True, | |
timeout_ms=ocr_timeout_ms | |
) | |
logger.info(f"PDF OCR processing completed in {time.time() - ocr_start:.2f}s") | |
break # Success, exit retry loop | |
except Exception as e: | |
error_msg = str(e) | |
logger.warning(f"API error on attempt {retry+1}/{max_retries}: {error_msg}") | |
# Handle errors with optimized retry logic | |
error_lower = error_msg.lower() | |
# Authentication errors - no point in retrying | |
if any(term in error_lower for term in ["unauthorized", "401", "403", "authentication"]): | |
logger.error("API authentication failed. Check your API key.") | |
raise ValueError(f"Authentication failed. Please verify your Mistral API key: {error_msg}") | |
# Connection or server errors - worth retrying | |
elif any(term in error_lower for term in ["connection", "timeout", "520", "server error", "502", "503", "504"]): | |
if retry < max_retries - 1: | |
# Exponential backoff with jitter for better retry behavior | |
wait_time = base_retry_delay * (2 ** retry) * (0.8 + 0.4 * random.random()) | |
logger.info(f"Connection issue detected. Waiting {wait_time:.1f}s before retry...") | |
time.sleep(wait_time) | |
else: | |
# Last retry failed | |
logger.error("Maximum retries reached, API connection error persists.") | |
raise ValueError(f"Could not connect to Mistral API after {max_retries} attempts: {error_msg}") | |
# Rate limit errors - much longer wait | |
elif any(term in error_lower for term in ["rate limit", "429", "too many requests", "requests rate limit exceeded"]): | |
# Check specifically for token exhaustion vs temporary rate limit | |
if "quota" in error_lower or "credit" in error_lower or "subscription" in error_lower: | |
logger.error("API quota or credit limit reached. No retry will help.") | |
raise ValueError(f"Mistral API quota or credit limit reached. Please check your subscription: {error_msg}") | |
elif retry < max_retries - 1: | |
wait_time = base_retry_delay * (2 ** retry) * 6.0 # Significantly longer wait for rate limits | |
logger.info(f"Rate limit exceeded. Waiting {wait_time:.1f}s before retry...") | |
time.sleep(wait_time) | |
else: | |
logger.error("Maximum retries reached, rate limit error persists.") | |
raise ValueError(f"API rate limit exceeded. Please try again later: {error_msg}") | |
# Misc errors - typically no retry will help | |
else: | |
if retry < max_retries - 1 and any(term in error_lower for term in ["transient", "temporary"]): | |
# Only retry for errors explicitly marked as transient | |
wait_time = base_retry_delay * (2 ** retry) | |
logger.info(f"Transient error detected. Waiting {wait_time:.1f}s before retry...") | |
time.sleep(wait_time) | |
else: | |
logger.error(f"Unrecoverable API error: {error_msg}") | |
raise | |
# Calculate the number of pages to process | |
pages_to_process = pdf_response.pages | |
total_pages = len(pdf_response.pages) | |
limited_pages = False | |
logger.info(f"API returned {total_pages} total PDF pages") | |
# Smart page selection logic for better performance | |
if custom_pages: | |
# Convert to 0-based indexing and filter valid page numbers | |
valid_indices = [i-1 for i in custom_pages if 0 < i <= total_pages] | |
if valid_indices: | |
pages_to_process = [pdf_response.pages[i] for i in valid_indices] | |
limited_pages = True | |
logger.info(f"Processing {len(valid_indices)} custom-selected pages") | |
# Max pages limit with smart sampling | |
elif max_pages and total_pages > max_pages: | |
if max_pages == 1: | |
# Just first page | |
pages_to_process = pages_to_process[:1] | |
elif max_pages < 5 and total_pages > 10: | |
# For small max_pages on large docs, include first, last, and middle | |
indices = [0] # First page | |
if max_pages > 1: | |
indices.append(total_pages - 1) # Last page | |
if max_pages > 2: | |
indices.append(total_pages // 2) # Middle page | |
# Add more pages up to max_pages if needed | |
if max_pages > 3: | |
remaining = max_pages - len(indices) | |
step = total_pages // (remaining + 1) | |
for i in range(1, remaining + 1): | |
idx = i * step | |
if idx not in indices and 0 <= idx < total_pages: | |
indices.append(idx) | |
indices.sort() | |
pages_to_process = [pdf_response.pages[i] for i in indices] | |
else: | |
# Default: first max_pages | |
pages_to_process = pages_to_process[:max_pages] | |
limited_pages = True | |
logger.info(f"Processing {len(pages_to_process)} pages out of {total_pages} total") | |
# Directly extract any language information from the OCR response | |
detected_languages = set() | |
# Check if the response has a 'languages' attribute in any form | |
# First check direct attributes on the response object | |
if hasattr(pdf_response, 'languages') and pdf_response.languages: | |
for lang in pdf_response.languages: | |
detected_languages.add(str(lang)) | |
logger.info(f"Found language in OCR response: {lang}") | |
# Then check if it's in the response as a dictionary format | |
elif hasattr(pdf_response, '__dict__'): | |
response_dict = pdf_response.__dict__ | |
if 'languages' in response_dict and response_dict['languages']: | |
for lang in response_dict['languages']: | |
detected_languages.add(str(lang)) | |
logger.info(f"Found language in OCR response dict: {lang}") | |
# Calculate confidence score if available | |
try: | |
confidence_values = [page.confidence for page in pages_to_process if hasattr(page, 'confidence')] | |
confidence_score = sum(confidence_values) / len(confidence_values) if confidence_values else 0.89 | |
except Exception: | |
confidence_score = 0.89 # Improved default | |
# Merge page content intelligently - include page numbers for better context | |
all_markdown = [] | |
for idx, page in enumerate(pages_to_process): | |
# Try to determine actual page number | |
if custom_pages and len(custom_pages) == len(pages_to_process): | |
page_num = custom_pages[idx] | |
else: | |
# Estimate page number - may not be accurate with sampling | |
page_num = idx + 1 | |
page_markdown = page.markdown if hasattr(page, 'markdown') else "" | |
# Add page content without obvious page markers | |
if page_markdown.strip(): | |
all_markdown.append(f"{page_markdown}") | |
# Collect language information from individual pages if available | |
if hasattr(page, 'languages') and page.languages: | |
for lang in page.languages: | |
detected_languages.add(str(lang)) | |
logger.info(f"Found language in page {page_num}: {lang}") | |
# Join all pages with separation | |
combined_markdown = "\n\n".join(all_markdown) | |
# Extract structured data with the appropriate model | |
if use_vision: | |
# Try to get a good image for vision model | |
vision_image = None | |
# Try first page with images | |
for page in pages_to_process: | |
if hasattr(page, 'images') and page.images: | |
vision_image = page.images[0].image_base64 | |
break | |
if vision_image: | |
# Use vision model with enhanced prompt | |
logger.info(f"Using vision model: {VISION_MODEL}") | |
result = self._extract_structured_data_with_vision( | |
vision_image, combined_markdown, file_path.name, custom_prompt | |
) | |
else: | |
# Fall back to text-only if no images available | |
logger.info(f"No images in PDF, falling back to text model: {TEXT_MODEL}") | |
result = self._extract_structured_data_text_only( | |
combined_markdown, file_path.name, custom_prompt | |
) | |
else: | |
# Use text-only model as requested | |
logger.info(f"Using text-only model as specified: {TEXT_MODEL}") | |
result = self._extract_structured_data_text_only( | |
combined_markdown, file_path.name, custom_prompt | |
) | |
# If we have detected languages directly from the OCR model, use them | |
if detected_languages: | |
logger.info(f"Using languages detected by OCR model: {', '.join(detected_languages)}") | |
result['languages'] = list(detected_languages) | |
# Add flag to indicate source of language detection | |
result['language_detection_source'] = 'mistral-ocr-latest' | |
# Add metadata about pages | |
if limited_pages: | |
result['limited_pages'] = { | |
'processed': len(pages_to_process), | |
'total': total_pages | |
} | |
# Set confidence score from OCR | |
result['confidence_score'] = confidence_score | |
# Add processing method info | |
result['pdf_processing_method'] = 'api' | |
result['total_pages'] = total_pages | |
result['processed_pages'] = len(pages_to_process) | |
# Store serialized OCR response for rendering | |
serialized_response = serialize_ocr_response(pdf_response) | |
result['raw_response_data'] = serialized_response | |
# Check if there are images to include | |
has_images = hasattr(pdf_response, 'pages') and any( | |
hasattr(page, 'images') and page.images for page in pdf_response.pages | |
) | |
result['has_images'] = has_images | |
# Include image data for rendering if available | |
if has_images: | |
# Prepare pages data with image references | |
result['pages_data'] = [] | |
# Get serialized pages - handle different formats | |
serialized_pages = None | |
try: | |
if hasattr(serialized_response, 'pages'): | |
serialized_pages = serialized_response.pages | |
elif isinstance(serialized_response, dict) and 'pages' in serialized_response: | |
serialized_pages = serialized_response.get('pages', []) | |
else: | |
# No pages found in response | |
logger.warning("No pages found in OCR response") | |
serialized_pages = [] | |
except Exception as pages_err: | |
logger.warning(f"Error extracting pages from OCR response: {str(pages_err)}") | |
serialized_pages = [] | |
# Process each page to extract images | |
for page_idx, page in enumerate(serialized_pages): | |
try: | |
# Skip processing pages not in our selection | |
if limited_pages and page_idx >= len(pages_to_process): | |
continue | |
# Extract page data with careful error handling | |
markdown = "" | |
images = [] | |
# Handle different page formats safely | |
if isinstance(page, dict): | |
markdown = page.get('markdown', '') | |
images = page.get('images', []) | |
else: | |
# Try attribute access | |
if hasattr(page, 'markdown'): | |
markdown = page.markdown | |
if hasattr(page, 'images'): | |
images = page.images | |
# Create page data record | |
page_data = { | |
'page_number': page_idx + 1, | |
'markdown': markdown, | |
'images': [] | |
} | |
# Process images with careful error handling | |
for img_idx, img in enumerate(images): | |
try: | |
# Extract image ID and base64 data | |
img_id = None | |
img_base64 = None | |
if isinstance(img, dict): | |
img_id = img.get('id') | |
img_base64 = img.get('image_base64') | |
else: | |
# Try attribute access | |
if hasattr(img, 'id'): | |
img_id = img.id | |
if hasattr(img, 'image_base64'): | |
img_base64 = img.image_base64 | |
# Only add if we have valid image data | |
if img_base64 and isinstance(img_base64, str): | |
# Ensure ID exists | |
safe_id = img_id if img_id else f"img_{page_idx}_{img_idx}" | |
page_data['images'].append({ | |
'id': safe_id, | |
'image_base64': img_base64 | |
}) | |
except Exception as img_err: | |
logger.warning(f"Error processing image {img_idx} on page {page_idx+1}: {str(img_err)}") | |
continue # Skip this image | |
# Add page data if it has content | |
if page_data['markdown'] or page_data['images']: | |
result['pages_data'].append(page_data) | |
except Exception as page_err: | |
logger.warning(f"Error processing page {page_idx+1}: {str(page_err)}") | |
continue # Skip this page | |
# Record final processing time | |
total_time = time.time() - start_time | |
result['processing_time'] = total_time | |
logger.info(f"PDF API processing completed in {total_time:.2f}s") | |
return result | |
except Exception as api_e: | |
logger.error(f"Error in API-based PDF processing: {str(api_e)}") | |
# Re-raise to be caught by outer exception handler | |
raise | |
except Exception as e: | |
# Log the error and return a helpful error result | |
logger.error(f"Error processing PDF: {str(e)}") | |
# Return basic result on error | |
return { | |
"file_name": file_path.name, | |
"topics": ["Document"], | |
"languages": ["English"], | |
"confidence_score": 0.0, | |
"error": str(e), | |
"ocr_contents": { | |
"error": f"Failed to process PDF: {str(e)}", | |
"partial_text": "Document could not be fully processed." | |
}, | |
"processing_time": time.time() - start_time | |
} | |
def _process_image(self, file_path, use_vision=True, custom_prompt=None): | |
"""Process an image file with OCR""" | |
logger = logging.getLogger("image_processor") | |
logger.info(f"Processing image: {file_path}") | |
# Check if we're in test mode | |
if self.test_mode: | |
# Return a placeholder document response | |
return { | |
"file_name": file_path.name, | |
"topics": ["Document"], | |
"languages": ["English"], | |
"ocr_contents": { | |
"title": "Document", | |
"content": "Please set up API key to process documents." | |
}, | |
"processing_time": 0.5, | |
"confidence_score": 0.0 | |
} | |
# No automatic document type detection - rely on the document type specified in the custom prompt | |
# The document type is passed from the UI through the custom prompt in ocr_processing.py | |
try: | |
# Check file size | |
file_size_mb = file_path.stat().st_size / (1024 * 1024) | |
logger.info(f"Original image size: {file_size_mb:.2f} MB") | |
# Use enhanced preprocessing functions from ocr_utils | |
try: | |
from preprocessing import preprocess_image | |
from utils.file_utils import get_base64_from_bytes | |
logger.info(f"Applying image preprocessing for OCR") | |
# Get preprocessing settings from config | |
max_size_mb = IMAGE_PREPROCESSING.get("max_size_mb", 8.0) | |
if file_size_mb > max_size_mb: | |
logger.info(f"Image is large ({file_size_mb:.2f} MB), optimizing for API submission") | |
# Use standard preprocessing - document type will be handled by preprocessing.py | |
# based on the options passed from the UI | |
base64_data_url = get_base64_from_bytes( | |
preprocess_image(file_path.read_bytes(), | |
{"document_type": "standard", | |
"grayscale": True, | |
"denoise": True, | |
"contrast": 0}) | |
) | |
logger.info(f"Image preprocessing completed successfully") | |
except (ImportError, AttributeError) as e: | |
# Fallback to basic processing if advanced functions not available | |
logger.warning(f"Advanced preprocessing not available: {str(e)}. Using basic image processing.") | |
# If image is larger than 8MB, resize it to reduce API payload size | |
if file_size_mb > 8: | |
logger.info("Image is large, resizing before API submission") | |
try: | |
from PIL import Image | |
import io | |
# Open and process the image | |
with Image.open(file_path) as img: | |
# Convert to RGB if not already (prevents mode errors) | |
if img.mode != 'RGB': | |
img = img.convert('RGB') | |
# Calculate new dimensions (maintain aspect ratio) | |
# Target around 2000-2500 pixels on longest side for better OCR quality | |
width, height = img.size | |
max_dimension = max(width, height) | |
target_dimension = 2000 # Restored to 2000 for better image quality | |
if max_dimension > target_dimension: | |
scale_factor = target_dimension / max_dimension | |
resized_width = int(width * scale_factor) | |
resized_height = int(height * scale_factor) | |
# Use LANCZOS instead of BILINEAR for better quality | |
img = img.resize((resized_width, resized_height), Image.LANCZOS) | |
# Enhance contrast for better text recognition | |
from PIL import ImageEnhance | |
enhancer = ImageEnhance.Contrast(img) | |
img = enhancer.enhance(1.3) | |
# Save to bytes with compression | |
buffer = io.BytesIO() | |
img.save(buffer, format="JPEG", quality=92, optimize=True) # Higher quality for better OCR | |
buffer.seek(0) | |
# Get the base64 | |
encoded_image = base64.b64encode(buffer.getvalue()).decode() | |
base64_data_url = f"data:image/jpeg;base64,{encoded_image}" | |
# Log the new size | |
new_size_mb = len(buffer.getvalue()) / (1024 * 1024) | |
logger.info(f"Resized image to {new_size_mb:.2f} MB") | |
except ImportError: | |
logger.warning("PIL not available for resizing. Using original image.") | |
# Use enhanced encoder with proper MIME type detection | |
from utils.image_utils import encode_image_for_api | |
base64_data_url = encode_image_for_api(file_path) | |
except Exception as e: | |
logger.warning(f"Image resize failed: {str(e)}. Using original image.") | |
# Use enhanced encoder with proper MIME type detection | |
from utils.image_utils import encode_image_for_api | |
base64_data_url = encode_image_for_api(file_path) | |
else: | |
# For smaller images, use as-is with proper MIME type | |
from utils.image_utils import encode_image_for_api | |
base64_data_url = encode_image_for_api(file_path) | |
except Exception as e: | |
# Fallback to original image if any preprocessing fails | |
logger.warning(f"Image preprocessing failed: {str(e)}. Using original image.") | |
# Use enhanced encoder with proper MIME type detection | |
from utils.image_utils import encode_image_for_api | |
base64_data_url = encode_image_for_api(file_path) | |
# Process the image with OCR | |
logger.info(f"Processing image with OCR using {OCR_MODEL}") | |
# Add retry logic with more retries and longer backoff periods for rate limit issues | |
max_retries = 2 # Reduced to prevent rate limiting | |
retry_delay = 1 # Shorter delay between retries | |
for retry in range(max_retries): | |
try: | |
image_response = self.client.ocr.process( | |
document=ImageURLChunk(image_url=base64_data_url), | |
model=OCR_MODEL, | |
include_image_base64=True, | |
timeout_ms=45000 # 45 second timeout for better performance | |
) | |
break # Success, exit retry loop | |
except Exception as e: | |
error_msg = str(e) | |
logger.warning(f"API error on attempt {retry+1}/{max_retries}: {error_msg}") | |
# Check specific error types to handle them appropriately | |
error_lower = error_msg.lower() | |
# Authentication errors - no point in retrying | |
if "unauthorized" in error_lower or "401" in error_lower: | |
logger.error("API authentication failed. Check your API key.") | |
raise ValueError(f"Authentication failed with API key. Please verify your Mistral API key is correct and active: {error_msg}") | |
# Connection errors - worth retrying | |
elif "connection" in error_lower or "timeout" in error_lower or "520" in error_msg or "server error" in error_lower: | |
if retry < max_retries - 1: | |
# Wait with shorter delay before retrying | |
wait_time = retry_delay * (2 ** retry) | |
logger.info(f"Connection issue detected. Waiting {wait_time}s before retry...") | |
time.sleep(wait_time) | |
else: | |
# Last retry failed | |
logger.error("Maximum retries reached, API connection error persists.") | |
raise ValueError(f"Could not connect to Mistral API after {max_retries} attempts: {error_msg}") | |
# Rate limit errors | |
elif "rate limit" in error_lower or "429" in error_lower or "requests rate limit exceeded" in error_lower: | |
# Check specifically for token exhaustion vs temporary rate limit | |
if "quota" in error_lower or "credit" in error_lower or "subscription" in error_lower: | |
logger.error("API quota or credit limit reached. No retry will help.") | |
raise ValueError(f"Mistral API quota or credit limit reached. Please check your subscription: {error_msg}") | |
elif retry < max_retries - 1: | |
# More aggressive backoff for rate limits | |
wait_time = retry_delay * (2 ** retry) * 5 # 5x longer wait for rate limits | |
logger.info(f"Rate limit exceeded. Waiting {wait_time}s before retry...") | |
time.sleep(wait_time) | |
else: | |
# Last retry failed, try local OCR as fallback | |
logger.error("Maximum retries reached, rate limit error persists.") | |
try: | |
# Try to import the local OCR fallback function | |
from utils.image_utils import try_local_ocr_fallback | |
# Attempt local OCR fallback | |
ocr_text = try_local_ocr_fallback(file_path, base64_data_url) | |
if ocr_text: | |
logger.info("Successfully used local OCR fallback") | |
# Return a basic result with the local OCR text | |
return { | |
"file_name": file_path.name, | |
"topics": ["Document"], | |
"languages": ["English"], | |
"ocr_contents": { | |
"title": "Document (Local OCR)", | |
"content": "This document was processed with local OCR due to API rate limiting.", | |
"raw_text": ocr_text | |
}, | |
"processing_method": "local_fallback", | |
"processing_note": "Used local OCR due to API rate limit" | |
} | |
except (ImportError, Exception) as local_err: | |
logger.warning(f"Local OCR fallback failed: {str(local_err)}") | |
# If we get here, both API and local OCR failed | |
raise ValueError(f"Mistral API rate limit exceeded. Please try again later: {error_msg}") | |
# Other errors - no retry | |
else: | |
logger.error(f"Unrecoverable API error: {error_msg}") | |
raise | |
# Get the OCR markdown from the first page | |
image_ocr_markdown = image_response.pages[0].markdown if image_response.pages else "" | |
# Check if the OCR response has images | |
has_images = hasattr(image_response, 'pages') and image_response.pages and hasattr(image_response.pages[0], 'images') and image_response.pages[0].images | |
# Check for language information directly from the OCR model | |
detected_languages = set() | |
# Check if the response has a 'languages' attribute in any form | |
# First check direct attributes on the response object | |
if hasattr(image_response, 'languages') and image_response.languages: | |
for lang in image_response.languages: | |
detected_languages.add(str(lang)) | |
logger.info(f"Found language in OCR response: {lang}") | |
# Then check if it's in the response as a dictionary format | |
elif hasattr(image_response, '__dict__'): | |
response_dict = image_response.__dict__ | |
if 'languages' in response_dict and response_dict['languages']: | |
for lang in response_dict['languages']: | |
detected_languages.add(str(lang)) | |
logger.info(f"Found language in OCR response dict: {lang}") | |
# Check for languages in individual pages | |
if hasattr(image_response, 'pages') and image_response.pages: | |
for page in image_response.pages: | |
if hasattr(page, 'languages') and page.languages: | |
for lang in page.languages: | |
detected_languages.add(str(lang)) | |
logger.info(f"Found language in page: {lang}") | |
# Optimize: Skip vision model step if ocr_markdown is very small or empty | |
# BUT make an exception if custom_prompt is provided | |
# OR if the image has visual content worth preserving | |
if (not custom_prompt and not has_images) and (not image_ocr_markdown or len(image_ocr_markdown) < 50): | |
logger.warning("OCR produced minimal text with no images. Returning basic result.") | |
return { | |
"file_name": file_path.name, | |
"topics": ["Document"], | |
"languages": ["English"], | |
"ocr_contents": { | |
"raw_text": image_ocr_markdown if image_ocr_markdown else "No text could be extracted from the image." | |
}, | |
"processing_note": "OCR produced minimal text content", | |
# Include raw response data for images | |
"raw_response_data": serialize_ocr_response(image_response) | |
} | |
# For images with minimal text but visual content, enhance the prompt | |
elif has_images and (not image_ocr_markdown or len(image_ocr_markdown) < 100): | |
logger.info("Document with images but minimal text detected. Using enhanced prompt for mixed media.") | |
if not custom_prompt: | |
custom_prompt = "This is a mixed media document with both text and important visual elements. Please carefully describe the image content and extract all visible text, preserving the relationship between text and visuals." | |
elif "visual" not in custom_prompt.lower() and "image" not in custom_prompt.lower(): | |
custom_prompt += " The document contains important visual elements that should be described along with the text content." | |
# Extract structured data using the appropriate model, with a single API call | |
if use_vision: | |
logger.info(f"Using vision model: {VISION_MODEL}") | |
result = self._extract_structured_data_with_vision(base64_data_url, image_ocr_markdown, file_path.name, custom_prompt) | |
else: | |
logger.info(f"Using text-only model: {TEXT_MODEL}") | |
result = self._extract_structured_data_text_only(image_ocr_markdown, file_path.name, custom_prompt) | |
# If we have detected languages directly from the OCR model, use them | |
if detected_languages: | |
logger.info(f"Using languages detected by OCR model: {', '.join(detected_languages)}") | |
result['languages'] = list(detected_languages) | |
# Add flag to indicate source of language detection | |
result['language_detection_source'] = 'mistral-ocr-latest' | |
# Store the serialized OCR response for image rendering (for compatibility with original version) | |
# Don't store raw_response directly as it's not JSON serializable | |
serialized_response = serialize_ocr_response(image_response) | |
result['raw_response_data'] = serialized_response | |
# Store key parts of the OCR response for image rendering | |
# With serialized format that can be stored in JSON | |
result['has_images'] = has_images | |
if has_images: | |
# Serialize the entire response to ensure it's JSON serializable | |
serialized_response = serialize_ocr_response(image_response) | |
# Create a structured representation of images that can be serialized | |
result['pages_data'] = [] | |
if hasattr(serialized_response, 'pages'): | |
serialized_pages = serialized_response.pages | |
else: | |
# Handle case where serialization returns a dict instead of an object | |
serialized_pages = serialized_response.get('pages', []) | |
for page_idx, page in enumerate(serialized_pages): | |
# Handle both object and dict forms | |
if isinstance(page, dict): | |
markdown = page.get('markdown', '') | |
images = page.get('images', []) | |
else: | |
markdown = page.markdown if hasattr(page, 'markdown') else '' | |
images = page.images if hasattr(page, 'images') else [] | |
page_data = { | |
'page_number': page_idx + 1, | |
'markdown': markdown, | |
'images': [] | |
} | |
# Extract images if present | |
for img_idx, img in enumerate(images): | |
img_id = None | |
img_base64 = None | |
if isinstance(img, dict): | |
img_id = img.get('id') | |
img_base64 = img.get('image_base64') | |
else: | |
img_id = img.id if hasattr(img, 'id') else None | |
img_base64 = img.image_base64 if hasattr(img, 'image_base64') else None | |
if img_base64: | |
page_data['images'].append({ | |
'id': img_id if img_id else f"img_{page_idx}_{img_idx}", | |
'image_base64': img_base64 | |
}) | |
result['pages_data'].append(page_data) | |
logger.info("Image processing completed successfully") | |
return result | |
except Exception as e: | |
logger.error(f"Error processing image: {str(e)}") | |
# Return basic result on error | |
return { | |
"file_name": file_path.name, | |
"topics": ["Document"], | |
"languages": ["English"], | |
"error": str(e), | |
"ocr_contents": { | |
"error": f"Failed to process image: {str(e)}", | |
"partial_text": "Image could not be processed." | |
} | |
} | |
def _extract_structured_data_with_vision(self, image_base64, ocr_markdown, filename, custom_prompt=None): | |
""" | |
Extract structured data using vision model with detailed historical context prompting | |
Optimized for speed, accuracy, and resilience | |
""" | |
logger = logging.getLogger("vision_processor") | |
try: | |
# Check if this is a newspaper or document with columns by filename | |
is_likely_newspaper = False | |
newspaper_keywords = ["newspaper", "gazette", "herald", "times", "journal", | |
"chronicle", "post", "tribune", "news", "press", "gender"] | |
# Check filename for newspaper indicators | |
filename_lower = filename.lower() | |
for keyword in newspaper_keywords: | |
if keyword in filename_lower: | |
is_likely_newspaper = True | |
logger.info(f"Likely newspaper document detected in vision processing: {filename}") | |
break | |
# Fast path: Skip vision API if OCR already produced reasonable text | |
# We'll define "reasonable" as having at least 300 characters | |
if len(ocr_markdown.strip()) > 300: | |
logger.info("Sufficient OCR text detected, analyzing language before using OCR text directly") | |
# Perform language detection on the OCR text before returning | |
if LANG_DETECTOR_AVAILABLE and self.language_detector: | |
detected_languages = self.language_detector.detect_languages( | |
ocr_markdown, | |
filename=getattr(self, 'current_filename', None) | |
) | |
else: | |
# If language detector is not available, use default English | |
detected_languages = ["English"] | |
return { | |
"file_name": filename, | |
"topics": ["Document"], | |
"languages": detected_languages, | |
"ocr_contents": { | |
"raw_text": ocr_markdown | |
} | |
} | |
# Only use vision model for minimal OCR text or when document has columns | |
if is_likely_newspaper and (not ocr_markdown or len(ocr_markdown.strip()) < 300): | |
logger.info("Using vision model for newspaper with minimal OCR text") | |
if not custom_prompt: | |
custom_prompt = "Document has columns. Extract text by reading each column top to bottom." | |
# Fast path: Skip if in test mode or no API key | |
if self.test_mode or not self.api_key: | |
logger.info("Test mode or no API key, using text-only processing") | |
return self._extract_structured_data_text_only(ocr_markdown, filename) | |
# Use only the first part of OCR text to keep prompts small and processing fast | |
if len(ocr_markdown) > 1000: | |
truncated_ocr = ocr_markdown[:1000] | |
logger.info(f"Truncated OCR text from {len(ocr_markdown)} to 1000 chars for faster processing") | |
else: | |
truncated_ocr = ocr_markdown | |
# Build a comprehensive prompt with OCR text and detailed instructions for title detection and language handling | |
enhanced_prompt = f"This is a document's OCR text:\n<BEGIN_OCR>\n{truncated_ocr}\n<END_OCR>\n\n" | |
# Add custom prompt if provided | |
if custom_prompt: | |
enhanced_prompt += f"User instructions: {custom_prompt}\n\n" | |
# Primary focus on document structure and title detection | |
enhanced_prompt += "You are analyzing a historical document. Follow these extraction priorities:\n" | |
enhanced_prompt += "1. FIRST PRIORITY: Identify and extract the TITLE of the document. Look for large text at the top, decorative typography, or centered text that appears to be a title. The title is often one of the first elements in historical documents.\n" | |
enhanced_prompt += "2. SECOND: Extract all text content accurately from this document, including any text visible in the image that may not have been captured by OCR.\n\n" | |
enhanced_prompt += "Document Title Guidelines:\n" | |
enhanced_prompt += "- For printed historical works: Look for primary heading at top of the document, all-caps text, or larger font size text\n" | |
enhanced_prompt += "- For newspapers/periodicals: Extract both newspaper name and article title if present\n" | |
enhanced_prompt += "- For handwritten documents: Look for centered text at the top or underlined headings\n" | |
enhanced_prompt += "- For engravings/illustrations: Include the title or caption, which often appears below the image\n\n" | |
# Language detection guidance | |
enhanced_prompt += "IMPORTANT: After extracting the title and text content, determine the languages present.\n" | |
enhanced_prompt += "Precisely identify and list ALL languages present in the document separately. Look closely for multiple languages that might appear together.\n" | |
enhanced_prompt += "For language detection, examine these specific indicators:\n" | |
enhanced_prompt += "- French: accents (é, è, ê, à, ç, â, î, ô, û), words like 'le', 'la', 'les', 'et', 'en', 'de', 'du', 'des', 'dans', 'ce', 'cette', 'ces', 'par', 'pour', 'qui', 'que', 'où', 'avec'\n" | |
enhanced_prompt += "- Portuguese: accents (ã, õ, á, é, ê, ó, ç), words like 'e', 'o', 'de', 'da', 'do', 'em', 'para', 'que', 'não', 'com'\n" | |
enhanced_prompt += "- Spanish: ñ, inverted punctuation (¿, ¡), accents (á, é, í, ó, ú), words like 'el', 'la', 'los', 'las', 'y', 'en', 'por', 'que', 'con'\n" | |
enhanced_prompt += "- German: umlauts (ä, ö, ü), sharp s (ß), words like 'und', 'der', 'die', 'das', 'in', 'mit'\n" | |
enhanced_prompt += "- Italian: accents (à, è, é, ì, ò, ù), words like 'il', 'la', 'e', 'di', 'che', 'per', 'con'\n" | |
enhanced_prompt += "- Chinese: hanzi characters (汉字), lack of spaces between words, markers like 的, 是, 了, 在, 和, 有\n" | |
enhanced_prompt += "- Latin: words like 'et', 'in', 'ad', 'est', 'sunt', 'non', 'cum', 'sed'\n\n" | |
enhanced_prompt += "If the document contains multiple columns or sections, process each section independently and then combine them logically.\n" | |
enhanced_prompt += "Return ALL detected languages as separate entries in the languages array, never combine them.\n" | |
enhanced_prompt += "CRITICAL: Do NOT default to English unless absolutely certain. If you see French characteristics like 'é', 'è', 'ê', 'ç' or French words, prioritize French in your language detection." | |
# Measure API call time for optimization feedback | |
start_time = time.time() | |
try: | |
# Use a fixed, shorter timeout for single-page documents | |
timeout_ms = 45000 # 45 seconds is optimal for most single-page documents | |
logger.info(f"Calling vision model with {timeout_ms}ms timeout") | |
chat_response = self.client.chat.parse( | |
model=VISION_MODEL, | |
messages=[ | |
{ | |
"role": "user", | |
"content": [ | |
ImageURLChunk(image_url=image_base64), | |
TextChunk(text=enhanced_prompt) | |
], | |
}, | |
], | |
response_format=StructuredOCRModel, | |
temperature=0, | |
timeout_ms=timeout_ms | |
) | |
api_time = time.time() - start_time | |
logger.info(f"Vision model completed in {api_time:.2f}s") | |
except Exception as e: | |
# If there's an error with the enhanced prompt, try progressively simpler approaches | |
logger.warning(f"Enhanced prompt failed after {time.time() - start_time:.2f}s: {str(e)}") | |
# Try a very simplified approach with minimal context | |
try: | |
# Ultra-short prompt for faster processing | |
simplified_prompt = ( | |
f"Extract text from this document image. " | |
f"<BEGIN_OCR>\n{truncated_ocr[:500]}\n<END_OCR>\n" | |
f"Return a JSON with file_name, topics, languages, and ocr_contents fields." | |
) | |
# Only add minimal custom prompt if provided | |
if custom_prompt and len(custom_prompt) < 100: | |
simplified_prompt += f"\n{custom_prompt}" | |
logger.info(f"Trying simplified prompt approach") | |
chat_response = self.client.chat.parse( | |
model=VISION_MODEL, | |
messages=[ | |
{ | |
"role": "user", | |
"content": [ | |
ImageURLChunk(image_url=image_base64), | |
TextChunk(text=simplified_prompt) | |
], | |
}, | |
], | |
response_format=StructuredOCRModel, | |
temperature=0, | |
timeout_ms=30000 # Very short timeout for simplified approach (30 seconds) | |
) | |
logger.info(f"Simplified prompt approach succeeded") | |
except Exception as second_e: | |
# If that fails, try with minimal prompt and just image analysis | |
logger.warning(f"Simplified prompt failed: {str(second_e)}. Trying minimal prompt.") | |
try: | |
# Minimal prompt focusing only on OCR task | |
minimal_prompt = ( | |
f"Extract the text from this image. " | |
f"Return JSON with file_name, topics, languages, and ocr_contents.raw_text fields." | |
) | |
logger.info(f"Trying minimal prompt with image-only focus") | |
chat_response = self.client.chat.parse( | |
model=VISION_MODEL, | |
messages=[ | |
{ | |
"role": "user", | |
"content": [ | |
ImageURLChunk(image_url=image_base64), | |
TextChunk(text=minimal_prompt) | |
], | |
}, | |
], | |
response_format=StructuredOCRModel, | |
temperature=0, | |
timeout_ms=25000 # Minimal timeout for last attempt (25 seconds) | |
) | |
logger.info(f"Minimal prompt approach succeeded") | |
except Exception as third_e: | |
# If all vision attempts fail, fall back to text-only model | |
logger.warning(f"All vision model attempts failed, falling back to text-only model: {str(third_e)}") | |
return self._extract_structured_data_text_only(ocr_markdown, filename) | |
# Convert the response to a dictionary | |
result = json.loads(chat_response.choices[0].message.parsed.json()) | |
# Ensure languages is a list of strings, not Language enum objects | |
if 'languages' in result: | |
result['languages'] = [str(lang) for lang in result.get('languages', [])] | |
# Add simplified metadata about processing | |
result['processing_info'] = { | |
'method': 'vision_model', | |
'ocr_text_length': len(ocr_markdown), | |
'api_response_time': time.time() - start_time | |
} | |
# Note if custom prompt was applied | |
if custom_prompt: | |
result['custom_prompt_applied'] = 'vision_model' | |
# Add confidence score if not present | |
if 'confidence_score' not in result: | |
result['confidence_score'] = 0.92 # Vision model typically has higher confidence | |
# If OCR text has clear French patterns but language is English or missing, fix it | |
if ocr_markdown and 'languages' in result: | |
if LANG_DETECTOR_AVAILABLE and self.language_detector: | |
result['languages'] = self.language_detector.detect_languages( | |
ocr_markdown, | |
filename=getattr(self, 'current_filename', None), | |
current_languages=result['languages'] | |
) | |
except Exception as e: | |
# Fall back to text-only model if vision model fails | |
logger.warning(f"Vision model processing failed, falling back to text-only model: {str(e)}") | |
result = self._extract_structured_data_text_only(ocr_markdown, filename) | |
return result | |
# We've removed document type detection entirely for simplicity | |
# Create a prompt with enhanced language detection instructions | |
generic_section = ( | |
f"You are an OCR specialist processing historical documents. " | |
f"Focus on accurately extracting text content and image chunks while preserving structure and formatting. " | |
f"Pay attention to any historical features and document characteristics.\n\n" | |
f"Create a structured JSON response with the following fields:\n" | |
f"- file_name: The document's name\n" | |
f"- topics: An array of topics covered in the document\n" | |
f"- languages: An array of languages used in the document (be precise and specific about language detection)\n" | |
f"- ocr_contents: A comprehensive dictionary with the document's contents including:\n" | |
f" * title: The title or heading (if present)\n" | |
f" * transcript: The full text of the document\n" | |
f" * text: The main text content (if different from transcript)\n" | |
f" * content: The body content (if different than transcript)\n" | |
f" * images: An array of image objects with their base64 data\n" | |
f" * alt_text: The alt text or description of the images\n" | |
f" * caption: The caption or title of the images\n" | |
f" * raw_text: The complete OCR text\n" | |
) | |
# Add custom prompt if provided | |
custom_section = "" | |
if custom_prompt: | |
custom_section = f"\n\nUser-provided instructions: {custom_prompt}\n" | |
# Return the enhanced prompt | |
return generic_section + custom_section | |
def _extract_structured_data_text_only(self, ocr_markdown, filename, custom_prompt=None): | |
""" | |
Extract structured data using text-only model with detailed historical context prompting | |
and improved error handling with enhanced language detection | |
""" | |
logger = logging.getLogger("text_processor") | |
start_time = time.time() | |
try: | |
# Fast path: Skip for minimal OCR text | |
if not ocr_markdown or len(ocr_markdown.strip()) < 50: | |
logger.info("Minimal OCR text - returning basic result") | |
# Attempt comprehensive language detection even for minimal text | |
detected_languages = [] | |
# Simple language detection based on character frequency | |
if ocr_markdown and len(ocr_markdown.strip()) > 10: | |
# Define indicators for all supported languages | |
language_indicators = { | |
"Portuguese": { | |
"chars": ['ã', 'õ', 'á', 'é', 'ê', 'í', 'ó', 'ú', 'ç'], | |
"words": ['e', 'o', 'de', 'da', 'do', 'em', 'para', 'que', 'não', 'com'] | |
}, | |
"Spanish": { | |
"chars": ['ñ', 'á', 'é', 'í', 'ó', 'ú', '¿', '¡'], | |
"words": ['el', 'la', 'los', 'las', 'y', 'en', 'por', 'que', 'con', 'del'] | |
}, | |
"French": { | |
"chars": ['é', 'è', 'ê', 'à', 'ç', 'ù', 'â', 'î', 'ô', 'û'], | |
"words": ['le', 'la', 'les', 'et', 'en', 'de', 'du', 'des', 'un', 'une', 'ce', 'cette', 'qui', 'que', 'pour', 'dans', 'par', 'sur'] | |
}, | |
"German": { | |
"chars": ['ä', 'ö', 'ü', 'ß'], | |
"words": ['der', 'die', 'das', 'und', 'ist', 'von', 'mit', 'für', 'sich'] | |
}, | |
"Italian": { | |
"chars": ['à', 'è', 'é', 'ì', 'ò', 'ù'], | |
"words": ['il', 'la', 'e', 'di', 'che', 'per', 'con', 'sono', 'non'] | |
}, | |
"Latin": { | |
"chars": [], | |
"words": ['et', 'in', 'ad', 'est', 'sunt', 'non', 'cum', 'sed', 'qui', 'quod'] | |
} | |
} | |
words = ocr_markdown.lower().split() | |
# Check for indicators of each language | |
for language, indicators in language_indicators.items(): | |
chars = indicators["chars"] | |
lang_words = indicators["words"] | |
has_chars = any(char in ocr_markdown for char in chars) if chars else False | |
word_count = sum(1 for word in words if word in lang_words) | |
# Add language if strong enough indicators are present | |
if has_chars or word_count >= 2: | |
detected_languages.append(language) | |
# Check for English separately | |
english_words = ['the', 'and', 'of', 'to', 'in', 'a', 'is', 'that', 'for', 'it'] | |
english_count = sum(1 for word in words if word in english_words) | |
if english_count >= 2: | |
detected_languages.append("English") | |
# If no languages detected, default to English | |
if not detected_languages: | |
detected_languages = ["English"] | |
return { | |
"file_name": filename, | |
"topics": ["Document"], | |
"languages": detected_languages, | |
"ocr_contents": { | |
"raw_text": ocr_markdown if ocr_markdown else "No text could be extracted" | |
}, | |
"processing_method": "minimal_text" | |
} | |
# Check for API key to avoid unnecessary processing | |
if self.test_mode or not self.api_key: | |
logger.info("Test mode or no API key - returning basic result") | |
return { | |
"file_name": filename, | |
"topics": ["Document"], | |
"languages": ["English"], | |
"ocr_contents": { | |
"raw_text": ocr_markdown[:10000] if ocr_markdown else "No text could be extracted", | |
"note": "API key not provided - showing raw OCR text only" | |
}, | |
"processing_method": "test_mode" | |
} | |
# If OCR text is very large, truncate it to avoid API limits | |
truncated_text = ocr_markdown | |
if len(ocr_markdown) > 25000: | |
# Keep first 15000 chars and last 5000 chars | |
truncated_text = ocr_markdown[:15000] + "\n...[content truncated]...\n" + ocr_markdown[-5000:] | |
logger.info(f"OCR text truncated from {len(ocr_markdown)} to {len(truncated_text)} chars") | |
# Build a prompt with enhanced title detection and language detection instructions | |
enhanced_prompt = f"This is a document's OCR text:\n<BEGIN_OCR>\n{truncated_text}\n<END_OCR>\n\n" | |
# Add custom prompt if provided | |
if custom_prompt: | |
enhanced_prompt += f"User instructions: {custom_prompt}\n\n" | |
# Add title detection focus | |
enhanced_prompt += "You are analyzing a historical document. Please follow these extraction priorities:\n" | |
enhanced_prompt += "1. FIRST PRIORITY: Identify and extract the TITLE of the document. Look for prominent text at the top, decorative typography, or centered text that appears to be a title.\n" | |
enhanced_prompt += " - For historical documents with prominent headings at the top\n" | |
enhanced_prompt += " - For newspapers or periodicals, extract both the publication name and article title\n" | |
enhanced_prompt += " - For manuscripts or letters, identify any heading or subject line\n" | |
enhanced_prompt += "2. SECOND PRIORITY: Extract all text content accurately and return structured data with the document's contents.\n\n" | |
enhanced_prompt += "IMPORTANT: Precisely identify and list ALL languages present in the document separately. Look closely for multiple languages that might appear together.\n" | |
enhanced_prompt += "For language detection, examine these specific indicators:\n" | |
enhanced_prompt += "- French: accents (é, è, ê, à, ç), words like 'le', 'la', 'les', 'et', 'en', 'de', 'du'\n" | |
enhanced_prompt += "- German: umlauts (ä, ö, ü), sharp s (ß), words like 'und', 'der', 'die', 'das', 'in', 'mit'\n" | |
enhanced_prompt += "- Spanish: ñ, inverted punctuation (¿, ¡), accents (á, é, í, ó, ú), words like 'el', 'la', 'los', 'las', 'y', 'en'\n" | |
enhanced_prompt += "- Italian: words like 'il', 'la', 'e', 'di', 'che', 'per', 'con'\n" | |
enhanced_prompt += "- Chinese: hanzi characters (汉字), lack of spaces between words, markers like 的, 是, 了, 在, 和, 有\n" | |
enhanced_prompt += "- Latin: words like 'et', 'in', 'ad', 'est', 'sunt', 'non', 'cum', 'sed'\n" | |
enhanced_prompt += "Do NOT classify text as English unless you can positively confirm it contains specifically English words and phrases.\n\n" | |
enhanced_prompt += "Return ALL detected languages as separate entries in the languages array. If multiple languages are present, list them ALL separately." | |
# Use enhanced prompt with text-only model - with retry logic | |
max_retries = 2 | |
retry_delay = 1 | |
for retry in range(max_retries): | |
try: | |
logger.info(f"Calling text model ({TEXT_MODEL})") | |
api_start = time.time() | |
# Set appropriate timeout based on text length | |
timeout_ms = min(120000, max(30000, len(truncated_text) * 5)) # 30-120s based on length | |
# Make API call with appropriate timeout | |
chat_response = self.client.chat.parse( | |
model=TEXT_MODEL, | |
messages=[ | |
{ | |
"role": "user", | |
"content": enhanced_prompt | |
}, | |
], | |
response_format=StructuredOCRModel, | |
temperature=0, | |
timeout_ms=timeout_ms | |
) | |
api_time = time.time() - api_start | |
logger.info(f"Text model API call completed in {api_time:.2f}s") | |
# Convert the response to a dictionary | |
result = json.loads(chat_response.choices[0].message.parsed.json()) | |
# Ensure languages is a list of strings, not Language enum objects | |
if 'languages' in result: | |
result['languages'] = [str(lang) for lang in result.get('languages', [])] | |
# Add simplified processing metadata | |
result['processing_method'] = 'text_model' | |
result['model_used'] = TEXT_MODEL | |
result['processing_time'] = time.time() - start_time | |
# Flag when custom prompt has been successfully applied | |
if custom_prompt: | |
result['custom_prompt_applied'] = 'text_model' | |
# Add raw text for reference if not already present | |
if 'ocr_contents' in result and 'raw_text' not in result['ocr_contents']: | |
# Add truncated raw text if very large | |
if len(ocr_markdown) > 50000: | |
result['ocr_contents']['raw_text'] = ocr_markdown[:50000] + "\n...[content truncated]..." | |
else: | |
result['ocr_contents']['raw_text'] = ocr_markdown | |
return result | |
except Exception as api_error: | |
error_msg = str(api_error).lower() | |
logger.warning(f"API error on attempt {retry+1}/{max_retries}: {str(api_error)}") | |
# Check if retry would help | |
if retry < max_retries - 1: | |
# Rate limit errors - special handling with longer wait | |
if any(term in error_msg for term in ["rate limit", "429", "too many requests", "requests rate limit exceeded"]): | |
# Check specifically for token exhaustion vs temporary rate limit | |
if any(term in error_msg for term in ["quota", "credit", "subscription"]): | |
logger.error("API quota or credit limit reached. No retry will help.") | |
raise ValueError(f"Mistral API quota or credit limit reached. Please check your subscription: {error_msg}") | |
# Longer backoff for rate limit errors | |
wait_time = retry_delay * (2 ** retry) * 6.0 # 6x longer wait for rate limits | |
logger.info(f"Rate limit exceeded. Waiting {wait_time:.1f}s before retry...") | |
time.sleep(wait_time) | |
# Other transient errors | |
elif any(term in error_msg for term in ["timeout", "connection", "500", "503", "504"]): | |
# Wait before retrying | |
wait_time = retry_delay * (2 ** retry) | |
logger.info(f"Transient error, retrying in {wait_time}s") | |
time.sleep(wait_time) | |
else: | |
# Non-retryable error | |
raise | |
else: | |
# Last retry failed | |
raise | |
# This shouldn't be reached due to raise in the loop, but just in case | |
raise Exception("All retries failed for text model") | |
except Exception as e: | |
logger.error(f"Text model failed: {str(e)}. Creating basic result.") | |
# Create a basic result with available OCR text | |
try: | |
# Create a more informative fallback result | |
result = { | |
"file_name": filename, | |
"topics": ["Document"], | |
"languages": ["English"], | |
"ocr_contents": { | |
"raw_text": ocr_markdown[:50000] if ocr_markdown else "No text could be extracted", | |
"error": "AI processing failed: " + str(e).replace('"', '\\"') | |
}, | |
"processing_method": "fallback", | |
"processing_error": str(e), | |
"processing_time": time.time() - start_time | |
} | |
# No topic detection to avoid issue with document misclassification | |
except Exception as inner_e: | |
logger.error(f"Error creating basic result: {str(inner_e)}") | |
result = { | |
"file_name": str(filename) if filename else "unknown", | |
"topics": ["Document"], | |
"languages": ["English"], | |
"ocr_contents": { | |
"error": "Processing failed completely", | |
"partial_text": ocr_markdown[:1000] if ocr_markdown else "Document could not be processed." | |
} | |
} | |
return result | |
# For testing directly | |
if __name__ == "__main__": | |
import sys | |
if len(sys.argv) < 2: | |
print("Usage: python structured_ocr.py <file_path>") | |
sys.exit(1) | |
file_path = sys.argv[1] | |
processor = StructuredOCR() | |
result = processor.process_file(file_path) | |
print(json.dumps(result, indent=2)) | |