Spaces:
Running
Running
import os | |
import streamlit as st | |
import json | |
import sys | |
import time | |
import base64 | |
from pathlib import Path | |
import io | |
from datetime import datetime | |
import logging | |
# Import modules | |
from preprocessing import convert_pdf_to_images, preprocess_image | |
from ocr_processing import process_file | |
from ui_components import ( | |
ProgressReporter, | |
create_sidebar_options, | |
display_results, | |
create_file_uploader, | |
display_about_tab, | |
display_previous_results, | |
display_document_with_images | |
) | |
from utils import get_base64_from_image, handle_temp_files, format_timestamp | |
from error_handler import handle_ocr_error, check_file_size | |
from constants import ( | |
MAX_FILE_SIZE_MB, | |
MAX_PAGES, | |
DOCUMENT_TYPES, | |
DOCUMENT_LAYOUTS, | |
CUSTOM_PROMPT_TEMPLATES, | |
LAYOUT_PROMPT_ADDITIONS | |
) | |
# Import the StructuredOCR class and config from the local files | |
from structured_ocr import StructuredOCR | |
from config import MISTRAL_API_KEY | |
# Import utilities for handling previous results | |
from ocr_utils import create_results_zip | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger("app") | |
# Set favicon path | |
favicon_path = os.path.join(os.path.dirname(__file__), "static/favicon.png") | |
# Set page configuration | |
st.set_page_config( | |
page_title="Historical OCR", | |
page_icon=favicon_path if os.path.exists(favicon_path) else "π", | |
layout="wide", | |
initial_sidebar_state="expanded" | |
) | |
def initialize_session_state(): | |
"""Initialize all session state variables""" | |
# Initialize session state for storing previous results if not already present | |
if 'previous_results' not in st.session_state: | |
st.session_state.previous_results = [] | |
# Initialize temp file tracking | |
if 'temp_file_paths' not in st.session_state: | |
st.session_state.temp_file_paths = [] | |
# Initialize last processed file tracking to fix "Process Document Again" button | |
if 'last_processed_file' not in st.session_state: | |
st.session_state.last_processed_file = None | |
# Important: Initialize the reset flag | |
if 'perform_reset' not in st.session_state: | |
st.session_state.perform_reset = False | |
# Initialize other session state variables | |
if 'auto_process_sample' not in st.session_state: | |
st.session_state.auto_process_sample = False | |
if 'sample_just_loaded' not in st.session_state: | |
st.session_state.sample_just_loaded = False | |
if 'processed_document_active' not in st.session_state: | |
st.session_state.processed_document_active = False | |
if 'sample_document_processed' not in st.session_state: | |
st.session_state.sample_document_processed = False | |
if 'sample_document' not in st.session_state: | |
st.session_state.sample_document = None | |
if 'original_sample_bytes' not in st.session_state: | |
st.session_state.original_sample_bytes = None | |
if 'original_sample_name' not in st.session_state: | |
st.session_state.original_sample_name = None | |
if 'is_sample_document' not in st.session_state: | |
st.session_state.is_sample_document = False | |
# Check if we need to perform a complete reset (coming from "Close Document" button) | |
if 'perform_reset' in st.session_state and st.session_state.perform_reset: | |
# Save previous results | |
previous_results = st.session_state.previous_results | |
# Clean up any temporary files | |
if 'temp_file_paths' in st.session_state and st.session_state.temp_file_paths: | |
handle_temp_files(st.session_state.temp_file_paths) | |
# Clear all session state variables except previous_results | |
for key in list(st.session_state.keys()): | |
if key not in ['previous_results']: | |
# We will manually reset the perform_reset flag at the end | |
if key != 'perform_reset': | |
st.session_state.pop(key, None) | |
# Restore previous results | |
st.session_state.previous_results = previous_results | |
# Reinitialize session state variables | |
st.session_state.temp_file_paths = [] | |
st.session_state.last_processed_file = None | |
st.session_state.auto_process_sample = False | |
st.session_state.sample_just_loaded = False | |
st.session_state.processed_document_active = False | |
st.session_state.sample_document_processed = False | |
st.session_state.sample_document = None | |
st.session_state.original_sample_bytes = None | |
st.session_state.original_sample_name = None | |
st.session_state.is_sample_document = False | |
# Turn off reset flag - this must be done last | |
st.session_state.perform_reset = False | |
# Force this to be a complete reset cycle | |
return | |
def show_example_documents(): | |
"""Show example documents section""" | |
st.subheader("Example Documents") | |
# Add a simplified info message about examples | |
st.markdown(""" | |
This app can process various historical documents: | |
- Historical photographs, maps, and manuscripts | |
- Handwritten letters and documents | |
- Printed books and articles | |
- Multi-page PDFs | |
""") | |
# Add CSS to make the dropdown match the column width | |
st.markdown(""" | |
<style> | |
/* Make the selectbox container match the full column width */ | |
.main .block-container .element-container:has([data-testid="stSelectbox"]) { | |
width: 100% !important; | |
max-width: 100% !important; | |
} | |
/* Make the actual selectbox control take the full width */ | |
.stSelectbox > div > div { | |
width: 100% !important; | |
max-width: 100% !important; | |
} | |
</style> | |
""", unsafe_allow_html=True) | |
# Sample document URLs dropdown with clearer label | |
sample_urls = [ | |
"Select a sample document", | |
"https://huggingface.co/spaces/milwright/historical-ocr/resolve/main/input/a-la-carte.pdf", | |
"https://huggingface.co/spaces/milwright/historical-ocr/resolve/main/input/magician-or-bottle-cungerer.jpg", | |
"https://huggingface.co/spaces/milwright/historical-ocr/resolve/main/input/handwritten-letter.jpg", | |
"https://huggingface.co/spaces/milwright/historical-ocr/resolve/main/input/magellan-travels.jpg", | |
"https://huggingface.co/spaces/milwright/historical-ocr/resolve/main/input/milgram-flier.png", | |
"https://huggingface.co/spaces/milwright/historical-ocr/resolve/main/input/baldwin-15th-north.jpg" | |
] | |
sample_names = [ | |
"Select a sample document", | |
"Restaurant Menu (PDF)", | |
"The Magician (Image)", | |
"Handwritten Letter (Image)", | |
"Magellan Travels (Image)", | |
"Milgram Flier (Image)", | |
"Baldwin Street (Image)" | |
] | |
# Initialize sample_document in session state if it doesn't exist | |
if 'sample_document' not in st.session_state: | |
st.session_state.sample_document = None | |
selected_sample = st.selectbox("Select a sample document from `~/input`", options=range(len(sample_urls)), format_func=lambda i: sample_names[i]) | |
if selected_sample > 0: | |
selected_url = sample_urls[selected_sample] | |
# Add process button for the sample document | |
if st.button("Load Sample Document"): | |
try: | |
import requests | |
from io import BytesIO | |
with st.spinner(f"Downloading {sample_names[selected_sample]}..."): | |
response = requests.get(selected_url) | |
response.raise_for_status() | |
# Extract filename from URL | |
file_name = selected_url.split("/")[-1] | |
# Create a BytesIO object from the downloaded content | |
file_content = BytesIO(response.content) | |
# Store as a UploadedFile-like object in session state | |
class SampleDocument: | |
def __init__(self, name, content, content_type): | |
self.name = name | |
self._content = content | |
self.type = content_type | |
self.size = len(content) | |
def getvalue(self): | |
return self._content | |
def read(self): | |
return self._content | |
def seek(self, position): | |
# Implement seek for compatibility with some file operations | |
return | |
def tell(self): | |
# Implement tell for compatibility | |
return 0 | |
# Determine content type based on file extension | |
if file_name.lower().endswith('.pdf'): | |
content_type = 'application/pdf' | |
elif file_name.lower().endswith(('.jpg', '.jpeg')): | |
content_type = 'image/jpeg' | |
elif file_name.lower().endswith('.png'): | |
content_type = 'image/png' | |
else: | |
content_type = 'application/octet-stream' | |
# Reset any document state before loading a new sample | |
if st.session_state.processed_document_active: | |
# Clear previous document state | |
st.session_state.processed_document_active = False | |
st.session_state.last_processed_file = None | |
# Clean up any temporary files from previous processing | |
if st.session_state.temp_file_paths: | |
handle_temp_files(st.session_state.temp_file_paths) | |
st.session_state.temp_file_paths = [] | |
# Save download info in session state | |
st.session_state.sample_document = SampleDocument( | |
name=file_name, | |
content=response.content, | |
content_type=content_type | |
) | |
# Store original bytes for reprocessing | |
st.session_state.original_sample_bytes = response.content | |
st.session_state.original_sample_name = file_name | |
# Set state flags | |
st.session_state.sample_just_loaded = True | |
st.session_state.is_sample_document = True | |
# Generate a unique identifier for the sample document | |
st.session_state.last_processed_file = f"{file_name}_{len(response.content)}" | |
# Force rerun to load the document | |
st.rerun() | |
except Exception as e: | |
st.error(f"Error downloading sample document: {str(e)}") | |
st.info("Please try uploading your own document instead.") | |
else: | |
# If no sample is selected, clear the sample document in session state | |
st.session_state.sample_document = None | |
def process_document(uploaded_file, left_col, right_col, sidebar_options): | |
"""Process the uploaded document and display results""" | |
if uploaded_file is None: | |
return | |
# Check file size (cap at 50MB) | |
file_size_mb = len(uploaded_file.getvalue()) / (1024 * 1024) | |
if file_size_mb > MAX_FILE_SIZE_MB: | |
with left_col: | |
st.error(f"File too large ({file_size_mb:.1f} MB). Maximum file size is {MAX_FILE_SIZE_MB}MB.") | |
return | |
# Check if this is a new file (different from the last processed file) | |
current_file_identifier = f"{uploaded_file.name}_{len(uploaded_file.getvalue())}" | |
if st.session_state.last_processed_file != current_file_identifier: | |
# Reset processed_document_active if a new file is uploaded | |
st.session_state.processed_document_active = False | |
# Process button - flush left with similar padding as file browser | |
with left_col: | |
# Use a key for the button based on state to force re-creation | |
button_key = "process_again" if st.session_state.processed_document_active else "process_initial" | |
# Show appropriate button text based on state | |
button_text = "Process Document Again" if st.session_state.processed_document_active else "Process Document" | |
# Create the button | |
process_button = st.button(button_text, key=button_key) | |
# Handle sample document recreation if needed | |
if process_button and st.session_state.processed_document_active and st.session_state.original_sample_bytes is not None: | |
# Recreate the uploaded file from stored bytes | |
from io import BytesIO | |
import mimetypes | |
# Determine mime type based on file extension | |
file_ext = os.path.splitext(st.session_state.original_sample_name)[1].lower() | |
if file_ext == '.pdf': | |
mime_type = 'application/pdf' | |
elif file_ext in ['.jpg', '.jpeg']: | |
mime_type = 'image/jpeg' | |
elif file_ext == '.png': | |
mime_type = 'image/png' | |
else: | |
mime_type = mimetypes.guess_type(st.session_state.original_sample_name)[0] or 'application/octet-stream' | |
# Create a synthetic file-like object with the same interface as UploadedFile | |
uploaded_file = type('obj', (object,), { | |
'name': st.session_state.original_sample_name, | |
'getvalue': lambda: st.session_state.original_sample_bytes, | |
'read': lambda: st.session_state.original_sample_bytes, | |
'seek': lambda x: None, | |
'type': mime_type | |
}) | |
# Empty container for progress indicators - will be filled during processing | |
# Positioned right after the process button for better visibility | |
progress_placeholder = st.empty() | |
# Image preprocessing preview - automatically show only the preprocessed version | |
if any(sidebar_options["preprocessing_options"].values()) and uploaded_file.type.startswith('image/'): | |
st.markdown("**Preprocessed Preview**") | |
try: | |
# Create a container for the preview to better control layout | |
with st.container(): | |
processed_bytes = preprocess_image(uploaded_file.getvalue(), sidebar_options["preprocessing_options"]) | |
# Use use_container_width=True for responsive design | |
st.image(io.BytesIO(processed_bytes), use_container_width=True) | |
# Show preprocessing metadata in a well-formatted caption | |
meta_items = [] | |
if sidebar_options["preprocessing_options"].get("document_type", "standard") != "standard": | |
meta_items.append(f"Document type ({sidebar_options['preprocessing_options']['document_type']})") | |
if sidebar_options["preprocessing_options"].get("grayscale", False): | |
meta_items.append("Grayscale") | |
if sidebar_options["preprocessing_options"].get("denoise", False): | |
meta_items.append("Denoise") | |
if sidebar_options["preprocessing_options"].get("contrast", 0) != 0: | |
meta_items.append(f"Contrast ({sidebar_options['preprocessing_options']['contrast']})") | |
if sidebar_options["preprocessing_options"].get("rotation", 0) != 0: | |
meta_items.append(f"Rotation ({sidebar_options['preprocessing_options']['rotation']}Β°)") | |
# Only show "Applied:" if there are actual preprocessing steps | |
if meta_items: | |
meta_text = "Applied: " + ", ".join(meta_items) | |
st.caption(meta_text) | |
except Exception as e: | |
st.error(f"Error in preprocessing: {str(e)}") | |
st.info("Try using grayscale preprocessing for PNG images with transparency") | |
# Container for success message (will be filled after processing) | |
# No extra spacing needed as it will be managed programmatically | |
metadata_placeholder = st.empty() | |
# Check if this is an auto-processing situation | |
auto_processing = st.session_state.auto_process_sample and not st.session_state.processed_document_active | |
# Show a message if auto-processing is happening | |
auto_processing_message = st.empty() | |
if auto_processing: | |
auto_processing_message.info("Automatically processing sample document...") | |
# Determine if we should process the document | |
# Either process button was clicked OR auto-processing is happening | |
should_process = process_button or auto_processing | |
if should_process: | |
# Reset auto-process flag to avoid processing on next rerun | |
if st.session_state.auto_process_sample: | |
st.session_state.auto_process_sample = False | |
# Move the progress indicator reference to just below the button | |
progress_reporter = ProgressReporter(progress_placeholder).setup() | |
try: | |
# Process the document | |
result = process_file( | |
uploaded_file=uploaded_file, | |
use_vision=sidebar_options["use_vision"], | |
preprocessing_options=sidebar_options["preprocessing_options"], | |
progress_reporter=progress_reporter, | |
pdf_dpi=sidebar_options.get("pdf_dpi", 150), | |
max_pages=sidebar_options.get("max_pages", 3), | |
pdf_rotation=sidebar_options.get("pdf_rotation", 0), | |
custom_prompt=sidebar_options.get("custom_prompt", ""), | |
perf_mode=sidebar_options.get("perf_mode", "Quality") | |
) | |
# Display results | |
display_results(result, right_col, sidebar_options.get("custom_prompt", "")) | |
# Set processed_document_active to True when a new document is processed | |
st.session_state.processed_document_active = True | |
# Clear the auto-processing message | |
auto_processing_message.empty() | |
# Store information about this processed file to track when new files are uploaded | |
if uploaded_file is not None: | |
st.session_state.last_processed_file = current_file_identifier | |
# Display success message with close button for dismissing processed documents | |
success_cols = st.columns([5, 1]) | |
with success_cols[0]: | |
metadata_placeholder.success("**Document processed successfully**") | |
with success_cols[1]: | |
# Define a function to clear document state | |
def clear_document_state(): | |
# Reset all document-related session state | |
st.session_state.processed_document_active = False | |
st.session_state.sample_document = None | |
st.session_state.last_processed_file = None | |
# Clear any remaining state flag if we're showing examples | |
st.session_state.perform_reset = True | |
# Create the close button with a callback | |
st.button("β Close Document", | |
key="close_document_button", | |
help="Clear current document and start over", | |
on_click=clear_document_state) | |
# Store the result in the previous results list | |
# Add timestamp to result for history tracking | |
result_copy = result.copy() | |
result_copy['timestamp'] = format_timestamp() | |
# Store if this was a sample document | |
if 'is_sample_document' in st.session_state and st.session_state.is_sample_document: | |
result_copy['sample_document'] = True | |
# Add to session state, keeping the most recent 20 results | |
st.session_state.previous_results.insert(0, result_copy) | |
if len(st.session_state.previous_results) > 20: | |
st.session_state.previous_results = st.session_state.previous_results[:20] | |
except Exception as e: | |
st.error(f"Error processing document: {str(e)}") | |
# Log the error | |
import logging | |
logging.error(f"Document processing error: {str(e)}", exc_info=True) | |
def main(): | |
"""Main application function""" | |
# Initialize session state | |
initialize_session_state() | |
# Apply custom CSS | |
from ui.layout import load_css | |
load_css() | |
# Create sidebar options | |
sidebar_options = create_sidebar_options() | |
# Create main layout with tabs | |
main_tab1, main_tab2, main_tab3 = st.tabs(["Document Processing", "Previous Results", "About"]) | |
with main_tab1: | |
# Create a two-column layout for file upload and results | |
left_col, right_col = st.columns([1, 1]) | |
with left_col: | |
# Create file uploader | |
uploaded_file = create_file_uploader() | |
# Check if we have a sample document loaded | |
if ('sample_document' in st.session_state and | |
st.session_state.sample_document is not None): | |
# Use the sample document instead of the uploaded file | |
uploaded_file = st.session_state.sample_document | |
# Just reset the sample document loading flags after it's been used | |
if st.session_state.sample_just_loaded: | |
st.session_state.sample_just_loaded = False | |
st.session_state.sample_document_processed = True | |
st.session_state.auto_process_sample = True | |
# Only process document if available | |
if uploaded_file is not None: | |
process_document(uploaded_file, left_col, right_col, sidebar_options) | |
else: | |
# Clear any remaining state flag if we're showing examples | |
st.session_state.processed_document_active = False | |
# Show example documents section | |
show_example_documents() | |
with main_tab2: | |
# Previous results tab | |
display_previous_results() | |
with main_tab3: | |
# About tab | |
display_about_tab() | |
# Run the application | |
if __name__ == "__main__": | |
main() | |