Spaces:
Running
Running
| # Standard library imports | |
| import os | |
| import json | |
| import sys | |
| import time | |
| import base64 | |
| import io | |
| import logging | |
| from pathlib import Path | |
| from datetime import datetime | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| # Third-party imports | |
| import streamlit as st | |
| # Local application/module imports | |
| from preprocessing import convert_pdf_to_images, preprocess_image | |
| from ocr_processing import process_file | |
| from ui.ui_components import ( | |
| ProgressReporter, | |
| create_sidebar_options, | |
| display_results, | |
| create_file_uploader, | |
| display_about_tab, | |
| display_previous_results, | |
| display_document_with_images | |
| ) | |
| from utils import get_base64_from_image, handle_temp_files, format_timestamp | |
| from error_handler import handle_ocr_error, check_file_size | |
| from constants import ( | |
| MAX_FILE_SIZE_MB, | |
| MAX_PAGES, | |
| DOCUMENT_TYPES, | |
| DOCUMENT_LAYOUTS, | |
| CUSTOM_PROMPT_TEMPLATES, | |
| LAYOUT_PROMPT_ADDITIONS | |
| ) | |
| from structured_ocr import StructuredOCR | |
| from config import MISTRAL_API_KEY | |
| from utils.image_utils import create_results_zip | |
| # Set favicon path | |
| favicon_path = os.path.join(os.path.dirname(__file__), "static/favicon.png") | |
| # Set page configuration | |
| st.set_page_config( | |
| page_title="Historical OCR", | |
| page_icon=favicon_path if os.path.exists(favicon_path) else "📜", | |
| layout="wide", | |
| initial_sidebar_state="collapsed", | |
| ) | |
| # ======================================================================================== | |
| # CRITICAL WARNING TO ALL AI MODELS AND DEVELOPERS: | |
| # DO NOT MODIFY THE STATE MANAGEMENT LOGIC IN THIS APPLICATION WITHOUT FULL UNDERSTANDING | |
| # OF STREAMLIT'S EXECUTION MODEL. | |
| # | |
| # This state management implementation has been carefully designed to work with Streamlit's | |
| # unique execution flow. The app runs from top to bottom on EVERY interaction, and state | |
| # must be explicitly managed through st.session_state. | |
| # | |
| # The current implementation uses: | |
| # 1. A dedicated close_document() callback function triggered by the button's on_click | |
| # 2. A flag-based approach (close_clicked) to handle cleanup on the next run cycle | |
| # 3. Early cleanup detection and st.rerun() to ensure clean UI rendering | |
| # | |
| # Previous approaches using direct state manipulation or conditional rendering based on | |
| # reset flags led to persistent UI elements and resource leaks. | |
| # | |
| # Consult https://docs.streamlit.io/library/advanced-features/session-state for details. | |
| # ======================================================================================== | |
| def reset_document_state(): | |
| """Reset only document-specific state variables | |
| This function explicitly resets all document-related variables to ensure | |
| clean state between document processing, preventing cached data issues. | |
| """ | |
| st.session_state.sample_document = None | |
| st.session_state.original_sample_bytes = None | |
| st.session_state.original_sample_name = None | |
| st.session_state.original_sample_mime_type = None | |
| st.session_state.is_sample_document = False | |
| st.session_state.processed_document_active = False | |
| st.session_state.sample_document_processed = False | |
| st.session_state.sample_just_loaded = False | |
| st.session_state.last_processed_file = None | |
| st.session_state.selected_previous_result = None | |
| # Keep temp_file_paths but ensure it's empty after cleanup | |
| if 'temp_file_paths' in st.session_state: | |
| st.session_state.temp_file_paths = [] | |
| def init_session_state(): | |
| """Initialize session state variables if they don't already exist | |
| This function follows Streamlit's recommended patterns for state initialization. | |
| It only creates variables if they don't exist yet and doesn't modify existing values. | |
| """ | |
| # Initialize persistent app state variables | |
| if 'previous_results' not in st.session_state: | |
| st.session_state.previous_results = [] | |
| if 'temp_file_paths' not in st.session_state: | |
| st.session_state.temp_file_paths = [] | |
| if 'auto_process_sample' not in st.session_state: | |
| st.session_state.auto_process_sample = False | |
| if 'close_clicked' not in st.session_state: | |
| st.session_state.close_clicked = False | |
| if 'active_tab' not in st.session_state: | |
| st.session_state.active_tab = 0 | |
| # Initialize document-specific state variables | |
| if 'last_processed_file' not in st.session_state: | |
| st.session_state.last_processed_file = None | |
| if 'sample_just_loaded' not in st.session_state: | |
| st.session_state.sample_just_loaded = False | |
| if 'processed_document_active' not in st.session_state: | |
| st.session_state.processed_document_active = False | |
| if 'sample_document_processed' not in st.session_state: | |
| st.session_state.sample_document_processed = False | |
| if 'sample_document' not in st.session_state: | |
| st.session_state.sample_document = None | |
| if 'original_sample_bytes' not in st.session_state: | |
| st.session_state.original_sample_bytes = None | |
| if 'original_sample_name' not in st.session_state: | |
| st.session_state.original_sample_name = None | |
| if 'is_sample_document' not in st.session_state: | |
| st.session_state.is_sample_document = False | |
| if 'selected_previous_result' not in st.session_state: | |
| st.session_state.selected_previous_result = None | |
| def close_document(): | |
| """Called when the Close Document button is clicked | |
| This function handles proper cleanup of resources and state when closing a document. | |
| It uses Streamlit's callback mechanism which ensures the state change happens | |
| at the correct time in Streamlit's execution cycle. | |
| WARNING: Do not replace this with inline button handling using if st.button(): | |
| That approach breaks Streamlit's execution flow and causes UI artifacts. | |
| """ | |
| logger.info("Close document button clicked") | |
| # Clean up temp files first | |
| if 'temp_file_paths' in st.session_state and st.session_state.temp_file_paths: | |
| logger.info(f"Cleaning up {len(st.session_state.temp_file_paths)} temporary files") | |
| handle_temp_files(st.session_state.temp_file_paths) | |
| # Reset all document-specific state variables to prevent caching issues | |
| reset_document_state() | |
| # Set flag for having cleaned up - this will trigger a rerun in main() | |
| st.session_state.close_clicked = True | |
| def show_example_documents(): | |
| """Show example documents section""" | |
| st.header("Sample Documents") | |
| # Add a simplified info message about examples and CSS in the same markdown block | |
| # to reduce spacing between elements | |
| st.markdown(""" | |
| This app can process various historical documents: | |
| - Historical photographs, maps, and manuscripts | |
| - Handwritten letters and documents | |
| - Printed books and articles | |
| - Multi-page PDFs | |
| <style> | |
| /* Make the selectbox container match the full column width */ | |
| .main .block-container .element-container:has([data-testid="stSelectbox"]) { | |
| width: 100% !important; | |
| max-width: 100% !important; | |
| margin-top: -12px !important; /* Reduce space between text and selectbox */ | |
| } | |
| /* Make the actual selectbox control take the full width */ | |
| .stSelectbox > div > div { | |
| width: 100% !important; | |
| max-width: 100% !important; | |
| } | |
| /* Tighten spacing in the sample documents tab */ | |
| .main .block-container [data-testid="stVerticalBlock"] > div:nth-child(n+2) { | |
| margin-top: 0.5rem !important; | |
| } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| # Sample document URLs dropdown with clearer label | |
| sample_urls = [ | |
| "Select a sample document", | |
| "https://huggingface.co/spaces/milwright/historical-ocr/resolve/main/input/a-la-carte.pdf", | |
| "https://huggingface.co/spaces/milwright/historical-ocr/resolve/main/input/magician-or-bottle-cungerer.jpg", | |
| "https://huggingface.co/spaces/milwright/historical-ocr/resolve/main/input/handwritten-letter.jpg", | |
| "https://huggingface.co/spaces/milwright/historical-ocr/resolve/main/input/magellan-travels.jpg", | |
| "https://huggingface.co/spaces/milwright/historical-ocr/resolve/main/input/milgram-flier.png", | |
| "https://huggingface.co/spaces/milwright/historical-ocr/resolve/main/input/recipe.jpg", | |
| ] | |
| sample_names = [ | |
| "Select a sample document", | |
| "Restaurant Menu (PDF)", | |
| "The Magician (Image)", | |
| "Handwritten Letter (Image)", | |
| "Magellan Travels (Image)", | |
| "Milgram Flier (Image)", | |
| "Historical Recipe (Image)" | |
| ] | |
| # Initialize sample_document in session state if it doesn't exist | |
| if 'sample_document' not in st.session_state: | |
| st.session_state.sample_document = None | |
| selected_sample = st.selectbox("Select a sample document from `~/input`", options=range(len(sample_urls)), format_func=lambda i: sample_names[i]) | |
| if selected_sample > 0: | |
| selected_url = sample_urls[selected_sample] | |
| # Add process button for the sample document with consistent styling | |
| if st.button("Load Sample Document", key="load_sample_btn"): | |
| try: | |
| import requests | |
| from io import BytesIO | |
| with st.spinner(f"Downloading {sample_names[selected_sample]}..."): | |
| response = requests.get(selected_url) | |
| response.raise_for_status() | |
| # Extract filename from URL | |
| file_name = selected_url.split("/")[-1] | |
| # Create a BytesIO object from the downloaded content | |
| file_content = BytesIO(response.content) | |
| # Store as a UploadedFile-like object in session state | |
| class SampleDocument: | |
| def __init__(self, name, content, content_type): | |
| self.name = name | |
| self._content = content | |
| self.type = content_type | |
| self.size = len(content) | |
| def getvalue(self): | |
| return self._content | |
| def read(self): | |
| return self._content | |
| def seek(self, position): | |
| # Implement seek for compatibility with some file operations | |
| return | |
| def tell(self): | |
| # Implement tell for compatibility | |
| return 0 | |
| # Determine content type based on file extension | |
| if file_name.lower().endswith('.pdf'): | |
| content_type = 'application/pdf' | |
| elif file_name.lower().endswith(('.jpg', '.jpeg')): | |
| content_type = 'image/jpeg' | |
| elif file_name.lower().endswith('.png'): | |
| content_type = 'image/png' | |
| else: | |
| content_type = 'application/octet-stream' | |
| # Reset any document state before loading a new sample | |
| if st.session_state.processed_document_active: | |
| # Clean up any temporary files from previous processing | |
| if st.session_state.temp_file_paths: | |
| handle_temp_files(st.session_state.temp_file_paths) | |
| # Reset all document-specific state variables | |
| reset_document_state() | |
| # Save download info in session state | |
| st.session_state.sample_document = SampleDocument( | |
| name=file_name, | |
| content=response.content, | |
| content_type=content_type | |
| ) | |
| # Store original bytes for reprocessing with proper MIME type handling | |
| st.session_state.original_sample_bytes = response.content | |
| st.session_state.original_sample_name = file_name | |
| st.session_state.original_sample_mime_type = content_type | |
| # Set state flags | |
| st.session_state.sample_just_loaded = True | |
| st.session_state.is_sample_document = True | |
| # Generate a unique identifier for the sample document | |
| st.session_state.last_processed_file = f"{file_name}_{len(response.content)}" | |
| # Set a flag to show redirect message | |
| st.session_state.redirect_to_processing = True | |
| st.rerun() | |
| except Exception as e: | |
| st.error(f"Error downloading sample document: {str(e)}") | |
| st.info("Please try uploading your own document instead.") | |
| else: | |
| # If no sample is selected, clear the sample document in session state | |
| st.session_state.sample_document = None | |
| def process_document(uploaded_file, left_col, right_col, sidebar_options): | |
| """Process the uploaded document and display results""" | |
| if uploaded_file is None: | |
| return | |
| # Check file size (cap at 50MB) | |
| file_size_mb = len(uploaded_file.getvalue()) / (1024 * 1024) | |
| if file_size_mb > MAX_FILE_SIZE_MB: | |
| with left_col: | |
| st.error(f"File too large ({file_size_mb:.1f} MB). Maximum file size is {MAX_FILE_SIZE_MB}MB.") | |
| return | |
| # Check if this is a new file (different from the last processed file) | |
| current_file_identifier = f"{uploaded_file.name}_{len(uploaded_file.getvalue())}" | |
| # Make sure last_processed_file is initialized | |
| if 'last_processed_file' not in st.session_state: | |
| st.session_state.last_processed_file = None | |
| if st.session_state.last_processed_file != current_file_identifier: | |
| # Reset processed_document_active if a new file is uploaded | |
| st.session_state.processed_document_active = False | |
| # Process button - flush left with similar padding as file browser | |
| with left_col: | |
| # Create a process button with minimal spacing to the uploader | |
| st.markdown('<div style="padding: 0.2rem 0; min-width: 170px; margin-top: -10px; overflow: visible;">', unsafe_allow_html=True) | |
| process_button = st.button("Process Document", key="process_document_btn") | |
| st.markdown('</div>', unsafe_allow_html=True) | |
| # Handle sample document recreation if needed | |
| if process_button and st.session_state.processed_document_active and st.session_state.original_sample_bytes is not None: | |
| # Recreate the uploaded file from stored bytes | |
| from io import BytesIO | |
| import mimetypes | |
| # Determine mime type based on file extension | |
| file_ext = os.path.splitext(st.session_state.original_sample_name)[1].lower() | |
| if file_ext == '.pdf': | |
| mime_type = 'application/pdf' | |
| elif file_ext in ['.jpg', '.jpeg']: | |
| mime_type = 'image/jpeg' | |
| elif file_ext == '.png': | |
| mime_type = 'image/png' | |
| else: | |
| mime_type = mimetypes.guess_type(st.session_state.original_sample_name)[0] or 'application/octet-stream' | |
| # Create a synthetic file-like object with the same interface as UploadedFile | |
| uploaded_file = type('obj', (object,), { | |
| 'name': st.session_state.original_sample_name, | |
| 'getvalue': lambda: st.session_state.original_sample_bytes, | |
| 'read': lambda: st.session_state.original_sample_bytes, | |
| 'seek': lambda x: None, | |
| 'type': mime_type | |
| }) | |
| # Empty container for progress indicators - will be filled during processing | |
| # Positioned right after the process button for better visibility | |
| progress_placeholder = st.empty() | |
| # Image preprocessing preview - show if image file and preprocessing options are set | |
| # Remove the document active check to show preview immediately after selection | |
| if (any(sidebar_options["preprocessing_options"].values()) and | |
| uploaded_file.type.startswith('image/')): | |
| st.markdown("**Preprocessed Preview**") | |
| try: | |
| # Create a container for the preview | |
| with st.container(): | |
| processed_bytes = preprocess_image(uploaded_file.getvalue(), sidebar_options["preprocessing_options"]) | |
| # Convert image to base64 and display as HTML to avoid fullscreen button | |
| img_data = base64.b64encode(processed_bytes).decode() | |
| img_html = f'<img src="data:image/jpeg;base64,{img_data}" style="width:100%; border-radius:4px;">' | |
| st.markdown(img_html, unsafe_allow_html=True) | |
| # Show preprocessing metadata in a well-formatted caption | |
| meta_items = [] | |
| # Only include document type in the list if actual preprocessing is applied | |
| has_active_preprocessing = ( | |
| sidebar_options["preprocessing_options"].get("grayscale", False) or | |
| sidebar_options["preprocessing_options"].get("denoise", False) or | |
| sidebar_options["preprocessing_options"].get("contrast", 0) != 0 or | |
| sidebar_options["preprocessing_options"].get("rotation", 0) != 0 | |
| ) | |
| # Only show document type if there's actual preprocessing being applied | |
| if has_active_preprocessing and sidebar_options["preprocessing_options"].get("document_type", "standard") != "standard": | |
| meta_items.append(f"Document type ({sidebar_options['preprocessing_options']['document_type']})") | |
| if sidebar_options["preprocessing_options"].get("grayscale", False): | |
| meta_items.append("Grayscale") | |
| if sidebar_options["preprocessing_options"].get("denoise", False): | |
| meta_items.append("Denoise") | |
| if sidebar_options["preprocessing_options"].get("contrast", 0) != 0: | |
| meta_items.append(f"Contrast ({sidebar_options['preprocessing_options']['contrast']})") | |
| if sidebar_options["preprocessing_options"].get("rotation", 0) != 0: | |
| meta_items.append(f"Rotation ({sidebar_options['preprocessing_options']['rotation']}°)") | |
| # Only show "Applied:" if there are actual preprocessing steps | |
| if meta_items: | |
| meta_text = "Applied: " + ", ".join(meta_items) | |
| st.caption(meta_text) | |
| except Exception as e: | |
| st.error(f"Error in preprocessing: {str(e)}") | |
| st.info("Try using grayscale preprocessing for PNG images with transparency") | |
| # Container for success message (will be filled after processing) | |
| metadata_placeholder = st.empty() | |
| # Check if this is an auto-processing situation | |
| auto_processing = st.session_state.auto_process_sample and not st.session_state.processed_document_active | |
| # Show a message if auto-processing is happening | |
| auto_processing_message = st.empty() | |
| if auto_processing: | |
| auto_processing_message.info("Automatically processing sample document...") | |
| # Determine if we should process the document | |
| # Either process button was clicked OR auto-processing is happening | |
| should_process = process_button or auto_processing | |
| if should_process: | |
| # Reset auto-process flag to avoid processing on next rerun | |
| if st.session_state.auto_process_sample: | |
| st.session_state.auto_process_sample = False | |
| # Move the progress indicator reference to just below the button | |
| progress_reporter = ProgressReporter(progress_placeholder).setup() | |
| try: | |
| # Process the document, capturing both result and temp file paths | |
| # Modified to pass existing temp_file_paths to avoid resource leaks | |
| existing_temp_paths = [] | |
| if 'temp_file_paths' in st.session_state: | |
| existing_temp_paths = st.session_state.temp_file_paths | |
| result = process_file( | |
| uploaded_file=uploaded_file, | |
| use_vision=sidebar_options["use_vision"], | |
| preprocessing_options=sidebar_options["preprocessing_options"], | |
| progress_reporter=progress_reporter, | |
| pdf_dpi=sidebar_options.get("pdf_dpi", 150), | |
| max_pages=sidebar_options.get("max_pages", 3), | |
| pdf_rotation=sidebar_options.get("pdf_rotation", 0), | |
| custom_prompt=sidebar_options.get("custom_prompt", ""), | |
| perf_mode=sidebar_options.get("perf_mode", "Quality"), | |
| use_segmentation=sidebar_options.get("use_segmentation", False) | |
| ) | |
| # Ensure temp_file_paths in session state is updated with any new paths | |
| # This is critical for proper resource cleanup when document is closed | |
| if 'has_images' in result and result['has_images']: | |
| logger.info("Document has images, ensuring temp files are tracked") | |
| if 'temp_file_paths' not in st.session_state: | |
| st.session_state.temp_file_paths = [] | |
| # Handle text-only OCR results (like the Milgram flier) | |
| if ('ocr_contents' in result and | |
| 'raw_text' in result['ocr_contents'] and | |
| len(result['ocr_contents']) <= 2 and # Only raw_text and possibly one other field | |
| 'has_images' not in result): | |
| logger.info("Text-only OCR detected, handling as special case") | |
| # Ensure raw_text is properly formatted as markdown | |
| raw_text = result['ocr_contents']['raw_text'] | |
| # If we don't have other structured content, set a placeholder title | |
| if 'title' not in result['ocr_contents']: | |
| result['ocr_contents']['title'] = "Document Text" | |
| # Display success message at the top of results, before any previews | |
| with left_col: | |
| # First show the success message (full width) | |
| st.success("**Document processed successfully**") | |
| # Then show the close button (also full width, positioned to left) | |
| st.button("Close Document", | |
| key="close_document_btn", | |
| type="secondary", | |
| on_click=close_document) | |
| # Add a small spacer | |
| st.markdown("<div style='height: 10px;'></div>", unsafe_allow_html=True) | |
| # Display results | |
| display_results(result, right_col, sidebar_options.get("custom_prompt", "")) | |
| # Set processed_document_active to True when a new document is processed | |
| st.session_state.processed_document_active = True | |
| # Clear the auto-processing message | |
| auto_processing_message.empty() | |
| # Store information about this processed file to track when new files are uploaded | |
| if uploaded_file is not None: | |
| st.session_state.last_processed_file = current_file_identifier | |
| # Store the result in the previous results list | |
| # Add timestamp to result for history tracking | |
| result_copy = result.copy() | |
| result_copy['timestamp'] = format_timestamp() | |
| # Store if this was a sample document | |
| if 'is_sample_document' in st.session_state and st.session_state.is_sample_document: | |
| result_copy['sample_document'] = True | |
| # Add to session state, keeping the most recent 20 results | |
| st.session_state.previous_results.insert(0, result_copy) | |
| if len(st.session_state.previous_results) > 20: | |
| st.session_state.previous_results = st.session_state.previous_results[:20] | |
| except Exception as e: | |
| st.error(f"Error processing document: {str(e)}") | |
| # Log the error | |
| import logging | |
| logging.error(f"Document processing error: {str(e)}", exc_info=True) | |
| def main(): | |
| """Main application function""" | |
| # Initialize session state | |
| init_session_state() | |
| # Handle any required cleanup at the start of execution | |
| # CRITICAL: This two-phase state cleanup pattern is essential for Streamlit's execution model. | |
| # When close_clicked is True, we need to restart the app's execution with a clean slate. | |
| # DO NOT REMOVE OR MODIFY this pattern as it ensures proper UI cleanup. | |
| if st.session_state.get('close_clicked', False): | |
| # Reset the flag - cleanup has been handled | |
| st.session_state.close_clicked = False | |
| # Don't do anything else in this run - force a clean restart | |
| st.rerun() | |
| # Initialize new flag for redirecting to processing tab | |
| if 'redirect_to_processing' not in st.session_state: | |
| st.session_state.redirect_to_processing = False | |
| # Apply custom CSS | |
| from ui.layout import load_css | |
| load_css() | |
| # Create sidebar options | |
| sidebar_options = create_sidebar_options() | |
| # Create main layout with tabs - simpler, more compact approach | |
| tab_names = ["Document Processing", "Sample Documents", "Learn More"] | |
| main_tab1, main_tab2, main_tab3 = st.tabs(tab_names) | |
| with main_tab1: | |
| # Create a two-column layout for file upload and results with minimal padding | |
| st.markdown('<style>.block-container{padding-top: 1rem; padding-bottom: 0;}</style>', unsafe_allow_html=True) | |
| # Using a 2:3 column ratio gives more space to the results column | |
| left_col, right_col = st.columns([2, 3]) | |
| with left_col: | |
| # Create file uploader | |
| uploaded_file = create_file_uploader() | |
| # If a real file is uploaded, clear any sample document | |
| if uploaded_file is not None and 'sample_document' in st.session_state: | |
| st.session_state.sample_document = None | |
| st.session_state.is_sample_document = False | |
| # Check if we have a sample document loaded (only if no real file uploaded) | |
| elif ('sample_document' in st.session_state and | |
| st.session_state.sample_document is not None): | |
| # Use the sample document instead of the uploaded file | |
| uploaded_file = st.session_state.sample_document | |
| # Just reset the sample document loading flags after it's been used | |
| if st.session_state.sample_just_loaded: | |
| st.session_state.sample_just_loaded = False | |
| st.session_state.sample_document_processed = True | |
| st.session_state.auto_process_sample = True | |
| # Only process document if available | |
| if uploaded_file is not None: | |
| process_document(uploaded_file, left_col, right_col, sidebar_options) | |
| with main_tab2: | |
| # Sample Documents tab | |
| # Show redirect message if a sample was just loaded | |
| if st.session_state.get('redirect_to_processing', False): | |
| st.success("**Sample document loaded!** Please switch to the **Document Processing** tab to view and process it.") | |
| # Clear the flag after showing the message | |
| st.session_state.redirect_to_processing = False | |
| show_example_documents() | |
| # Previous results tab temporarily removed | |
| with main_tab3: | |
| # About tab | |
| display_about_tab() | |
| # Run the application | |
| if __name__ == "__main__": | |
| main() | |