historical-ocr / app.py
milwright's picture
Fix sample document processing and close functionality
71cd5b2
raw
history blame
137 kB
import os
import streamlit as st
import json
import sys
import time
import base64
# Updated import section
from pathlib import Path
import tempfile
import io
from pdf2image import convert_from_bytes
from PIL import Image, ImageEnhance, ImageFilter
import cv2
import numpy as np
from datetime import datetime
# Import the StructuredOCR class and config from the local files
from structured_ocr import StructuredOCR
from config import MISTRAL_API_KEY
# Import utilities for handling previous results
from ocr_utils import create_results_zip
def get_base64_from_image(image_path):
"""Get base64 string from image file"""
with open(image_path, "rb") as img_file:
return base64.b64encode(img_file.read()).decode('utf-8')
# Set favicon path
favicon_path = os.path.join(os.path.dirname(__file__), "static/favicon.png")
# Set page configuration
st.set_page_config(
page_title="Historical OCR",
page_icon=favicon_path if os.path.exists(favicon_path) else "πŸ“œ",
layout="wide",
initial_sidebar_state="expanded"
)
# Enable caching for expensive operations with longer TTL for better performance
@st.cache_data(ttl=24*3600, show_spinner=False) # Cache for 24 hours instead of 1 hour
def convert_pdf_to_images(pdf_bytes, dpi=150, rotation=0):
"""Convert PDF bytes to a list of images with caching"""
try:
images = convert_from_bytes(pdf_bytes, dpi=dpi)
# Apply rotation if specified
if rotation != 0 and images:
rotated_images = []
for img in images:
rotated_img = img.rotate(rotation, expand=True, resample=Image.BICUBIC)
rotated_images.append(rotated_img)
return rotated_images
return images
except Exception as e:
st.error(f"Error converting PDF: {str(e)}")
return []
# Cache preprocessed images for better performance
@st.cache_data(ttl=24*3600, show_spinner=False) # Cache for 24 hours
def preprocess_image(image_bytes, preprocessing_options):
"""Preprocess image with selected options optimized for historical document OCR quality"""
# Setup basic console logging
import logging
logger = logging.getLogger("image_preprocessor")
logger.setLevel(logging.INFO)
# Log which preprocessing options are being applied
logger.info(f"Preprocessing image with options: {preprocessing_options}")
# Convert bytes to PIL Image
image = Image.open(io.BytesIO(image_bytes))
# Check for alpha channel (RGBA) and convert to RGB if needed
if image.mode == 'RGBA':
# Convert RGBA to RGB by compositing the image onto a white background
background = Image.new('RGB', image.size, (255, 255, 255))
background.paste(image, mask=image.split()[3]) # 3 is the alpha channel
image = background
logger.info("Converted RGBA image to RGB")
elif image.mode not in ('RGB', 'L'):
# Convert other modes to RGB as well
image = image.convert('RGB')
logger.info(f"Converted {image.mode} image to RGB")
# Apply rotation if specified
if preprocessing_options.get("rotation", 0) != 0:
rotation_degrees = preprocessing_options.get("rotation")
image = image.rotate(rotation_degrees, expand=True, resample=Image.BICUBIC)
# Resize large images while preserving details important for OCR
width, height = image.size
max_dimension = max(width, height)
# Less aggressive resizing to preserve document details
if max_dimension > 2500:
scale_factor = 2500 / max_dimension
new_width = int(width * scale_factor)
new_height = int(height * scale_factor)
# Use LANCZOS for better quality preservation
image = image.resize((new_width, new_height), Image.LANCZOS)
img_array = np.array(image)
# Apply preprocessing based on selected options with settings optimized for historical documents
document_type = preprocessing_options.get("document_type", "standard")
# Process grayscale option first as it's a common foundation
if preprocessing_options.get("grayscale", False):
if len(img_array.shape) == 3: # Only convert if it's not already grayscale
if document_type == "handwritten":
# Enhanced grayscale processing for handwritten documents
img_array = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY)
# Apply adaptive histogram equalization to enhance handwriting
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
img_array = clahe.apply(img_array)
else:
# Standard grayscale for printed documents
img_array = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY)
# Convert back to RGB for further processing
img_array = cv2.cvtColor(img_array, cv2.COLOR_GRAY2RGB)
if preprocessing_options.get("contrast", 0) != 0:
contrast_factor = 1 + (preprocessing_options.get("contrast", 0) / 10)
image = Image.fromarray(img_array)
enhancer = ImageEnhance.Contrast(image)
image = enhancer.enhance(contrast_factor)
img_array = np.array(image)
if preprocessing_options.get("denoise", False):
try:
# Apply appropriate denoising based on document type
if document_type == "handwritten":
# Very light denoising for handwritten documents to preserve pen strokes
if len(img_array.shape) == 3 and img_array.shape[2] == 3: # Color image
img_array = cv2.fastNlMeansDenoisingColored(img_array, None, 3, 3, 5, 9)
else: # Grayscale image
img_array = cv2.fastNlMeansDenoising(img_array, None, 3, 7, 21)
else:
# Standard denoising for printed documents
if len(img_array.shape) == 3 and img_array.shape[2] == 3: # Color image
img_array = cv2.fastNlMeansDenoisingColored(img_array, None, 5, 5, 7, 21)
else: # Grayscale image
img_array = cv2.fastNlMeansDenoising(img_array, None, 5, 7, 21)
except Exception as e:
print(f"Denoising error: {str(e)}, falling back to standard processing")
# Convert back to PIL Image
processed_image = Image.fromarray(img_array)
# Higher quality for OCR processing
byte_io = io.BytesIO()
try:
# Make sure the image is in RGB mode before saving as JPEG
if processed_image.mode not in ('RGB', 'L'):
processed_image = processed_image.convert('RGB')
processed_image.save(byte_io, format='JPEG', quality=92, optimize=True)
byte_io.seek(0)
logger.info(f"Preprocessing complete. Original image mode: {image.mode}, processed mode: {processed_image.mode}")
logger.info(f"Original size: {len(image_bytes)/1024:.1f}KB, processed size: {len(byte_io.getvalue())/1024:.1f}KB")
return byte_io.getvalue()
except Exception as e:
logger.error(f"Error saving processed image: {str(e)}")
# Fallback to original image
logger.info("Using original image as fallback")
image_io = io.BytesIO()
image.save(image_io, format='JPEG', quality=92)
image_io.seek(0)
return image_io.getvalue()
# Cache OCR results in memory to speed up repeated processing
@st.cache_data(ttl=24*3600, max_entries=20, show_spinner=False)
def process_file_cached(file_path, file_type, use_vision, file_size_mb, cache_key):
"""Cached version of OCR processing to reuse results"""
# Initialize OCR processor
processor = StructuredOCR()
# Process the file
result = processor.process_file(
file_path,
file_type=file_type,
use_vision=use_vision,
file_size_mb=file_size_mb
)
return result
# Define functions
def process_file(uploaded_file, use_vision=True, preprocessing_options=None, progress_container=None):
"""Process the uploaded file and return the OCR results
Args:
uploaded_file: The uploaded file to process
use_vision: Whether to use vision model
preprocessing_options: Dictionary of preprocessing options
progress_container: Optional container for progress indicators
"""
if preprocessing_options is None:
preprocessing_options = {}
# Create a container for progress indicators if not provided
if progress_container is None:
progress_container = st.empty()
with progress_container.container():
progress_bar = st.progress(0)
status_text = st.empty()
status_text.markdown('<div class="processing-status-container">Preparing file for processing...</div>', unsafe_allow_html=True)
try:
# Check if API key is available
if not MISTRAL_API_KEY:
# Return dummy data if no API key
progress_bar.progress(100)
status_text.empty()
return {
"file_name": uploaded_file.name,
"topics": ["Document"],
"languages": ["English"],
"ocr_contents": {
"title": "API Key Required",
"content": "Please set the MISTRAL_API_KEY environment variable to process documents."
}
}
# Update progress - more granular steps
progress_bar.progress(10)
status_text.markdown('<div class="processing-status-container">Initializing OCR processor...</div>', unsafe_allow_html=True)
# Determine file type from extension
file_ext = Path(uploaded_file.name).suffix.lower()
file_type = "pdf" if file_ext == ".pdf" else "image"
file_bytes = uploaded_file.getvalue()
# Create a temporary file for processing
with tempfile.NamedTemporaryFile(delete=False, suffix=file_ext) as tmp:
tmp.write(file_bytes)
temp_path = tmp.name
# Get PDF rotation value if available and file is a PDF
pdf_rotation_value = pdf_rotation if 'pdf_rotation' in locals() and file_type == "pdf" else 0
progress_bar.progress(15)
# For PDFs, we need to handle differently
if file_type == "pdf":
status_text.markdown('<div class="processing-status-container">Converting PDF to images...</div>', unsafe_allow_html=True)
progress_bar.progress(20)
# Convert PDF to images
try:
# Use the PDF processing pipeline directly from the StructuredOCR class
processor = StructuredOCR()
# Process the file with direct PDF handling
progress_bar.progress(30)
status_text.markdown('<div class="processing-status-container">Processing PDF with OCR...</div>', unsafe_allow_html=True)
# Get file size in MB for API limits
file_size_mb = os.path.getsize(temp_path) / (1024 * 1024)
# Check if file exceeds API limits (50 MB)
if file_size_mb > 50:
os.unlink(temp_path) # Clean up temp file
progress_bar.progress(100)
status_text.empty()
progress_container.empty()
return {
"file_name": uploaded_file.name,
"topics": ["Document"],
"languages": ["English"],
"error": f"File size {file_size_mb:.2f} MB exceeds Mistral API limit of 50 MB",
"ocr_contents": {
"error": f"Failed to process file: File size {file_size_mb:.2f} MB exceeds Mistral API limit of 50 MB",
"partial_text": "Document could not be processed due to size limitations."
}
}
# Generate cache key
import hashlib
file_hash = hashlib.md5(file_bytes).hexdigest()
cache_key = f"{file_hash}_{file_type}_{use_vision}_{pdf_rotation_value}"
# Process with cached function if possible
try:
result = process_file_cached(temp_path, file_type, use_vision, file_size_mb, cache_key)
progress_bar.progress(90)
status_text.markdown('<div class="processing-status-container">Finalizing results...</div>', unsafe_allow_html=True)
except Exception as e:
status_text.markdown(f'<div class="processing-status-container">Processing error: {str(e)}. Retrying...</div>', unsafe_allow_html=True)
progress_bar.progress(60)
# If caching fails, process directly
result = processor.process_file(
temp_path,
file_type=file_type,
use_vision=use_vision,
file_size_mb=file_size_mb,
)
progress_bar.progress(90)
status_text.markdown('<div class="processing-status-container">Finalizing results...</div>', unsafe_allow_html=True)
except Exception as e:
os.unlink(temp_path) # Clean up temp file
progress_bar.progress(100)
status_text.empty()
progress_container.empty()
raise ValueError(f"Error processing PDF: {str(e)}")
else:
# For image files, apply preprocessing if needed
# Check if any preprocessing options with boolean values are True, or if any non-boolean values are non-default
has_preprocessing = (
preprocessing_options.get("grayscale", False) or
preprocessing_options.get("denoise", False) or
preprocessing_options.get("contrast", 0) != 0 or
preprocessing_options.get("rotation", 0) != 0 or
preprocessing_options.get("document_type", "standard") != "standard"
)
# Add document type hints to custom prompt if available from document type selector - with safety checks
if ('custom_prompt' in locals() and custom_prompt and
'selected_doc_type' in locals() and selected_doc_type != "Auto-detect (standard processing)" and
"This is a" not in str(custom_prompt)):
# Extract just the document type from the selector
doc_type_hint = selected_doc_type.split(" or ")[0].lower()
# Prepend to the custom prompt
custom_prompt = f"This is a {doc_type_hint}. {custom_prompt}"
if has_preprocessing:
status_text.markdown('<div class="processing-status-container">Applying image preprocessing...</div>', unsafe_allow_html=True)
progress_bar.progress(20)
processed_bytes = preprocess_image(file_bytes, preprocessing_options)
progress_bar.progress(25)
# Save processed image to temp file
with tempfile.NamedTemporaryFile(delete=False, suffix=file_ext) as proc_tmp:
proc_tmp.write(processed_bytes)
# Clean up original temp file and use the processed one
if os.path.exists(temp_path):
os.unlink(temp_path)
temp_path = proc_tmp.name
progress_bar.progress(30)
else:
progress_bar.progress(30)
# Get file size in MB for API limits
file_size_mb = os.path.getsize(temp_path) / (1024 * 1024)
# Check if file exceeds API limits (50 MB)
if file_size_mb > 50:
os.unlink(temp_path) # Clean up temp file
progress_bar.progress(100)
status_text.empty()
progress_container.empty()
return {
"file_name": uploaded_file.name,
"topics": ["Document"],
"languages": ["English"],
"error": f"File size {file_size_mb:.2f} MB exceeds Mistral API limit of 50 MB",
"ocr_contents": {
"error": f"Failed to process file: File size {file_size_mb:.2f} MB exceeds Mistral API limit of 50 MB",
"partial_text": "Document could not be processed due to size limitations."
}
}
# Update progress - more granular steps
progress_bar.progress(40)
status_text.markdown('<div class="processing-status-container">Preparing document for OCR analysis...</div>', unsafe_allow_html=True)
# Generate a cache key based on file content, type and settings
import hashlib
# Add pdf_rotation to cache key if present
pdf_rotation_value = pdf_rotation if 'pdf_rotation' in locals() else 0
file_hash = hashlib.md5(open(temp_path, 'rb').read()).hexdigest()
cache_key = f"{file_hash}_{file_type}_{use_vision}_{pdf_rotation_value}"
progress_bar.progress(50)
# Check if we have custom instructions
has_custom_prompt = 'custom_prompt' in locals() and custom_prompt and len(str(custom_prompt).strip()) > 0
if has_custom_prompt:
status_text.markdown('<div class="processing-status-container">Processing document with custom instructions...</div>', unsafe_allow_html=True)
else:
status_text.markdown('<div class="processing-status-container">Processing document with OCR...</div>', unsafe_allow_html=True)
# Process the file using cached function if possible
try:
result = process_file_cached(temp_path, file_type, use_vision, file_size_mb, cache_key)
progress_bar.progress(80)
status_text.markdown('<div class="processing-status-container">Analyzing document structure...</div>', unsafe_allow_html=True)
progress_bar.progress(90)
status_text.markdown('<div class="processing-status-container">Finalizing results...</div>', unsafe_allow_html=True)
except Exception as e:
progress_bar.progress(60)
status_text.markdown(f'<div class="processing-status-container">Processing error: {str(e)}. Retrying...</div>', unsafe_allow_html=True)
# If caching fails, process directly
processor = StructuredOCR()
result = processor.process_file(temp_path, file_type=file_type, use_vision=use_vision, file_size_mb=file_size_mb)
progress_bar.progress(90)
status_text.markdown('<div class="processing-status-container">Finalizing results...</div>', unsafe_allow_html=True)
# Complete progress
progress_bar.progress(100)
status_text.markdown('<div class="processing-status-container">Processing complete!</div>', unsafe_allow_html=True)
time.sleep(0.8) # Brief pause to show completion
status_text.empty()
progress_container.empty() # Remove progress indicators when done
# Clean up the temporary file
if os.path.exists(temp_path):
try:
os.unlink(temp_path)
except:
pass # Ignore errors when cleaning up temporary files
return result
except Exception as e:
progress_bar.progress(100)
error_message = str(e)
# Check for specific error types and provide helpful user-facing messages
if "rate limit" in error_message.lower() or "429" in error_message or "requests rate limit exceeded" in error_message.lower():
friendly_message = "The AI service is currently experiencing high demand. Please try again in a few minutes."
logger = logging.getLogger("app")
logger.error(f"Rate limit error: {error_message}")
status_text.markdown(f'<div class="processing-status-container" style="border-left-color: #ff9800;">Rate Limit: {friendly_message}</div>', unsafe_allow_html=True)
elif "quota" in error_message.lower() or "credit" in error_message.lower() or "subscription" in error_message.lower():
friendly_message = "The API usage quota has been reached. Please check your API key and subscription limits."
status_text.markdown(f'<div class="processing-status-container" style="border-left-color: #ef5350;">API Quota: {friendly_message}</div>', unsafe_allow_html=True)
else:
status_text.markdown(f'<div class="processing-status-container" style="border-left-color: #ef5350;">Error: {error_message}</div>', unsafe_allow_html=True)
time.sleep(1.5) # Show error briefly
status_text.empty()
progress_container.empty()
# Display an appropriate error message based on the exception type
if "rate limit" in error_message.lower() or "429" in error_message or "requests rate limit exceeded" in error_message.lower():
st.warning(f"API Rate Limit: {friendly_message} This is a temporary issue and does not indicate any problem with your document.")
elif "quota" in error_message.lower() or "credit" in error_message.lower() or "subscription" in error_message.lower():
st.error(f"API Quota Exceeded: {friendly_message}")
else:
st.error(f"Error during processing: {error_message}")
# Clean up the temporary file
try:
if 'temp_path' in locals() and os.path.exists(temp_path):
os.unlink(temp_path)
except:
pass # Ignore errors when cleaning up temporary files
raise
# App title and description
favicon_base64 = get_base64_from_image(os.path.join(os.path.dirname(__file__), "static/favicon.png"))
st.markdown(f'<div style="display: flex; align-items: center; gap: 10px;"><img src="data:image/png;base64,{favicon_base64}" width="36" height="36" alt="Scroll Icon"/> <div><h1 style="margin: 0; padding: 20px 0 0 0;">Historical Document OCR</h1></div></div>', unsafe_allow_html=True)
st.subheader("Made possible by Mistral AI")
# Check if pytesseract is available for fallback
try:
import pytesseract
has_pytesseract = True
except ImportError:
has_pytesseract = False
# Initialize session state for storing previous results if not already present
if 'previous_results' not in st.session_state:
st.session_state.previous_results = []
# Create main layout with tabs and columns
main_tab1, main_tab2, main_tab3 = st.tabs(["Document Processing", "Previous Results", "About"])
with main_tab1:
# Create a two-column layout for file upload and results
left_col, right_col = st.columns([1, 1])
# File uploader in the left column
with left_col:
# Simple CSS just to fix vertical text in drag and drop area
st.markdown("""
<style>
/* Reset all file uploader styling */
.uploadedFile, .uploadedFileData, .stFileUploader {
color: inherit !important;
}
/* Fix vertical text orientation */
.stFileUploader p,
.stFileUploader span,
.stFileUploader div p,
.stFileUploader div span,
.stFileUploader label p,
.stFileUploader label span,
.stFileUploader div[data-testid="stFileUploadDropzone"] p,
.stFileUploader div[data-testid="stFileUploadDropzone"] span {
writing-mode: horizontal-tb !important;
}
/* Simplify the drop zone appearance */
.stFileUploader > section > div,
.stFileUploader div[data-testid="stFileUploadDropzone"] {
min-height: 100px !important;
}
</style>
""", unsafe_allow_html=True)
# Add heading for the file uploader (just text, no container)
st.markdown('### Upload Document')
# Model info with clearer instructions
st.markdown("Using the latest `mistral-ocr-latest` model for advanced document understanding. To get started upload your own document, use an example document, or explore the 'About' tab for more info.")
# Enhanced file uploader with better help text
uploaded_file = st.file_uploader("Drag and drop PDFs or images here", type=["pdf", "png", "jpg", "jpeg"],
help="Limit 200MB per file β€’ PDF, PNG, JPG, JPEG")
# Removed seed prompt instructions from here, moving to sidebar
# Sidebar with options - moved up with equal spacing
with st.sidebar:
# Options title with reduced top margin
st.markdown("<h2 style='margin-top:-25px; margin-bottom:5px; padding:0;'>Options</h2>", unsafe_allow_html=True)
# Comprehensive CSS for optimal sidebar spacing and layout
st.markdown("""
<style>
/* Core sidebar spacing fixes */
.block-container {padding-top: 0;}
.stSidebar .block-container {padding-top: 0 !important;}
.stSidebar [data-testid='stSidebarNav'] {margin-bottom: 0 !important;}
.stSidebar [data-testid='stMarkdownContainer'] {margin-bottom: 0 !important; margin-top: 0 !important;}
.stSidebar [data-testid='stVerticalBlock'] {gap: 0 !important;}
/* Input element optimization */
.stSidebar .stCheckbox {margin: 0 !important; padding: 0 !important;}
.stSidebar .stSelectbox {margin: 0 0 3px !important; padding: 0 !important;}
.stSidebar .stSlider {margin: 0 0 5px !important; padding: 0 !important;}
.stSidebar .stNumberInput {margin: 0 0 5px !important; padding: 0 !important;}
.stSidebar .stTextArea {margin: 0 0 5px !important; padding: 0 !important;}
.stSidebar .stTextInput {margin: 0 0 5px !important; padding: 0 !important;}
/* Heading and label optimization */
.stSidebar h1, .stSidebar h2, .stSidebar h3, .stSidebar h4, .stSidebar h5 {
margin: 2px 0 !important;
padding: 0 !important;
line-height: 1.2 !important;
}
/* Label text optimization */
.stSidebar label {margin: 0 !important; line-height: 1.2 !important;}
.stSidebar .stTextArea label, .stSidebar .stSelectbox label {margin-top: 2px !important;}
/* Help text optimization */
.stSidebar .stTooltipIcon {margin: 0 !important; height: 1em !important;}
/* Slider optimization */
.stSidebar [data-baseweb="slider"] {margin: 10px 0 0 !important;}
/* Expander optimization */
.stSidebar .stExpander {margin: 0 0 8px !important;}
.stSidebar .streamlit-expanderHeader {font-size: 0.9em !important;}
.stSidebar .streamlit-expanderContent {padding-top: 5px !important;}
/* Remove unnecessary margins in form elements */
.stSidebar .stForm > div {margin: 0 !important;}
</style>
""", unsafe_allow_html=True)
# Model options
use_vision = st.checkbox("Use Vision Model", value=True,
help="Use vision model for improved analysis (may be slower)")
# Add spacing between sections
st.markdown("<div style='margin: 10px 0;'></div>", unsafe_allow_html=True)
# Document Processing section
st.markdown("##### OCR Instructions", help="Optimize text extraction")
# Document type selector
document_types = [
"Auto-detect (standard processing)",
"Newspaper or Magazine",
"Letter or Correspondence",
"Book or Publication",
"Form or Legal Document",
"Recipe",
"Handwritten Document",
"Map or Illustration",
"Table or Spreadsheet",
"Other (specify in instructions)"
]
selected_doc_type = st.selectbox(
"Document Type",
options=document_types,
index=0,
help="Select document type to optimize OCR processing for specific document formats and layouts. For documents with specialized features, also provide details in the instructions field below."
)
# Document layout selector
document_layouts = [
"Standard layout",
"Multiple columns",
"Table/grid format",
"Mixed layout with images"
]
selected_layout = st.selectbox(
"Document Layout",
options=document_layouts,
index=0,
help="Select the document's text layout for better OCR"
)
# Generate dynamic prompt based on both document type and layout
custom_prompt_text = ""
# First add document type specific instructions (simplified)
if selected_doc_type != "Auto-detect (standard processing)":
if selected_doc_type == "Newspaper or Magazine":
custom_prompt_text = "This is a newspaper/magazine. Process columns from top to bottom, capture headlines, bylines, article text and captions."
elif selected_doc_type == "Letter or Correspondence":
custom_prompt_text = "This is a letter/correspondence. Capture letterhead, date, greeting, body, closing and signature. Note any handwritten annotations."
elif selected_doc_type == "Book or Publication":
custom_prompt_text = "This is a book/publication. Extract titles, headers, footnotes, page numbers and body text. Preserve paragraph structure and any special formatting."
elif selected_doc_type == "Form or Legal Document":
custom_prompt_text = "This is a form/legal document. Extract all field labels and values, preserving the structure. Pay special attention to signature lines, dates, and any official markings."
elif selected_doc_type == "Recipe":
custom_prompt_text = "This is a recipe. Extract title, ingredients list with measurements, and preparation instructions. Maintain the distinction between ingredients and preparation steps."
elif selected_doc_type == "Handwritten Document":
custom_prompt_text = "This is a handwritten document. Carefully transcribe all handwritten text, preserving line breaks. Note any unclear sections or annotations."
elif selected_doc_type == "Map or Illustration":
custom_prompt_text = "This is a map or illustration. Transcribe all labels, legends, captions, and annotations. Note any scale indicators or directional markings."
elif selected_doc_type == "Table or Spreadsheet":
custom_prompt_text = "This is a table/spreadsheet. Preserve row and column structure, maintaining alignment of data. Extract headers and all cell values."
elif selected_doc_type == "Other (specify in instructions)":
custom_prompt_text = "Please describe the document type and any special processing requirements here."
# Then add layout specific instructions if needed
if selected_layout != "Standard layout" and not custom_prompt_text:
if selected_layout == "Multiple columns":
custom_prompt_text = "Document has multiple columns. Read each column from top to bottom, then move to the next column."
elif selected_layout == "Table/grid format":
custom_prompt_text = "Document contains table data. Preserve row and column structure during extraction."
elif selected_layout == "Mixed layout with images":
custom_prompt_text = "Document has mixed text layout with images. Extract text in proper reading order."
# If both document type and non-standard layout are selected, add layout info
elif selected_layout != "Standard layout" and custom_prompt_text:
if selected_layout == "Multiple columns":
custom_prompt_text += " Document has multiple columns."
elif selected_layout == "Table/grid format":
custom_prompt_text += " Contains table/grid formatting."
elif selected_layout == "Mixed layout with images":
custom_prompt_text += " Has mixed text layout with images."
# Add spacing between sections
st.markdown("<div style='margin: 10px 0;'></div>", unsafe_allow_html=True)
custom_prompt = st.text_area(
"Additional OCR Instructions",
value=custom_prompt_text,
placeholder="Example: Small text at bottom needs special attention",
height=100,
max_chars=300,
key="custom_analysis_instructions",
help="Specify document type and special OCR requirements. Detailed instructions activate Mistral AI's advanced document analysis."
)
# Custom instructions expander
with st.expander("Custom Instruction Examples"):
st.markdown("""
**Document Format Instructions:**
- "This newspaper has multiple columns - read each column from top to bottom"
- "This letter has a formal heading, main body, and signature section at bottom"
- "This form has fields with labels and filled-in values that should be paired"
- "This recipe has ingredient list at top and preparation steps below"
**Special Processing Instructions:**
- "Pay attention to footnotes at the bottom of each page"
- "Some text is faded - please attempt to reconstruct unclear passages"
- "There are handwritten annotations in the margins that should be included"
- "Document has table data that should preserve row and column alignment"
- "Text continues across pages and should be connected into a single flow"
- "This document uses special symbols and mathematical notation"
""")
# Add spacing between sections
st.markdown("<div style='margin: 10px 0;'></div>", unsafe_allow_html=True)
# Image preprocessing options with reduced spacing
st.markdown("##### Image Processing", help="Options for enhancing images")
with st.expander("Preprocessing Options", expanded=False):
preprocessing_options = {}
# Document type selector
doc_type_options = ["standard", "handwritten", "typed", "printed"]
preprocessing_options["document_type"] = st.selectbox(
"Document Type",
options=doc_type_options,
index=0,
format_func=lambda x: x.capitalize(),
help="Select document type for optimized processing"
)
preprocessing_options["grayscale"] = st.checkbox("Convert to Grayscale",
help="Convert image to grayscale before OCR")
preprocessing_options["denoise"] = st.checkbox("Denoise Image",
help="Remove noise from the image")
preprocessing_options["contrast"] = st.slider("Adjust Contrast", -5, 5, 0,
help="Adjust image contrast (-5 to +5)")
# Add rotation options
rotation_options = [0, 90, 180, 270]
preprocessing_options["rotation"] = st.select_slider(
"Rotate Document",
options=rotation_options,
value=0,
format_func=lambda x: f"{x}Β° {'(No rotation)' if x == 0 else ''}",
help="Rotate the document to correct orientation"
)
# Add spacing between sections
st.markdown("<div style='margin: 10px 0;'></div>", unsafe_allow_html=True)
# PDF options with consistent formatting
st.markdown("##### PDF Settings", help="Options for PDF documents")
with st.expander("PDF Options", expanded=False):
pdf_dpi = st.slider("Resolution (DPI)", 72, 300, 100,
help="Higher DPI = better quality but slower")
max_pages = st.number_input("Max Pages", 1, 20, 3,
help="Limit number of pages to process")
# Add PDF rotation option
pdf_rotation = st.select_slider(
"Rotation",
options=rotation_options,
value=0,
format_func=lambda x: f"{x}Β°",
help="Rotate PDF pages"
)
# Previous Results tab content
with main_tab2:
st.markdown('<h2>Previous Results</h2>', unsafe_allow_html=True)
# Load custom CSS for Previous Results tab
from ui.layout import load_css
load_css()
# Display previous results if available
if not st.session_state.previous_results:
st.markdown("""
<div class="previous-results-container" style="text-align: center; padding: 40px 20px; background-color: #f0f2f6; border-radius: 8px;">
<div style="font-size: 48px; margin-bottom: 20px;">πŸ“„</div>
<h3 style="margin-bottom: 10px; font-weight: 600;">No Previous Results</h3>
<p style="font-size: 16px;">Process a document to see your results history saved here.</p>
</div>
""", unsafe_allow_html=True)
else:
# Create a container for the results list
st.markdown('<div class="previous-results-container">', unsafe_allow_html=True)
st.markdown(f'<h3>{len(st.session_state.previous_results)} Previous Results</h3>', unsafe_allow_html=True)
# Create two columns for filters and download buttons
filter_col, download_col = st.columns([2, 1])
with filter_col:
# Add filter options
filter_options = ["All Types"]
if any(result.get("file_name", "").lower().endswith(".pdf") for result in st.session_state.previous_results):
filter_options.append("PDF Documents")
if any(result.get("file_name", "").lower().endswith((".jpg", ".jpeg", ".png")) for result in st.session_state.previous_results):
filter_options.append("Images")
selected_filter = st.selectbox("Filter by Type:", filter_options)
with download_col:
# Add download all button for results
if len(st.session_state.previous_results) > 0:
try:
# Create buffer in memory instead of file on disk
import io
from ocr_utils import create_results_zip_in_memory
# Get zip data directly in memory
zip_data = create_results_zip_in_memory(st.session_state.previous_results)
# Create more informative ZIP filename with timestamp
from datetime import datetime
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Count document types for a more descriptive filename
pdf_count = sum(1 for r in st.session_state.previous_results if r.get('file_name', '').lower().endswith('.pdf'))
img_count = sum(1 for r in st.session_state.previous_results if r.get('file_name', '').lower().endswith(('.jpg', '.jpeg', '.png')))
# Create more descriptive filename
if pdf_count > 0 and img_count > 0:
zip_filename = f"historical_ocr_mixed_{pdf_count}pdf_{img_count}img_{timestamp}.zip"
elif pdf_count > 0:
zip_filename = f"historical_ocr_pdf_documents_{pdf_count}_{timestamp}.zip"
elif img_count > 0:
zip_filename = f"historical_ocr_images_{img_count}_{timestamp}.zip"
else:
zip_filename = f"historical_ocr_results_{timestamp}.zip"
st.download_button(
label="Download All Results",
data=zip_data,
file_name=zip_filename,
mime="application/zip",
help="Download all previous results as a ZIP file containing HTML and JSON files"
)
except Exception as e:
st.error(f"Error creating download: {str(e)}")
st.info("Try with fewer results or individual downloads")
# Filter results based on selection
filtered_results = st.session_state.previous_results
if selected_filter == "PDF Documents":
filtered_results = [r for r in st.session_state.previous_results if r.get("file_name", "").lower().endswith(".pdf")]
elif selected_filter == "Images":
filtered_results = [r for r in st.session_state.previous_results if r.get("file_name", "").lower().endswith((".jpg", ".jpeg", ".png"))]
# Show a message if no results match the filter
if not filtered_results:
st.markdown("""
<div style="text-align: center; padding: 20px; background-color: #f9f9f9; border-radius: 5px; margin: 20px 0;">
<p>No results match the selected filter.</p>
</div>
""", unsafe_allow_html=True)
# Display each result as a card
for i, result in enumerate(filtered_results):
# Determine file type icon
file_name = result.get("file_name", f"Document {i+1}")
file_type_lower = file_name.lower()
if file_type_lower.endswith(".pdf"):
icon = "πŸ“„"
elif file_type_lower.endswith((".jpg", ".jpeg", ".png", ".gif")):
icon = "πŸ–ΌοΈ"
else:
icon = "πŸ“"
# Create a card for each result
st.markdown(f"""
<div class="result-card">
<div class="result-header">
<div class="result-filename">{icon} {result.get('descriptive_file_name', file_name)}</div>
<div class="result-date">{result.get('timestamp', 'Unknown')}</div>
</div>
<div class="result-metadata">
<div class="result-tag">Languages: {', '.join(result.get('languages', ['Unknown']))}</div>
<div class="result-tag">Topics: {', '.join(result.get('topics', ['Unknown'])[:5])} {' + ' + str(len(result.get('topics', [])) - 5) + ' more' if len(result.get('topics', [])) > 5 else ''}</div>
</div>
""", unsafe_allow_html=True)
# Add view button inside the card with proper styling
st.markdown('<div class="result-action-button">', unsafe_allow_html=True)
if st.button(f"View Document", key=f"view_{i}"):
# Set the selected result in the session state
st.session_state.selected_previous_result = st.session_state.previous_results[i]
# Force a rerun to show the selected result
st.rerun()
st.markdown('</div>', unsafe_allow_html=True)
# Close the result card
st.markdown('</div>', unsafe_allow_html=True)
# Close the container
st.markdown('</div>', unsafe_allow_html=True)
# Display the selected result if available
if 'selected_previous_result' in st.session_state and st.session_state.selected_previous_result:
selected_result = st.session_state.selected_previous_result
# Create a styled container for the selected result
st.markdown(f"""
<div class="selected-result-container">
<div class="result-header" style="margin-bottom: 20px;">
<div class="selected-result-title">Selected Document: {selected_result.get('file_name', 'Unknown')}</div>
<div class="result-date">{selected_result.get('timestamp', '')}</div>
</div>
""", unsafe_allow_html=True)
# Display metadata in a styled way
meta_col1, meta_col2 = st.columns(2)
with meta_col1:
# Display document metadata
if 'languages' in selected_result:
languages = [lang for lang in selected_result['languages'] if lang is not None]
if languages:
st.write(f"**Languages:** {', '.join(languages)}")
if 'topics' in selected_result and selected_result['topics']:
# Show topics in a more organized way with badges
st.markdown("**Subject Tags:**")
# Create a container with flex display for the tags
st.markdown('<div style="display: flex; flex-wrap: wrap; gap: 5px; margin-top: 5px;">', unsafe_allow_html=True)
# Generate a badge for each tag
for topic in selected_result['topics']:
# Create colored badge based on tag category
badge_color = "#546e7a" # Default color
# Assign colors by category
if any(term in topic.lower() for term in ["century", "pre-", "era", "historical"]):
badge_color = "#1565c0" # Blue for time periods
elif any(term in topic.lower() for term in ["language", "english", "french", "german", "latin"]):
badge_color = "#00695c" # Teal for languages
elif any(term in topic.lower() for term in ["letter", "newspaper", "book", "form", "document", "recipe"]):
badge_color = "#6a1b9a" # Purple for document types
elif any(term in topic.lower() for term in ["travel", "military", "science", "medicine", "education", "art", "literature"]):
badge_color = "#2e7d32" # Green for subject domains
elif any(term in topic.lower() for term in ["preprocessed", "enhanced", "grayscale", "denoised", "contrast", "rotated"]):
badge_color = "#e65100" # Orange for preprocessing-related tags
st.markdown(
f'<span style="background-color: {badge_color}; color: white; padding: 3px 8px; '
f'border-radius: 12px; font-size: 0.85em; display: inline-block; margin-bottom: 5px;">{topic}</span>',
unsafe_allow_html=True
)
# Close the container
st.markdown('</div>', unsafe_allow_html=True)
with meta_col2:
# Display processing metadata
if 'limited_pages' in selected_result:
st.info(f"Processed {selected_result['limited_pages']['processed']} of {selected_result['limited_pages']['total']} pages")
if 'processing_time' in selected_result:
proc_time = selected_result['processing_time']
st.write(f"**Processing Time:** {proc_time:.1f}s")
# Create tabs for content display
has_images = selected_result.get('has_images', False)
if has_images:
view_tab1, view_tab2, view_tab3 = st.tabs(["Structured View", "Raw JSON", "With Images"])
else:
view_tab1, view_tab2 = st.tabs(["Structured View", "Raw JSON"])
with view_tab1:
# Display structured content
if 'ocr_contents' in selected_result and isinstance(selected_result['ocr_contents'], dict):
for section, content in selected_result['ocr_contents'].items():
if content and section not in ['error', 'raw_text', 'partial_text']: # Skip error and raw text sections
st.markdown(f"#### {section.replace('_', ' ').title()}")
if isinstance(content, str):
st.write(content)
elif isinstance(content, list):
for item in content:
if isinstance(item, str):
st.write(f"- {item}")
else:
st.write(f"- {str(item)}")
elif isinstance(content, dict):
for k, v in content.items():
st.write(f"**{k}:** {v}")
with view_tab2:
# Show the raw JSON with an option to download it
try:
st.json(selected_result)
except Exception as e:
st.error(f"Error displaying JSON: {str(e)}")
# Try a safer approach with string representation
st.code(str(selected_result))
# Create more informative JSON download button with better naming
try:
json_str = json.dumps(selected_result, indent=2)
# Use the descriptive filename if available, otherwise build one
if 'descriptive_file_name' in selected_result:
# Get base name without extension
base_filename = Path(selected_result['descriptive_file_name']).stem
else:
# Fall back to old method of building filename
base_filename = selected_result.get('file_name', 'document').split('.')[0]
# Add document type if available
if 'topics' in selected_result and selected_result['topics']:
topic = selected_result['topics'][0].lower().replace(' ', '_')
base_filename = f"{base_filename}_{topic}"
# Add language if available
if 'languages' in selected_result and selected_result['languages']:
lang = selected_result['languages'][0].lower()
# Only add if it's not already in the filename
if lang not in base_filename.lower():
base_filename = f"{base_filename}_{lang}"
# For PDFs, add page information
if 'total_pages' in selected_result and 'processed_pages' in selected_result:
base_filename = f"{base_filename}_p{selected_result['processed_pages']}of{selected_result['total_pages']}"
# Get date from timestamp if available
timestamp = ""
if 'timestamp' in selected_result:
try:
# Try to parse the timestamp and reformat it
from datetime import datetime
dt = datetime.strptime(selected_result['timestamp'], "%Y-%m-%d %H:%M")
timestamp = dt.strftime("%Y%m%d_%H%M%S")
except:
# If parsing fails, create a new timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
else:
# No timestamp in the result, create a new one
from datetime import datetime
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Create final filename
json_filename = f"{base_filename}_{timestamp}.json"
st.download_button(
label="Download JSON",
data=json_str,
file_name=json_filename,
mime="application/json"
)
except Exception as e:
st.error(f"Error creating JSON download: {str(e)}")
# Fallback to string representation for download with simple naming
from datetime import datetime
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
st.download_button(
label="Download as Text",
data=str(selected_result),
file_name=f"document_{timestamp}.txt",
mime="text/plain"
)
if has_images and 'pages_data' in selected_result:
with view_tab3:
# Display content with images in a nicely formatted way
pages_data = selected_result.get('pages_data', [])
# Process and display each page
for page_idx, page in enumerate(pages_data):
# Add a page header if multi-page
if len(pages_data) > 1:
st.markdown(f"### Page {page_idx + 1}")
# Create columns for better layout
if page.get('images'):
# Extract images for this page
images = page.get('images', [])
for img in images:
if 'image_base64' in img:
st.image(img['image_base64'], width=600)
# Display text content if available
text_content = page.get('markdown', '')
if text_content:
with st.expander("View Page Text", expanded=True):
st.markdown(text_content)
else:
# Just display text if no images
text_content = page.get('markdown', '')
if text_content:
st.markdown(text_content)
# Add page separator
if page_idx < len(pages_data) - 1:
st.markdown("---")
# Add HTML download button with improved, more descriptive filename
from ocr_utils import create_html_with_images
html_content = create_html_with_images(selected_result)
# Use the descriptive filename if available, otherwise build one
if 'descriptive_file_name' in selected_result:
# Get base name without extension
base_filename = Path(selected_result['descriptive_file_name']).stem
else:
# Fall back to old method of building filename
base_filename = selected_result.get('file_name', 'document').split('.')[0]
# Add document type if available
if 'topics' in selected_result and selected_result['topics']:
topic = selected_result['topics'][0].lower().replace(' ', '_')
base_filename = f"{base_filename}_{topic}"
# Add language if available
if 'languages' in selected_result and selected_result['languages']:
lang = selected_result['languages'][0].lower()
# Only add if it's not already in the filename
if lang not in base_filename.lower():
base_filename = f"{base_filename}_{lang}"
# For PDFs, add page information
if 'total_pages' in selected_result and 'processed_pages' in selected_result:
base_filename = f"{base_filename}_p{selected_result['processed_pages']}of{selected_result['total_pages']}"
# Get date from timestamp if available
timestamp = ""
if 'timestamp' in selected_result:
try:
# Try to parse the timestamp and reformat it
from datetime import datetime
dt = datetime.strptime(selected_result['timestamp'], "%Y-%m-%d %H:%M")
timestamp = dt.strftime("%Y%m%d_%H%M%S")
except:
# If parsing fails, create a new timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
else:
# No timestamp in the result, create a new one
from datetime import datetime
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Create final filename
html_filename = f"{base_filename}_{timestamp}_with_images.html"
st.download_button(
label="Download as HTML with Images",
data=html_content,
file_name=html_filename,
mime="text/html"
)
# Close the container
st.markdown('</div>', unsafe_allow_html=True)
# Add clear button outside the container with proper styling
col1, col2, col3 = st.columns([1, 1, 1])
with col2:
st.markdown('<div class="result-action-button" style="text-align: center;">', unsafe_allow_html=True)
if st.button("Close Selected Document", key="close_selected"):
# Clear the selected result from session state
del st.session_state.selected_previous_result
# Force a rerun to update the view
st.rerun()
st.markdown('</div>', unsafe_allow_html=True)
# About tab content
with main_tab3:
# Add a notice about local OCR fallback if available
fallback_notice = ""
if 'has_pytesseract' in locals() and has_pytesseract:
fallback_notice = """
**Local OCR Fallback:**
- Local OCR fallback using Tesseract is available if API rate limits are reached
- Provides basic text extraction when cloud OCR is unavailable
"""
st.markdown(f"""
### About Historical Document OCR
This application specializes in processing historical documents using [Mistral AI's Document OCR](https://docs.mistral.ai/capabilities/document/), which is particularly effective for handling challenging textual materials.
#### Document Processing Capabilities
- **Historical Images**: Process vintage photographs, scanned historical papers, manuscripts
- **Handwritten Documents**: Extract text from letters, journals, notes, and records
- **Multi-Page PDFs**: Process historical books, articles, and longer documents
- **Mixed Content**: Handle documents with both text and imagery
#### Key Features
- **Advanced Image Preprocessing**
- Grayscale conversion optimized for historical documents
- Denoising to remove artifacts and improve clarity
- Contrast adjustment to enhance faded text
- Document rotation for proper orientation
- **Document Analysis**
- Text extraction with `mistral-ocr-latest`
- Structured data extraction: dates, names, places, topics
- Multi-language support with automatic detection
- Handling of period-specific terminology and obsolete language
- **Flexible Output Formats**
- Structured view with organized content sections
- Developer JSON for integration with other applications
- Visual representation preserving original document layout
- Downloadable results in various formats
#### Historical Context
Add period-specific context to improve analysis:
- Historical period selection
- Document purpose identification
- Custom instructions for specialized terminology
#### Data Privacy
- All document processing happens through secure AI processing
- No documents are permanently stored on the server
- Results are only saved in your current session
{fallback_notice}
""")
with main_tab1:
# Initialize all session state variables in one place at the beginning
# This ensures they exist before being accessed anywhere in the code
if 'auto_process_sample' not in st.session_state:
st.session_state.auto_process_sample = False
if 'sample_just_loaded' not in st.session_state:
st.session_state.sample_just_loaded = False
if 'processed_document_active' not in st.session_state:
st.session_state.processed_document_active = False
if 'sample_document_processed' not in st.session_state:
st.session_state.sample_document_processed = False
# Use uploaded_file or sample_document if available
if 'sample_document' in st.session_state and st.session_state.sample_document is not None:
# Use the sample document
uploaded_file = st.session_state.sample_document
# Add a notice about using sample document with better style
st.markdown(
f"""
<div style="background-color: #D4EDDA; color: #155724; padding: 10px;
border-radius: 4px; border-left: 5px solid #155724; margin-bottom: 10px;">
<div style="display: flex; justify-content: space-between; align-items: center;">
<span style="font-weight: bold;">Sample Document: {uploaded_file.name}</span>
</div>
</div>
""",
unsafe_allow_html=True
)
# Set auto-process flag in session state if this is a newly loaded sample
if st.session_state.sample_just_loaded:
st.session_state.auto_process_sample = True
# Mark that this is a sample document being processed
st.session_state.sample_document_processed = True
st.session_state.sample_just_loaded = False
# Clear sample document after use to avoid interference with future uploads
st.session_state.sample_document = None
if uploaded_file is not None:
# Check file size (cap at 50MB)
file_size_mb = len(uploaded_file.getvalue()) / (1024 * 1024)
if file_size_mb > 50:
with left_col:
st.error(f"File too large ({file_size_mb:.1f} MB). Maximum file size is 50MB.")
st.stop()
file_ext = Path(uploaded_file.name).suffix.lower()
# Process button - flush left with similar padding as file browser
with left_col:
# Make the button more clear about its function
if st.session_state.processed_document_active:
process_button = st.button("Process Document Again")
else:
process_button = st.button("Process Document")
# Empty container for progress indicators - will be filled during processing
# Positioned right after the process button for better visibility
progress_placeholder = st.empty()
# Image preprocessing preview - automatically show only the preprocessed version
if any(preprocessing_options.values()) and uploaded_file.type.startswith('image/'):
st.markdown("**Preprocessed Preview**")
try:
# Create a container for the preview to better control layout
with st.container():
processed_bytes = preprocess_image(uploaded_file.getvalue(), preprocessing_options)
# Use use_container_width=True for responsive design
st.image(io.BytesIO(processed_bytes), use_container_width=True)
# Show preprocessing metadata in a well-formatted caption
meta_items = []
if preprocessing_options.get("document_type", "standard") != "standard":
meta_items.append(f"Document type ({preprocessing_options['document_type']})")
if preprocessing_options.get("grayscale", False):
meta_items.append("Grayscale")
if preprocessing_options.get("denoise", False):
meta_items.append("Denoise")
if preprocessing_options.get("contrast", 0) != 0:
meta_items.append(f"Contrast ({preprocessing_options['contrast']})")
if preprocessing_options.get("rotation", 0) != 0:
meta_items.append(f"Rotation ({preprocessing_options['rotation']}Β°)")
# Only show "Applied:" if there are actual preprocessing steps
if meta_items:
meta_text = "Applied: " + ", ".join(meta_items)
st.caption(meta_text)
except Exception as e:
st.error(f"Error in preprocessing: {str(e)}")
st.info("Try using grayscale preprocessing for PNG images with transparency")
# Container for success message (will be filled after processing)
# No extra spacing needed as it will be managed programmatically
metadata_placeholder = st.empty()
# We now have a close button next to the success message, so we don't need one here
# auto_process_sample is already initialized at the top of the function
# processed_document_active is already initialized at the top of the function
# We'll determine processing logic below
# Check if this is an auto-processing situation
auto_processing = st.session_state.auto_process_sample and not st.session_state.processed_document_active
# Show a message if auto-processing is happening
if auto_processing:
st.info("Automatically processing sample document...")
# Determine if we should process the document
# Either process button was clicked OR auto-processing is happening
should_process = process_button or auto_processing
if should_process:
# Reset auto-process flag to avoid processing on next rerun
if st.session_state.auto_process_sample:
st.session_state.auto_process_sample = False
# Move the progress indicator reference to just below the button
progress_container = progress_placeholder
try:
# Get max_pages or default if not available
max_pages_value = max_pages if 'max_pages' in locals() else None
# Apply performance mode settings
if 'perf_mode' in locals():
if perf_mode == "Speed":
# Override settings for faster processing
if 'preprocessing_options' in locals():
preprocessing_options["denoise"] = False # Skip denoising for speed
if 'pdf_dpi' in locals() and file_ext.lower() == '.pdf':
pdf_dpi = min(pdf_dpi, 100) # Lower DPI for speed
# Process file with or without custom prompt
if custom_prompt and custom_prompt.strip():
# Process with custom instructions for the AI
with progress_placeholder.container():
progress_bar = st.progress(0)
status_text = st.empty()
status_text.markdown('<div class="processing-status-container">Processing with custom instructions...</div>', unsafe_allow_html=True)
progress_bar.progress(30)
# Special handling for PDF files with custom prompts
if file_ext.lower() == ".pdf":
# For PDFs with custom prompts, we use a special two-step process
with progress_placeholder.container():
status_text.markdown('<div class="processing-status-container">Using special PDF processing for custom instructions...</div>', unsafe_allow_html=True)
progress_bar.progress(40)
try:
# Process directly in one step for better performance
processor = StructuredOCR()
# First save the PDF to a temp file
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp:
tmp.write(uploaded_file.getvalue())
temp_path = tmp.name
# Apply PDF rotation if specified
pdf_rotation_value = pdf_rotation if 'pdf_rotation' in locals() else 0
# Add document type hints to custom prompt if available from document type selector
if custom_prompt and custom_prompt is not None and 'selected_doc_type' in locals() and selected_doc_type != "Auto-detect (standard processing)" and "This is a" not in str(custom_prompt):
# Extract just the document type from the selector
doc_type_hint = selected_doc_type.split(" or ")[0].lower()
# Prepend to the custom prompt
custom_prompt = f"This is a {doc_type_hint}. {custom_prompt}"
# Process in a single step with simplified custom prompt
if custom_prompt:
# Detect document type from custom prompt
doc_type = "general"
if any(keyword in custom_prompt.lower() for keyword in ["newspaper", "column", "article", "magazine"]):
doc_type = "newspaper"
elif any(keyword in custom_prompt.lower() for keyword in ["letter", "correspondence", "handwritten"]):
doc_type = "letter"
elif any(keyword in custom_prompt.lower() for keyword in ["book", "publication"]):
doc_type = "book"
elif any(keyword in custom_prompt.lower() for keyword in ["form", "certificate", "legal"]):
doc_type = "form"
elif any(keyword in custom_prompt.lower() for keyword in ["recipe", "ingredients"]):
doc_type = "recipe"
# Format the custom prompt for better Mistral processing
if len(custom_prompt) > 250:
# Truncate long custom prompts but preserve essential info
simplified_prompt = f"DOCUMENT TYPE: {doc_type}\nINSTRUCTIONS: {custom_prompt[:250]}..."
else:
simplified_prompt = f"DOCUMENT TYPE: {doc_type}\nINSTRUCTIONS: {custom_prompt}"
else:
simplified_prompt = custom_prompt
progress_bar.progress(50)
# Check if we have custom instructions
has_custom_prompt = custom_prompt is not None and len(str(custom_prompt).strip()) > 0
if has_custom_prompt:
status_text.markdown('<div class="processing-status-container">Processing PDF with custom instructions...</div>', unsafe_allow_html=True)
else:
status_text.markdown('<div class="processing-status-container">Processing PDF with optimized settings...</div>', unsafe_allow_html=True)
# Process directly with optimized settings
result = processor.process_file(
file_path=temp_path,
file_type="pdf",
use_vision=use_vision,
custom_prompt=simplified_prompt,
file_size_mb=len(uploaded_file.getvalue()) / (1024 * 1024),
pdf_rotation=pdf_rotation_value
)
progress_bar.progress(90)
status_text.markdown('<div class="processing-status-container">Finalizing results...</div>', unsafe_allow_html=True)
# Clean up temp file
if os.path.exists(temp_path):
os.unlink(temp_path)
except Exception as e:
# If anything fails, revert to standard processing
st.warning(f"Special PDF processing failed. Falling back to standard method: {str(e)}")
result = process_file(uploaded_file, use_vision, {}, progress_container=progress_placeholder)
else:
# For non-PDF files, use normal processing with custom prompt
# Save the uploaded file to a temporary file with preprocessing
with tempfile.NamedTemporaryFile(delete=False, suffix=Path(uploaded_file.name).suffix) as tmp:
# Apply preprocessing if any options are selected
if any(preprocessing_options.values()):
# Apply performance mode settings
if 'perf_mode' in locals() and perf_mode == "Speed":
# Skip denoising for speed in preprocessing
speed_preprocessing = preprocessing_options.copy()
speed_preprocessing["denoise"] = False
processed_bytes = preprocess_image(uploaded_file.getvalue(), speed_preprocessing)
else:
processed_bytes = preprocess_image(uploaded_file.getvalue(), preprocessing_options)
tmp.write(processed_bytes)
else:
tmp.write(uploaded_file.getvalue())
temp_path = tmp.name
# Show progress
with progress_placeholder.container():
progress_bar.progress(50)
status_text.markdown('<div class="processing-status-container">Analyzing with custom instructions...</div>', unsafe_allow_html=True)
# Initialize OCR processor and process with custom prompt
processor = StructuredOCR()
# Detect document type from custom prompt
doc_type = "general"
if any(keyword in custom_prompt.lower() for keyword in ["newspaper", "column", "article", "magazine"]):
doc_type = "newspaper"
elif any(keyword in custom_prompt.lower() for keyword in ["letter", "correspondence", "handwritten"]):
doc_type = "letter"
elif any(keyword in custom_prompt.lower() for keyword in ["book", "publication"]):
doc_type = "book"
elif any(keyword in custom_prompt.lower() for keyword in ["form", "certificate", "legal"]):
doc_type = "form"
elif any(keyword in custom_prompt.lower() for keyword in ["recipe", "ingredients"]):
doc_type = "recipe"
# Format the custom prompt for better Mistral processing
formatted_prompt = f"DOCUMENT TYPE: {doc_type}\nUSER INSTRUCTIONS: {custom_prompt.strip()}\nPay special attention to these instructions and respond accordingly."
try:
result = processor.process_file(
file_path=temp_path,
file_type="image", # Always use image for non-PDFs
use_vision=use_vision,
custom_prompt=formatted_prompt,
file_size_mb=len(uploaded_file.getvalue()) / (1024 * 1024)
)
except Exception as e:
# For any error, fall back to standard processing
st.warning(f"Custom prompt processing failed. Falling back to standard processing: {str(e)}")
result = process_file(uploaded_file, use_vision, preprocessing_options, progress_container=progress_placeholder)
# Complete progress
with progress_placeholder.container():
progress_bar.progress(100)
status_text.markdown('<div class="processing-status-container">Processing complete!</div>', unsafe_allow_html=True)
time.sleep(0.8)
progress_placeholder.empty()
# Clean up temporary file
if os.path.exists(temp_path):
try:
os.unlink(temp_path)
except:
pass
else:
# Standard processing without custom prompt
result = process_file(uploaded_file, use_vision, preprocessing_options, progress_container=progress_placeholder)
# Document results will be shown in the right column
with right_col:
# Add Document Metadata section header
st.subheader("Document Metadata")
# Create metadata card with standard styling
metadata_html = '<div class="metadata-card" style="padding:15px; margin-bottom:20px;">'
# File info
metadata_html += f'<p><strong>File Name:</strong> {result.get("file_name", uploaded_file.name)}</p>'
# Info about limited pages
if 'limited_pages' in result:
metadata_html += f'<p style="padding:8px; border-radius:4px;"><strong>Pages:</strong> {result["limited_pages"]["processed"]} of {result["limited_pages"]["total"]} processed</p>'
# Languages
if 'languages' in result:
languages = [lang for lang in result['languages'] if lang is not None]
if languages:
metadata_html += f'<p><strong>Languages:</strong> {", ".join(languages)}</p>'
# Topics - show all subject tags with max of 8
if 'topics' in result and result['topics']:
topics_display = result['topics'][:8]
topics_str = ", ".join(topics_display)
# Add indicator if there are more tags
if len(result['topics']) > 8:
topics_str += f" + {len(result['topics']) - 8} more"
metadata_html += f'<p><strong>Subject Tags:</strong> {topics_str}</p>'
# Document type - using simplified labeling consistent with user instructions
if 'detected_document_type' in result:
# Get clean document type label - removing "historical" prefix if present
doc_type = result['detected_document_type'].lower()
if doc_type.startswith("historical "):
doc_type = doc_type[len("historical "):]
# Capitalize first letter of each word for display
doc_type = ' '.join(word.capitalize() for word in doc_type.split())
metadata_html += f'<p><strong>Document Type:</strong> {doc_type}</p>'
# Processing time
if 'processing_time' in result:
proc_time = result['processing_time']
metadata_html += f'<p><strong>Processing Time:</strong> {proc_time:.1f}s</p>'
# Custom prompt indicator with special styling - simplified and only showing when there are actual instructions
# Only show when custom_prompt exists in the session AND has content, or when the result explicitly states it was applied
has_instructions = ('custom_prompt' in locals() and custom_prompt and len(str(custom_prompt).strip()) > 0)
if has_instructions or 'custom_prompt_applied' in result:
# Use consistent styling with other metadata fields
metadata_html += f'<p><strong>Advanced Analysis:</strong> Custom instructions applied</p>'
# Close the metadata card
metadata_html += '</div>'
# Render the metadata HTML
st.markdown(metadata_html, unsafe_allow_html=True)
# Add content section heading - using standard subheader
st.subheader("Document Content")
# Start document content div with consistent styling class
st.markdown('<div class="document-content" style="margin-top:10px;">', unsafe_allow_html=True)
if 'ocr_contents' in result:
# Check for has_images in the result
has_images = result.get('has_images', False)
# Create tabs for different views
if has_images:
view_tab1, view_tab2, view_tab3 = st.tabs(["Structured View", "Raw JSON", "With Images"])
else:
view_tab1, view_tab2 = st.tabs(["Structured View", "Raw JSON"])
with view_tab1:
# Display in a more user-friendly format based on the content structure
html_content = ""
if isinstance(result['ocr_contents'], dict):
for section, content in result['ocr_contents'].items():
if content: # Only display non-empty sections
# Add consistent styling for each section
section_title = f'<h4 style="font-family: Georgia, serif; font-size: 18px; margin-top: 20px; margin-bottom: 10px;">{section.replace("_", " ").title()}</h4>'
html_content += section_title
if isinstance(content, str):
# Optimize by using a expander for very long content
if len(content) > 1000:
# Format content for long text - bold everything after "... that"
preview_content = content[:1000] + "..." if len(content) > 1000 else content
if "... that" in content:
# For the preview (first 1000 chars)
if "... that" in preview_content:
parts = preview_content.split("... that", 1)
formatted_preview = f"{parts[0]}... that<strong>{parts[1]}</strong>"
html_content += f"<p style=\"font-size:16px;\">{formatted_preview}</p>"
else:
html_content += f"<p style=\"font-size:16px; font-weight:normal;\">{preview_content}</p>"
# For the full content in expander
parts = content.split("... that", 1)
formatted_full = f"{parts[0]}... that**{parts[1]}**"
st.markdown(f"#### {section.replace('_', ' ').title()}")
with st.expander("Show full content"):
st.markdown(formatted_full)
else:
html_content += f"<p style=\"font-size:16px; font-weight:normal;\">{preview_content}</p>"
st.markdown(f"#### {section.replace('_', ' ').title()}")
with st.expander("Show full content"):
st.write(content)
else:
# Format content - bold everything after "... that"
if "... that" in content:
parts = content.split("... that", 1)
formatted_content = f"{parts[0]}... that<strong>{parts[1]}</strong>"
html_content += f"<p style=\"font-size:16px;\">{formatted_content}</p>"
st.markdown(f"#### {section.replace('_', ' ').title()}")
st.markdown(f"{parts[0]}... that**{parts[1]}**")
else:
html_content += f"<p style=\"font-size:16px; font-weight:normal;\">{content}</p>"
st.markdown(f"#### {section.replace('_', ' ').title()}")
st.write(content)
elif isinstance(content, list):
html_list = "<ul>"
st.markdown(f"#### {section.replace('_', ' ').title()}")
# Limit display for very long lists
if len(content) > 20:
with st.expander(f"Show all {len(content)} items"):
for item in content:
if isinstance(item, str):
html_list += f"<li>{item}</li>"
st.write(f"- {item}")
elif isinstance(item, dict):
try:
st.json(item)
except Exception as e:
st.error(f"Error displaying JSON: {str(e)}")
st.code(str(item))
else:
for item in content:
if isinstance(item, str):
html_list += f"<li>{item}</li>"
st.write(f"- {item}")
elif isinstance(item, dict):
try:
st.json(item)
except Exception as e:
st.error(f"Error displaying JSON: {str(e)}")
st.code(str(item))
html_list += "</ul>"
html_content += html_list
elif isinstance(content, dict):
html_dict = "<dl>"
st.markdown(f"#### {section.replace('_', ' ').title()}")
for k, v in content.items():
html_dict += f"<dt>{k}</dt><dd>{v}</dd>"
st.write(f"**{k}:** {v}")
html_dict += "</dl>"
html_content += html_dict
# Add download button in a smaller section
with st.expander("Export Content"):
# Get original filename without extension
original_name = Path(result.get('file_name', uploaded_file.name)).stem
# HTML download button
html_bytes = html_content.encode()
st.download_button(
label="Download as HTML",
data=html_bytes,
file_name=f"{original_name}_processed.html",
mime="text/html"
)
with view_tab2:
# Show the raw JSON for developers, with an expander for large results
if len(json.dumps(result)) > 5000:
with st.expander("View full JSON"):
try:
st.json(result)
except Exception as e:
st.error(f"Error displaying JSON: {str(e)}")
# Fallback to string representation
st.code(str(result))
else:
try:
st.json(result)
except Exception as e:
st.error(f"Error displaying JSON: {str(e)}")
# Fallback to string representation
st.code(str(result))
if has_images and 'pages_data' in result:
with view_tab3:
# Use pages_data directly instead of raw_response
try:
# Use the serialized pages data
pages_data = result.get('pages_data', [])
if not pages_data:
st.warning("No image data found in the document.")
st.stop()
# Construct markdown from pages_data directly
from ocr_utils import replace_images_in_markdown
combined_markdown = ""
for page in pages_data:
page_markdown = page.get('markdown', '')
images = page.get('images', [])
# Create image dictionary
image_dict = {}
for img in images:
if 'id' in img and 'image_base64' in img:
image_dict[img['id']] = img['image_base64']
# Replace image references in markdown
if page_markdown and image_dict:
page_markdown = replace_images_in_markdown(page_markdown, image_dict)
combined_markdown += page_markdown + "\n\n---\n\n"
if not combined_markdown:
st.warning("No content with images found.")
st.stop()
# Add CSS for better image handling
st.markdown("""
<style>
.image-container {
margin: 20px 0;
text-align: center;
}
.markdown-text-container {
padding: 10px;
background-color: #f9f9f9;
border-radius: 5px;
}
.markdown-text-container img {
margin: 15px auto;
max-width: 90%;
max-height: 500px;
object-fit: contain;
border: 1px solid #ddd;
border-radius: 4px;
display: block;
}
.markdown-text-container p {
margin-bottom: 16px;
line-height: 1.6;
font-family: Georgia, serif;
}
.page-break {
border-top: 1px solid #ddd;
margin: 20px 0;
padding-top: 20px;
}
.page-text-content {
margin-bottom: 20px;
}
.text-block {
background-color: #fff;
padding: 15px;
border-radius: 4px;
border-left: 3px solid #546e7a;
margin-bottom: 15px;
color: #333;
}
.text-block p {
margin: 8px 0;
color: #333;
}
</style>
""", unsafe_allow_html=True)
# Process and display content with images properly
import re
# Process each page separately
pages_content = []
# Check if this is from a PDF processed through pdf2image
is_pdf2image = result.get('pdf_processing_method') == 'pdf2image'
for i, page in enumerate(pages_data):
page_markdown = page.get('markdown', '')
images = page.get('images', [])
if not page_markdown:
continue
# Create image dictionary
image_dict = {}
for img in images:
if 'id' in img and 'image_base64' in img:
image_dict[img['id']] = img['image_base64']
# Create HTML content for this page
page_html = f"<h3>Page {i+1}</h3>" if i > 0 else ""
# Display the raw text content first to ensure it's visible
page_html += f"<div class='page-text-content'>"
# Special handling for PDF2image processed documents
if is_pdf2image and i == 0 and 'ocr_contents' in result:
# Display all structured content from OCR for PDFs
page_html += "<div class='text-block pdf-content'>"
# Check if custom prompt was applied
if result.get('custom_prompt_applied') == 'text_only':
page_html += "<div class='prompt-info'><i>Custom analysis applied using text-only processing</i></div>"
ocr_contents = result.get('ocr_contents', {})
# Get a sorted list of sections to ensure consistent order
section_keys = sorted(ocr_contents.keys())
# Place important sections first
priority_sections = ['title', 'subtitle', 'header', 'publication', 'date', 'content', 'main_text']
for important in priority_sections:
if important in ocr_contents and important in section_keys:
section_keys.remove(important)
section_keys.insert(0, important)
for section in section_keys:
content = ocr_contents[section]
if section in ['raw_text', 'error', 'partial_text']:
continue # Skip these fields
section_title = section.replace('_', ' ').title()
page_html += f"<h4>{section_title}</h4>"
if isinstance(content, str):
# Convert newlines to <br> tags
content_html = content.replace('\n', '<br>')
page_html += f"<p>{content_html}</p>"
elif isinstance(content, list):
page_html += "<ul>"
for item in content:
if isinstance(item, str):
page_html += f"<li>{item}</li>"
elif isinstance(item, dict):
page_html += "<li>"
for k, v in item.items():
page_html += f"<strong>{k}:</strong> {v}<br>"
page_html += "</li>"
else:
page_html += f"<li>{str(item)}</li>"
page_html += "</ul>"
elif isinstance(content, dict):
for k, v in content.items():
if isinstance(v, str):
page_html += f"<p><strong>{k}:</strong> {v}</p>"
elif isinstance(v, list):
page_html += f"<p><strong>{k}:</strong></p><ul>"
for item in v:
page_html += f"<li>{item}</li>"
page_html += "</ul>"
else:
page_html += f"<p><strong>{k}:</strong> {str(v)}</p>"
page_html += "</div>"
else:
# Standard processing for regular documents
# Get all text content that isn't an image and add it first
text_content = []
for line in page_markdown.split("\n"):
if not re.search(r'!\[(.*?)\]\((.*?)\)', line) and line.strip():
text_content.append(line)
# Add the text content as a block
if text_content:
page_html += f"<div class='text-block'>"
for line in text_content:
page_html += f"<p>{line}</p>"
page_html += "</div>"
page_html += "</div>"
# Then add images separately
for line in page_markdown.split("\n"):
# Handle image lines
img_match = re.search(r'!\[(.*?)\]\((.*?)\)', line)
if img_match:
alt_text = img_match.group(1)
img_ref = img_match.group(2)
# Get the base64 data for this image ID
img_data = image_dict.get(img_ref, "")
if img_data:
img_html = f'<div class="image-container"><img src="{img_data}" alt="{alt_text}"></div>'
page_html += img_html
# Add page separator if not the last page
if i < len(pages_data) - 1:
page_html += '<div class="page-break"></div>'
pages_content.append(page_html)
# Combine all pages HTML
html_content = "\n".join(pages_content)
# Wrap the content in a div with the class for styling
st.markdown(f"""
<div class="markdown-text-container">
{html_content}
</div>
""", unsafe_allow_html=True)
# Create download HTML content
download_html = f"""
<html>
<head>
<style>
body {{
font-family: Georgia, serif;
line-height: 1.7;
margin: 0 auto;
max-width: 800px;
padding: 20px;
}}
img {{
max-width: 90%;
max-height: 500px;
object-fit: contain;
margin: 20px auto;
display: block;
border: 1px solid #ddd;
border-radius: 4px;
}}
.image-container {{
margin: 20px 0;
text-align: center;
}}
.page-break {{
border-top: 1px solid #ddd;
margin: 40px 0;
padding-top: 40px;
}}
h3 {{
color: #333;
border-bottom: 1px solid #eee;
padding-bottom: 10px;
}}
p {{
margin: 12px 0;
}}
.page-text-content {{
margin-bottom: 20px;
}}
.text-block {{
background-color: #f9f9f9;
padding: 15px;
border-radius: 4px;
border-left: 3px solid #546e7a;
margin-bottom: 15px;
color: #333;
}}
.text-block p {{
margin: 8px 0;
color: #333;
}}
</style>
</head>
<body>
<div class="markdown-text-container">
{html_content}
</div>
</body>
</html>
"""
# Create a more descriptive filename
original_name = Path(result.get('file_name', uploaded_file.name)).stem
# Add document type if available
if 'topics' in result and result['topics']:
topic = result['topics'][0].lower().replace(' ', '_')
original_name = f"{original_name}_{topic}"
# Add language if available
if 'languages' in result and result['languages']:
lang = result['languages'][0].lower()
# Only add if it's not already in the filename
if lang not in original_name.lower():
original_name = f"{original_name}_{lang}"
# Get current date for uniqueness
from datetime import datetime
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Create final filename
download_filename = f"{original_name}_{timestamp}_with_images.html"
# Add download button as an expander to prevent page reset
with st.expander("Download Document with Images"):
st.markdown("Click the button below to download the document with embedded images")
st.download_button(
label="Download as HTML",
data=download_html,
file_name=download_filename,
mime="text/html",
key="download_with_images_button"
)
except Exception as e:
st.error(f"Could not display document with images: {str(e)}")
st.info("Try refreshing or processing the document again.")
if 'ocr_contents' not in result:
st.error("No OCR content was extracted from the document.")
else:
# Check for minimal text content in OCR results
has_minimal_text = False
total_text_length = 0
# Check if the document is an image (not a PDF)
is_image = result.get('file_name', '').lower().endswith(('.jpg', '.jpeg', '.png', '.gif'))
# If image file with raw_text only
if is_image and 'ocr_contents' in result:
ocr_contents = result['ocr_contents']
# Check if only raw_text exists with minimal content
has_raw_text_only = False
if 'raw_text' in ocr_contents:
raw_text = ocr_contents['raw_text']
total_text_length += len(raw_text.strip())
# Check if raw_text is the only significant field
other_content_fields = [k for k in ocr_contents.keys()
if k not in ['raw_text', 'error', 'partial_text']
and isinstance(ocr_contents[k], (str, list))
and ocr_contents[k]]
if len(other_content_fields) <= 1: # Only raw_text or one other field
has_raw_text_only = True
# Check if minimal text was extracted (less than 50 characters)
if total_text_length < 50 and has_raw_text_only:
has_minimal_text = True
# Check if any meaningful preprocessing options were used
preprocessing_used = False
if preprocessing_options.get("document_type", "standard") != "standard":
preprocessing_used = True
if preprocessing_options.get("grayscale", False):
preprocessing_used = True
if preprocessing_options.get("denoise", False):
preprocessing_used = True
if preprocessing_options.get("contrast", 0) != 0:
preprocessing_used = True
if preprocessing_options.get("rotation", 0) != 0:
preprocessing_used = True
# If minimal text was found and preprocessing options weren't used
if has_minimal_text and not preprocessing_used and uploaded_file.type.startswith('image/'):
st.warning("""
**Limited text extracted from this image.**
Try using preprocessing options in the sidebar to improve results:
- Convert to grayscale for clearer text
- Use denoising for aged or degraded documents
- Adjust contrast for faded text
- Try different rotation if text orientation is unclear
Click the "Preprocessing Options" section in the sidebar under "Image Processing".
""")
# Close document content div
st.markdown('</div>', unsafe_allow_html=True)
# Set processed_document_active to True when a new document is processed
st.session_state.processed_document_active = True
# Add CSS for styling close buttons
st.markdown("""
<style>
.close-button {
background-color: #e7e7e7;
color: #666;
border: none;
border-radius: 4px;
padding: 0.25rem 0.5rem;
font-size: 0.8rem;
cursor: pointer;
transition: all 0.2s ease;
}
.close-button:hover {
background-color: #d7d7d7;
color: #333;
}
</style>
""", unsafe_allow_html=True)
# Display success message with close button for dismissing processed documents
success_cols = st.columns([5, 1])
with success_cols[0]:
metadata_placeholder.success("**Document processed successfully**")
with success_cols[1]:
# Use a more visually distinctive close button
st.markdown("""
<style>
div[data-testid="stButton"] button {
background-color: #f8f9fa;
border: 1px solid #dee2e6;
padding: 0.25rem 0.5rem;
font-size: 0.875rem;
border-radius: 0.2rem;
}
</style>
""", unsafe_allow_html=True)
if st.button("βœ• Close Document", key="close_document_button", help="Clear current document and start over"):
# Clear the session state
st.session_state.processed_document_active = False
# Reset any active document data
if 'current_result' in st.session_state:
del st.session_state.current_result
# Rerun to reset the page
st.rerun()
# Store the result in the previous results list
# Add timestamp to result for history tracking
result_copy = result.copy()
result_copy['timestamp'] = datetime.now().strftime("%Y-%m-%d %H:%M")
# Store if this was a sample document
if 'sample_document_processed' in st.session_state and st.session_state.sample_document_processed:
result_copy['sample_document'] = True
# Reset the flag
st.session_state.sample_document_processed = False
# Generate more descriptive file name for the result
original_name = Path(result.get('file_name', uploaded_file.name)).stem
# Extract subject tags from content
subject_tags = []
# First check if we already have topics in the result
if 'topics' in result and result['topics'] and len(result['topics']) >= 3:
subject_tags = result['topics']
else:
# Generate tags based on document content
try:
# Extract text from OCR contents
raw_text = ""
if 'ocr_contents' in result:
if 'raw_text' in result['ocr_contents']:
raw_text = result['ocr_contents']['raw_text']
elif 'content' in result['ocr_contents']:
raw_text = result['ocr_contents']['content']
# Use existing topics as starting point if available
if 'topics' in result and result['topics']:
subject_tags = list(result['topics'])
# Add document type if detected
if 'detected_document_type' in result:
doc_type = result['detected_document_type'].capitalize()
if doc_type not in subject_tags:
subject_tags.append(doc_type)
# Analyze content for common themes based on keywords
content_themes = {
"Historical": ["century", "ancient", "historical", "history", "vintage", "archive", "heritage"],
"Travel": ["travel", "journey", "expedition", "exploration", "voyage", "map", "location"],
"Science": ["experiment", "research", "study", "analysis", "scientific", "laboratory"],
"Literature": ["book", "novel", "poetry", "author", "literary", "chapter", "story"],
"Art": ["painting", "illustration", "drawing", "artist", "exhibit", "gallery", "portrait"],
"Education": ["education", "school", "university", "college", "learning", "student", "teach"],
"Politics": ["government", "political", "policy", "administration", "election", "legislature"],
"Business": ["business", "company", "corporation", "market", "industry", "commercial", "trade"],
"Social": ["society", "community", "social", "culture", "tradition", "customs"],
"Technology": ["technology", "invention", "device", "mechanical", "machine", "technical"],
"Military": ["military", "army", "navy", "war", "battle", "soldier", "weapon"],
"Religion": ["religion", "church", "temple", "spiritual", "sacred", "ritual"],
"Medicine": ["medical", "medicine", "health", "hospital", "treatment", "disease", "doctor"],
"Legal": ["legal", "law", "court", "justice", "attorney", "judicial", "statute"],
"Correspondence": ["letter", "mail", "correspondence", "message", "communication"]
}
# Search for keywords in content
if raw_text:
raw_text_lower = raw_text.lower()
for theme, keywords in content_themes.items():
if any(keyword in raw_text_lower for keyword in keywords):
if theme not in subject_tags:
subject_tags.append(theme)
# Add document period tag if date patterns are detected
if raw_text:
# Look for years in content
import re
year_matches = re.findall(r'\b1[0-9]{3}\b|\b20[0-1][0-9]\b', raw_text)
if year_matches:
# Convert to integers
years = [int(y) for y in year_matches]
# Get earliest and latest years
earliest = min(years)
# Add period tag based on earliest year
if earliest < 1800:
period_tag = "Pre-1800s"
elif earliest < 1850:
period_tag = "Early 19th Century"
elif earliest < 1900:
period_tag = "Late 19th Century"
elif earliest < 1950:
period_tag = "Early 20th Century"
else:
period_tag = "Modern Era"
if period_tag not in subject_tags:
subject_tags.append(period_tag)
# Add languages as topics if available
if 'languages' in result and result['languages']:
for lang in result['languages']:
if lang and lang not in subject_tags:
lang_tag = f"{lang} Language"
subject_tags.append(lang_tag)
# Add preprocessing information as tags if preprocessing was applied
if uploaded_file.type.startswith('image/'):
# Check if meaningful preprocessing options were used
if preprocessing_options.get("document_type", "standard") != "standard":
doc_type = preprocessing_options["document_type"].capitalize()
preprocessing_tag = f"Enhanced ({doc_type})"
if preprocessing_tag not in subject_tags:
subject_tags.append(preprocessing_tag)
preprocessing_methods = []
if preprocessing_options.get("grayscale", False):
preprocessing_methods.append("Grayscale")
if preprocessing_options.get("denoise", False):
preprocessing_methods.append("Denoised")
if preprocessing_options.get("contrast", 0) != 0:
contrast_val = preprocessing_options.get("contrast", 0)
if contrast_val > 0:
preprocessing_methods.append("Contrast Enhanced")
else:
preprocessing_methods.append("Contrast Reduced")
if preprocessing_options.get("rotation", 0) != 0:
preprocessing_methods.append("Rotated")
# Add a combined preprocessing tag if methods were applied
if preprocessing_methods:
prep_tag = "Preprocessed"
if prep_tag not in subject_tags:
subject_tags.append(prep_tag)
# Add the specific method as a tag if only one was used
if len(preprocessing_methods) == 1:
method_tag = preprocessing_methods[0]
if method_tag not in subject_tags:
subject_tags.append(method_tag)
except Exception as e:
logger.warning(f"Error generating subject tags: {str(e)}")
# Fallback tags if extraction fails
if not subject_tags:
subject_tags = ["Document", "Historical", "Text"]
# Ensure we have at least 3 tags
while len(subject_tags) < 3:
if "Document" not in subject_tags:
subject_tags.append("Document")
elif "Historical" not in subject_tags:
subject_tags.append("Historical")
elif "Text" not in subject_tags:
subject_tags.append("Text")
else:
# If we still need tags, add generic ones
generic_tags = ["Archive", "Content", "Record"]
for tag in generic_tags:
if tag not in subject_tags:
subject_tags.append(tag)
break
# Update the result with enhanced tags
result_copy['topics'] = subject_tags
# Create a more descriptive file name
file_type = Path(result.get('file_name', uploaded_file.name)).suffix.lower()
doc_type_tag = ""
# Add document type to filename if detected
if 'detected_document_type' in result:
doc_type = result['detected_document_type'].lower()
doc_type_tag = f"_{doc_type}"
elif len(subject_tags) > 0:
# Use first tag as document type if not explicitly detected
doc_type_tag = f"_{subject_tags[0].lower().replace(' ', '_')}"
# Add period tag for historical context if available
period_tag = ""
for tag in subject_tags:
if "century" in tag.lower() or "pre-" in tag.lower() or "era" in tag.lower():
period_tag = f"_{tag.lower().replace(' ', '_')}"
break
# Generate final descriptive file name
descriptive_name = f"{original_name}{doc_type_tag}{period_tag}{file_type}"
result_copy['descriptive_file_name'] = descriptive_name
# Add to session state, keeping the most recent 20 results
st.session_state.previous_results.insert(0, result_copy)
if len(st.session_state.previous_results) > 20:
st.session_state.previous_results = st.session_state.previous_results[:20]
except Exception as e:
st.error(f"Error processing document: {str(e)}")
else:
# Example Documents section after file uploader
st.subheader("Example Documents")
# Add a simplified info message about examples
st.markdown("""
This app can process various historical documents:
- Historical photographs, maps, and manuscripts
- Handwritten letters and documents
- Printed books and articles
- Multi-page PDFs
""")
# Add CSS to make the dropdown match the column width
st.markdown("""
<style>
/* Make the selectbox container match the full column width */
.main .block-container .element-container:has([data-testid="stSelectbox"]) {
width: 100% !important;
max-width: 100% !important;
}
/* Make the actual selectbox control take the full width */
.stSelectbox > div > div {
width: 100% !important;
max-width: 100% !important;
}
</style>
""", unsafe_allow_html=True)
# Sample document URLs dropdown with clearer label
sample_urls = [
"Select a sample document",
"https://huggingface.co/spaces/milwright/historical-ocr/resolve/main/input/a-la-carte.pdf",
"https://huggingface.co/spaces/milwright/historical-ocr/resolve/main/input/magician-or-bottle-cungerer.jpg",
"https://huggingface.co/spaces/milwright/historical-ocr/resolve/main/input/handwritten-letter.jpg",
"https://huggingface.co/spaces/milwright/historical-ocr/resolve/main/input/magellan-travels.jpg",
"https://huggingface.co/spaces/milwright/historical-ocr/resolve/main/input/milgram-flier.png",
"https://huggingface.co/spaces/milwright/historical-ocr/resolve/main/input/baldwin-15st-north.jpg"
]
sample_names = [
"Select a sample document",
"Restaurant Menu (PDF)",
"The Magician (Image)",
"Handwritten Letter (Image)",
"Magellan Travels (Image)",
"Milgram Flier (Image)",
"Baldwin Street (Image)"
]
# Initialize sample_document in session state if it doesn't exist
if 'sample_document' not in st.session_state:
st.session_state.sample_document = None
selected_sample = st.selectbox("Select a sample document from `~/input`", options=range(len(sample_urls)), format_func=lambda i: sample_names[i])
if selected_sample > 0:
selected_url = sample_urls[selected_sample]
# Add process button for the sample document
if st.button("Load Sample Document"):
try:
import requests
from io import BytesIO
with st.spinner(f"Downloading {sample_names[selected_sample]}..."):
response = requests.get(selected_url)
response.raise_for_status()
# Extract filename from URL
file_name = selected_url.split("/")[-1]
# Create a BytesIO object from the downloaded content
file_content = BytesIO(response.content)
# Store as a UploadedFile-like object in session state
class SampleDocument:
def __init__(self, name, content, content_type):
self.name = name
self._content = content
self.type = content_type
self.size = len(content)
def getvalue(self):
return self._content
def read(self):
return self._content
def seek(self, position):
# Implement seek for compatibility with some file operations
return
def tell(self):
# Implement tell for compatibility
return 0
# Determine content type based on file extension
if file_name.lower().endswith('.pdf'):
content_type = 'application/pdf'
elif file_name.lower().endswith(('.jpg', '.jpeg')):
content_type = 'image/jpeg'
elif file_name.lower().endswith('.png'):
content_type = 'image/png'
else:
content_type = 'application/octet-stream'
# Save download info in session state for more reliable handling
st.session_state.sample_document = SampleDocument(
name=file_name,
content=response.content,
content_type=content_type
)
# Set a flag to indicate this is a newly loaded sample
st.session_state.sample_just_loaded = True
# Force rerun to load the document
st.rerun()
except Exception as e:
st.error(f"Error downloading sample document: {str(e)}")
st.info("Please try uploading your own document instead.")