diff --git "a/app.py" "b/app.py" --- "a/app.py" +++ "b/app.py" @@ -4,15 +4,33 @@ import json import sys import time import base64 -# Updated import section from pathlib import Path -import tempfile import io -from pdf2image import convert_from_bytes -from PIL import Image, ImageEnhance, ImageFilter -import cv2 -import numpy as np from datetime import datetime +import logging + +# Import modules +from preprocessing import convert_pdf_to_images, preprocess_image +from ocr_processing import process_file +from ui_components import ( + ProgressReporter, + create_sidebar_options, + display_results, + create_file_uploader, + display_about_tab, + display_previous_results, + display_document_with_images +) +from utils import get_base64_from_image, handle_temp_files, format_timestamp +from error_handler import handle_ocr_error, check_file_size +from constants import ( + MAX_FILE_SIZE_MB, + MAX_PAGES, + DOCUMENT_TYPES, + DOCUMENT_LAYOUTS, + CUSTOM_PROMPT_TEMPLATES, + LAYOUT_PROMPT_ADDITIONS +) # Import the StructuredOCR class and config from the local files from structured_ocr import StructuredOCR @@ -21,10 +39,10 @@ from config import MISTRAL_API_KEY # Import utilities for handling previous results from ocr_utils import create_results_zip -def get_base64_from_image(image_path): - """Get base64 string from image file""" - with open(image_path, "rb") as img_file: - return base64.b64encode(img_file.read()).decode('utf-8') +# Configure logging +logging.basicConfig(level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +logger = logging.getLogger("app") # Set favicon path favicon_path = os.path.join(os.path.dirname(__file__), "static/favicon.png") @@ -37,2573 +55,462 @@ st.set_page_config( initial_sidebar_state="expanded" ) -# Enable caching for expensive operations with longer TTL for better performance -@st.cache_data(ttl=24*3600, show_spinner=False) # Cache for 24 hours instead of 1 hour -def convert_pdf_to_images(pdf_bytes, dpi=150, rotation=0): - """Convert PDF bytes to a list of images with caching""" - try: - images = convert_from_bytes(pdf_bytes, dpi=dpi) - - # Apply rotation if specified - if rotation != 0 and images: - rotated_images = [] - for img in images: - rotated_img = img.rotate(rotation, expand=True, resample=Image.BICUBIC) - rotated_images.append(rotated_img) - return rotated_images - - return images - except Exception as e: - st.error(f"Error converting PDF: {str(e)}") - return [] - -# Cache preprocessed images for better performance -@st.cache_data(ttl=24*3600, show_spinner=False, hash_funcs={dict: lambda x: str(sorted(x.items()))}) # Cache for 24 hours -def preprocess_image(image_bytes, preprocessing_options): - """Preprocess image with selected options optimized for historical document OCR quality""" - # Setup basic console logging - import logging - logger = logging.getLogger("image_preprocessor") - logger.setLevel(logging.INFO) - - # Log which preprocessing options are being applied - logger.info(f"Preprocessing image with options: {preprocessing_options}") - - # Convert bytes to PIL Image - image = Image.open(io.BytesIO(image_bytes)) - - # Check for alpha channel (RGBA) and convert to RGB if needed - if image.mode == 'RGBA': - # Convert RGBA to RGB by compositing the image onto a white background - background = Image.new('RGB', image.size, (255, 255, 255)) - background.paste(image, mask=image.split()[3]) # 3 is the alpha channel - image = background - logger.info("Converted RGBA image to RGB") - elif image.mode not in ('RGB', 'L'): - # Convert other modes to RGB as well - image = image.convert('RGB') - logger.info(f"Converted {image.mode} image to RGB") - - # Apply rotation if specified - if preprocessing_options.get("rotation", 0) != 0: - rotation_degrees = preprocessing_options.get("rotation") - image = image.rotate(rotation_degrees, expand=True, resample=Image.BICUBIC) - - # Resize large images while preserving details important for OCR - width, height = image.size - max_dimension = max(width, height) - - # Less aggressive resizing to preserve document details - if max_dimension > 2500: - scale_factor = 2500 / max_dimension - new_width = int(width * scale_factor) - new_height = int(height * scale_factor) - # Use LANCZOS for better quality preservation - image = image.resize((new_width, new_height), Image.LANCZOS) - - img_array = np.array(image) - - # Apply preprocessing based on selected options with settings optimized for historical documents - document_type = preprocessing_options.get("document_type", "standard") - - # Process grayscale option first as it's a common foundation - if preprocessing_options.get("grayscale", False): - if len(img_array.shape) == 3: # Only convert if it's not already grayscale - if document_type == "handwritten": - # Enhanced grayscale processing for handwritten documents - img_array = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY) - # Apply adaptive histogram equalization to enhance handwriting - clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8)) - img_array = clahe.apply(img_array) - else: - # Standard grayscale for printed documents - img_array = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY) - - # Convert back to RGB for further processing - img_array = cv2.cvtColor(img_array, cv2.COLOR_GRAY2RGB) - - if preprocessing_options.get("contrast", 0) != 0: - contrast_factor = 1 + (preprocessing_options.get("contrast", 0) / 10) - image = Image.fromarray(img_array) - enhancer = ImageEnhance.Contrast(image) - image = enhancer.enhance(contrast_factor) - img_array = np.array(image) - - if preprocessing_options.get("denoise", False): - try: - # Apply appropriate denoising based on document type - if document_type == "handwritten": - # Very light denoising for handwritten documents to preserve pen strokes - if len(img_array.shape) == 3 and img_array.shape[2] == 3: # Color image - img_array = cv2.fastNlMeansDenoisingColored(img_array, None, 3, 3, 5, 9) - else: # Grayscale image - img_array = cv2.fastNlMeansDenoising(img_array, None, 3, 7, 21) - else: - # Standard denoising for printed documents - if len(img_array.shape) == 3 and img_array.shape[2] == 3: # Color image - img_array = cv2.fastNlMeansDenoisingColored(img_array, None, 5, 5, 7, 21) - else: # Grayscale image - img_array = cv2.fastNlMeansDenoising(img_array, None, 5, 7, 21) - except Exception as e: - print(f"Denoising error: {str(e)}, falling back to standard processing") - - # Convert back to PIL Image - processed_image = Image.fromarray(img_array) - - # Higher quality for OCR processing - byte_io = io.BytesIO() - try: - # Make sure the image is in RGB mode before saving as JPEG - if processed_image.mode not in ('RGB', 'L'): - processed_image = processed_image.convert('RGB') - - processed_image.save(byte_io, format='JPEG', quality=92, optimize=True) - byte_io.seek(0) - - logger.info(f"Preprocessing complete. Original image mode: {image.mode}, processed mode: {processed_image.mode}") - logger.info(f"Original size: {len(image_bytes)/1024:.1f}KB, processed size: {len(byte_io.getvalue())/1024:.1f}KB") - - return byte_io.getvalue() - except Exception as e: - logger.error(f"Error saving processed image: {str(e)}") - # Fallback to original image - logger.info("Using original image as fallback") - image_io = io.BytesIO() - image.save(image_io, format='JPEG', quality=92) - image_io.seek(0) - return image_io.getvalue() - -# Cache OCR results in memory to speed up repeated processing -@st.cache_data(ttl=24*3600, max_entries=20, show_spinner=False) -def process_file_cached(file_path, file_type, use_vision, file_size_mb, cache_key, preprocessing_options_hash=None): - """Cached version of OCR processing to reuse results""" - # Initialize OCR processor - processor = StructuredOCR() - - # Process the file - result = processor.process_file( - file_path, - file_type=file_type, - use_vision=use_vision, - file_size_mb=file_size_mb - ) - - return result +def initialize_session_state(): + """Initialize all session state variables""" + # Initialize session state for storing previous results if not already present + if 'previous_results' not in st.session_state: + st.session_state.previous_results = [] -# Define functions -def process_file(uploaded_file, use_vision=True, preprocessing_options=None, progress_container=None): - """Process the uploaded file and return the OCR results - - Args: - uploaded_file: The uploaded file to process - use_vision: Whether to use vision model - preprocessing_options: Dictionary of preprocessing options - progress_container: Optional container for progress indicators - """ - if preprocessing_options is None: - preprocessing_options = {} - - # Create a container for progress indicators if not provided - if progress_container is None: - progress_container = st.empty() + # Initialize temp file tracking + if 'temp_file_paths' not in st.session_state: + st.session_state.temp_file_paths = [] - with progress_container.container(): - progress_bar = st.progress(0) - status_text = st.empty() - status_text.markdown('
Preparing file for processing...
', unsafe_allow_html=True) + # Initialize last processed file tracking to fix "Process Document Again" button + if 'last_processed_file' not in st.session_state: + st.session_state.last_processed_file = None - try: - # Check if API key is available - if not MISTRAL_API_KEY: - # Return dummy data if no API key - progress_bar.progress(100) - status_text.empty() - return { - "file_name": uploaded_file.name, - "topics": ["Document"], - "languages": ["English"], - "ocr_contents": { - "title": "API Key Required", - "content": "Please set the MISTRAL_API_KEY environment variable to process documents." - } - } - - # Update progress - more granular steps - progress_bar.progress(10) - status_text.markdown('
Initializing OCR processor...
', unsafe_allow_html=True) - - # Determine file type from extension - file_ext = Path(uploaded_file.name).suffix.lower() - file_type = "pdf" if file_ext == ".pdf" else "image" - file_bytes = uploaded_file.getvalue() - - # Create a temporary file for processing - with tempfile.NamedTemporaryFile(delete=False, suffix=file_ext) as tmp: - tmp.write(file_bytes) - temp_path = tmp.name - # Track temporary file for cleanup - st.session_state.temp_file_paths.append(temp_path) + # Important: Initialize the reset flag + if 'perform_reset' not in st.session_state: + st.session_state.perform_reset = False - # Get PDF rotation value if available and file is a PDF - pdf_rotation_value = pdf_rotation if 'pdf_rotation' in locals() and file_type == "pdf" else 0 - - progress_bar.progress(15) - - # For PDFs, we need to handle differently - if file_type == "pdf": - status_text.markdown('
Converting PDF to images...
', unsafe_allow_html=True) - progress_bar.progress(20) - - # Convert PDF to images - try: - # Use the PDF processing pipeline directly from the StructuredOCR class - processor = StructuredOCR() - - # Process the file with direct PDF handling - progress_bar.progress(30) - status_text.markdown('
Processing PDF with OCR...
', unsafe_allow_html=True) - - # Get file size in MB for API limits - file_size_mb = os.path.getsize(temp_path) / (1024 * 1024) - - # Check if file exceeds API limits (50 MB) - if file_size_mb > 50: - os.unlink(temp_path) # Clean up temp file - progress_bar.progress(100) - status_text.empty() - progress_container.empty() - return { - "file_name": uploaded_file.name, - "topics": ["Document"], - "languages": ["English"], - "error": f"File size {file_size_mb:.2f} MB exceeds Mistral API limit of 50 MB", - "ocr_contents": { - "error": f"Failed to process file: File size {file_size_mb:.2f} MB exceeds Mistral API limit of 50 MB", - "partial_text": "Document could not be processed due to size limitations." - } - } - - # Generate cache key - import hashlib - file_hash = hashlib.md5(file_bytes).hexdigest() - - # Include preprocessing options in cache key if available - preprocessing_options_hash = "" - if 'preprocessing_options' in locals() and preprocessing_options: - # Add pdf_rotation to preprocessing options to ensure it's part of the cache key - if pdf_rotation_value != 0: - preprocessing_options_with_rotation = preprocessing_options.copy() - preprocessing_options_with_rotation['pdf_rotation'] = pdf_rotation_value - preprocessing_str = str(sorted(preprocessing_options_with_rotation.items())) - else: - preprocessing_str = str(sorted(preprocessing_options.items())) - preprocessing_options_hash = hashlib.md5(preprocessing_str.encode()).hexdigest() - elif pdf_rotation_value != 0: - # If no preprocessing options but we have rotation, include that in the hash - preprocessing_options_hash = hashlib.md5(f"pdf_rotation_{pdf_rotation_value}".encode()).hexdigest() - - cache_key = f"{file_hash}_{file_type}_{use_vision}_{preprocessing_options_hash}" - - # Check if we have custom prompt to include in cache key - has_custom_prompt = 'custom_prompt' in locals() and custom_prompt and len(str(custom_prompt).strip()) > 0 - if has_custom_prompt: - # Update cache key to include custom prompt hash - custom_prompt_hash = hashlib.md5(str(custom_prompt).encode()).hexdigest() - cache_key = f"{cache_key}_{custom_prompt_hash}" - - # Process with cached function if possible - try: - result = process_file_cached(temp_path, file_type, use_vision, file_size_mb, cache_key, preprocessing_options_hash) - progress_bar.progress(90) - status_text.markdown('
Finalizing results...
', unsafe_allow_html=True) - except Exception as e: - status_text.markdown(f'
Processing error: {str(e)}. Retrying...
', unsafe_allow_html=True) - progress_bar.progress(60) - # If caching fails, process directly - result = processor.process_file( - temp_path, - file_type=file_type, - use_vision=use_vision, - file_size_mb=file_size_mb, - ) - progress_bar.progress(90) - status_text.markdown('
Finalizing results...
', unsafe_allow_html=True) - - except Exception as e: - os.unlink(temp_path) # Clean up temp file - progress_bar.progress(100) - status_text.empty() - progress_container.empty() - raise ValueError(f"Error processing PDF: {str(e)}") - - else: - # For image files, apply preprocessing if needed - # Check if any preprocessing options with boolean values are True, or if any non-boolean values are non-default - has_preprocessing = ( - preprocessing_options.get("grayscale", False) or - preprocessing_options.get("denoise", False) or - preprocessing_options.get("contrast", 0) != 0 or - preprocessing_options.get("rotation", 0) != 0 or - preprocessing_options.get("document_type", "standard") != "standard" - ) - - # Add document type hints to custom prompt if available from document type selector - with safety checks - if ('custom_prompt' in locals() and custom_prompt and - 'selected_doc_type' in locals() and selected_doc_type != "Auto-detect (standard processing)" and - "This is a" not in str(custom_prompt)): - # Extract just the document type from the selector - doc_type_hint = selected_doc_type.split(" or ")[0].lower() - # Prepend to the custom prompt - custom_prompt = f"This is a {doc_type_hint}. {custom_prompt}" - - if has_preprocessing: - status_text.markdown('
Applying image preprocessing...
', unsafe_allow_html=True) - progress_bar.progress(20) - processed_bytes = preprocess_image(file_bytes, preprocessing_options) - progress_bar.progress(25) - - # Save processed image to temp file - with tempfile.NamedTemporaryFile(delete=False, suffix=file_ext) as proc_tmp: - proc_tmp.write(processed_bytes) - # Clean up original temp file and use the processed one - if os.path.exists(temp_path): - os.unlink(temp_path) - # Remove original temp path from tracking list - if temp_path in st.session_state.temp_file_paths: - st.session_state.temp_file_paths.remove(temp_path) - temp_path = proc_tmp.name - # Track new temporary file for cleanup - st.session_state.temp_file_paths.append(temp_path) - progress_bar.progress(30) - else: - progress_bar.progress(30) - - # Get file size in MB for API limits - file_size_mb = os.path.getsize(temp_path) / (1024 * 1024) - - # Check if file exceeds API limits (50 MB) - if file_size_mb > 50: - os.unlink(temp_path) # Clean up temp file - progress_bar.progress(100) - status_text.empty() - progress_container.empty() - return { - "file_name": uploaded_file.name, - "topics": ["Document"], - "languages": ["English"], - "error": f"File size {file_size_mb:.2f} MB exceeds Mistral API limit of 50 MB", - "ocr_contents": { - "error": f"Failed to process file: File size {file_size_mb:.2f} MB exceeds Mistral API limit of 50 MB", - "partial_text": "Document could not be processed due to size limitations." - } - } - - # Update progress - more granular steps - progress_bar.progress(40) - status_text.markdown('
Preparing document for OCR analysis...
', unsafe_allow_html=True) - - # Generate a cache key based on file content, type and settings - import hashlib - # Add pdf_rotation to cache key if present - pdf_rotation_value = pdf_rotation if 'pdf_rotation' in locals() else 0 - file_hash = hashlib.md5(open(temp_path, 'rb').read()).hexdigest() - - # Include preprocessing options in cache key to ensure reprocessing when options change - preprocessing_options_hash = "" - if preprocessing_options: - # Add pdf_rotation to preprocessing options to ensure it's part of the cache key - if pdf_rotation_value != 0: - preprocessing_options_with_rotation = preprocessing_options.copy() - preprocessing_options_with_rotation['pdf_rotation'] = pdf_rotation_value - preprocessing_str = str(sorted(preprocessing_options_with_rotation.items())) - else: - preprocessing_str = str(sorted(preprocessing_options.items())) - preprocessing_options_hash = hashlib.md5(preprocessing_str.encode()).hexdigest() - - cache_key = f"{file_hash}_{file_type}_{use_vision}_{preprocessing_options_hash}" - - progress_bar.progress(50) - # Check if we have custom instructions - has_custom_prompt = 'custom_prompt' in locals() and custom_prompt and len(str(custom_prompt).strip()) > 0 - - # If we have custom instructions, include them in cache key - if has_custom_prompt: - status_text.markdown('
Processing document with custom instructions...
', unsafe_allow_html=True) - # Update cache key to include custom prompt hash - custom_prompt_hash = hashlib.md5(str(custom_prompt).encode()).hexdigest() - cache_key = f"{cache_key}_{custom_prompt_hash}" - else: - status_text.markdown('
Processing document with OCR...
', unsafe_allow_html=True) - - # Process the file using cached function if possible - try: - result = process_file_cached(temp_path, file_type, use_vision, file_size_mb, cache_key, preprocessing_options_hash) - progress_bar.progress(80) - status_text.markdown('
Analyzing document structure...
', unsafe_allow_html=True) - progress_bar.progress(90) - status_text.markdown('
Finalizing results...
', unsafe_allow_html=True) - except Exception as e: - progress_bar.progress(60) - status_text.markdown(f'
Processing error: {str(e)}. Retrying...
', unsafe_allow_html=True) - # If caching fails, process directly - processor = StructuredOCR() - result = processor.process_file(temp_path, file_type=file_type, use_vision=use_vision, file_size_mb=file_size_mb) - progress_bar.progress(90) - status_text.markdown('
Finalizing results...
', unsafe_allow_html=True) - - # Complete progress - progress_bar.progress(100) - status_text.markdown('
Processing complete!
', unsafe_allow_html=True) - time.sleep(0.8) # Brief pause to show completion - status_text.empty() - progress_container.empty() # Remove progress indicators when done - - # Clean up the temporary file - if os.path.exists(temp_path): - try: - os.unlink(temp_path) - except: - pass # Ignore errors when cleaning up temporary files - - return result - except Exception as e: - progress_bar.progress(100) - error_message = str(e) - - # Check for specific error types and provide helpful user-facing messages - if "rate limit" in error_message.lower() or "429" in error_message or "requests rate limit exceeded" in error_message.lower(): - friendly_message = "The AI service is currently experiencing high demand. Please try again in a few minutes." - logger = logging.getLogger("app") - logger.error(f"Rate limit error: {error_message}") - status_text.markdown(f'
Rate Limit: {friendly_message}
', unsafe_allow_html=True) - elif "quota" in error_message.lower() or "credit" in error_message.lower() or "subscription" in error_message.lower(): - friendly_message = "The API usage quota has been reached. Please check your API key and subscription limits." - status_text.markdown(f'
API Quota: {friendly_message}
', unsafe_allow_html=True) - else: - status_text.markdown(f'
Error: {error_message}
', unsafe_allow_html=True) - - time.sleep(1.5) # Show error briefly - status_text.empty() - progress_container.empty() - - # Display an appropriate error message based on the exception type - if "rate limit" in error_message.lower() or "429" in error_message or "requests rate limit exceeded" in error_message.lower(): - st.warning(f"API Rate Limit: {friendly_message} This is a temporary issue and does not indicate any problem with your document.") - elif "quota" in error_message.lower() or "credit" in error_message.lower() or "subscription" in error_message.lower(): - st.error(f"API Quota Exceeded: {friendly_message}") - else: - st.error(f"Error during processing: {error_message}") + # Initialize other session state variables + if 'auto_process_sample' not in st.session_state: + st.session_state.auto_process_sample = False + if 'sample_just_loaded' not in st.session_state: + st.session_state.sample_just_loaded = False + if 'processed_document_active' not in st.session_state: + st.session_state.processed_document_active = False + if 'sample_document_processed' not in st.session_state: + st.session_state.sample_document_processed = False + if 'sample_document' not in st.session_state: + st.session_state.sample_document = None + if 'original_sample_bytes' not in st.session_state: + st.session_state.original_sample_bytes = None + if 'original_sample_name' not in st.session_state: + st.session_state.original_sample_name = None + if 'is_sample_document' not in st.session_state: + st.session_state.is_sample_document = False + + # Check if we need to perform a complete reset (coming from "Close Document" button) + if 'perform_reset' in st.session_state and st.session_state.perform_reset: + # Save previous results + previous_results = st.session_state.previous_results + + # Clean up any temporary files + if 'temp_file_paths' in st.session_state and st.session_state.temp_file_paths: + handle_temp_files(st.session_state.temp_file_paths) + + # Clear all session state variables except previous_results + for key in list(st.session_state.keys()): + if key not in ['previous_results']: + # We will manually reset the perform_reset flag at the end + if key != 'perform_reset': + st.session_state.pop(key, None) + + # Restore previous results + st.session_state.previous_results = previous_results + + # Reinitialize session state variables + st.session_state.temp_file_paths = [] + st.session_state.last_processed_file = None + st.session_state.auto_process_sample = False + st.session_state.sample_just_loaded = False + st.session_state.processed_document_active = False + st.session_state.sample_document_processed = False + st.session_state.sample_document = None + st.session_state.original_sample_bytes = None + st.session_state.original_sample_name = None + st.session_state.is_sample_document = False - # Clean up the temporary file - try: - if 'temp_path' in locals() and os.path.exists(temp_path): - os.unlink(temp_path) - except: - pass # Ignore errors when cleaning up temporary files + # Turn off reset flag - this must be done last + st.session_state.perform_reset = False - raise - -# App title and description -favicon_base64 = get_base64_from_image(os.path.join(os.path.dirname(__file__), "static/favicon.png")) -st.markdown(f'
Scroll Icon

