import os import streamlit as st import json import sys import time import base64 from pathlib import Path import io from datetime import datetime import logging # Import modules from preprocessing import convert_pdf_to_images, preprocess_image from ocr_processing import process_file from ui_components import ( ProgressReporter, create_sidebar_options, display_results, create_file_uploader, display_about_tab, display_previous_results, display_document_with_images ) from utils import get_base64_from_image, handle_temp_files, format_timestamp from error_handler import handle_ocr_error, check_file_size from constants import ( MAX_FILE_SIZE_MB, MAX_PAGES, DOCUMENT_TYPES, DOCUMENT_LAYOUTS, CUSTOM_PROMPT_TEMPLATES, LAYOUT_PROMPT_ADDITIONS ) # Import the StructuredOCR class and config from the local files from structured_ocr import StructuredOCR from config import MISTRAL_API_KEY # Import utilities for handling previous results from ocr_utils import create_results_zip # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger("app") # Set favicon path favicon_path = os.path.join(os.path.dirname(__file__), "static/favicon.png") # Set page configuration st.set_page_config( page_title="Historical OCR", page_icon=favicon_path if os.path.exists(favicon_path) else "📜", layout="wide", initial_sidebar_state="expanded" ) def initialize_session_state(): """Initialize all session state variables""" # Initialize session state for storing previous results if not already present if 'previous_results' not in st.session_state: st.session_state.previous_results = [] # Initialize temp file tracking if 'temp_file_paths' not in st.session_state: st.session_state.temp_file_paths = [] # Initialize last processed file tracking to fix "Process Document Again" button if 'last_processed_file' not in st.session_state: st.session_state.last_processed_file = None # Important: Initialize the reset flag if 'perform_reset' not in st.session_state: st.session_state.perform_reset = False # Initialize other session state variables if 'auto_process_sample' not in st.session_state: st.session_state.auto_process_sample = False if 'sample_just_loaded' not in st.session_state: st.session_state.sample_just_loaded = False if 'processed_document_active' not in st.session_state: st.session_state.processed_document_active = False if 'sample_document_processed' not in st.session_state: st.session_state.sample_document_processed = False if 'sample_document' not in st.session_state: st.session_state.sample_document = None if 'original_sample_bytes' not in st.session_state: st.session_state.original_sample_bytes = None if 'original_sample_name' not in st.session_state: st.session_state.original_sample_name = None if 'is_sample_document' not in st.session_state: st.session_state.is_sample_document = False # Check if we need to perform a complete reset (coming from "Close Document" button) if 'perform_reset' in st.session_state and st.session_state.perform_reset: # Save previous results previous_results = st.session_state.previous_results # Clean up any temporary files if 'temp_file_paths' in st.session_state and st.session_state.temp_file_paths: handle_temp_files(st.session_state.temp_file_paths) # Clear all session state variables except previous_results for key in list(st.session_state.keys()): if key not in ['previous_results']: # We will manually reset the perform_reset flag at the end if key != 'perform_reset': st.session_state.pop(key, None) # Restore previous results st.session_state.previous_results = previous_results # Reinitialize session state variables st.session_state.temp_file_paths = [] st.session_state.last_processed_file = None st.session_state.auto_process_sample = False st.session_state.sample_just_loaded = False st.session_state.processed_document_active = False st.session_state.sample_document_processed = False st.session_state.sample_document = None st.session_state.original_sample_bytes = None st.session_state.original_sample_name = None st.session_state.is_sample_document = False # Turn off reset flag - this must be done last st.session_state.perform_reset = False # Force this to be a complete reset cycle return def show_example_documents(): """Show example documents section""" st.subheader("Example Documents") # Add a simplified info message about examples st.markdown(""" This app can process various historical documents: - Historical photographs, maps, and manuscripts - Handwritten letters and documents - Printed books and articles - Multi-page PDFs """) # Add CSS to make the dropdown match the column width st.markdown(""" """, unsafe_allow_html=True) # Sample document URLs dropdown with clearer label sample_urls = [ "Select a sample document", "https://huggingface.co/spaces/milwright/historical-ocr/resolve/main/input/a-la-carte.pdf", "https://huggingface.co/spaces/milwright/historical-ocr/resolve/main/input/magician-or-bottle-cungerer.jpg", "https://huggingface.co/spaces/milwright/historical-ocr/resolve/main/input/handwritten-letter.jpg", "https://huggingface.co/spaces/milwright/historical-ocr/resolve/main/input/magellan-travels.jpg", "https://huggingface.co/spaces/milwright/historical-ocr/resolve/main/input/milgram-flier.png", "https://huggingface.co/spaces/milwright/historical-ocr/resolve/main/input/baldwin-15th-north.jpg" ] sample_names = [ "Select a sample document", "Restaurant Menu (PDF)", "The Magician (Image)", "Handwritten Letter (Image)", "Magellan Travels (Image)", "Milgram Flier (Image)", "Baldwin Street (Image)" ] # Initialize sample_document in session state if it doesn't exist if 'sample_document' not in st.session_state: st.session_state.sample_document = None selected_sample = st.selectbox("Select a sample document from `~/input`", options=range(len(sample_urls)), format_func=lambda i: sample_names[i]) if selected_sample > 0: selected_url = sample_urls[selected_sample] # Add process button for the sample document if st.button("Load Sample Document"): try: import requests from io import BytesIO with st.spinner(f"Downloading {sample_names[selected_sample]}..."): response = requests.get(selected_url) response.raise_for_status() # Extract filename from URL file_name = selected_url.split("/")[-1] # Create a BytesIO object from the downloaded content file_content = BytesIO(response.content) # Store as a UploadedFile-like object in session state class SampleDocument: def __init__(self, name, content, content_type): self.name = name self._content = content self.type = content_type self.size = len(content) def getvalue(self): return self._content def read(self): return self._content def seek(self, position): # Implement seek for compatibility with some file operations return def tell(self): # Implement tell for compatibility return 0 # Determine content type based on file extension if file_name.lower().endswith('.pdf'): content_type = 'application/pdf' elif file_name.lower().endswith(('.jpg', '.jpeg')): content_type = 'image/jpeg' elif file_name.lower().endswith('.png'): content_type = 'image/png' else: content_type = 'application/octet-stream' # Reset any document state before loading a new sample if st.session_state.processed_document_active: # Clear previous document state st.session_state.processed_document_active = False st.session_state.last_processed_file = None # Clean up any temporary files from previous processing if st.session_state.temp_file_paths: handle_temp_files(st.session_state.temp_file_paths) st.session_state.temp_file_paths = [] # Save download info in session state st.session_state.sample_document = SampleDocument( name=file_name, content=response.content, content_type=content_type ) # Store original bytes for reprocessing st.session_state.original_sample_bytes = response.content st.session_state.original_sample_name = file_name # Set state flags st.session_state.sample_just_loaded = True st.session_state.is_sample_document = True # Generate a unique identifier for the sample document st.session_state.last_processed_file = f"{file_name}_{len(response.content)}" # Force rerun to load the document st.rerun() except Exception as e: st.error(f"Error downloading sample document: {str(e)}") st.info("Please try uploading your own document instead.") else: # If no sample is selected, clear the sample document in session state st.session_state.sample_document = None def process_document(uploaded_file, left_col, right_col, sidebar_options): """Process the uploaded document and display results""" if uploaded_file is None: return # Check file size (cap at 50MB) file_size_mb = len(uploaded_file.getvalue()) / (1024 * 1024) if file_size_mb > MAX_FILE_SIZE_MB: with left_col: st.error(f"File too large ({file_size_mb:.1f} MB). Maximum file size is {MAX_FILE_SIZE_MB}MB.") return # Check if this is a new file (different from the last processed file) current_file_identifier = f"{uploaded_file.name}_{len(uploaded_file.getvalue())}" if st.session_state.last_processed_file != current_file_identifier: # Reset processed_document_active if a new file is uploaded st.session_state.processed_document_active = False # Process button - flush left with similar padding as file browser with left_col: # Use a key for the button based on state to force re-creation button_key = "process_again" if st.session_state.processed_document_active else "process_initial" # Show appropriate button text based on state button_text = "Process Document Again" if st.session_state.processed_document_active else "Process Document" # Create the button process_button = st.button(button_text, key=button_key) # Handle sample document recreation if needed if process_button and st.session_state.processed_document_active and st.session_state.original_sample_bytes is not None: # Recreate the uploaded file from stored bytes from io import BytesIO import mimetypes # Determine mime type based on file extension file_ext = os.path.splitext(st.session_state.original_sample_name)[1].lower() if file_ext == '.pdf': mime_type = 'application/pdf' elif file_ext in ['.jpg', '.jpeg']: mime_type = 'image/jpeg' elif file_ext == '.png': mime_type = 'image/png' else: mime_type = mimetypes.guess_type(st.session_state.original_sample_name)[0] or 'application/octet-stream' # Create a synthetic file-like object with the same interface as UploadedFile uploaded_file = type('obj', (object,), { 'name': st.session_state.original_sample_name, 'getvalue': lambda: st.session_state.original_sample_bytes, 'read': lambda: st.session_state.original_sample_bytes, 'seek': lambda x: None, 'type': mime_type }) # Empty container for progress indicators - will be filled during processing # Positioned right after the process button for better visibility progress_placeholder = st.empty() # Image preprocessing preview - automatically show only the preprocessed version if any(sidebar_options["preprocessing_options"].values()) and uploaded_file.type.startswith('image/'): st.markdown("**Preprocessed Preview**") try: # Create a container for the preview to better control layout with st.container(): processed_bytes = preprocess_image(uploaded_file.getvalue(), sidebar_options["preprocessing_options"]) # Use use_container_width=True for responsive design st.image(io.BytesIO(processed_bytes), use_container_width=True) # Show preprocessing metadata in a well-formatted caption meta_items = [] if sidebar_options["preprocessing_options"].get("document_type", "standard") != "standard": meta_items.append(f"Document type ({sidebar_options['preprocessing_options']['document_type']})") if sidebar_options["preprocessing_options"].get("grayscale", False): meta_items.append("Grayscale") if sidebar_options["preprocessing_options"].get("denoise", False): meta_items.append("Denoise") if sidebar_options["preprocessing_options"].get("contrast", 0) != 0: meta_items.append(f"Contrast ({sidebar_options['preprocessing_options']['contrast']})") if sidebar_options["preprocessing_options"].get("rotation", 0) != 0: meta_items.append(f"Rotation ({sidebar_options['preprocessing_options']['rotation']}°)") # Only show "Applied:" if there are actual preprocessing steps if meta_items: meta_text = "Applied: " + ", ".join(meta_items) st.caption(meta_text) except Exception as e: st.error(f"Error in preprocessing: {str(e)}") st.info("Try using grayscale preprocessing for PNG images with transparency") # Container for success message (will be filled after processing) # No extra spacing needed as it will be managed programmatically metadata_placeholder = st.empty() # Check if this is an auto-processing situation auto_processing = st.session_state.auto_process_sample and not st.session_state.processed_document_active # Show a message if auto-processing is happening auto_processing_message = st.empty() if auto_processing: auto_processing_message.info("Automatically processing sample document...") # Determine if we should process the document # Either process button was clicked OR auto-processing is happening should_process = process_button or auto_processing if should_process: # Reset auto-process flag to avoid processing on next rerun if st.session_state.auto_process_sample: st.session_state.auto_process_sample = False # Move the progress indicator reference to just below the button progress_reporter = ProgressReporter(progress_placeholder).setup() try: # Process the document result = process_file( uploaded_file=uploaded_file, use_vision=sidebar_options["use_vision"], preprocessing_options=sidebar_options["preprocessing_options"], progress_reporter=progress_reporter, pdf_dpi=sidebar_options.get("pdf_dpi", 150), max_pages=sidebar_options.get("max_pages", 3), pdf_rotation=sidebar_options.get("pdf_rotation", 0), custom_prompt=sidebar_options.get("custom_prompt", ""), perf_mode=sidebar_options.get("perf_mode", "Quality") ) # Display results display_results(result, right_col, sidebar_options.get("custom_prompt", "")) # Set processed_document_active to True when a new document is processed st.session_state.processed_document_active = True # Clear the auto-processing message auto_processing_message.empty() # Store information about this processed file to track when new files are uploaded if uploaded_file is not None: st.session_state.last_processed_file = current_file_identifier # Display success message with close button for dismissing processed documents success_cols = st.columns([5, 1]) with success_cols[0]: metadata_placeholder.success("**Document processed successfully**") with success_cols[1]: # Define a function to clear document state def clear_document_state(): # Reset all document-related session state st.session_state.processed_document_active = False st.session_state.sample_document = None st.session_state.last_processed_file = None # Clear any remaining state flag if we're showing examples st.session_state.perform_reset = True # Create the close button with a callback st.button("✕ Close Document", key="close_document_button", help="Clear current document and start over", on_click=clear_document_state) # Store the result in the previous results list # Add timestamp to result for history tracking result_copy = result.copy() result_copy['timestamp'] = format_timestamp() # Store if this was a sample document if 'is_sample_document' in st.session_state and st.session_state.is_sample_document: result_copy['sample_document'] = True # Add to session state, keeping the most recent 20 results st.session_state.previous_results.insert(0, result_copy) if len(st.session_state.previous_results) > 20: st.session_state.previous_results = st.session_state.previous_results[:20] except Exception as e: st.error(f"Error processing document: {str(e)}") # Log the error import logging logging.error(f"Document processing error: {str(e)}", exc_info=True) def main(): """Main application function""" # Initialize session state initialize_session_state() # Apply custom CSS from ui.layout import load_css load_css() # Create sidebar options sidebar_options = create_sidebar_options() # Create main layout with tabs main_tab1, main_tab2, main_tab3 = st.tabs(["Document Processing", "Previous Results", "About"]) with main_tab1: # Create a two-column layout for file upload and results left_col, right_col = st.columns([1, 1]) with left_col: # Create file uploader uploaded_file = create_file_uploader() # Check if we have a sample document loaded if ('sample_document' in st.session_state and st.session_state.sample_document is not None): # Use the sample document instead of the uploaded file uploaded_file = st.session_state.sample_document # Just reset the sample document loading flags after it's been used if st.session_state.sample_just_loaded: st.session_state.sample_just_loaded = False st.session_state.sample_document_processed = True st.session_state.auto_process_sample = True # Only process document if available if uploaded_file is not None: process_document(uploaded_file, left_col, right_col, sidebar_options) else: # Clear any remaining state flag if we're showing examples st.session_state.processed_document_active = False # Show example documents section show_example_documents() with main_tab2: # Previous results tab display_previous_results() with main_tab3: # About tab display_about_tab() # Run the application if __name__ == "__main__": main()