Spaces:
Running
Running
# Standard library imports | |
import os | |
import json | |
import sys | |
import time | |
import base64 | |
import io | |
import logging | |
from pathlib import Path | |
from datetime import datetime | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
# Third-party imports | |
import streamlit as st | |
# Local application/module imports | |
from preprocessing import convert_pdf_to_images, preprocess_image | |
from ocr_processing import process_file | |
from ui.ui_components import ( | |
ProgressReporter, | |
create_sidebar_options, | |
display_results, | |
create_file_uploader, | |
display_about_tab, | |
display_previous_results, | |
display_document_with_images | |
) | |
from utils import get_base64_from_image, handle_temp_files, format_timestamp | |
from error_handler import handle_ocr_error, check_file_size | |
from constants import ( | |
MAX_FILE_SIZE_MB, | |
MAX_PAGES, | |
DOCUMENT_TYPES, | |
DOCUMENT_LAYOUTS, | |
CUSTOM_PROMPT_TEMPLATES, | |
LAYOUT_PROMPT_ADDITIONS | |
) | |
from structured_ocr import StructuredOCR | |
from config import MISTRAL_API_KEY | |
from utils.image_utils import create_results_zip | |
# Set favicon path | |
favicon_path = os.path.join(os.path.dirname(__file__), "static/favicon.png") | |
# Set page configuration | |
st.set_page_config( | |
page_title="Historical OCR", | |
page_icon=favicon_path if os.path.exists(favicon_path) else "π", | |
layout="wide", | |
initial_sidebar_state="collapsed", | |
) | |
# ======================================================================================== | |
# CRITICAL WARNING TO ALL AI MODELS AND DEVELOPERS: | |
# DO NOT MODIFY THE STATE MANAGEMENT LOGIC IN THIS APPLICATION WITHOUT FULL UNDERSTANDING | |
# OF STREAMLIT'S EXECUTION MODEL. | |
# | |
# This state management implementation has been carefully designed to work with Streamlit's | |
# unique execution flow. The app runs from top to bottom on EVERY interaction, and state | |
# must be explicitly managed through st.session_state. | |
# | |
# The current implementation uses: | |
# 1. A dedicated close_document() callback function triggered by the button's on_click | |
# 2. A flag-based approach (close_clicked) to handle cleanup on the next run cycle | |
# 3. Early cleanup detection and st.rerun() to ensure clean UI rendering | |
# | |
# Previous approaches using direct state manipulation or conditional rendering based on | |
# reset flags led to persistent UI elements and resource leaks. | |
# | |
# Consult https://docs.streamlit.io/library/advanced-features/session-state for details. | |
# ======================================================================================== | |
def reset_document_state(): | |
"""Reset only document-specific state variables | |
This function explicitly resets all document-related variables to ensure | |
clean state between document processing, preventing cached data issues. | |
""" | |
st.session_state.sample_document = None | |
st.session_state.original_sample_bytes = None | |
st.session_state.original_sample_name = None | |
st.session_state.original_sample_mime_type = None | |
st.session_state.is_sample_document = False | |
st.session_state.processed_document_active = False | |
st.session_state.sample_document_processed = False | |
st.session_state.sample_just_loaded = False | |
st.session_state.last_processed_file = None | |
st.session_state.selected_previous_result = None | |
# Keep temp_file_paths but ensure it's empty after cleanup | |
if 'temp_file_paths' in st.session_state: | |
st.session_state.temp_file_paths = [] | |
def init_session_state(): | |
"""Initialize session state variables if they don't already exist | |
This function follows Streamlit's recommended patterns for state initialization. | |
It only creates variables if they don't exist yet and doesn't modify existing values. | |
""" | |
# Initialize persistent app state variables | |
if 'previous_results' not in st.session_state: | |
st.session_state.previous_results = [] | |
if 'temp_file_paths' not in st.session_state: | |
st.session_state.temp_file_paths = [] | |
if 'auto_process_sample' not in st.session_state: | |
st.session_state.auto_process_sample = False | |
if 'close_clicked' not in st.session_state: | |
st.session_state.close_clicked = False | |
if 'active_tab' not in st.session_state: | |
st.session_state.active_tab = 0 | |
# Initialize document-specific state variables | |
if 'last_processed_file' not in st.session_state: | |
st.session_state.last_processed_file = None | |
if 'sample_just_loaded' not in st.session_state: | |
st.session_state.sample_just_loaded = False | |
if 'processed_document_active' not in st.session_state: | |
st.session_state.processed_document_active = False | |
if 'sample_document_processed' not in st.session_state: | |
st.session_state.sample_document_processed = False | |
if 'sample_document' not in st.session_state: | |
st.session_state.sample_document = None | |
if 'original_sample_bytes' not in st.session_state: | |
st.session_state.original_sample_bytes = None | |
if 'original_sample_name' not in st.session_state: | |
st.session_state.original_sample_name = None | |
if 'is_sample_document' not in st.session_state: | |
st.session_state.is_sample_document = False | |
if 'selected_previous_result' not in st.session_state: | |
st.session_state.selected_previous_result = None | |
def close_document(): | |
"""Called when the Close Document button is clicked | |
This function handles proper cleanup of resources and state when closing a document. | |
It uses Streamlit's callback mechanism which ensures the state change happens | |
at the correct time in Streamlit's execution cycle. | |
WARNING: Do not replace this with inline button handling using if st.button(): | |
That approach breaks Streamlit's execution flow and causes UI artifacts. | |
""" | |
logger.info("Close document button clicked") | |
# Clean up temp files first | |
if 'temp_file_paths' in st.session_state and st.session_state.temp_file_paths: | |
logger.info(f"Cleaning up {len(st.session_state.temp_file_paths)} temporary files") | |
handle_temp_files(st.session_state.temp_file_paths) | |
# Reset all document-specific state variables to prevent caching issues | |
reset_document_state() | |
# Set flag for having cleaned up - this will trigger a rerun in main() | |
st.session_state.close_clicked = True | |
def show_example_documents(): | |
"""Show example documents section""" | |
st.header("Sample Documents") | |
# Add a simplified info message about examples and CSS in the same markdown block | |
# to reduce spacing between elements | |
st.markdown(""" | |
This app can process various historical documents: | |
- Historical photographs, maps, and manuscripts | |
- Handwritten letters and documents | |
- Printed books and articles | |
- Multi-page PDFs | |
<style> | |
/* Make the selectbox container match the full column width */ | |
.main .block-container .element-container:has([data-testid="stSelectbox"]) { | |
width: 100% !important; | |
max-width: 100% !important; | |
margin-top: -12px !important; /* Reduce space between text and selectbox */ | |
} | |
/* Make the actual selectbox control take the full width */ | |
.stSelectbox > div > div { | |
width: 100% !important; | |
max-width: 100% !important; | |
} | |
/* Tighten spacing in the sample documents tab */ | |
.main .block-container [data-testid="stVerticalBlock"] > div:nth-child(n+2) { | |
margin-top: 0.5rem !important; | |
} | |
</style> | |
""", unsafe_allow_html=True) | |
# Sample document URLs dropdown with clearer label | |
sample_urls = [ | |
"Select a sample document", | |
"https://huggingface.co/spaces/milwright/historical-ocr/resolve/main/input/a-la-carte.pdf", | |
"https://huggingface.co/spaces/milwright/historical-ocr/resolve/main/input/magician-or-bottle-cungerer.jpg", | |
"https://huggingface.co/spaces/milwright/historical-ocr/resolve/main/input/handwritten-letter.jpg", | |
"https://huggingface.co/spaces/milwright/historical-ocr/resolve/main/input/magellan-travels.jpg", | |
"https://huggingface.co/spaces/milwright/historical-ocr/resolve/main/input/milgram-flier.png", | |
"https://huggingface.co/spaces/milwright/historical-ocr/resolve/main/input/recipe.jpg", | |
] | |
sample_names = [ | |
"Select a sample document", | |
"Restaurant Menu (PDF)", | |
"The Magician (Image)", | |
"Handwritten Letter (Image)", | |
"Magellan Travels (Image)", | |
"Milgram Flier (Image)", | |
"Historical Recipe (Image)" | |
] | |
# Initialize sample_document in session state if it doesn't exist | |
if 'sample_document' not in st.session_state: | |
st.session_state.sample_document = None | |
selected_sample = st.selectbox("Select a sample document from `~/input`", options=range(len(sample_urls)), format_func=lambda i: sample_names[i]) | |
if selected_sample > 0: | |
selected_url = sample_urls[selected_sample] | |
# Add process button for the sample document with consistent styling | |
if st.button("Load Sample Document", key="load_sample_btn"): | |
try: | |
import requests | |
from io import BytesIO | |
with st.spinner(f"Downloading {sample_names[selected_sample]}..."): | |
response = requests.get(selected_url) | |
response.raise_for_status() | |
# Extract filename from URL | |
file_name = selected_url.split("/")[-1] | |
# Create a BytesIO object from the downloaded content | |
file_content = BytesIO(response.content) | |
# Store as a UploadedFile-like object in session state | |
class SampleDocument: | |
def __init__(self, name, content, content_type): | |
self.name = name | |
self._content = content | |
self.type = content_type | |
self.size = len(content) | |
def getvalue(self): | |
return self._content | |
def read(self): | |
return self._content | |
def seek(self, position): | |
# Implement seek for compatibility with some file operations | |
return | |
def tell(self): | |
# Implement tell for compatibility | |
return 0 | |
# Determine content type based on file extension | |
if file_name.lower().endswith('.pdf'): | |
content_type = 'application/pdf' | |
elif file_name.lower().endswith(('.jpg', '.jpeg')): | |
content_type = 'image/jpeg' | |
elif file_name.lower().endswith('.png'): | |
content_type = 'image/png' | |
else: | |
content_type = 'application/octet-stream' | |
# Reset any document state before loading a new sample | |
if st.session_state.processed_document_active: | |
# Clean up any temporary files from previous processing | |
if st.session_state.temp_file_paths: | |
handle_temp_files(st.session_state.temp_file_paths) | |
# Reset all document-specific state variables | |
reset_document_state() | |
# Save download info in session state | |
st.session_state.sample_document = SampleDocument( | |
name=file_name, | |
content=response.content, | |
content_type=content_type | |
) | |
# Store original bytes for reprocessing with proper MIME type handling | |
st.session_state.original_sample_bytes = response.content | |
st.session_state.original_sample_name = file_name | |
st.session_state.original_sample_mime_type = content_type | |
# Set state flags | |
st.session_state.sample_just_loaded = True | |
st.session_state.is_sample_document = True | |
# Generate a unique identifier for the sample document | |
st.session_state.last_processed_file = f"{file_name}_{len(response.content)}" | |
# Set a flag to show redirect message | |
st.session_state.redirect_to_processing = True | |
st.rerun() | |
except Exception as e: | |
st.error(f"Error downloading sample document: {str(e)}") | |
st.info("Please try uploading your own document instead.") | |
else: | |
# If no sample is selected, clear the sample document in session state | |
st.session_state.sample_document = None | |
def process_document(uploaded_file, left_col, right_col, sidebar_options): | |
"""Process the uploaded document and display results""" | |
if uploaded_file is None: | |
return | |
# Check file size (cap at 50MB) | |
file_size_mb = len(uploaded_file.getvalue()) / (1024 * 1024) | |
if file_size_mb > MAX_FILE_SIZE_MB: | |
with left_col: | |
st.error(f"File too large ({file_size_mb:.1f} MB). Maximum file size is {MAX_FILE_SIZE_MB}MB.") | |
return | |
# Check if this is a new file (different from the last processed file) | |
current_file_identifier = f"{uploaded_file.name}_{len(uploaded_file.getvalue())}" | |
# Make sure last_processed_file is initialized | |
if 'last_processed_file' not in st.session_state: | |
st.session_state.last_processed_file = None | |
if st.session_state.last_processed_file != current_file_identifier: | |
# Reset processed_document_active if a new file is uploaded | |
st.session_state.processed_document_active = False | |
# Process button - flush left with similar padding as file browser | |
with left_col: | |
# Create a process button with minimal spacing to the uploader | |
st.markdown('<div style="padding: 0.2rem 0; min-width: 170px; margin-top: -10px; overflow: visible;">', unsafe_allow_html=True) | |
process_button = st.button("Process Document", key="process_document_btn") | |
st.markdown('</div>', unsafe_allow_html=True) | |
# Handle sample document recreation if needed | |
if process_button and st.session_state.processed_document_active and st.session_state.original_sample_bytes is not None: | |
# Recreate the uploaded file from stored bytes | |
from io import BytesIO | |
import mimetypes | |
# Determine mime type based on file extension | |
file_ext = os.path.splitext(st.session_state.original_sample_name)[1].lower() | |
if file_ext == '.pdf': | |
mime_type = 'application/pdf' | |
elif file_ext in ['.jpg', '.jpeg']: | |
mime_type = 'image/jpeg' | |
elif file_ext == '.png': | |
mime_type = 'image/png' | |
else: | |
mime_type = mimetypes.guess_type(st.session_state.original_sample_name)[0] or 'application/octet-stream' | |
# Create a synthetic file-like object with the same interface as UploadedFile | |
uploaded_file = type('obj', (object,), { | |
'name': st.session_state.original_sample_name, | |
'getvalue': lambda: st.session_state.original_sample_bytes, | |
'read': lambda: st.session_state.original_sample_bytes, | |
'seek': lambda x: None, | |
'type': mime_type | |
}) | |
# Empty container for progress indicators - will be filled during processing | |
# Positioned right after the process button for better visibility | |
progress_placeholder = st.empty() | |
# Image preprocessing preview - show if image file and preprocessing options are set | |
# Remove the document active check to show preview immediately after selection | |
if (any(sidebar_options["preprocessing_options"].values()) and | |
uploaded_file.type.startswith('image/')): | |
st.markdown("**Preprocessed Preview**") | |
try: | |
# Create a container for the preview | |
with st.container(): | |
processed_bytes = preprocess_image(uploaded_file.getvalue(), sidebar_options["preprocessing_options"]) | |
# Convert image to base64 and display as HTML to avoid fullscreen button | |
img_data = base64.b64encode(processed_bytes).decode() | |
img_html = f'<img src="data:image/jpeg;base64,{img_data}" style="width:100%; border-radius:4px;">' | |
st.markdown(img_html, unsafe_allow_html=True) | |
# Show preprocessing metadata in a well-formatted caption | |
meta_items = [] | |
# Only include document type in the list if actual preprocessing is applied | |
has_active_preprocessing = ( | |
sidebar_options["preprocessing_options"].get("grayscale", False) or | |
sidebar_options["preprocessing_options"].get("denoise", False) or | |
sidebar_options["preprocessing_options"].get("contrast", 0) != 0 or | |
sidebar_options["preprocessing_options"].get("rotation", 0) != 0 | |
) | |
# Only show document type if there's actual preprocessing being applied | |
if has_active_preprocessing and sidebar_options["preprocessing_options"].get("document_type", "standard") != "standard": | |
meta_items.append(f"Document type ({sidebar_options['preprocessing_options']['document_type']})") | |
if sidebar_options["preprocessing_options"].get("grayscale", False): | |
meta_items.append("Grayscale") | |
if sidebar_options["preprocessing_options"].get("denoise", False): | |
meta_items.append("Denoise") | |
if sidebar_options["preprocessing_options"].get("contrast", 0) != 0: | |
meta_items.append(f"Contrast ({sidebar_options['preprocessing_options']['contrast']})") | |
if sidebar_options["preprocessing_options"].get("rotation", 0) != 0: | |
meta_items.append(f"Rotation ({sidebar_options['preprocessing_options']['rotation']}Β°)") | |
# Only show "Applied:" if there are actual preprocessing steps | |
if meta_items: | |
meta_text = "Applied: " + ", ".join(meta_items) | |
st.caption(meta_text) | |
except Exception as e: | |
st.error(f"Error in preprocessing: {str(e)}") | |
st.info("Try using grayscale preprocessing for PNG images with transparency") | |
# Container for success message (will be filled after processing) | |
metadata_placeholder = st.empty() | |
# Check if this is an auto-processing situation | |
auto_processing = st.session_state.auto_process_sample and not st.session_state.processed_document_active | |
# Show a message if auto-processing is happening | |
auto_processing_message = st.empty() | |
if auto_processing: | |
auto_processing_message.info("Automatically processing sample document...") | |
# Determine if we should process the document | |
# Either process button was clicked OR auto-processing is happening | |
should_process = process_button or auto_processing | |
if should_process: | |
# Reset auto-process flag to avoid processing on next rerun | |
if st.session_state.auto_process_sample: | |
st.session_state.auto_process_sample = False | |
# Move the progress indicator reference to just below the button | |
progress_reporter = ProgressReporter(progress_placeholder).setup() | |
try: | |
# Process the document, capturing both result and temp file paths | |
# Modified to pass existing temp_file_paths to avoid resource leaks | |
existing_temp_paths = [] | |
if 'temp_file_paths' in st.session_state: | |
existing_temp_paths = st.session_state.temp_file_paths | |
result = process_file( | |
uploaded_file=uploaded_file, | |
use_vision=sidebar_options["use_vision"], | |
preprocessing_options=sidebar_options["preprocessing_options"], | |
progress_reporter=progress_reporter, | |
pdf_dpi=sidebar_options.get("pdf_dpi", 150), | |
max_pages=sidebar_options.get("max_pages", 3), | |
pdf_rotation=sidebar_options.get("pdf_rotation", 0), | |
custom_prompt=sidebar_options.get("custom_prompt", ""), | |
perf_mode=sidebar_options.get("perf_mode", "Quality"), | |
use_segmentation=sidebar_options.get("use_segmentation", False) | |
) | |
# Ensure temp_file_paths in session state is updated with any new paths | |
# This is critical for proper resource cleanup when document is closed | |
if 'has_images' in result and result['has_images']: | |
logger.info("Document has images, ensuring temp files are tracked") | |
if 'temp_file_paths' not in st.session_state: | |
st.session_state.temp_file_paths = [] | |
# Handle text-only OCR results (like the Milgram flier) | |
if ('ocr_contents' in result and | |
'raw_text' in result['ocr_contents'] and | |
len(result['ocr_contents']) <= 2 and # Only raw_text and possibly one other field | |
'has_images' not in result): | |
logger.info("Text-only OCR detected, handling as special case") | |
# Ensure raw_text is properly formatted as markdown | |
raw_text = result['ocr_contents']['raw_text'] | |
# If we don't have other structured content, set a placeholder title | |
if 'title' not in result['ocr_contents']: | |
result['ocr_contents']['title'] = "Document Text" | |
# Display success message at the top of results, before any previews | |
with left_col: | |
# First show the success message (full width) | |
st.success("**Document processed successfully**") | |
# Then show the close button (also full width, positioned to left) | |
st.button("Close Document", | |
key="close_document_btn", | |
type="secondary", | |
on_click=close_document) | |
# Add a small spacer | |
st.markdown("<div style='height: 10px;'></div>", unsafe_allow_html=True) | |
# Display results | |
display_results(result, right_col, sidebar_options.get("custom_prompt", "")) | |
# Set processed_document_active to True when a new document is processed | |
st.session_state.processed_document_active = True | |
# Clear the auto-processing message | |
auto_processing_message.empty() | |
# Store information about this processed file to track when new files are uploaded | |
if uploaded_file is not None: | |
st.session_state.last_processed_file = current_file_identifier | |
# Store the result in the previous results list | |
# Add timestamp to result for history tracking | |
result_copy = result.copy() | |
result_copy['timestamp'] = format_timestamp() | |
# Store if this was a sample document | |
if 'is_sample_document' in st.session_state and st.session_state.is_sample_document: | |
result_copy['sample_document'] = True | |
# Add to session state, keeping the most recent 20 results | |
st.session_state.previous_results.insert(0, result_copy) | |
if len(st.session_state.previous_results) > 20: | |
st.session_state.previous_results = st.session_state.previous_results[:20] | |
except Exception as e: | |
st.error(f"Error processing document: {str(e)}") | |
# Log the error | |
import logging | |
logging.error(f"Document processing error: {str(e)}", exc_info=True) | |
def main(): | |
"""Main application function""" | |
# Initialize session state | |
init_session_state() | |
# Handle any required cleanup at the start of execution | |
# CRITICAL: This two-phase state cleanup pattern is essential for Streamlit's execution model. | |
# When close_clicked is True, we need to restart the app's execution with a clean slate. | |
# DO NOT REMOVE OR MODIFY this pattern as it ensures proper UI cleanup. | |
if st.session_state.get('close_clicked', False): | |
# Reset the flag - cleanup has been handled | |
st.session_state.close_clicked = False | |
# Don't do anything else in this run - force a clean restart | |
st.rerun() | |
# Initialize new flag for redirecting to processing tab | |
if 'redirect_to_processing' not in st.session_state: | |
st.session_state.redirect_to_processing = False | |
# Apply custom CSS | |
from ui.layout import load_css | |
load_css() | |
# Create sidebar options | |
sidebar_options = create_sidebar_options() | |
# Create main layout with tabs - simpler, more compact approach | |
tab_names = ["Document Processing", "Sample Documents", "Learn More"] | |
main_tab1, main_tab2, main_tab3 = st.tabs(tab_names) | |
with main_tab1: | |
# Create a two-column layout for file upload and results with minimal padding | |
st.markdown('<style>.block-container{padding-top: 1rem; padding-bottom: 0;}</style>', unsafe_allow_html=True) | |
# Using a 2:3 column ratio gives more space to the results column | |
left_col, right_col = st.columns([2, 3]) | |
with left_col: | |
# Create file uploader | |
uploaded_file = create_file_uploader() | |
# If a real file is uploaded, clear any sample document | |
if uploaded_file is not None and 'sample_document' in st.session_state: | |
st.session_state.sample_document = None | |
st.session_state.is_sample_document = False | |
# Check if we have a sample document loaded (only if no real file uploaded) | |
elif ('sample_document' in st.session_state and | |
st.session_state.sample_document is not None): | |
# Use the sample document instead of the uploaded file | |
uploaded_file = st.session_state.sample_document | |
# Just reset the sample document loading flags after it's been used | |
if st.session_state.sample_just_loaded: | |
st.session_state.sample_just_loaded = False | |
st.session_state.sample_document_processed = True | |
st.session_state.auto_process_sample = True | |
# Only process document if available | |
if uploaded_file is not None: | |
process_document(uploaded_file, left_col, right_col, sidebar_options) | |
with main_tab2: | |
# Sample Documents tab | |
# Show redirect message if a sample was just loaded | |
if st.session_state.get('redirect_to_processing', False): | |
st.success("**Sample document loaded!** Please switch to the **Document Processing** tab to view and process it.") | |
# Clear the flag after showing the message | |
st.session_state.redirect_to_processing = False | |
show_example_documents() | |
# Previous results tab temporarily removed | |
with main_tab3: | |
# About tab | |
display_about_tab() | |
# Run the application | |
if __name__ == "__main__": | |
main() | |