Historical Document OCR

', unsafe_allow_html=True) -st.subheader("Made possible by Mistral AI") - -# Check if pytesseract is available for fallback -try: - import pytesseract - has_pytesseract = True -except ImportError: - has_pytesseract = False + # Force this to be a complete reset cycle + return -# Initialize session state for storing previous results if not already present -if 'previous_results' not in st.session_state: - st.session_state.previous_results = [] - -# Initialize temp file tracking -if 'temp_file_paths' not in st.session_state: - st.session_state.temp_file_paths = [] - -# Initialize last processed file tracking to fix "Process Document Again" button -if 'last_processed_file' not in st.session_state: - st.session_state.last_processed_file = None - -# Ensure perform_reset flag is initialized -if 'perform_reset' not in st.session_state: - st.session_state.perform_reset = False - -# Check if we need to perform a complete reset (coming from "X Close" button) -if 'perform_reset' in st.session_state and st.session_state.perform_reset: - # List of all session state keys that should be reset, except previous_results - reset_keys = [key for key in list(st.session_state.keys()) - if key != 'previous_results'] - - # Remove all keys except previous_results - for key in reset_keys: - if key == 'perform_reset': - st.session_state[key] = False # Clear this flag - else: - st.session_state.pop(key, None) - - # Reinitialize required session state variables - st.session_state.auto_process_sample = False - st.session_state.sample_just_loaded = False - st.session_state.processed_document_active = False - st.session_state.sample_document_processed = False - st.session_state.last_processed_file = None +def show_example_documents(): + """Show example documents section""" + st.subheader("Example Documents") - # Explicitly reset document-related variables - st.session_state.sample_document = None - st.session_state.original_sample_bytes = None - st.session_state.original_sample_name = None - st.session_state.is_sample_document = False - -# Create main layout with tabs and columns -main_tab1, main_tab2, main_tab3 = st.tabs(["Document Processing", "Previous Results", "About"]) - -with main_tab1: - # Create a two-column layout for file upload and results - left_col, right_col = st.columns([1, 1]) - - # File uploader in the left column - with left_col: - # Simple CSS just to fix vertical text in drag and drop area - st.markdown(""" - - """, unsafe_allow_html=True) - - # Add heading for the file uploader (just text, no container) - st.markdown('### Upload Document') - - # Model info with clearer instructions - st.markdown("Using the latest `mistral-ocr-latest` model for advanced document understanding. To get started upload your own document, use an example document, or explore the 'About' tab for more info.") - - # Enhanced file uploader with better help text - uploaded_file = st.file_uploader("Drag and drop PDFs or images here", type=["pdf", "png", "jpg", "jpeg"], - help="Limit 200MB per file • PDF, PNG, JPG, JPEG") - - # Removed seed prompt instructions from here, moving to sidebar - -# Sidebar with options - moved up with equal spacing -with st.sidebar: - # Options title with reduced top margin - st.markdown("

Options

", unsafe_allow_html=True) + # Add a simplified info message about examples + st.markdown(""" + This app can process various historical documents: + - Historical photographs, maps, and manuscripts + - Handwritten letters and documents + - Printed books and articles + - Multi-page PDFs + """) - # Comprehensive CSS for optimal sidebar spacing and layout + # Add CSS to make the dropdown match the column width st.markdown(""" """, unsafe_allow_html=True) - # Model options - use_vision = st.checkbox("Use Vision Model", value=True, - help="Use vision model for improved analysis (may be slower)") - - # Add spacing between sections - st.markdown("
", unsafe_allow_html=True) - - # Document Processing section - st.markdown("##### OCR Instructions", help="Optimize text extraction") - - # Document type selector - document_types = [ - "Auto-detect (standard processing)", - "Newspaper or Magazine", - "Letter or Correspondence", - "Book or Publication", - "Form or Legal Document", - "Recipe", - "Handwritten Document", - "Map or Illustration", - "Table or Spreadsheet", - "Other (specify in instructions)" + # Sample document URLs dropdown with clearer label + sample_urls = [ + "Select a sample document", + "https://huggingface.co/spaces/milwright/historical-ocr/resolve/main/input/a-la-carte.pdf", + "https://huggingface.co/spaces/milwright/historical-ocr/resolve/main/input/magician-or-bottle-cungerer.jpg", + "https://huggingface.co/spaces/milwright/historical-ocr/resolve/main/input/handwritten-letter.jpg", + "https://huggingface.co/spaces/milwright/historical-ocr/resolve/main/input/magellan-travels.jpg", + "https://huggingface.co/spaces/milwright/historical-ocr/resolve/main/input/milgram-flier.png", + "https://huggingface.co/spaces/milwright/historical-ocr/resolve/main/input/baldwin-15th-north.jpg" ] - selected_doc_type = st.selectbox( - "Document Type", - options=document_types, - index=0, - help="Select document type to optimize OCR processing for specific document formats and layouts. For documents with specialized features, also provide details in the instructions field below." - ) - - # Document layout selector - document_layouts = [ - "Standard layout", - "Multiple columns", - "Table/grid format", - "Mixed layout with images" + sample_names = [ + "Select a sample document", + "Restaurant Menu (PDF)", + "The Magician (Image)", + "Handwritten Letter (Image)", + "Magellan Travels (Image)", + "Milgram Flier (Image)", + "Baldwin Street (Image)" ] - selected_layout = st.selectbox( - "Document Layout", - options=document_layouts, - index=0, - help="Select the document's text layout for better OCR" - ) - - # Generate dynamic prompt based on both document type and layout - custom_prompt_text = "" - - # First add document type specific instructions (simplified) - if selected_doc_type != "Auto-detect (standard processing)": - if selected_doc_type == "Newspaper or Magazine": - custom_prompt_text = "This is a newspaper/magazine. Process columns from top to bottom, capture headlines, bylines, article text and captions." - elif selected_doc_type == "Letter or Correspondence": - custom_prompt_text = "This is a letter/correspondence. Capture letterhead, date, greeting, body, closing and signature. Note any handwritten annotations." - elif selected_doc_type == "Book or Publication": - custom_prompt_text = "This is a book/publication. Extract titles, headers, footnotes, page numbers and body text. Preserve paragraph structure and any special formatting." - elif selected_doc_type == "Form or Legal Document": - custom_prompt_text = "This is a form/legal document. Extract all field labels and values, preserving the structure. Pay special attention to signature lines, dates, and any official markings." - elif selected_doc_type == "Recipe": - custom_prompt_text = "This is a recipe. Extract title, ingredients list with measurements, and preparation instructions. Maintain the distinction between ingredients and preparation steps." - elif selected_doc_type == "Handwritten Document": - custom_prompt_text = "This is a handwritten document. Carefully transcribe all handwritten text, preserving line breaks. Note any unclear sections or annotations." - elif selected_doc_type == "Map or Illustration": - custom_prompt_text = "This is a map or illustration. Transcribe all labels, legends, captions, and annotations. Note any scale indicators or directional markings." - elif selected_doc_type == "Table or Spreadsheet": - custom_prompt_text = "This is a table/spreadsheet. Preserve row and column structure, maintaining alignment of data. Extract headers and all cell values." - elif selected_doc_type == "Other (specify in instructions)": - custom_prompt_text = "Please describe the document type and any special processing requirements here." + # Initialize sample_document in session state if it doesn't exist + if 'sample_document' not in st.session_state: + st.session_state.sample_document = None - # Then add layout specific instructions if needed - if selected_layout != "Standard layout" and not custom_prompt_text: - if selected_layout == "Multiple columns": - custom_prompt_text = "Document has multiple columns. Read each column from top to bottom, then move to the next column." - elif selected_layout == "Table/grid format": - custom_prompt_text = "Document contains table data. Preserve row and column structure during extraction." - elif selected_layout == "Mixed layout with images": - custom_prompt_text = "Document has mixed text layout with images. Extract text in proper reading order." - # If both document type and non-standard layout are selected, add layout info - elif selected_layout != "Standard layout" and custom_prompt_text: - if selected_layout == "Multiple columns": - custom_prompt_text += " Document has multiple columns." - elif selected_layout == "Table/grid format": - custom_prompt_text += " Contains table/grid formatting." - elif selected_layout == "Mixed layout with images": - custom_prompt_text += " Has mixed text layout with images." + selected_sample = st.selectbox("Select a sample document from `~/input`", options=range(len(sample_urls)), format_func=lambda i: sample_names[i]) - # Add spacing between sections - st.markdown("
", unsafe_allow_html=True) - - custom_prompt = st.text_area( - "Additional OCR Instructions", - value=custom_prompt_text, - placeholder="Example: Small text at bottom needs special attention", - height=100, - max_chars=300, - key="custom_analysis_instructions", - help="Specify document type and special OCR requirements. Detailed instructions activate Mistral AI's advanced document analysis." - ) - - # Custom instructions expander - with st.expander("Custom Instruction Examples"): - st.markdown(""" - **Document Format Instructions:** - - "This newspaper has multiple columns - read each column from top to bottom" - - "This letter has a formal heading, main body, and signature section at bottom" - - "This form has fields with labels and filled-in values that should be paired" - - "This recipe has ingredient list at top and preparation steps below" + if selected_sample > 0: + selected_url = sample_urls[selected_sample] - **Special Processing Instructions:** - - "Pay attention to footnotes at the bottom of each page" - - "Some text is faded - please attempt to reconstruct unclear passages" - - "There are handwritten annotations in the margins that should be included" - - "Document has table data that should preserve row and column alignment" - - "Text continues across pages and should be connected into a single flow" - - "This document uses special symbols and mathematical notation" - """) - - # Add spacing between sections - st.markdown("
", unsafe_allow_html=True) - - # Image preprocessing options with reduced spacing - st.markdown("##### Image Processing", help="Options for enhancing images") - with st.expander("Preprocessing Options", expanded=False): - preprocessing_options = {} - - # Document type selector - doc_type_options = ["standard", "handwritten", "typed", "printed"] - preprocessing_options["document_type"] = st.selectbox( - "Document Type", - options=doc_type_options, - index=0, - format_func=lambda x: x.capitalize(), - help="Select document type for optimized processing" - ) - - preprocessing_options["grayscale"] = st.checkbox("Convert to Grayscale", - help="Convert image to grayscale before OCR") - preprocessing_options["denoise"] = st.checkbox("Denoise Image", - help="Remove noise from the image") - preprocessing_options["contrast"] = st.slider("Adjust Contrast", -5, 5, 0, - help="Adjust image contrast (-5 to +5)") - - # Add rotation options - rotation_options = [0, 90, 180, 270] - preprocessing_options["rotation"] = st.select_slider( - "Rotate Document", - options=rotation_options, - value=0, - format_func=lambda x: f"{x}° {'(No rotation)' if x == 0 else ''}", - help="Rotate the document to correct orientation" - ) - - # Add spacing between sections - st.markdown("
", unsafe_allow_html=True) - - # PDF options with consistent formatting - st.markdown("##### PDF Settings", help="Options for PDF documents") - with st.expander("PDF Options", expanded=False): - pdf_dpi = st.slider("Resolution (DPI)", 72, 300, 100, - help="Higher DPI = better quality but slower") - max_pages = st.number_input("Max Pages", 1, 20, 3, - help="Limit number of pages to process") - - # Add PDF rotation option - pdf_rotation = st.select_slider( - "Rotation", - options=rotation_options, - value=0, - format_func=lambda x: f"{x}°", - help="Rotate PDF pages" - ) - -# Previous Results tab content -with main_tab2: - st.markdown('

Previous Results

', unsafe_allow_html=True) - - # Load custom CSS for Previous Results tab - from ui.layout import load_css - load_css() - - # Display previous results if available - if not st.session_state.previous_results: - st.markdown(""" -
-
📄
-

No Previous Results

-

Process a document to see your results history saved here.

-
- """, unsafe_allow_html=True) - else: - # Create a container for the results list - st.markdown('
', unsafe_allow_html=True) - st.markdown(f'

{len(st.session_state.previous_results)} Previous Results

', unsafe_allow_html=True) - - # Create two columns for filters and download buttons - filter_col, download_col = st.columns([2, 1]) - - with filter_col: - # Add filter options - filter_options = ["All Types"] - if any(result.get("file_name", "").lower().endswith(".pdf") for result in st.session_state.previous_results): - filter_options.append("PDF Documents") - if any(result.get("file_name", "").lower().endswith((".jpg", ".jpeg", ".png")) for result in st.session_state.previous_results): - filter_options.append("Images") + # Add process button for the sample document + if st.button("Load Sample Document"): + try: + import requests + from io import BytesIO - selected_filter = st.selectbox("Filter by Type:", filter_options) - - with download_col: - # Add download all button for results - if len(st.session_state.previous_results) > 0: - try: - # Create buffer in memory instead of file on disk - import io - from ocr_utils import create_results_zip_in_memory + with st.spinner(f"Downloading {sample_names[selected_sample]}..."): + response = requests.get(selected_url) + response.raise_for_status() - # Get zip data directly in memory - zip_data = create_results_zip_in_memory(st.session_state.previous_results) + # Extract filename from URL + file_name = selected_url.split("/")[-1] - # Create more informative ZIP filename with timestamp - from datetime import datetime - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + # Create a BytesIO object from the downloaded content + file_content = BytesIO(response.content) - # Count document types for a more descriptive filename - pdf_count = sum(1 for r in st.session_state.previous_results if r.get('file_name', '').lower().endswith('.pdf')) - img_count = sum(1 for r in st.session_state.previous_results if r.get('file_name', '').lower().endswith(('.jpg', '.jpeg', '.png'))) - - # Create more descriptive filename - if pdf_count > 0 and img_count > 0: - zip_filename = f"historical_ocr_mixed_{pdf_count}pdf_{img_count}img_{timestamp}.zip" - elif pdf_count > 0: - zip_filename = f"historical_ocr_pdf_documents_{pdf_count}_{timestamp}.zip" - elif img_count > 0: - zip_filename = f"historical_ocr_images_{img_count}_{timestamp}.zip" - else: - zip_filename = f"historical_ocr_results_{timestamp}.zip" - - st.download_button( - label="Download All Results", - data=zip_data, - file_name=zip_filename, - mime="application/zip", - help="Download all previous results as a ZIP file containing HTML and JSON files" - ) - except Exception as e: - st.error(f"Error creating download: {str(e)}") - st.info("Try with fewer results or individual downloads") - - # Filter results based on selection - filtered_results = st.session_state.previous_results - if selected_filter == "PDF Documents": - filtered_results = [r for r in st.session_state.previous_results if r.get("file_name", "").lower().endswith(".pdf")] - elif selected_filter == "Images": - filtered_results = [r for r in st.session_state.previous_results if r.get("file_name", "").lower().endswith((".jpg", ".jpeg", ".png"))] - - # Show a message if no results match the filter - if not filtered_results: - st.markdown(""" -
-

No results match the selected filter.

-
- """, unsafe_allow_html=True) - - # Display each result as a card - for i, result in enumerate(filtered_results): - # Determine file type icon - file_name = result.get("file_name", f"Document {i+1}") - file_type_lower = file_name.lower() - - if file_type_lower.endswith(".pdf"): - icon = "📄" - elif file_type_lower.endswith((".jpg", ".jpeg", ".png", ".gif")): - icon = "🖼️" - else: - icon = "📝" - - # Create a card for each result - st.markdown(f""" -
-
-
{icon} {result.get('descriptive_file_name', file_name)}
-
{result.get('timestamp', 'Unknown')}
-
- - """, unsafe_allow_html=True) - - # Add view button inside the card with proper styling - st.markdown('
', unsafe_allow_html=True) - if st.button(f"View Document", key=f"view_{i}"): - # Set the selected result in the session state - st.session_state.selected_previous_result = st.session_state.previous_results[i] - # Force a rerun to show the selected result - st.rerun() - st.markdown('
', unsafe_allow_html=True) - - # Close the result card - st.markdown('
', unsafe_allow_html=True) - - # Close the container - st.markdown('
', unsafe_allow_html=True) - - # Display the selected result if available - if 'selected_previous_result' in st.session_state and st.session_state.selected_previous_result: - selected_result = st.session_state.selected_previous_result - - # Create a styled container for the selected result - st.markdown(f""" -
-
-
Selected Document: {selected_result.get('file_name', 'Unknown')}
-
{selected_result.get('timestamp', '')}
-
- """, unsafe_allow_html=True) - - # Display metadata in a styled way - meta_col1, meta_col2 = st.columns(2) - - with meta_col1: - # Display document metadata - if 'languages' in selected_result: - languages = [lang for lang in selected_result['languages'] if lang is not None] - if languages: - st.write(f"**Languages:** {', '.join(languages)}") - - if 'topics' in selected_result and selected_result['topics']: - # Show topics in a more organized way with badges - st.markdown("**Subject Tags:**") - # Create a container with flex display for the tags - st.markdown('
', unsafe_allow_html=True) - - # Generate a badge for each tag - for topic in selected_result['topics']: - # Create colored badge based on tag category - badge_color = "#546e7a" # Default color + # Store as a UploadedFile-like object in session state + class SampleDocument: + def __init__(self, name, content, content_type): + self.name = name + self._content = content + self.type = content_type + self.size = len(content) - # Assign colors by category - if any(term in topic.lower() for term in ["century", "pre-", "era", "historical"]): - badge_color = "#1565c0" # Blue for time periods - elif any(term in topic.lower() for term in ["language", "english", "french", "german", "latin"]): - badge_color = "#00695c" # Teal for languages - elif any(term in topic.lower() for term in ["letter", "newspaper", "book", "form", "document", "recipe"]): - badge_color = "#6a1b9a" # Purple for document types - elif any(term in topic.lower() for term in ["travel", "military", "science", "medicine", "education", "art", "literature"]): - badge_color = "#2e7d32" # Green for subject domains - elif any(term in topic.lower() for term in ["preprocessed", "enhanced", "grayscale", "denoised", "contrast", "rotated"]): - badge_color = "#e65100" # Orange for preprocessing-related tags + def getvalue(self): + return self._content - st.markdown( - f'{topic}', - unsafe_allow_html=True - ) - - # Close the container - st.markdown('
', unsafe_allow_html=True) - - with meta_col2: - # Display processing metadata - if 'limited_pages' in selected_result: - st.info(f"Processed {selected_result['limited_pages']['processed']} of {selected_result['limited_pages']['total']} pages") - - if 'processing_time' in selected_result: - proc_time = selected_result['processing_time'] - st.write(f"**Processing Time:** {proc_time:.1f}s") - - # Create tabs for content display - has_images = selected_result.get('has_images', False) - if has_images: - view_tab1, view_tab2, view_tab3 = st.tabs(["Structured View", "Raw JSON", "With Images"]) - else: - view_tab1, view_tab2 = st.tabs(["Structured View", "Raw JSON"]) - - with view_tab1: - # Display structured content - if 'ocr_contents' in selected_result and isinstance(selected_result['ocr_contents'], dict): - for section, content in selected_result['ocr_contents'].items(): - if content and section not in ['error', 'raw_text', 'partial_text']: # Skip error and raw text sections - st.markdown(f"#### {section.replace('_', ' ').title()}") - - if isinstance(content, str): - st.write(content) - elif isinstance(content, list): - for item in content: - if isinstance(item, str): - st.write(f"- {item}") - else: - st.write(f"- {str(item)}") - elif isinstance(content, dict): - for k, v in content.items(): - st.write(f"**{k}:** {v}") - - with view_tab2: - # Show the raw JSON with an option to download it - try: - st.json(selected_result) - except Exception as e: - st.error(f"Error displaying JSON: {str(e)}") - # Try a safer approach with string representation - st.code(str(selected_result)) - - # Create more informative JSON download button with better naming - try: - json_str = json.dumps(selected_result, indent=2) - - # Use the descriptive filename if available, otherwise build one - if 'descriptive_file_name' in selected_result: - # Get base name without extension - base_filename = Path(selected_result['descriptive_file_name']).stem - else: - # Fall back to old method of building filename - base_filename = selected_result.get('file_name', 'document').split('.')[0] - - # Add document type if available - if 'topics' in selected_result and selected_result['topics']: - topic = selected_result['topics'][0].lower().replace(' ', '_') - base_filename = f"{base_filename}_{topic}" - - # Add language if available - if 'languages' in selected_result and selected_result['languages']: - lang = selected_result['languages'][0].lower() - # Only add if it's not already in the filename - if lang not in base_filename.lower(): - base_filename = f"{base_filename}_{lang}" - - # For PDFs, add page information - if 'total_pages' in selected_result and 'processed_pages' in selected_result: - base_filename = f"{base_filename}_p{selected_result['processed_pages']}of{selected_result['total_pages']}" - - # Get date from timestamp if available - timestamp = "" - if 'timestamp' in selected_result: - try: - # Try to parse the timestamp and reformat it - from datetime import datetime - dt = datetime.strptime(selected_result['timestamp'], "%Y-%m-%d %H:%M") - timestamp = dt.strftime("%Y%m%d_%H%M%S") - except: - # If parsing fails, create a new timestamp - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - else: - # No timestamp in the result, create a new one - from datetime import datetime - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + def read(self): + return self._content - # Create final filename - json_filename = f"{base_filename}_{timestamp}.json" - - st.download_button( - label="Download JSON", - data=json_str, - file_name=json_filename, - mime="application/json" - ) - except Exception as e: - st.error(f"Error creating JSON download: {str(e)}") - # Fallback to string representation for download with simple naming - from datetime import datetime - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - st.download_button( - label="Download as Text", - data=str(selected_result), - file_name=f"document_{timestamp}.txt", - mime="text/plain" - ) - - if has_images and 'pages_data' in selected_result: - with view_tab3: - # Display content with images in a nicely formatted way - pages_data = selected_result.get('pages_data', []) - - # Process and display each page - for page_idx, page in enumerate(pages_data): - # Add a page header if multi-page - if len(pages_data) > 1: - st.markdown(f"### Page {page_idx + 1}") - - # Create columns for better layout - if page.get('images'): - # Extract images for this page - images = page.get('images', []) - for img in images: - if 'image_base64' in img: - st.image(img['image_base64'], width=600) + def seek(self, position): + # Implement seek for compatibility with some file operations + return - # Display text content if available - text_content = page.get('markdown', '') - if text_content: - with st.expander("View Page Text", expanded=True): - st.markdown(text_content) - else: - # Just display text if no images - text_content = page.get('markdown', '') - if text_content: - st.markdown(text_content) - - # Add page separator - if page_idx < len(pages_data) - 1: - st.markdown("---") - - # Add HTML download button with improved, more descriptive filename - from ocr_utils import create_html_with_images - html_content = create_html_with_images(selected_result) - - # Use the descriptive filename if available, otherwise build one - if 'descriptive_file_name' in selected_result: - # Get base name without extension - base_filename = Path(selected_result['descriptive_file_name']).stem + def tell(self): + # Implement tell for compatibility + return 0 + + # Determine content type based on file extension + if file_name.lower().endswith('.pdf'): + content_type = 'application/pdf' + elif file_name.lower().endswith(('.jpg', '.jpeg')): + content_type = 'image/jpeg' + elif file_name.lower().endswith('.png'): + content_type = 'image/png' else: - # Fall back to old method of building filename - base_filename = selected_result.get('file_name', 'document').split('.')[0] + content_type = 'application/octet-stream' - # Add document type if available - if 'topics' in selected_result and selected_result['topics']: - topic = selected_result['topics'][0].lower().replace(' ', '_') - base_filename = f"{base_filename}_{topic}" + # Reset any document state before loading a new sample + if st.session_state.processed_document_active: + # Clear previous document state + st.session_state.processed_document_active = False + st.session_state.last_processed_file = None + + # Clean up any temporary files from previous processing + if st.session_state.temp_file_paths: + handle_temp_files(st.session_state.temp_file_paths) + st.session_state.temp_file_paths = [] - # Add language if available - if 'languages' in selected_result and selected_result['languages']: - lang = selected_result['languages'][0].lower() - # Only add if it's not already in the filename - if lang not in base_filename.lower(): - base_filename = f"{base_filename}_{lang}" + # Save download info in session state + st.session_state.sample_document = SampleDocument( + name=file_name, + content=response.content, + content_type=content_type + ) - # For PDFs, add page information - if 'total_pages' in selected_result and 'processed_pages' in selected_result: - base_filename = f"{base_filename}_p{selected_result['processed_pages']}of{selected_result['total_pages']}" + # Store original bytes for reprocessing + st.session_state.original_sample_bytes = response.content + st.session_state.original_sample_name = file_name - # Get date from timestamp if available - timestamp = "" - if 'timestamp' in selected_result: - try: - # Try to parse the timestamp and reformat it - from datetime import datetime - dt = datetime.strptime(selected_result['timestamp'], "%Y-%m-%d %H:%M") - timestamp = dt.strftime("%Y%m%d_%H%M%S") - except: - # If parsing fails, create a new timestamp - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - else: - # No timestamp in the result, create a new one - from datetime import datetime - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - - # Create final filename - html_filename = f"{base_filename}_{timestamp}_with_images.html" + # Set state flags + st.session_state.sample_just_loaded = True + st.session_state.is_sample_document = True + # Generate a unique identifier for the sample document + st.session_state.last_processed_file = f"{file_name}_{len(response.content)}" - st.download_button( - label="Download as HTML with Images", - data=html_content, - file_name=html_filename, - mime="text/html" - ) - - # Close the container - st.markdown('
', unsafe_allow_html=True) - - # Add clear button outside the container with proper styling - col1, col2, col3 = st.columns([1, 1, 1]) - with col2: - st.markdown('
', unsafe_allow_html=True) - if st.button("Close Selected Document", key="close_selected"): - # Clear the selected result from session state - del st.session_state.selected_previous_result - # Force a rerun to update the view + # Force rerun to load the document st.rerun() - st.markdown('
', unsafe_allow_html=True) - -# About tab content -with main_tab3: - # Add a notice about local OCR fallback if available - fallback_notice = "" - if 'has_pytesseract' in locals() and has_pytesseract: - fallback_notice = """ - **Local OCR Fallback:** - - Local OCR fallback using Tesseract is available if API rate limits are reached - - Provides basic text extraction when cloud OCR is unavailable - """ - - st.markdown(f""" - ### About Historical Document OCR - - This application specializes in processing historical documents using [Mistral AI's Document OCR](https://docs.mistral.ai/capabilities/document/), which is particularly effective for handling challenging textual materials. - - #### Document Processing Capabilities - - **Historical Images**: Process vintage photographs, scanned historical papers, manuscripts - - **Handwritten Documents**: Extract text from letters, journals, notes, and records - - **Multi-Page PDFs**: Process historical books, articles, and longer documents - - **Mixed Content**: Handle documents with both text and imagery - - #### Key Features - - **Advanced Image Preprocessing** - - Grayscale conversion optimized for historical documents - - Denoising to remove artifacts and improve clarity - - Contrast adjustment to enhance faded text - - Document rotation for proper orientation - - - **Document Analysis** - - Text extraction with `mistral-ocr-latest` - - Structured data extraction: dates, names, places, topics - - Multi-language support with automatic detection - - Handling of period-specific terminology and obsolete language - - - **Flexible Output Formats** - - Structured view with organized content sections - - Developer JSON for integration with other applications - - Visual representation preserving original document layout - - Downloadable results in various formats - - #### Historical Context - Add period-specific context to improve analysis: - - Historical period selection - - Document purpose identification - - Custom instructions for specialized terminology - - #### Data Privacy - - All document processing happens through secure AI processing - - No documents are permanently stored on the server - - Results are only saved in your current session - {fallback_notice} - """) + except Exception as e: + st.error(f"Error downloading sample document: {str(e)}") + st.info("Please try uploading your own document instead.") + else: + # If no sample is selected, clear the sample document in session state + st.session_state.sample_document = None -with main_tab1: - # Initialize all session state variables in one place at the beginning - # This ensures they exist before being accessed anywhere in the code - if 'auto_process_sample' not in st.session_state: - st.session_state.auto_process_sample = False - if 'sample_just_loaded' not in st.session_state: - st.session_state.sample_just_loaded = False - if 'processed_document_active' not in st.session_state: - st.session_state.processed_document_active = False - if 'sample_document_processed' not in st.session_state: - st.session_state.sample_document_processed = False +def process_document(uploaded_file, left_col, right_col, sidebar_options): + """Process the uploaded document and display results""" + if uploaded_file is None: + return - # Add global CSS to ensure consistent button styling throughout the app - st.markdown(""" - - """, unsafe_allow_html=True) + # Check if this is an auto-processing situation + auto_processing = st.session_state.auto_process_sample and not st.session_state.processed_document_active + + # Show a message if auto-processing is happening + auto_processing_message = st.empty() + if auto_processing: + auto_processing_message.info("Automatically processing sample document...") - # Check if we're using a sample document (either newly loaded or from session state) - using_sample_document = False - sample_document_name = None + # Determine if we should process the document + # Either process button was clicked OR auto-processing is happening + should_process = process_button or auto_processing - # Check for newly loaded sample document - if 'sample_document' in st.session_state and st.session_state.sample_document is not None: - # Use the sample document - uploaded_file = st.session_state.sample_document - using_sample_document = True - sample_document_name = uploaded_file.name + if should_process: + # Reset auto-process flag to avoid processing on next rerun + if st.session_state.auto_process_sample: + st.session_state.auto_process_sample = False + + # Move the progress indicator reference to just below the button + progress_reporter = ProgressReporter(progress_placeholder).setup() - # Set auto-process flag in session state if this is a newly loaded sample - if st.session_state.sample_just_loaded: - st.session_state.auto_process_sample = True - # Mark that this is a sample document being processed - st.session_state.sample_document_processed = True - st.session_state.sample_just_loaded = False + try: + # Process the document + result = process_file( + uploaded_file=uploaded_file, + use_vision=sidebar_options["use_vision"], + preprocessing_options=sidebar_options["preprocessing_options"], + progress_reporter=progress_reporter, + pdf_dpi=sidebar_options.get("pdf_dpi", 150), + max_pages=sidebar_options.get("max_pages", 3), + pdf_rotation=sidebar_options.get("pdf_rotation", 0), + custom_prompt=sidebar_options.get("custom_prompt", ""), + perf_mode=sidebar_options.get("perf_mode", "Quality") + ) + + # Display results + display_results(result, right_col, sidebar_options.get("custom_prompt", "")) + + # Set processed_document_active to True when a new document is processed + st.session_state.processed_document_active = True + + # Clear the auto-processing message + auto_processing_message.empty() + + # Store information about this processed file to track when new files are uploaded + if uploaded_file is not None: + st.session_state.last_processed_file = current_file_identifier + + # Display success message with close button for dismissing processed documents + success_cols = st.columns([5, 1]) + with success_cols[0]: + metadata_placeholder.success("**Document processed successfully**") + with success_cols[1]: + # Define a function to clear document state + def clear_document_state(): + # Reset all document-related session state + st.session_state.processed_document_active = False + st.session_state.sample_document = None + st.session_state.last_processed_file = None + + # Clear any remaining state flag if we're showing examples + st.session_state.perform_reset = True + + # Create the close button with a callback + st.button("✕ Close Document", + key="close_document_button", + help="Clear current document and start over", + on_click=clear_document_state) + + # Store the result in the previous results list + # Add timestamp to result for history tracking + result_copy = result.copy() + result_copy['timestamp'] = format_timestamp() + + # Store if this was a sample document + if 'is_sample_document' in st.session_state and st.session_state.is_sample_document: + result_copy['sample_document'] = True + + # Add to session state, keeping the most recent 20 results + st.session_state.previous_results.insert(0, result_copy) + if len(st.session_state.previous_results) > 20: + st.session_state.previous_results = st.session_state.previous_results[:20] + + except Exception as e: + st.error(f"Error processing document: {str(e)}") - # Store sample document bytes in a separate session state variable for potential reprocessing - st.session_state.original_sample_bytes = uploaded_file.getvalue() - st.session_state.original_sample_name = uploaded_file.name - st.session_state.is_sample_document = True + # Log the error + import logging + logging.error(f"Document processing error: {str(e)}", exc_info=True) + +def main(): + """Main application function""" + # Initialize session state + initialize_session_state() - # Check for reprocessing of previously loaded sample - elif 'is_sample_document' in st.session_state and st.session_state.is_sample_document: - using_sample_document = True - sample_document_name = st.session_state.original_sample_name if 'original_sample_name' in st.session_state else "Sample Document" + # Apply custom CSS + from ui.layout import load_css + load_css() - # Display sample document notice if using a sample document - if using_sample_document: - st.markdown( - f""" -
-
- Sample Document: {sample_document_name} -
-
- """, - unsafe_allow_html=True - ) + # Create sidebar options + sidebar_options = create_sidebar_options() - if uploaded_file is not None: - # Check file size (cap at 50MB) - file_size_mb = len(uploaded_file.getvalue()) / (1024 * 1024) - - if file_size_mb > 50: - with left_col: - st.error(f"File too large ({file_size_mb:.1f} MB). Maximum file size is 50MB.") - st.stop() - - # Check if this is a new file (different from the last processed file) - current_file_identifier = f"{uploaded_file.name}_{len(uploaded_file.getvalue())}" - if st.session_state.last_processed_file != current_file_identifier: - # Reset processed_document_active if a new file is uploaded - st.session_state.processed_document_active = False - - file_ext = Path(uploaded_file.name).suffix.lower() + # Create main layout with tabs + main_tab1, main_tab2, main_tab3 = st.tabs(["Document Processing", "Previous Results", "About"]) + + with main_tab1: + # Create a two-column layout for file upload and results + left_col, right_col = st.columns([1, 1]) - # Process button - flush left with similar padding as file browser with left_col: - # Process button styling is now handled by global CSS + # Create file uploader + uploaded_file = create_file_uploader() - # Use a key for the button based on state to force re-creation - button_key = "process_again" if st.session_state.processed_document_active else "process_initial" - - # Show appropriate button text based on state - button_text = "Process Document Again" if st.session_state.processed_document_active else "Process Document" - - # Create the button - process_button = st.button(button_text, key=button_key) - - # Handle sample document recreation if needed - if process_button and st.session_state.processed_document_active and st.session_state.original_sample_bytes is not None: - # Recreate the uploaded file from stored bytes - from io import BytesIO - import mimetypes + # Check if we have a sample document loaded + if ('sample_document' in st.session_state and + st.session_state.sample_document is not None): - # Determine mime type based on file extension - file_ext = os.path.splitext(st.session_state.original_sample_name)[1].lower() - if file_ext == '.pdf': - mime_type = 'application/pdf' - elif file_ext in ['.jpg', '.jpeg']: - mime_type = 'image/jpeg' - elif file_ext == '.png': - mime_type = 'image/png' - else: - mime_type = mimetypes.guess_type(st.session_state.original_sample_name)[0] or 'application/octet-stream' + # Use the sample document instead of the uploaded file + uploaded_file = st.session_state.sample_document - # Create a synthetic file-like object with the same interface as UploadedFile - uploaded_file = type('obj', (object,), { - 'name': st.session_state.original_sample_name, - 'getvalue': lambda: st.session_state.original_sample_bytes, - 'read': lambda: st.session_state.original_sample_bytes, - 'seek': lambda x: None, - 'type': mime_type - }) - - # Empty container for progress indicators - will be filled during processing - # Positioned right after the process button for better visibility - progress_placeholder = st.empty() + # Just reset the sample document loading flags after it's been used + if st.session_state.sample_just_loaded: + st.session_state.sample_just_loaded = False + st.session_state.sample_document_processed = True + st.session_state.auto_process_sample = True - # Image preprocessing preview - automatically show only the preprocessed version - if any(preprocessing_options.values()) and uploaded_file.type.startswith('image/'): - st.markdown("**Preprocessed Preview**") - try: - # Create a container for the preview to better control layout - with st.container(): - processed_bytes = preprocess_image(uploaded_file.getvalue(), preprocessing_options) - # Use use_container_width=True for responsive design - st.image(io.BytesIO(processed_bytes), use_container_width=True) - - # Show preprocessing metadata in a well-formatted caption - meta_items = [] - if preprocessing_options.get("document_type", "standard") != "standard": - meta_items.append(f"Document type ({preprocessing_options['document_type']})") - if preprocessing_options.get("grayscale", False): - meta_items.append("Grayscale") - if preprocessing_options.get("denoise", False): - meta_items.append("Denoise") - if preprocessing_options.get("contrast", 0) != 0: - meta_items.append(f"Contrast ({preprocessing_options['contrast']})") - if preprocessing_options.get("rotation", 0) != 0: - meta_items.append(f"Rotation ({preprocessing_options['rotation']}°)") - - # Only show "Applied:" if there are actual preprocessing steps - if meta_items: - meta_text = "Applied: " + ", ".join(meta_items) - st.caption(meta_text) - except Exception as e: - st.error(f"Error in preprocessing: {str(e)}") - st.info("Try using grayscale preprocessing for PNG images with transparency") - - # Container for success message (will be filled after processing) - # No extra spacing needed as it will be managed programmatically - metadata_placeholder = st.empty() - - # We now have a close button next to the success message, so we don't need one here - - # auto_process_sample is already initialized at the top of the function - - # processed_document_active is already initialized at the top of the function - - # We'll determine processing logic below - - # Check if this is an auto-processing situation - auto_processing = st.session_state.auto_process_sample and not st.session_state.processed_document_active - - # Show a message if auto-processing is happening - if auto_processing: - st.info("Automatically processing sample document...") - - # Determine if we should process the document - # Either process button was clicked OR auto-processing is happening - should_process = process_button or auto_processing - - if should_process: - # Reset auto-process flag to avoid processing on next rerun - if st.session_state.auto_process_sample: - st.session_state.auto_process_sample = False - # Move the progress indicator reference to just below the button - progress_container = progress_placeholder - try: - # Get max_pages or default if not available - max_pages_value = max_pages if 'max_pages' in locals() else None - - # Apply performance mode settings - if 'perf_mode' in locals(): - if perf_mode == "Speed": - # Override settings for faster processing - if 'preprocessing_options' in locals(): - preprocessing_options["denoise"] = False # Skip denoising for speed - if 'pdf_dpi' in locals() and file_ext.lower() == '.pdf': - pdf_dpi = min(pdf_dpi, 100) # Lower DPI for speed - - # Process file with or without custom prompt - if custom_prompt and custom_prompt.strip(): - # Process with custom instructions for the AI - with progress_placeholder.container(): - progress_bar = st.progress(0) - status_text = st.empty() - status_text.markdown('
Processing with custom instructions...
', unsafe_allow_html=True) - progress_bar.progress(30) - - # Special handling for PDF files with custom prompts - if file_ext.lower() == ".pdf": - # For PDFs with custom prompts, we use a special two-step process - with progress_placeholder.container(): - status_text.markdown('
Using special PDF processing for custom instructions...
', unsafe_allow_html=True) - progress_bar.progress(40) - - try: - # Process directly in one step for better performance - processor = StructuredOCR() - - # First save the PDF to a temp file - with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp: - tmp.write(uploaded_file.getvalue()) - temp_path = tmp.name - # Track temporary file for cleanup - st.session_state.temp_file_paths.append(temp_path) - - # Apply PDF rotation if specified - pdf_rotation_value = pdf_rotation if 'pdf_rotation' in locals() else 0 - - # Add document type hints to custom prompt if available from document type selector - if custom_prompt and custom_prompt is not None and 'selected_doc_type' in locals() and selected_doc_type != "Auto-detect (standard processing)" and "This is a" not in str(custom_prompt): - # Extract just the document type from the selector - doc_type_hint = selected_doc_type.split(" or ")[0].lower() - # Prepend to the custom prompt - custom_prompt = f"This is a {doc_type_hint}. {custom_prompt}" - - # Process in a single step with simplified custom prompt - if custom_prompt: - # Detect document type from custom prompt - doc_type = "general" - if any(keyword in custom_prompt.lower() for keyword in ["newspaper", "column", "article", "magazine"]): - doc_type = "newspaper" - elif any(keyword in custom_prompt.lower() for keyword in ["letter", "correspondence", "handwritten"]): - doc_type = "letter" - elif any(keyword in custom_prompt.lower() for keyword in ["book", "publication"]): - doc_type = "book" - elif any(keyword in custom_prompt.lower() for keyword in ["form", "certificate", "legal"]): - doc_type = "form" - elif any(keyword in custom_prompt.lower() for keyword in ["recipe", "ingredients"]): - doc_type = "recipe" - - # Format the custom prompt for better Mistral processing - if len(custom_prompt) > 250: - # Truncate long custom prompts but preserve essential info - simplified_prompt = f"DOCUMENT TYPE: {doc_type}\nINSTRUCTIONS: {custom_prompt[:250]}..." - else: - simplified_prompt = f"DOCUMENT TYPE: {doc_type}\nINSTRUCTIONS: {custom_prompt}" - else: - simplified_prompt = custom_prompt - - progress_bar.progress(50) - # Check if we have custom instructions - has_custom_prompt = custom_prompt is not None and len(str(custom_prompt).strip()) > 0 - if has_custom_prompt: - status_text.markdown('
Processing PDF with custom instructions...
', unsafe_allow_html=True) - else: - status_text.markdown('
Processing PDF with optimized settings...
', unsafe_allow_html=True) - - # Process directly with optimized settings - result = processor.process_file( - file_path=temp_path, - file_type="pdf", - use_vision=use_vision, - custom_prompt=simplified_prompt, - file_size_mb=len(uploaded_file.getvalue()) / (1024 * 1024), - pdf_rotation=pdf_rotation_value - ) - - progress_bar.progress(90) - status_text.markdown('
Finalizing results...
', unsafe_allow_html=True) - - # Clean up temp file - if os.path.exists(temp_path): - os.unlink(temp_path) - # Remove from tracking list - if temp_path in st.session_state.temp_file_paths: - st.session_state.temp_file_paths.remove(temp_path) - - except Exception as e: - # If anything fails, revert to standard processing - st.warning(f"Special PDF processing failed. Falling back to standard method: {str(e)}") - result = process_file(uploaded_file, use_vision, {}, progress_container=progress_placeholder) - else: - # For non-PDF files, use normal processing with custom prompt - # Save the uploaded file to a temporary file with preprocessing - with tempfile.NamedTemporaryFile(delete=False, suffix=Path(uploaded_file.name).suffix) as tmp: - temp_path = tmp.name - # Track temporary file for cleanup - st.session_state.temp_file_paths.append(temp_path) - # Apply preprocessing if any options are selected - if any(preprocessing_options.values()): - # Apply performance mode settings - if 'perf_mode' in locals() and perf_mode == "Speed": - # Skip denoising for speed in preprocessing - speed_preprocessing = preprocessing_options.copy() - speed_preprocessing["denoise"] = False - processed_bytes = preprocess_image(uploaded_file.getvalue(), speed_preprocessing) - else: - processed_bytes = preprocess_image(uploaded_file.getvalue(), preprocessing_options) - tmp.write(processed_bytes) - else: - tmp.write(uploaded_file.getvalue()) - - # Show progress - with progress_placeholder.container(): - progress_bar.progress(50) - status_text.markdown('
Analyzing with custom instructions...
', unsafe_allow_html=True) - - # Initialize OCR processor and process with custom prompt - processor = StructuredOCR() - - # Detect document type from custom prompt - doc_type = "general" - if any(keyword in custom_prompt.lower() for keyword in ["newspaper", "column", "article", "magazine"]): - doc_type = "newspaper" - elif any(keyword in custom_prompt.lower() for keyword in ["letter", "correspondence", "handwritten"]): - doc_type = "letter" - elif any(keyword in custom_prompt.lower() for keyword in ["book", "publication"]): - doc_type = "book" - elif any(keyword in custom_prompt.lower() for keyword in ["form", "certificate", "legal"]): - doc_type = "form" - elif any(keyword in custom_prompt.lower() for keyword in ["recipe", "ingredients"]): - doc_type = "recipe" - - # Format the custom prompt for better Mistral processing - formatted_prompt = f"DOCUMENT TYPE: {doc_type}\nUSER INSTRUCTIONS: {custom_prompt.strip()}\nPay special attention to these instructions and respond accordingly." - - try: - result = processor.process_file( - file_path=temp_path, - file_type="image", # Always use image for non-PDFs - use_vision=use_vision, - custom_prompt=formatted_prompt, - file_size_mb=len(uploaded_file.getvalue()) / (1024 * 1024) - ) - except Exception as e: - # For any error, fall back to standard processing - st.warning(f"Custom prompt processing failed. Falling back to standard processing: {str(e)}") - result = process_file(uploaded_file, use_vision, preprocessing_options, progress_container=progress_placeholder) - - # Complete progress - with progress_placeholder.container(): - progress_bar.progress(100) - status_text.markdown('
Processing complete!
', unsafe_allow_html=True) - time.sleep(0.8) - progress_placeholder.empty() - - # Clean up temporary file - if os.path.exists(temp_path): - try: - # Remove from tracking list - if temp_path in st.session_state.temp_file_paths: - st.session_state.temp_file_paths.remove(temp_path) - os.unlink(temp_path) - except: - pass - else: - # Standard processing without custom prompt - result = process_file(uploaded_file, use_vision, preprocessing_options, progress_container=progress_placeholder) + # Only process document if available + if uploaded_file is not None: + process_document(uploaded_file, left_col, right_col, sidebar_options) + else: + # Clear any remaining state flag if we're showing examples + st.session_state.processed_document_active = False - # Document results will be shown in the right column - with right_col: - - # Add Document Metadata section header - st.subheader("Document Metadata") - - # Create metadata card with standard styling - metadata_html = '
' - - # File info - metadata_html += f'

File Name: {result.get("file_name", uploaded_file.name)}

' - - # Info about limited pages - if 'limited_pages' in result: - metadata_html += f'

Pages: {result["limited_pages"]["processed"]} of {result["limited_pages"]["total"]} processed

' - - # Languages - if 'languages' in result: - languages = [lang for lang in result['languages'] if lang is not None] - if languages: - metadata_html += f'

Languages: {", ".join(languages)}

' - - # Topics - show all subject tags with max of 8 - if 'topics' in result and result['topics']: - topics_display = result['topics'][:8] - topics_str = ", ".join(topics_display) - - # Add indicator if there are more tags - if len(result['topics']) > 8: - topics_str += f" + {len(result['topics']) - 8} more" - - metadata_html += f'

Subject Tags: {topics_str}

' - - # Document type - using simplified labeling consistent with user instructions - if 'detected_document_type' in result: - # Get clean document type label - removing "historical" prefix if present - doc_type = result['detected_document_type'].lower() - if doc_type.startswith("historical "): - doc_type = doc_type[len("historical "):] - # Capitalize first letter of each word for display - doc_type = ' '.join(word.capitalize() for word in doc_type.split()) - metadata_html += f'

Document Type: {doc_type}

' - - # Processing time - if 'processing_time' in result: - proc_time = result['processing_time'] - metadata_html += f'

Processing Time: {proc_time:.1f}s

' - - # Custom prompt indicator with special styling - simplified and only showing when there are actual instructions - # Only show when custom_prompt exists in the session AND has content, or when the result explicitly states it was applied - has_instructions = ('custom_prompt' in locals() and custom_prompt and len(str(custom_prompt).strip()) > 0) - if has_instructions or 'custom_prompt_applied' in result: - # Use consistent styling with other metadata fields - metadata_html += f'

Advanced Analysis: Custom instructions applied

' - - # Close the metadata card - metadata_html += '
' - - # Render the metadata HTML - st.markdown(metadata_html, unsafe_allow_html=True) - - # Add content section heading - using standard subheader - st.subheader("Document Content") - - # Start document content div with consistent styling class - st.markdown('
', unsafe_allow_html=True) - if 'ocr_contents' in result: - # Check for has_images in the result - has_images = result.get('has_images', False) - - # Create tabs for different views - if has_images: - view_tab1, view_tab2, view_tab3 = st.tabs(["Structured View", "Raw JSON", "With Images"]) - else: - view_tab1, view_tab2 = st.tabs(["Structured View", "Raw JSON"]) - - with view_tab1: - # Display in a more user-friendly format based on the content structure - html_content = "" - if isinstance(result['ocr_contents'], dict): - for section, content in result['ocr_contents'].items(): - if content: # Only display non-empty sections - # Add consistent styling for each section - section_title = f'

{section.replace("_", " ").title()}

' - html_content += section_title - - if isinstance(content, str): - # Optimize by using a expander for very long content - if len(content) > 1000: - # Format content for long text - bold everything after "... that" - preview_content = content[:1000] + "..." if len(content) > 1000 else content - - if "... that" in content: - # For the preview (first 1000 chars) - if "... that" in preview_content: - parts = preview_content.split("... that", 1) - formatted_preview = f"{parts[0]}... that{parts[1]}" - html_content += f"

{formatted_preview}

" - else: - html_content += f"

{preview_content}

" - - # For the full content in expander - parts = content.split("... that", 1) - formatted_full = f"{parts[0]}... that**{parts[1]}**" - - st.markdown(f"#### {section.replace('_', ' ').title()}") - with st.expander("Show full content"): - st.markdown(formatted_full) - else: - html_content += f"

{preview_content}

" - st.markdown(f"#### {section.replace('_', ' ').title()}") - with st.expander("Show full content"): - st.write(content) - else: - # Format content - bold everything after "... that" - if "... that" in content: - parts = content.split("... that", 1) - formatted_content = f"{parts[0]}... that{parts[1]}" - html_content += f"

{formatted_content}

" - st.markdown(f"#### {section.replace('_', ' ').title()}") - st.markdown(f"{parts[0]}... that**{parts[1]}**") - else: - html_content += f"

{content}

" - st.markdown(f"#### {section.replace('_', ' ').title()}") - st.write(content) - elif isinstance(content, list): - html_list = "" - html_content += html_list - elif isinstance(content, dict): - html_dict = "
" - st.markdown(f"#### {section.replace('_', ' ').title()}") - for k, v in content.items(): - html_dict += f"
{k}
{v}
" - st.write(f"**{k}:** {v}") - html_dict += "
" - html_content += html_dict - - # Add download button in a smaller section - with st.expander("Export Content"): - # Get original filename without extension - original_name = Path(result.get('file_name', uploaded_file.name)).stem - # HTML download button - html_bytes = html_content.encode() - st.download_button( - label="Download as HTML", - data=html_bytes, - file_name=f"{original_name}_processed.html", - mime="text/html" - ) - - with view_tab2: - # Show the raw JSON for developers, with an expander for large results - if len(json.dumps(result)) > 5000: - with st.expander("View full JSON"): - try: - st.json(result) - except Exception as e: - st.error(f"Error displaying JSON: {str(e)}") - # Fallback to string representation - st.code(str(result)) - else: - try: - st.json(result) - except Exception as e: - st.error(f"Error displaying JSON: {str(e)}") - # Fallback to string representation - st.code(str(result)) - - if has_images and 'pages_data' in result: - with view_tab3: - # Use pages_data directly instead of raw_response - try: - # Use the serialized pages data - pages_data = result.get('pages_data', []) - if not pages_data: - st.warning("No image data found in the document.") - st.stop() - - # Construct markdown from pages_data directly - from ocr_utils import replace_images_in_markdown - combined_markdown = "" - - for page in pages_data: - page_markdown = page.get('markdown', '') - images = page.get('images', []) - - # Create image dictionary - image_dict = {} - for img in images: - if 'id' in img and 'image_base64' in img: - image_dict[img['id']] = img['image_base64'] - - # Replace image references in markdown - if page_markdown and image_dict: - page_markdown = replace_images_in_markdown(page_markdown, image_dict) - combined_markdown += page_markdown + "\n\n---\n\n" - - if not combined_markdown: - st.warning("No content with images found.") - st.stop() - - # Add CSS for better image handling - st.markdown(""" - - """, unsafe_allow_html=True) - - # Process and display content with images properly - import re + # Show example documents section + show_example_documents() + + with main_tab2: + # Previous results tab + display_previous_results() + + with main_tab3: + # About tab + display_about_tab() - # Process each page separately - pages_content = [] - - # Check if this is from a PDF processed through pdf2image - is_pdf2image = result.get('pdf_processing_method') == 'pdf2image' - - for i, page in enumerate(pages_data): - page_markdown = page.get('markdown', '') - images = page.get('images', []) - - if not page_markdown: - continue - - # Create image dictionary - image_dict = {} - for img in images: - if 'id' in img and 'image_base64' in img: - image_dict[img['id']] = img['image_base64'] - - # Create HTML content for this page - page_html = f"

Page {i+1}

" if i > 0 else "" - - # Display the raw text content first to ensure it's visible - page_html += f"
" - - # Special handling for PDF2image processed documents - if is_pdf2image and i == 0 and 'ocr_contents' in result: - # Display all structured content from OCR for PDFs - page_html += "
" - - # Check if custom prompt was applied - if result.get('custom_prompt_applied') == 'text_only': - page_html += "
Custom analysis applied using text-only processing
" - - ocr_contents = result.get('ocr_contents', {}) - # Get a sorted list of sections to ensure consistent order - section_keys = sorted(ocr_contents.keys()) - - # Place important sections first - priority_sections = ['title', 'subtitle', 'header', 'publication', 'date', 'content', 'main_text'] - for important in priority_sections: - if important in ocr_contents and important in section_keys: - section_keys.remove(important) - section_keys.insert(0, important) - - for section in section_keys: - content = ocr_contents[section] - if section in ['raw_text', 'error', 'partial_text']: - continue # Skip these fields - - section_title = section.replace('_', ' ').title() - page_html += f"

{section_title}

" - - if isinstance(content, str): - # Convert newlines to
tags - content_html = content.replace('\n', '
') - page_html += f"

{content_html}

" - elif isinstance(content, list): - page_html += "
    " - for item in content: - if isinstance(item, str): - page_html += f"
  • {item}
  • " - elif isinstance(item, dict): - page_html += "
  • " - for k, v in item.items(): - page_html += f"{k}: {v}
    " - page_html += "
  • " - else: - page_html += f"
  • {str(item)}
  • " - page_html += "
" - elif isinstance(content, dict): - for k, v in content.items(): - if isinstance(v, str): - page_html += f"

{k}: {v}

" - elif isinstance(v, list): - page_html += f"

{k}:

    " - for item in v: - page_html += f"
  • {item}
  • " - page_html += "
" - else: - page_html += f"

{k}: {str(v)}

" - - page_html += "
" - else: - # Standard processing for regular documents - # Get all text content that isn't an image and add it first - text_content = [] - for line in page_markdown.split("\n"): - if not re.search(r'!\[(.*?)\]\((.*?)\)', line) and line.strip(): - text_content.append(line) - - # Add the text content as a block - if text_content: - page_html += f"
" - for line in text_content: - page_html += f"

{line}

" - page_html += "
" - - page_html += "
" - - # Then add images separately - for line in page_markdown.split("\n"): - # Handle image lines - img_match = re.search(r'!\[(.*?)\]\((.*?)\)', line) - if img_match: - alt_text = img_match.group(1) - img_ref = img_match.group(2) - - # Get the base64 data for this image ID - img_data = image_dict.get(img_ref, "") - if img_data: - img_html = f'
{alt_text}
' - page_html += img_html - - # Add page separator if not the last page - if i < len(pages_data) - 1: - page_html += '
' - - pages_content.append(page_html) - - # Combine all pages HTML - html_content = "\n".join(pages_content) - - # Wrap the content in a div with the class for styling - st.markdown(f""" -
- {html_content} -
- """, unsafe_allow_html=True) - - # Create download HTML content - download_html = f""" - - - - - -
- {html_content} -
- - - """ - - # Create a more descriptive filename - original_name = Path(result.get('file_name', uploaded_file.name)).stem - - # Add document type if available - if 'topics' in result and result['topics']: - topic = result['topics'][0].lower().replace(' ', '_') - original_name = f"{original_name}_{topic}" - - # Add language if available - if 'languages' in result and result['languages']: - lang = result['languages'][0].lower() - # Only add if it's not already in the filename - if lang not in original_name.lower(): - original_name = f"{original_name}_{lang}" - - # Get current date for uniqueness - from datetime import datetime - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - - # Create final filename - download_filename = f"{original_name}_{timestamp}_with_images.html" - - # Add download button as an expander to prevent page reset - with st.expander("Download Document with Images"): - st.markdown("Click the button below to download the document with embedded images") - st.download_button( - label="Download as HTML", - data=download_html, - file_name=download_filename, - mime="text/html", - key="download_with_images_button" - ) - - except Exception as e: - st.error(f"Could not display document with images: {str(e)}") - st.info("Try refreshing or processing the document again.") - - if 'ocr_contents' not in result: - st.error("No OCR content was extracted from the document.") - else: - # Check for minimal text content in OCR results - has_minimal_text = False - total_text_length = 0 - - # Check if the document is an image (not a PDF) - is_image = result.get('file_name', '').lower().endswith(('.jpg', '.jpeg', '.png', '.gif')) - - # If image file with raw_text only - if is_image and 'ocr_contents' in result: - ocr_contents = result['ocr_contents'] - - # Check if only raw_text exists with minimal content - has_raw_text_only = False - if 'raw_text' in ocr_contents: - raw_text = ocr_contents['raw_text'] - total_text_length += len(raw_text.strip()) - - # Check if raw_text is the only significant field - other_content_fields = [k for k in ocr_contents.keys() - if k not in ['raw_text', 'error', 'partial_text'] - and isinstance(ocr_contents[k], (str, list)) - and ocr_contents[k]] - - if len(other_content_fields) <= 1: # Only raw_text or one other field - has_raw_text_only = True - - # Check if minimal text was extracted (less than 50 characters) - if total_text_length < 50 and has_raw_text_only: - has_minimal_text = True - - # Check if any meaningful preprocessing options were used - preprocessing_used = False - if preprocessing_options.get("document_type", "standard") != "standard": - preprocessing_used = True - if preprocessing_options.get("grayscale", False): - preprocessing_used = True - if preprocessing_options.get("denoise", False): - preprocessing_used = True - if preprocessing_options.get("contrast", 0) != 0: - preprocessing_used = True - if preprocessing_options.get("rotation", 0) != 0: - preprocessing_used = True - - # If minimal text was found and preprocessing options weren't used - if has_minimal_text and not preprocessing_used and uploaded_file.type.startswith('image/'): - st.warning(""" - **Limited text extracted from this image.** - - Try using preprocessing options in the sidebar to improve results: - - Convert to grayscale for clearer text - - Use denoising for aged or degraded documents - - Adjust contrast for faded text - - Try different rotation if text orientation is unclear - - Click the "Preprocessing Options" section in the sidebar under "Image Processing". - """) - - # Close document content div - st.markdown('
', unsafe_allow_html=True) - - # Set processed_document_active to True when a new document is processed - st.session_state.processed_document_active = True - - # Store information about this processed file to track when new files are uploaded - if uploaded_file is not None: - st.session_state.last_processed_file = f"{uploaded_file.name}_{len(uploaded_file.getvalue())}" - - # Button styling is now handled by global CSS - - # Display success message with close button for dismissing processed documents - success_cols = st.columns([5, 1]) - with success_cols[0]: - metadata_placeholder.success("**Document processed successfully**") - with success_cols[1]: - # Close button styling is now handled by global CSS - - # Define a function to clear document state - def clear_document_state(): - # Reset all document-related session state - st.session_state.processed_document_active = False - st.session_state.sample_document = None - st.session_state.last_processed_file = None - - # Clear all sample document state - st.session_state.original_sample_bytes = None - st.session_state.original_sample_name = None - st.session_state.sample_just_loaded = False - st.session_state.sample_document_processed = False - st.session_state.auto_process_sample = False - st.session_state.is_sample_document = False - - # Clean up any temporary files - if 'temp_file_paths' in st.session_state: - for temp_path in st.session_state.temp_file_paths: - try: - if os.path.exists(temp_path): - os.remove(temp_path) - except Exception: - pass # Ignore errors in cleanup - # Clear the temp files list - st.session_state.temp_file_paths = [] - - # Create the close button with a callback - st.button("X Close", - key="close_document_button", - help="Clear current document and start over", - on_click=clear_document_state) - - # Store the result in the previous results list - # Add timestamp to result for history tracking - result_copy = result.copy() - result_copy['timestamp'] = datetime.now().strftime("%Y-%m-%d %H:%M") - - # Store if this was a sample document - if 'sample_document_processed' in st.session_state and st.session_state.sample_document_processed: - result_copy['sample_document'] = True - # Reset the flag - st.session_state.sample_document_processed = False - - # Generate more descriptive file name for the result - original_name = Path(result.get('file_name', uploaded_file.name)).stem - - # Extract subject tags from content - subject_tags = [] - - # First check if we already have topics in the result - if 'topics' in result and result['topics'] and len(result['topics']) >= 3: - subject_tags = result['topics'] - else: - # Generate tags based on document content - try: - # Extract text from OCR contents - raw_text = "" - if 'ocr_contents' in result: - if 'raw_text' in result['ocr_contents']: - raw_text = result['ocr_contents']['raw_text'] - elif 'content' in result['ocr_contents']: - raw_text = result['ocr_contents']['content'] - - # Use existing topics as starting point if available - if 'topics' in result and result['topics']: - subject_tags = list(result['topics']) - - # Add document type if detected - if 'detected_document_type' in result: - doc_type = result['detected_document_type'].capitalize() - if doc_type not in subject_tags: - subject_tags.append(doc_type) - - # Analyze content for common themes based on keywords - content_themes = { - "Historical": ["century", "ancient", "historical", "history", "vintage", "archive", "heritage"], - "Travel": ["travel", "journey", "expedition", "exploration", "voyage", "map", "location"], - "Science": ["experiment", "research", "study", "analysis", "scientific", "laboratory"], - "Literature": ["book", "novel", "poetry", "author", "literary", "chapter", "story"], - "Art": ["painting", "illustration", "drawing", "artist", "exhibit", "gallery", "portrait"], - "Education": ["education", "school", "university", "college", "learning", "student", "teach"], - "Politics": ["government", "political", "policy", "administration", "election", "legislature"], - "Business": ["business", "company", "corporation", "market", "industry", "commercial", "trade"], - "Social": ["society", "community", "social", "culture", "tradition", "customs"], - "Technology": ["technology", "invention", "device", "mechanical", "machine", "technical"], - "Military": ["military", "army", "navy", "war", "battle", "soldier", "weapon"], - "Religion": ["religion", "church", "temple", "spiritual", "sacred", "ritual"], - "Medicine": ["medical", "medicine", "health", "hospital", "treatment", "disease", "doctor"], - "Legal": ["legal", "law", "court", "justice", "attorney", "judicial", "statute"], - "Correspondence": ["letter", "mail", "correspondence", "message", "communication"] - } - - # Search for keywords in content - if raw_text: - raw_text_lower = raw_text.lower() - for theme, keywords in content_themes.items(): - if any(keyword in raw_text_lower for keyword in keywords): - if theme not in subject_tags: - subject_tags.append(theme) - - # Add document period tag if date patterns are detected - if raw_text: - # Look for years in content - import re - year_matches = re.findall(r'\b1[0-9]{3}\b|\b20[0-1][0-9]\b', raw_text) - if year_matches: - # Convert to integers - years = [int(y) for y in year_matches] - # Get earliest and latest years - earliest = min(years) - - # Add period tag based on earliest year - if earliest < 1800: - period_tag = "Pre-1800s" - elif earliest < 1850: - period_tag = "Early 19th Century" - elif earliest < 1900: - period_tag = "Late 19th Century" - elif earliest < 1950: - period_tag = "Early 20th Century" - else: - period_tag = "Modern Era" - - if period_tag not in subject_tags: - subject_tags.append(period_tag) - - # Add languages as topics if available - if 'languages' in result and result['languages']: - for lang in result['languages']: - if lang and lang not in subject_tags: - lang_tag = f"{lang} Language" - subject_tags.append(lang_tag) - - # Add preprocessing information as tags if preprocessing was applied - if uploaded_file.type.startswith('image/'): - # Check if meaningful preprocessing options were used - if preprocessing_options.get("document_type", "standard") != "standard": - doc_type = preprocessing_options["document_type"].capitalize() - preprocessing_tag = f"Enhanced ({doc_type})" - if preprocessing_tag not in subject_tags: - subject_tags.append(preprocessing_tag) - - preprocessing_methods = [] - if preprocessing_options.get("grayscale", False): - preprocessing_methods.append("Grayscale") - if preprocessing_options.get("denoise", False): - preprocessing_methods.append("Denoised") - if preprocessing_options.get("contrast", 0) != 0: - contrast_val = preprocessing_options.get("contrast", 0) - if contrast_val > 0: - preprocessing_methods.append("Contrast Enhanced") - else: - preprocessing_methods.append("Contrast Reduced") - if preprocessing_options.get("rotation", 0) != 0: - preprocessing_methods.append("Rotated") - - # Add a combined preprocessing tag if methods were applied - if preprocessing_methods: - prep_tag = "Preprocessed" - if prep_tag not in subject_tags: - subject_tags.append(prep_tag) - - # Add the specific method as a tag if only one was used - if len(preprocessing_methods) == 1: - method_tag = preprocessing_methods[0] - if method_tag not in subject_tags: - subject_tags.append(method_tag) - - except Exception as e: - logger.warning(f"Error generating subject tags: {str(e)}") - # Fallback tags if extraction fails - if not subject_tags: - subject_tags = ["Document", "Historical", "Text"] - - # Ensure we have at least 3 tags - while len(subject_tags) < 3: - if "Document" not in subject_tags: - subject_tags.append("Document") - elif "Historical" not in subject_tags: - subject_tags.append("Historical") - elif "Text" not in subject_tags: - subject_tags.append("Text") - else: - # If we still need tags, add generic ones - generic_tags = ["Archive", "Content", "Record"] - for tag in generic_tags: - if tag not in subject_tags: - subject_tags.append(tag) - break - - # Update the result with enhanced tags - result_copy['topics'] = subject_tags - - # Create a more descriptive file name - file_type = Path(result.get('file_name', uploaded_file.name)).suffix.lower() - doc_type_tag = "" - - # Add document type to filename if detected - if 'detected_document_type' in result: - doc_type = result['detected_document_type'].lower() - doc_type_tag = f"_{doc_type}" - elif len(subject_tags) > 0: - # Use first tag as document type if not explicitly detected - doc_type_tag = f"_{subject_tags[0].lower().replace(' ', '_')}" - - # Add period tag for historical context if available - period_tag = "" - for tag in subject_tags: - if "century" in tag.lower() or "pre-" in tag.lower() or "era" in tag.lower(): - period_tag = f"_{tag.lower().replace(' ', '_')}" - break - - # Generate final descriptive file name - descriptive_name = f"{original_name}{doc_type_tag}{period_tag}{file_type}" - result_copy['descriptive_file_name'] = descriptive_name - - # Add to session state, keeping the most recent 20 results - st.session_state.previous_results.insert(0, result_copy) - if len(st.session_state.previous_results) > 20: - st.session_state.previous_results = st.session_state.previous_results[:20] - - except Exception as e: - st.error(f"Error processing document: {str(e)}") - else: - # Example Documents section after file uploader - st.subheader("Example Documents") - - # Add a simplified info message about examples - st.markdown(""" - This app can process various historical documents: - - Historical photographs, maps, and manuscripts - - Handwritten letters and documents - - Printed books and articles - - Multi-page PDFs - """) - - # Add CSS to make the dropdown match the column width - st.markdown(""" - - """, unsafe_allow_html=True) - - # Sample document URLs dropdown with clearer label - sample_urls = [ - "Select a sample document", - "https://huggingface.co/spaces/milwright/historical-ocr/resolve/main/input/magician-or-bottle-cungerer.jpg", - "https://huggingface.co/spaces/milwright/historical-ocr/resolve/main/input/handwritten-letter.jpg", - "https://huggingface.co/spaces/milwright/historical-ocr/resolve/main/input/magellan-travels.jpg", - "https://huggingface.co/spaces/milwright/historical-ocr/resolve/main/input/milgram-flier.png" - ] - - sample_names = [ - "Select a sample document", - "The Magician (Image)", - "Handwritten Letter (Image)", - "Magellan Travels (Image)", - "Milgram Flier (Image)" - ] - - # Initialize sample_document in session state if it doesn't exist - if 'sample_document' not in st.session_state: - st.session_state.sample_document = None - - selected_sample = st.selectbox("Select a sample document from `~/input`", options=range(len(sample_urls)), format_func=lambda i: sample_names[i]) - - if selected_sample > 0: - selected_url = sample_urls[selected_sample] - - # Load Sample Document button styling is now handled by global CSS - - # Add process button for the sample document - if st.button("Load Sample Document"): - try: - import requests - from io import BytesIO - - with st.spinner(f"Downloading {sample_names[selected_sample]}..."): - response = requests.get(selected_url) - response.raise_for_status() - - # Extract filename from URL - file_name = selected_url.split("/")[-1] - - # Create a BytesIO object from the downloaded content - file_content = BytesIO(response.content) - - # Store as a UploadedFile-like object in session state - class SampleDocument: - def __init__(self, name, content, content_type): - self.name = name - self._content = content - self.type = content_type - self.size = len(content) - - def getvalue(self): - return self._content - - def read(self): - return self._content - - def seek(self, position): - # Implement seek for compatibility with some file operations - return - - def tell(self): - # Implement tell for compatibility - return 0 - - # Determine content type based on file extension - if file_name.lower().endswith('.pdf'): - content_type = 'application/pdf' - elif file_name.lower().endswith(('.jpg', '.jpeg')): - content_type = 'image/jpeg' - elif file_name.lower().endswith('.png'): - content_type = 'image/png' - else: - content_type = 'application/octet-stream' - - # Save download info in session state for more reliable handling - st.session_state.sample_document = SampleDocument( - name=file_name, - content=response.content, - content_type=content_type - ) - - # Set a flag to indicate this is a newly loaded sample - st.session_state.sample_just_loaded = True - - # Force rerun to load the document - st.rerun() - except Exception as e: - st.error(f"Error downloading sample document: {str(e)}") - st.info("Please try uploading your own document instead.") +# Run the application +if __name__ == "__main__": + main()