Spaces:
Running
Running
import os | |
import streamlit as st | |
import json | |
import sys | |
import time | |
import base64 | |
# Updated import section | |
from pathlib import Path | |
import tempfile | |
import io | |
from pdf2image import convert_from_bytes | |
from PIL import Image, ImageEnhance, ImageFilter | |
import cv2 | |
import numpy as np | |
from datetime import datetime | |
# Import the StructuredOCR class and config from the local files | |
from structured_ocr import StructuredOCR | |
from config import MISTRAL_API_KEY | |
# Import utilities for handling previous results | |
from ocr_utils import create_results_zip | |
def get_base64_from_image(image_path): | |
"""Get base64 string from image file""" | |
with open(image_path, "rb") as img_file: | |
return base64.b64encode(img_file.read()).decode('utf-8') | |
# Set favicon path | |
favicon_path = os.path.join(os.path.dirname(__file__), "static/favicon.png") | |
# Set page configuration | |
st.set_page_config( | |
page_title="Historical OCR", | |
page_icon=favicon_path if os.path.exists(favicon_path) else "π", | |
layout="wide", | |
initial_sidebar_state="expanded" | |
) | |
# Enable caching for expensive operations with longer TTL for better performance | |
# Cache for 24 hours instead of 1 hour | |
def convert_pdf_to_images(pdf_bytes, dpi=150, rotation=0): | |
"""Convert PDF bytes to a list of images with caching""" | |
try: | |
images = convert_from_bytes(pdf_bytes, dpi=dpi) | |
# Apply rotation if specified | |
if rotation != 0 and images: | |
rotated_images = [] | |
for img in images: | |
rotated_img = img.rotate(rotation, expand=True, resample=Image.BICUBIC) | |
rotated_images.append(rotated_img) | |
return rotated_images | |
return images | |
except Exception as e: | |
st.error(f"Error converting PDF: {str(e)}") | |
return [] | |
# Cache preprocessed images for better performance | |
# Cache for 24 hours | |
def preprocess_image(image_bytes, preprocessing_options): | |
"""Preprocess image with selected options optimized for historical document OCR quality""" | |
# Setup basic console logging | |
import logging | |
logger = logging.getLogger("image_preprocessor") | |
logger.setLevel(logging.INFO) | |
# Log which preprocessing options are being applied | |
logger.info(f"Preprocessing image with options: {preprocessing_options}") | |
# Convert bytes to PIL Image | |
image = Image.open(io.BytesIO(image_bytes)) | |
# Check for alpha channel (RGBA) and convert to RGB if needed | |
if image.mode == 'RGBA': | |
# Convert RGBA to RGB by compositing the image onto a white background | |
background = Image.new('RGB', image.size, (255, 255, 255)) | |
background.paste(image, mask=image.split()[3]) # 3 is the alpha channel | |
image = background | |
logger.info("Converted RGBA image to RGB") | |
elif image.mode not in ('RGB', 'L'): | |
# Convert other modes to RGB as well | |
image = image.convert('RGB') | |
logger.info(f"Converted {image.mode} image to RGB") | |
# Apply rotation if specified | |
if preprocessing_options.get("rotation", 0) != 0: | |
rotation_degrees = preprocessing_options.get("rotation") | |
image = image.rotate(rotation_degrees, expand=True, resample=Image.BICUBIC) | |
# Resize large images while preserving details important for OCR | |
width, height = image.size | |
max_dimension = max(width, height) | |
# Less aggressive resizing to preserve document details | |
if max_dimension > 2500: | |
scale_factor = 2500 / max_dimension | |
new_width = int(width * scale_factor) | |
new_height = int(height * scale_factor) | |
# Use LANCZOS for better quality preservation | |
image = image.resize((new_width, new_height), Image.LANCZOS) | |
img_array = np.array(image) | |
# Apply preprocessing based on selected options with settings optimized for historical documents | |
document_type = preprocessing_options.get("document_type", "standard") | |
# Process grayscale option first as it's a common foundation | |
if preprocessing_options.get("grayscale", False): | |
if len(img_array.shape) == 3: # Only convert if it's not already grayscale | |
if document_type == "handwritten": | |
# Enhanced grayscale processing for handwritten documents | |
img_array = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY) | |
# Apply adaptive histogram equalization to enhance handwriting | |
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8)) | |
img_array = clahe.apply(img_array) | |
else: | |
# Standard grayscale for printed documents | |
img_array = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY) | |
# Convert back to RGB for further processing | |
img_array = cv2.cvtColor(img_array, cv2.COLOR_GRAY2RGB) | |
if preprocessing_options.get("contrast", 0) != 0: | |
contrast_factor = 1 + (preprocessing_options.get("contrast", 0) / 10) | |
image = Image.fromarray(img_array) | |
enhancer = ImageEnhance.Contrast(image) | |
image = enhancer.enhance(contrast_factor) | |
img_array = np.array(image) | |
if preprocessing_options.get("denoise", False): | |
try: | |
# Apply appropriate denoising based on document type | |
if document_type == "handwritten": | |
# Very light denoising for handwritten documents to preserve pen strokes | |
if len(img_array.shape) == 3 and img_array.shape[2] == 3: # Color image | |
img_array = cv2.fastNlMeansDenoisingColored(img_array, None, 3, 3, 5, 9) | |
else: # Grayscale image | |
img_array = cv2.fastNlMeansDenoising(img_array, None, 3, 7, 21) | |
else: | |
# Standard denoising for printed documents | |
if len(img_array.shape) == 3 and img_array.shape[2] == 3: # Color image | |
img_array = cv2.fastNlMeansDenoisingColored(img_array, None, 5, 5, 7, 21) | |
else: # Grayscale image | |
img_array = cv2.fastNlMeansDenoising(img_array, None, 5, 7, 21) | |
except Exception as e: | |
print(f"Denoising error: {str(e)}, falling back to standard processing") | |
# Convert back to PIL Image | |
processed_image = Image.fromarray(img_array) | |
# Higher quality for OCR processing | |
byte_io = io.BytesIO() | |
try: | |
# Make sure the image is in RGB mode before saving as JPEG | |
if processed_image.mode not in ('RGB', 'L'): | |
processed_image = processed_image.convert('RGB') | |
processed_image.save(byte_io, format='JPEG', quality=92, optimize=True) | |
byte_io.seek(0) | |
logger.info(f"Preprocessing complete. Original image mode: {image.mode}, processed mode: {processed_image.mode}") | |
logger.info(f"Original size: {len(image_bytes)/1024:.1f}KB, processed size: {len(byte_io.getvalue())/1024:.1f}KB") | |
return byte_io.getvalue() | |
except Exception as e: | |
logger.error(f"Error saving processed image: {str(e)}") | |
# Fallback to original image | |
logger.info("Using original image as fallback") | |
image_io = io.BytesIO() | |
image.save(image_io, format='JPEG', quality=92) | |
image_io.seek(0) | |
return image_io.getvalue() | |
# Cache OCR results in memory to speed up repeated processing | |
def process_file_cached(file_path, file_type, use_vision, file_size_mb, cache_key): | |
"""Cached version of OCR processing to reuse results""" | |
# Initialize OCR processor | |
processor = StructuredOCR() | |
# Process the file | |
result = processor.process_file( | |
file_path, | |
file_type=file_type, | |
use_vision=use_vision, | |
file_size_mb=file_size_mb | |
) | |
return result | |
# Define functions | |
def process_file(uploaded_file, use_vision=True, preprocessing_options=None, progress_container=None): | |
"""Process the uploaded file and return the OCR results | |
Args: | |
uploaded_file: The uploaded file to process | |
use_vision: Whether to use vision model | |
preprocessing_options: Dictionary of preprocessing options | |
progress_container: Optional container for progress indicators | |
""" | |
if preprocessing_options is None: | |
preprocessing_options = {} | |
# Create a container for progress indicators if not provided | |
if progress_container is None: | |
progress_container = st.empty() | |
with progress_container.container(): | |
progress_bar = st.progress(0) | |
status_text = st.empty() | |
status_text.markdown('<div class="processing-status-container">Preparing file for processing...</div>', unsafe_allow_html=True) | |
try: | |
# Check if API key is available | |
if not MISTRAL_API_KEY: | |
# Return dummy data if no API key | |
progress_bar.progress(100) | |
status_text.empty() | |
return { | |
"file_name": uploaded_file.name, | |
"topics": ["Document"], | |
"languages": ["English"], | |
"ocr_contents": { | |
"title": "API Key Required", | |
"content": "Please set the MISTRAL_API_KEY environment variable to process documents." | |
} | |
} | |
# Update progress - more granular steps | |
progress_bar.progress(10) | |
status_text.markdown('<div class="processing-status-container">Initializing OCR processor...</div>', unsafe_allow_html=True) | |
# Determine file type from extension | |
file_ext = Path(uploaded_file.name).suffix.lower() | |
file_type = "pdf" if file_ext == ".pdf" else "image" | |
file_bytes = uploaded_file.getvalue() | |
# Create a temporary file for processing | |
with tempfile.NamedTemporaryFile(delete=False, suffix=file_ext) as tmp: | |
tmp.write(file_bytes) | |
temp_path = tmp.name | |
# Get PDF rotation value if available and file is a PDF | |
pdf_rotation_value = pdf_rotation if 'pdf_rotation' in locals() and file_type == "pdf" else 0 | |
progress_bar.progress(15) | |
# For PDFs, we need to handle differently | |
if file_type == "pdf": | |
status_text.markdown('<div class="processing-status-container">Converting PDF to images...</div>', unsafe_allow_html=True) | |
progress_bar.progress(20) | |
# Convert PDF to images | |
try: | |
# Use the PDF processing pipeline directly from the StructuredOCR class | |
processor = StructuredOCR() | |
# Process the file with direct PDF handling | |
progress_bar.progress(30) | |
status_text.markdown('<div class="processing-status-container">Processing PDF with OCR...</div>', unsafe_allow_html=True) | |
# Get file size in MB for API limits | |
file_size_mb = os.path.getsize(temp_path) / (1024 * 1024) | |
# Check if file exceeds API limits (50 MB) | |
if file_size_mb > 50: | |
os.unlink(temp_path) # Clean up temp file | |
progress_bar.progress(100) | |
status_text.empty() | |
progress_container.empty() | |
return { | |
"file_name": uploaded_file.name, | |
"topics": ["Document"], | |
"languages": ["English"], | |
"error": f"File size {file_size_mb:.2f} MB exceeds Mistral API limit of 50 MB", | |
"ocr_contents": { | |
"error": f"Failed to process file: File size {file_size_mb:.2f} MB exceeds Mistral API limit of 50 MB", | |
"partial_text": "Document could not be processed due to size limitations." | |
} | |
} | |
# Generate cache key | |
import hashlib | |
file_hash = hashlib.md5(file_bytes).hexdigest() | |
cache_key = f"{file_hash}_{file_type}_{use_vision}_{pdf_rotation_value}" | |
# Process with cached function if possible | |
try: | |
result = process_file_cached(temp_path, file_type, use_vision, file_size_mb, cache_key) | |
progress_bar.progress(90) | |
status_text.markdown('<div class="processing-status-container">Finalizing results...</div>', unsafe_allow_html=True) | |
except Exception as e: | |
status_text.markdown(f'<div class="processing-status-container">Processing error: {str(e)}. Retrying...</div>', unsafe_allow_html=True) | |
progress_bar.progress(60) | |
# If caching fails, process directly | |
result = processor.process_file( | |
temp_path, | |
file_type=file_type, | |
use_vision=use_vision, | |
file_size_mb=file_size_mb, | |
) | |
progress_bar.progress(90) | |
status_text.markdown('<div class="processing-status-container">Finalizing results...</div>', unsafe_allow_html=True) | |
except Exception as e: | |
os.unlink(temp_path) # Clean up temp file | |
progress_bar.progress(100) | |
status_text.empty() | |
progress_container.empty() | |
raise ValueError(f"Error processing PDF: {str(e)}") | |
else: | |
# For image files, apply preprocessing if needed | |
# Check if any preprocessing options with boolean values are True, or if any non-boolean values are non-default | |
has_preprocessing = ( | |
preprocessing_options.get("grayscale", False) or | |
preprocessing_options.get("denoise", False) or | |
preprocessing_options.get("contrast", 0) != 0 or | |
preprocessing_options.get("rotation", 0) != 0 or | |
preprocessing_options.get("document_type", "standard") != "standard" | |
) | |
# Add document type hints to custom prompt if available from document type selector - with safety checks | |
if ('custom_prompt' in locals() and custom_prompt and | |
'selected_doc_type' in locals() and selected_doc_type != "Auto-detect (standard processing)" and | |
"This is a" not in str(custom_prompt)): | |
# Extract just the document type from the selector | |
doc_type_hint = selected_doc_type.split(" or ")[0].lower() | |
# Prepend to the custom prompt | |
custom_prompt = f"This is a {doc_type_hint}. {custom_prompt}" | |
if has_preprocessing: | |
status_text.markdown('<div class="processing-status-container">Applying image preprocessing...</div>', unsafe_allow_html=True) | |
progress_bar.progress(20) | |
processed_bytes = preprocess_image(file_bytes, preprocessing_options) | |
progress_bar.progress(25) | |
# Save processed image to temp file | |
with tempfile.NamedTemporaryFile(delete=False, suffix=file_ext) as proc_tmp: | |
proc_tmp.write(processed_bytes) | |
# Clean up original temp file and use the processed one | |
if os.path.exists(temp_path): | |
os.unlink(temp_path) | |
temp_path = proc_tmp.name | |
progress_bar.progress(30) | |
else: | |
progress_bar.progress(30) | |
# Get file size in MB for API limits | |
file_size_mb = os.path.getsize(temp_path) / (1024 * 1024) | |
# Check if file exceeds API limits (50 MB) | |
if file_size_mb > 50: | |
os.unlink(temp_path) # Clean up temp file | |
progress_bar.progress(100) | |
status_text.empty() | |
progress_container.empty() | |
return { | |
"file_name": uploaded_file.name, | |
"topics": ["Document"], | |
"languages": ["English"], | |
"error": f"File size {file_size_mb:.2f} MB exceeds Mistral API limit of 50 MB", | |
"ocr_contents": { | |
"error": f"Failed to process file: File size {file_size_mb:.2f} MB exceeds Mistral API limit of 50 MB", | |
"partial_text": "Document could not be processed due to size limitations." | |
} | |
} | |
# Update progress - more granular steps | |
progress_bar.progress(40) | |
status_text.markdown('<div class="processing-status-container">Preparing document for OCR analysis...</div>', unsafe_allow_html=True) | |
# Generate a cache key based on file content, type and settings | |
import hashlib | |
# Add pdf_rotation to cache key if present | |
pdf_rotation_value = pdf_rotation if 'pdf_rotation' in locals() else 0 | |
file_hash = hashlib.md5(open(temp_path, 'rb').read()).hexdigest() | |
cache_key = f"{file_hash}_{file_type}_{use_vision}_{pdf_rotation_value}" | |
progress_bar.progress(50) | |
# Check if we have custom instructions | |
has_custom_prompt = 'custom_prompt' in locals() and custom_prompt and len(str(custom_prompt).strip()) > 0 | |
if has_custom_prompt: | |
status_text.markdown('<div class="processing-status-container">Processing document with custom instructions...</div>', unsafe_allow_html=True) | |
else: | |
status_text.markdown('<div class="processing-status-container">Processing document with OCR...</div>', unsafe_allow_html=True) | |
# Process the file using cached function if possible | |
try: | |
result = process_file_cached(temp_path, file_type, use_vision, file_size_mb, cache_key) | |
progress_bar.progress(80) | |
status_text.markdown('<div class="processing-status-container">Analyzing document structure...</div>', unsafe_allow_html=True) | |
progress_bar.progress(90) | |
status_text.markdown('<div class="processing-status-container">Finalizing results...</div>', unsafe_allow_html=True) | |
except Exception as e: | |
progress_bar.progress(60) | |
status_text.markdown(f'<div class="processing-status-container">Processing error: {str(e)}. Retrying...</div>', unsafe_allow_html=True) | |
# If caching fails, process directly | |
processor = StructuredOCR() | |
result = processor.process_file(temp_path, file_type=file_type, use_vision=use_vision, file_size_mb=file_size_mb) | |
progress_bar.progress(90) | |
status_text.markdown('<div class="processing-status-container">Finalizing results...</div>', unsafe_allow_html=True) | |
# Complete progress | |
progress_bar.progress(100) | |
status_text.markdown('<div class="processing-status-container">Processing complete!</div>', unsafe_allow_html=True) | |
time.sleep(0.8) # Brief pause to show completion | |
status_text.empty() | |
progress_container.empty() # Remove progress indicators when done | |
# Clean up the temporary file | |
if os.path.exists(temp_path): | |
try: | |
os.unlink(temp_path) | |
except: | |
pass # Ignore errors when cleaning up temporary files | |
return result | |
except Exception as e: | |
progress_bar.progress(100) | |
error_message = str(e) | |
# Check for specific error types and provide helpful user-facing messages | |
if "rate limit" in error_message.lower() or "429" in error_message or "requests rate limit exceeded" in error_message.lower(): | |
friendly_message = "The AI service is currently experiencing high demand. Please try again in a few minutes." | |
logger = logging.getLogger("app") | |
logger.error(f"Rate limit error: {error_message}") | |
status_text.markdown(f'<div class="processing-status-container" style="border-left-color: #ff9800;">Rate Limit: {friendly_message}</div>', unsafe_allow_html=True) | |
elif "quota" in error_message.lower() or "credit" in error_message.lower() or "subscription" in error_message.lower(): | |
friendly_message = "The API usage quota has been reached. Please check your API key and subscription limits." | |
status_text.markdown(f'<div class="processing-status-container" style="border-left-color: #ef5350;">API Quota: {friendly_message}</div>', unsafe_allow_html=True) | |
else: | |
status_text.markdown(f'<div class="processing-status-container" style="border-left-color: #ef5350;">Error: {error_message}</div>', unsafe_allow_html=True) | |
time.sleep(1.5) # Show error briefly | |
status_text.empty() | |
progress_container.empty() | |
# Display an appropriate error message based on the exception type | |
if "rate limit" in error_message.lower() or "429" in error_message or "requests rate limit exceeded" in error_message.lower(): | |
st.warning(f"API Rate Limit: {friendly_message} This is a temporary issue and does not indicate any problem with your document.") | |
elif "quota" in error_message.lower() or "credit" in error_message.lower() or "subscription" in error_message.lower(): | |
st.error(f"API Quota Exceeded: {friendly_message}") | |
else: | |
st.error(f"Error during processing: {error_message}") | |
# Clean up the temporary file | |
try: | |
if 'temp_path' in locals() and os.path.exists(temp_path): | |
os.unlink(temp_path) | |
except: | |
pass # Ignore errors when cleaning up temporary files | |
raise | |
# App title and description | |
favicon_base64 = get_base64_from_image(os.path.join(os.path.dirname(__file__), "static/favicon.png")) | |
st.markdown(f'<div style="display: flex; align-items: center; gap: 10px;"><img src="data:image/png;base64,{favicon_base64}" width="36" height="36" alt="Scroll Icon"/> <div><h1 style="margin: 0; padding: 20px 0 0 0;">Historical Document OCR</h1></div></div>', unsafe_allow_html=True) | |
st.subheader("Made possible by Mistral AI") | |
# Check if pytesseract is available for fallback | |
try: | |
import pytesseract | |
has_pytesseract = True | |
except ImportError: | |
has_pytesseract = False | |
# Initialize session state for storing previous results if not already present | |
if 'previous_results' not in st.session_state: | |
st.session_state.previous_results = [] | |
# Create main layout with tabs and columns | |
main_tab1, main_tab2, main_tab3 = st.tabs(["Document Processing", "Previous Results", "About"]) | |
with main_tab1: | |
# Create a two-column layout for file upload and results | |
left_col, right_col = st.columns([1, 1]) | |
# File uploader in the left column | |
with left_col: | |
# Simple CSS just to fix vertical text in drag and drop area | |
st.markdown(""" | |
<style> | |
/* Reset all file uploader styling */ | |
.uploadedFile, .uploadedFileData, .stFileUploader { | |
color: inherit !important; | |
} | |
/* Fix vertical text orientation */ | |
.stFileUploader p, | |
.stFileUploader span, | |
.stFileUploader div p, | |
.stFileUploader div span, | |
.stFileUploader label p, | |
.stFileUploader label span, | |
.stFileUploader div[data-testid="stFileUploadDropzone"] p, | |
.stFileUploader div[data-testid="stFileUploadDropzone"] span { | |
writing-mode: horizontal-tb !important; | |
} | |
/* Simplify the drop zone appearance */ | |
.stFileUploader > section > div, | |
.stFileUploader div[data-testid="stFileUploadDropzone"] { | |
min-height: 100px !important; | |
} | |
</style> | |
""", unsafe_allow_html=True) | |
# Add heading for the file uploader (just text, no container) | |
st.markdown('### Upload Document') | |
# Model info with clearer instructions | |
st.markdown("Using the latest `mistral-ocr-latest` model for advanced document understanding. To get started upload your own document, use an example document, or explore the 'About' tab for more info.") | |
# Enhanced file uploader with better help text | |
uploaded_file = st.file_uploader("Drag and drop PDFs or images here", type=["pdf", "png", "jpg", "jpeg"], | |
help="Limit 200MB per file β’ PDF, PNG, JPG, JPEG") | |
# Removed seed prompt instructions from here, moving to sidebar | |
# Sidebar with options - moved up with equal spacing | |
with st.sidebar: | |
# Options title with reduced top margin | |
st.markdown("<h2 style='margin-top:-25px; margin-bottom:5px; padding:0;'>Options</h2>", unsafe_allow_html=True) | |
# Comprehensive CSS for optimal sidebar spacing and layout | |
st.markdown(""" | |
<style> | |
/* Core sidebar spacing fixes */ | |
.block-container {padding-top: 0;} | |
.stSidebar .block-container {padding-top: 0 !important;} | |
.stSidebar [data-testid='stSidebarNav'] {margin-bottom: 0 !important;} | |
.stSidebar [data-testid='stMarkdownContainer'] {margin-bottom: 0 !important; margin-top: 0 !important;} | |
.stSidebar [data-testid='stVerticalBlock'] {gap: 0 !important;} | |
/* Input element optimization */ | |
.stSidebar .stCheckbox {margin: 0 !important; padding: 0 !important;} | |
.stSidebar .stSelectbox {margin: 0 0 3px !important; padding: 0 !important;} | |
.stSidebar .stSlider {margin: 0 0 5px !important; padding: 0 !important;} | |
.stSidebar .stNumberInput {margin: 0 0 5px !important; padding: 0 !important;} | |
.stSidebar .stTextArea {margin: 0 0 5px !important; padding: 0 !important;} | |
.stSidebar .stTextInput {margin: 0 0 5px !important; padding: 0 !important;} | |
/* Heading and label optimization */ | |
.stSidebar h1, .stSidebar h2, .stSidebar h3, .stSidebar h4, .stSidebar h5 { | |
margin: 2px 0 !important; | |
padding: 0 !important; | |
line-height: 1.2 !important; | |
} | |
/* Label text optimization */ | |
.stSidebar label {margin: 0 !important; line-height: 1.2 !important;} | |
.stSidebar .stTextArea label, .stSidebar .stSelectbox label {margin-top: 2px !important;} | |
/* Help text optimization */ | |
.stSidebar .stTooltipIcon {margin: 0 !important; height: 1em !important;} | |
/* Slider optimization */ | |
.stSidebar [data-baseweb="slider"] {margin: 10px 0 0 !important;} | |
/* Expander optimization */ | |
.stSidebar .stExpander {margin: 0 0 8px !important;} | |
.stSidebar .streamlit-expanderHeader {font-size: 0.9em !important;} | |
.stSidebar .streamlit-expanderContent {padding-top: 5px !important;} | |
/* Remove unnecessary margins in form elements */ | |
.stSidebar .stForm > div {margin: 0 !important;} | |
</style> | |
""", unsafe_allow_html=True) | |
# Model options | |
use_vision = st.checkbox("Use Vision Model", value=True, | |
help="Use vision model for improved analysis (may be slower)") | |
# Add spacing between sections | |
st.markdown("<div style='margin: 10px 0;'></div>", unsafe_allow_html=True) | |
# Document Processing section | |
st.markdown("##### OCR Instructions", help="Optimize text extraction") | |
# Document type selector | |
document_types = [ | |
"Auto-detect (standard processing)", | |
"Newspaper or Magazine", | |
"Letter or Correspondence", | |
"Book or Publication", | |
"Form or Legal Document", | |
"Recipe", | |
"Handwritten Document", | |
"Map or Illustration", | |
"Table or Spreadsheet", | |
"Other (specify in instructions)" | |
] | |
selected_doc_type = st.selectbox( | |
"Document Type", | |
options=document_types, | |
index=0, | |
help="Select document type to optimize OCR processing for specific document formats and layouts. For documents with specialized features, also provide details in the instructions field below." | |
) | |
# Document layout selector | |
document_layouts = [ | |
"Standard layout", | |
"Multiple columns", | |
"Table/grid format", | |
"Mixed layout with images" | |
] | |
selected_layout = st.selectbox( | |
"Document Layout", | |
options=document_layouts, | |
index=0, | |
help="Select the document's text layout for better OCR" | |
) | |
# Generate dynamic prompt based on both document type and layout | |
custom_prompt_text = "" | |
# First add document type specific instructions (simplified) | |
if selected_doc_type != "Auto-detect (standard processing)": | |
if selected_doc_type == "Newspaper or Magazine": | |
custom_prompt_text = "This is a newspaper/magazine. Process columns from top to bottom, capture headlines, bylines, article text and captions." | |
elif selected_doc_type == "Letter or Correspondence": | |
custom_prompt_text = "This is a letter/correspondence. Capture letterhead, date, greeting, body, closing and signature. Note any handwritten annotations." | |
elif selected_doc_type == "Book or Publication": | |
custom_prompt_text = "This is a book/publication. Extract titles, headers, footnotes, page numbers and body text. Preserve paragraph structure and any special formatting." | |
elif selected_doc_type == "Form or Legal Document": | |
custom_prompt_text = "This is a form/legal document. Extract all field labels and values, preserving the structure. Pay special attention to signature lines, dates, and any official markings." | |
elif selected_doc_type == "Recipe": | |
custom_prompt_text = "This is a recipe. Extract title, ingredients list with measurements, and preparation instructions. Maintain the distinction between ingredients and preparation steps." | |
elif selected_doc_type == "Handwritten Document": | |
custom_prompt_text = "This is a handwritten document. Carefully transcribe all handwritten text, preserving line breaks. Note any unclear sections or annotations." | |
elif selected_doc_type == "Map or Illustration": | |
custom_prompt_text = "This is a map or illustration. Transcribe all labels, legends, captions, and annotations. Note any scale indicators or directional markings." | |
elif selected_doc_type == "Table or Spreadsheet": | |
custom_prompt_text = "This is a table/spreadsheet. Preserve row and column structure, maintaining alignment of data. Extract headers and all cell values." | |
elif selected_doc_type == "Other (specify in instructions)": | |
custom_prompt_text = "Please describe the document type and any special processing requirements here." | |
# Then add layout specific instructions if needed | |
if selected_layout != "Standard layout" and not custom_prompt_text: | |
if selected_layout == "Multiple columns": | |
custom_prompt_text = "Document has multiple columns. Read each column from top to bottom, then move to the next column." | |
elif selected_layout == "Table/grid format": | |
custom_prompt_text = "Document contains table data. Preserve row and column structure during extraction." | |
elif selected_layout == "Mixed layout with images": | |
custom_prompt_text = "Document has mixed text layout with images. Extract text in proper reading order." | |
# If both document type and non-standard layout are selected, add layout info | |
elif selected_layout != "Standard layout" and custom_prompt_text: | |
if selected_layout == "Multiple columns": | |
custom_prompt_text += " Document has multiple columns." | |
elif selected_layout == "Table/grid format": | |
custom_prompt_text += " Contains table/grid formatting." | |
elif selected_layout == "Mixed layout with images": | |
custom_prompt_text += " Has mixed text layout with images." | |
# Add spacing between sections | |
st.markdown("<div style='margin: 10px 0;'></div>", unsafe_allow_html=True) | |
custom_prompt = st.text_area( | |
"Additional OCR Instructions", | |
value=custom_prompt_text, | |
placeholder="Example: Small text at bottom needs special attention", | |
height=100, | |
max_chars=300, | |
key="custom_analysis_instructions", | |
help="Specify document type and special OCR requirements. Detailed instructions activate Mistral AI's advanced document analysis." | |
) | |
# Custom instructions expander | |
with st.expander("Custom Instruction Examples"): | |
st.markdown(""" | |
**Document Format Instructions:** | |
- "This newspaper has multiple columns - read each column from top to bottom" | |
- "This letter has a formal heading, main body, and signature section at bottom" | |
- "This form has fields with labels and filled-in values that should be paired" | |
- "This recipe has ingredient list at top and preparation steps below" | |
**Special Processing Instructions:** | |
- "Pay attention to footnotes at the bottom of each page" | |
- "Some text is faded - please attempt to reconstruct unclear passages" | |
- "There are handwritten annotations in the margins that should be included" | |
- "Document has table data that should preserve row and column alignment" | |
- "Text continues across pages and should be connected into a single flow" | |
- "This document uses special symbols and mathematical notation" | |
""") | |
# Add spacing between sections | |
st.markdown("<div style='margin: 10px 0;'></div>", unsafe_allow_html=True) | |
# Image preprocessing options with reduced spacing | |
st.markdown("##### Image Processing", help="Options for enhancing images") | |
with st.expander("Preprocessing Options", expanded=False): | |
preprocessing_options = {} | |
# Document type selector | |
doc_type_options = ["standard", "handwritten", "typed", "printed"] | |
preprocessing_options["document_type"] = st.selectbox( | |
"Document Type", | |
options=doc_type_options, | |
index=0, | |
format_func=lambda x: x.capitalize(), | |
help="Select document type for optimized processing" | |
) | |
preprocessing_options["grayscale"] = st.checkbox("Convert to Grayscale", | |
help="Convert image to grayscale before OCR") | |
preprocessing_options["denoise"] = st.checkbox("Denoise Image", | |
help="Remove noise from the image") | |
preprocessing_options["contrast"] = st.slider("Adjust Contrast", -5, 5, 0, | |
help="Adjust image contrast (-5 to +5)") | |
# Add rotation options | |
rotation_options = [0, 90, 180, 270] | |
preprocessing_options["rotation"] = st.select_slider( | |
"Rotate Document", | |
options=rotation_options, | |
value=0, | |
format_func=lambda x: f"{x}Β° {'(No rotation)' if x == 0 else ''}", | |
help="Rotate the document to correct orientation" | |
) | |
# Add spacing between sections | |
st.markdown("<div style='margin: 10px 0;'></div>", unsafe_allow_html=True) | |
# PDF options with consistent formatting | |
st.markdown("##### PDF Settings", help="Options for PDF documents") | |
with st.expander("PDF Options", expanded=False): | |
pdf_dpi = st.slider("Resolution (DPI)", 72, 300, 100, | |
help="Higher DPI = better quality but slower") | |
max_pages = st.number_input("Max Pages", 1, 20, 3, | |
help="Limit number of pages to process") | |
# Add PDF rotation option | |
pdf_rotation = st.select_slider( | |
"Rotation", | |
options=rotation_options, | |
value=0, | |
format_func=lambda x: f"{x}Β°", | |
help="Rotate PDF pages" | |
) | |
# Previous Results tab content | |
with main_tab2: | |
st.markdown('<h2>Previous Results</h2>', unsafe_allow_html=True) | |
# Load custom CSS for Previous Results tab | |
from ui.layout import load_css | |
load_css() | |
# Display previous results if available | |
if not st.session_state.previous_results: | |
st.markdown(""" | |
<div class="previous-results-container" style="text-align: center; padding: 40px 20px; background-color: #f0f2f6; border-radius: 8px;"> | |
<div style="font-size: 48px; margin-bottom: 20px;">π</div> | |
<h3 style="margin-bottom: 10px; font-weight: 600;">No Previous Results</h3> | |
<p style="font-size: 16px;">Process a document to see your results history saved here.</p> | |
</div> | |
""", unsafe_allow_html=True) | |
else: | |
# Create a container for the results list | |
st.markdown('<div class="previous-results-container">', unsafe_allow_html=True) | |
st.markdown(f'<h3>{len(st.session_state.previous_results)} Previous Results</h3>', unsafe_allow_html=True) | |
# Create two columns for filters and download buttons | |
filter_col, download_col = st.columns([2, 1]) | |
with filter_col: | |
# Add filter options | |
filter_options = ["All Types"] | |
if any(result.get("file_name", "").lower().endswith(".pdf") for result in st.session_state.previous_results): | |
filter_options.append("PDF Documents") | |
if any(result.get("file_name", "").lower().endswith((".jpg", ".jpeg", ".png")) for result in st.session_state.previous_results): | |
filter_options.append("Images") | |
selected_filter = st.selectbox("Filter by Type:", filter_options) | |
with download_col: | |
# Add download all button for results | |
if len(st.session_state.previous_results) > 0: | |
try: | |
# Create buffer in memory instead of file on disk | |
import io | |
from ocr_utils import create_results_zip_in_memory | |
# Get zip data directly in memory | |
zip_data = create_results_zip_in_memory(st.session_state.previous_results) | |
# Create more informative ZIP filename with timestamp | |
from datetime import datetime | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
# Count document types for a more descriptive filename | |
pdf_count = sum(1 for r in st.session_state.previous_results if r.get('file_name', '').lower().endswith('.pdf')) | |
img_count = sum(1 for r in st.session_state.previous_results if r.get('file_name', '').lower().endswith(('.jpg', '.jpeg', '.png'))) | |
# Create more descriptive filename | |
if pdf_count > 0 and img_count > 0: | |
zip_filename = f"historical_ocr_mixed_{pdf_count}pdf_{img_count}img_{timestamp}.zip" | |
elif pdf_count > 0: | |
zip_filename = f"historical_ocr_pdf_documents_{pdf_count}_{timestamp}.zip" | |
elif img_count > 0: | |
zip_filename = f"historical_ocr_images_{img_count}_{timestamp}.zip" | |
else: | |
zip_filename = f"historical_ocr_results_{timestamp}.zip" | |
st.download_button( | |
label="Download All Results", | |
data=zip_data, | |
file_name=zip_filename, | |
mime="application/zip", | |
help="Download all previous results as a ZIP file containing HTML and JSON files" | |
) | |
except Exception as e: | |
st.error(f"Error creating download: {str(e)}") | |
st.info("Try with fewer results or individual downloads") | |
# Filter results based on selection | |
filtered_results = st.session_state.previous_results | |
if selected_filter == "PDF Documents": | |
filtered_results = [r for r in st.session_state.previous_results if r.get("file_name", "").lower().endswith(".pdf")] | |
elif selected_filter == "Images": | |
filtered_results = [r for r in st.session_state.previous_results if r.get("file_name", "").lower().endswith((".jpg", ".jpeg", ".png"))] | |
# Show a message if no results match the filter | |
if not filtered_results: | |
st.markdown(""" | |
<div style="text-align: center; padding: 20px; background-color: #f9f9f9; border-radius: 5px; margin: 20px 0;"> | |
<p>No results match the selected filter.</p> | |
</div> | |
""", unsafe_allow_html=True) | |
# Display each result as a card | |
for i, result in enumerate(filtered_results): | |
# Determine file type icon | |
file_name = result.get("file_name", f"Document {i+1}") | |
file_type_lower = file_name.lower() | |
if file_type_lower.endswith(".pdf"): | |
icon = "π" | |
elif file_type_lower.endswith((".jpg", ".jpeg", ".png", ".gif")): | |
icon = "πΌοΈ" | |
else: | |
icon = "π" | |
# Create a card for each result | |
st.markdown(f""" | |
<div class="result-card"> | |
<div class="result-header"> | |
<div class="result-filename">{icon} {result.get('descriptive_file_name', file_name)}</div> | |
<div class="result-date">{result.get('timestamp', 'Unknown')}</div> | |
</div> | |
<div class="result-metadata"> | |
<div class="result-tag">Languages: {', '.join(result.get('languages', ['Unknown']))}</div> | |
<div class="result-tag">Topics: {', '.join(result.get('topics', ['Unknown'])[:5])} {' + ' + str(len(result.get('topics', [])) - 5) + ' more' if len(result.get('topics', [])) > 5 else ''}</div> | |
</div> | |
""", unsafe_allow_html=True) | |
# Add view button inside the card with proper styling | |
st.markdown('<div class="result-action-button">', unsafe_allow_html=True) | |
if st.button(f"View Document", key=f"view_{i}"): | |
# Set the selected result in the session state | |
st.session_state.selected_previous_result = st.session_state.previous_results[i] | |
# Force a rerun to show the selected result | |
st.rerun() | |
st.markdown('</div>', unsafe_allow_html=True) | |
# Close the result card | |
st.markdown('</div>', unsafe_allow_html=True) | |
# Close the container | |
st.markdown('</div>', unsafe_allow_html=True) | |
# Display the selected result if available | |
if 'selected_previous_result' in st.session_state and st.session_state.selected_previous_result: | |
selected_result = st.session_state.selected_previous_result | |
# Create a styled container for the selected result | |
st.markdown(f""" | |
<div class="selected-result-container"> | |
<div class="result-header" style="margin-bottom: 20px;"> | |
<div class="selected-result-title">Selected Document: {selected_result.get('file_name', 'Unknown')}</div> | |
<div class="result-date">{selected_result.get('timestamp', '')}</div> | |
</div> | |
""", unsafe_allow_html=True) | |
# Display metadata in a styled way | |
meta_col1, meta_col2 = st.columns(2) | |
with meta_col1: | |
# Display document metadata | |
if 'languages' in selected_result: | |
languages = [lang for lang in selected_result['languages'] if lang is not None] | |
if languages: | |
st.write(f"**Languages:** {', '.join(languages)}") | |
if 'topics' in selected_result and selected_result['topics']: | |
# Show topics in a more organized way with badges | |
st.markdown("**Subject Tags:**") | |
# Create a container with flex display for the tags | |
st.markdown('<div style="display: flex; flex-wrap: wrap; gap: 5px; margin-top: 5px;">', unsafe_allow_html=True) | |
# Generate a badge for each tag | |
for topic in selected_result['topics']: | |
# Create colored badge based on tag category | |
badge_color = "#546e7a" # Default color | |
# Assign colors by category | |
if any(term in topic.lower() for term in ["century", "pre-", "era", "historical"]): | |
badge_color = "#1565c0" # Blue for time periods | |
elif any(term in topic.lower() for term in ["language", "english", "french", "german", "latin"]): | |
badge_color = "#00695c" # Teal for languages | |
elif any(term in topic.lower() for term in ["letter", "newspaper", "book", "form", "document", "recipe"]): | |
badge_color = "#6a1b9a" # Purple for document types | |
elif any(term in topic.lower() for term in ["travel", "military", "science", "medicine", "education", "art", "literature"]): | |
badge_color = "#2e7d32" # Green for subject domains | |
elif any(term in topic.lower() for term in ["preprocessed", "enhanced", "grayscale", "denoised", "contrast", "rotated"]): | |
badge_color = "#e65100" # Orange for preprocessing-related tags | |
st.markdown( | |
f'<span style="background-color: {badge_color}; color: white; padding: 3px 8px; ' | |
f'border-radius: 12px; font-size: 0.85em; display: inline-block; margin-bottom: 5px;">{topic}</span>', | |
unsafe_allow_html=True | |
) | |
# Close the container | |
st.markdown('</div>', unsafe_allow_html=True) | |
with meta_col2: | |
# Display processing metadata | |
if 'limited_pages' in selected_result: | |
st.info(f"Processed {selected_result['limited_pages']['processed']} of {selected_result['limited_pages']['total']} pages") | |
if 'processing_time' in selected_result: | |
proc_time = selected_result['processing_time'] | |
st.write(f"**Processing Time:** {proc_time:.1f}s") | |
# Create tabs for content display | |
has_images = selected_result.get('has_images', False) | |
if has_images: | |
view_tab1, view_tab2, view_tab3 = st.tabs(["Structured View", "Raw JSON", "With Images"]) | |
else: | |
view_tab1, view_tab2 = st.tabs(["Structured View", "Raw JSON"]) | |
with view_tab1: | |
# Display structured content | |
if 'ocr_contents' in selected_result and isinstance(selected_result['ocr_contents'], dict): | |
for section, content in selected_result['ocr_contents'].items(): | |
if content and section not in ['error', 'raw_text', 'partial_text']: # Skip error and raw text sections | |
st.markdown(f"#### {section.replace('_', ' ').title()}") | |
if isinstance(content, str): | |
st.write(content) | |
elif isinstance(content, list): | |
for item in content: | |
if isinstance(item, str): | |
st.write(f"- {item}") | |
else: | |
st.write(f"- {str(item)}") | |
elif isinstance(content, dict): | |
for k, v in content.items(): | |
st.write(f"**{k}:** {v}") | |
with view_tab2: | |
# Show the raw JSON with an option to download it | |
try: | |
st.json(selected_result) | |
except Exception as e: | |
st.error(f"Error displaying JSON: {str(e)}") | |
# Try a safer approach with string representation | |
st.code(str(selected_result)) | |
# Create more informative JSON download button with better naming | |
try: | |
json_str = json.dumps(selected_result, indent=2) | |
# Use the descriptive filename if available, otherwise build one | |
if 'descriptive_file_name' in selected_result: | |
# Get base name without extension | |
base_filename = Path(selected_result['descriptive_file_name']).stem | |
else: | |
# Fall back to old method of building filename | |
base_filename = selected_result.get('file_name', 'document').split('.')[0] | |
# Add document type if available | |
if 'topics' in selected_result and selected_result['topics']: | |
topic = selected_result['topics'][0].lower().replace(' ', '_') | |
base_filename = f"{base_filename}_{topic}" | |
# Add language if available | |
if 'languages' in selected_result and selected_result['languages']: | |
lang = selected_result['languages'][0].lower() | |
# Only add if it's not already in the filename | |
if lang not in base_filename.lower(): | |
base_filename = f"{base_filename}_{lang}" | |
# For PDFs, add page information | |
if 'total_pages' in selected_result and 'processed_pages' in selected_result: | |
base_filename = f"{base_filename}_p{selected_result['processed_pages']}of{selected_result['total_pages']}" | |
# Get date from timestamp if available | |
timestamp = "" | |
if 'timestamp' in selected_result: | |
try: | |
# Try to parse the timestamp and reformat it | |
from datetime import datetime | |
dt = datetime.strptime(selected_result['timestamp'], "%Y-%m-%d %H:%M") | |
timestamp = dt.strftime("%Y%m%d_%H%M%S") | |
except: | |
# If parsing fails, create a new timestamp | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
else: | |
# No timestamp in the result, create a new one | |
from datetime import datetime | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
# Create final filename | |
json_filename = f"{base_filename}_{timestamp}.json" | |
st.download_button( | |
label="Download JSON", | |
data=json_str, | |
file_name=json_filename, | |
mime="application/json" | |
) | |
except Exception as e: | |
st.error(f"Error creating JSON download: {str(e)}") | |
# Fallback to string representation for download with simple naming | |
from datetime import datetime | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
st.download_button( | |
label="Download as Text", | |
data=str(selected_result), | |
file_name=f"document_{timestamp}.txt", | |
mime="text/plain" | |
) | |
if has_images and 'pages_data' in selected_result: | |
with view_tab3: | |
# Display content with images in a nicely formatted way | |
pages_data = selected_result.get('pages_data', []) | |
# Process and display each page | |
for page_idx, page in enumerate(pages_data): | |
# Add a page header if multi-page | |
if len(pages_data) > 1: | |
st.markdown(f"### Page {page_idx + 1}") | |
# Create columns for better layout | |
if page.get('images'): | |
# Extract images for this page | |
images = page.get('images', []) | |
for img in images: | |
if 'image_base64' in img: | |
st.image(img['image_base64'], width=600) | |
# Display text content if available | |
text_content = page.get('markdown', '') | |
if text_content: | |
with st.expander("View Page Text", expanded=True): | |
st.markdown(text_content) | |
else: | |
# Just display text if no images | |
text_content = page.get('markdown', '') | |
if text_content: | |
st.markdown(text_content) | |
# Add page separator | |
if page_idx < len(pages_data) - 1: | |
st.markdown("---") | |
# Add HTML download button with improved, more descriptive filename | |
from ocr_utils import create_html_with_images | |
html_content = create_html_with_images(selected_result) | |
# Use the descriptive filename if available, otherwise build one | |
if 'descriptive_file_name' in selected_result: | |
# Get base name without extension | |
base_filename = Path(selected_result['descriptive_file_name']).stem | |
else: | |
# Fall back to old method of building filename | |
base_filename = selected_result.get('file_name', 'document').split('.')[0] | |
# Add document type if available | |
if 'topics' in selected_result and selected_result['topics']: | |
topic = selected_result['topics'][0].lower().replace(' ', '_') | |
base_filename = f"{base_filename}_{topic}" | |
# Add language if available | |
if 'languages' in selected_result and selected_result['languages']: | |
lang = selected_result['languages'][0].lower() | |
# Only add if it's not already in the filename | |
if lang not in base_filename.lower(): | |
base_filename = f"{base_filename}_{lang}" | |
# For PDFs, add page information | |
if 'total_pages' in selected_result and 'processed_pages' in selected_result: | |
base_filename = f"{base_filename}_p{selected_result['processed_pages']}of{selected_result['total_pages']}" | |
# Get date from timestamp if available | |
timestamp = "" | |
if 'timestamp' in selected_result: | |
try: | |
# Try to parse the timestamp and reformat it | |
from datetime import datetime | |
dt = datetime.strptime(selected_result['timestamp'], "%Y-%m-%d %H:%M") | |
timestamp = dt.strftime("%Y%m%d_%H%M%S") | |
except: | |
# If parsing fails, create a new timestamp | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
else: | |
# No timestamp in the result, create a new one | |
from datetime import datetime | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
# Create final filename | |
html_filename = f"{base_filename}_{timestamp}_with_images.html" | |
st.download_button( | |
label="Download as HTML with Images", | |
data=html_content, | |
file_name=html_filename, | |
mime="text/html" | |
) | |
# Close the container | |
st.markdown('</div>', unsafe_allow_html=True) | |
# Add clear button outside the container with proper styling | |
col1, col2, col3 = st.columns([1, 1, 1]) | |
with col2: | |
st.markdown('<div class="result-action-button" style="text-align: center;">', unsafe_allow_html=True) | |
if st.button("Close Selected Document", key="close_selected"): | |
# Clear the selected result from session state | |
del st.session_state.selected_previous_result | |
# Force a rerun to update the view | |
st.rerun() | |
st.markdown('</div>', unsafe_allow_html=True) | |
# About tab content | |
with main_tab3: | |
# Add a notice about local OCR fallback if available | |
fallback_notice = "" | |
if 'has_pytesseract' in locals() and has_pytesseract: | |
fallback_notice = """ | |
**Local OCR Fallback:** | |
- Local OCR fallback using Tesseract is available if API rate limits are reached | |
- Provides basic text extraction when cloud OCR is unavailable | |
""" | |
st.markdown(f""" | |
### About Historical Document OCR | |
This application specializes in processing historical documents using [Mistral AI's Document OCR](https://docs.mistral.ai/capabilities/document/), which is particularly effective for handling challenging textual materials. | |
#### Document Processing Capabilities | |
- **Historical Images**: Process vintage photographs, scanned historical papers, manuscripts | |
- **Handwritten Documents**: Extract text from letters, journals, notes, and records | |
- **Multi-Page PDFs**: Process historical books, articles, and longer documents | |
- **Mixed Content**: Handle documents with both text and imagery | |
#### Key Features | |
- **Advanced Image Preprocessing** | |
- Grayscale conversion optimized for historical documents | |
- Denoising to remove artifacts and improve clarity | |
- Contrast adjustment to enhance faded text | |
- Document rotation for proper orientation | |
- **Document Analysis** | |
- Text extraction with `mistral-ocr-latest` | |
- Structured data extraction: dates, names, places, topics | |
- Multi-language support with automatic detection | |
- Handling of period-specific terminology and obsolete language | |
- **Flexible Output Formats** | |
- Structured view with organized content sections | |
- Developer JSON for integration with other applications | |
- Visual representation preserving original document layout | |
- Downloadable results in various formats | |
#### Historical Context | |
Add period-specific context to improve analysis: | |
- Historical period selection | |
- Document purpose identification | |
- Custom instructions for specialized terminology | |
#### Data Privacy | |
- All document processing happens through secure AI processing | |
- No documents are permanently stored on the server | |
- Results are only saved in your current session | |
{fallback_notice} | |
""") | |
with main_tab1: | |
# Initialize all session state variables in one place at the beginning | |
# This ensures they exist before being accessed anywhere in the code | |
if 'auto_process_sample' not in st.session_state: | |
st.session_state.auto_process_sample = False | |
if 'sample_just_loaded' not in st.session_state: | |
st.session_state.sample_just_loaded = False | |
if 'processed_document_active' not in st.session_state: | |
st.session_state.processed_document_active = False | |
if 'sample_document_processed' not in st.session_state: | |
st.session_state.sample_document_processed = False | |
# Use uploaded_file or sample_document if available | |
if 'sample_document' in st.session_state and st.session_state.sample_document is not None: | |
# Use the sample document | |
uploaded_file = st.session_state.sample_document | |
# Add a notice about using sample document with better style | |
st.markdown( | |
f""" | |
<div style="background-color: #D4EDDA; color: #155724; padding: 10px; | |
border-radius: 4px; border-left: 5px solid #155724; margin-bottom: 10px;"> | |
<div style="display: flex; justify-content: space-between; align-items: center;"> | |
<span style="font-weight: bold;">Sample Document: {uploaded_file.name}</span> | |
</div> | |
</div> | |
""", | |
unsafe_allow_html=True | |
) | |
# Set auto-process flag in session state if this is a newly loaded sample | |
if st.session_state.sample_just_loaded: | |
st.session_state.auto_process_sample = True | |
# Mark that this is a sample document being processed | |
st.session_state.sample_document_processed = True | |
st.session_state.sample_just_loaded = False | |
# Clear sample document after use to avoid interference with future uploads | |
st.session_state.sample_document = None | |
if uploaded_file is not None: | |
# Check file size (cap at 50MB) | |
file_size_mb = len(uploaded_file.getvalue()) / (1024 * 1024) | |
if file_size_mb > 50: | |
with left_col: | |
st.error(f"File too large ({file_size_mb:.1f} MB). Maximum file size is 50MB.") | |
st.stop() | |
file_ext = Path(uploaded_file.name).suffix.lower() | |
# Process button - flush left with similar padding as file browser | |
with left_col: | |
# Make the button more clear about its function | |
if st.session_state.processed_document_active: | |
process_button = st.button("Process Document Again") | |
else: | |
process_button = st.button("Process Document") | |
# Empty container for progress indicators - will be filled during processing | |
# Positioned right after the process button for better visibility | |
progress_placeholder = st.empty() | |
# Image preprocessing preview - automatically show only the preprocessed version | |
if any(preprocessing_options.values()) and uploaded_file.type.startswith('image/'): | |
st.markdown("**Preprocessed Preview**") | |
try: | |
# Create a container for the preview to better control layout | |
with st.container(): | |
processed_bytes = preprocess_image(uploaded_file.getvalue(), preprocessing_options) | |
# Use use_container_width=True for responsive design | |
st.image(io.BytesIO(processed_bytes), use_container_width=True) | |
# Show preprocessing metadata in a well-formatted caption | |
meta_items = [] | |
if preprocessing_options.get("document_type", "standard") != "standard": | |
meta_items.append(f"Document type ({preprocessing_options['document_type']})") | |
if preprocessing_options.get("grayscale", False): | |
meta_items.append("Grayscale") | |
if preprocessing_options.get("denoise", False): | |
meta_items.append("Denoise") | |
if preprocessing_options.get("contrast", 0) != 0: | |
meta_items.append(f"Contrast ({preprocessing_options['contrast']})") | |
if preprocessing_options.get("rotation", 0) != 0: | |
meta_items.append(f"Rotation ({preprocessing_options['rotation']}Β°)") | |
# Only show "Applied:" if there are actual preprocessing steps | |
if meta_items: | |
meta_text = "Applied: " + ", ".join(meta_items) | |
st.caption(meta_text) | |
except Exception as e: | |
st.error(f"Error in preprocessing: {str(e)}") | |
st.info("Try using grayscale preprocessing for PNG images with transparency") | |
# Container for success message (will be filled after processing) | |
# No extra spacing needed as it will be managed programmatically | |
metadata_placeholder = st.empty() | |
# We now have a close button next to the success message, so we don't need one here | |
# auto_process_sample is already initialized at the top of the function | |
# processed_document_active is already initialized at the top of the function | |
# We'll determine processing logic below | |
# Check if this is an auto-processing situation | |
auto_processing = st.session_state.auto_process_sample and not st.session_state.processed_document_active | |
# Show a message if auto-processing is happening | |
if auto_processing: | |
st.info("Automatically processing sample document...") | |
# Determine if we should process the document | |
# Either process button was clicked OR auto-processing is happening | |
should_process = process_button or auto_processing | |
if should_process: | |
# Reset auto-process flag to avoid processing on next rerun | |
if st.session_state.auto_process_sample: | |
st.session_state.auto_process_sample = False | |
# Move the progress indicator reference to just below the button | |
progress_container = progress_placeholder | |
try: | |
# Get max_pages or default if not available | |
max_pages_value = max_pages if 'max_pages' in locals() else None | |
# Apply performance mode settings | |
if 'perf_mode' in locals(): | |
if perf_mode == "Speed": | |
# Override settings for faster processing | |
if 'preprocessing_options' in locals(): | |
preprocessing_options["denoise"] = False # Skip denoising for speed | |
if 'pdf_dpi' in locals() and file_ext.lower() == '.pdf': | |
pdf_dpi = min(pdf_dpi, 100) # Lower DPI for speed | |
# Process file with or without custom prompt | |
if custom_prompt and custom_prompt.strip(): | |
# Process with custom instructions for the AI | |
with progress_placeholder.container(): | |
progress_bar = st.progress(0) | |
status_text = st.empty() | |
status_text.markdown('<div class="processing-status-container">Processing with custom instructions...</div>', unsafe_allow_html=True) | |
progress_bar.progress(30) | |
# Special handling for PDF files with custom prompts | |
if file_ext.lower() == ".pdf": | |
# For PDFs with custom prompts, we use a special two-step process | |
with progress_placeholder.container(): | |
status_text.markdown('<div class="processing-status-container">Using special PDF processing for custom instructions...</div>', unsafe_allow_html=True) | |
progress_bar.progress(40) | |
try: | |
# Process directly in one step for better performance | |
processor = StructuredOCR() | |
# First save the PDF to a temp file | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp: | |
tmp.write(uploaded_file.getvalue()) | |
temp_path = tmp.name | |
# Apply PDF rotation if specified | |
pdf_rotation_value = pdf_rotation if 'pdf_rotation' in locals() else 0 | |
# Add document type hints to custom prompt if available from document type selector | |
if custom_prompt and custom_prompt is not None and 'selected_doc_type' in locals() and selected_doc_type != "Auto-detect (standard processing)" and "This is a" not in str(custom_prompt): | |
# Extract just the document type from the selector | |
doc_type_hint = selected_doc_type.split(" or ")[0].lower() | |
# Prepend to the custom prompt | |
custom_prompt = f"This is a {doc_type_hint}. {custom_prompt}" | |
# Process in a single step with simplified custom prompt | |
if custom_prompt: | |
# Detect document type from custom prompt | |
doc_type = "general" | |
if any(keyword in custom_prompt.lower() for keyword in ["newspaper", "column", "article", "magazine"]): | |
doc_type = "newspaper" | |
elif any(keyword in custom_prompt.lower() for keyword in ["letter", "correspondence", "handwritten"]): | |
doc_type = "letter" | |
elif any(keyword in custom_prompt.lower() for keyword in ["book", "publication"]): | |
doc_type = "book" | |
elif any(keyword in custom_prompt.lower() for keyword in ["form", "certificate", "legal"]): | |
doc_type = "form" | |
elif any(keyword in custom_prompt.lower() for keyword in ["recipe", "ingredients"]): | |
doc_type = "recipe" | |
# Format the custom prompt for better Mistral processing | |
if len(custom_prompt) > 250: | |
# Truncate long custom prompts but preserve essential info | |
simplified_prompt = f"DOCUMENT TYPE: {doc_type}\nINSTRUCTIONS: {custom_prompt[:250]}..." | |
else: | |
simplified_prompt = f"DOCUMENT TYPE: {doc_type}\nINSTRUCTIONS: {custom_prompt}" | |
else: | |
simplified_prompt = custom_prompt | |
progress_bar.progress(50) | |
# Check if we have custom instructions | |
has_custom_prompt = custom_prompt is not None and len(str(custom_prompt).strip()) > 0 | |
if has_custom_prompt: | |
status_text.markdown('<div class="processing-status-container">Processing PDF with custom instructions...</div>', unsafe_allow_html=True) | |
else: | |
status_text.markdown('<div class="processing-status-container">Processing PDF with optimized settings...</div>', unsafe_allow_html=True) | |
# Process directly with optimized settings | |
result = processor.process_file( | |
file_path=temp_path, | |
file_type="pdf", | |
use_vision=use_vision, | |
custom_prompt=simplified_prompt, | |
file_size_mb=len(uploaded_file.getvalue()) / (1024 * 1024), | |
pdf_rotation=pdf_rotation_value | |
) | |
progress_bar.progress(90) | |
status_text.markdown('<div class="processing-status-container">Finalizing results...</div>', unsafe_allow_html=True) | |
# Clean up temp file | |
if os.path.exists(temp_path): | |
os.unlink(temp_path) | |
except Exception as e: | |
# If anything fails, revert to standard processing | |
st.warning(f"Special PDF processing failed. Falling back to standard method: {str(e)}") | |
result = process_file(uploaded_file, use_vision, {}, progress_container=progress_placeholder) | |
else: | |
# For non-PDF files, use normal processing with custom prompt | |
# Save the uploaded file to a temporary file with preprocessing | |
with tempfile.NamedTemporaryFile(delete=False, suffix=Path(uploaded_file.name).suffix) as tmp: | |
# Apply preprocessing if any options are selected | |
if any(preprocessing_options.values()): | |
# Apply performance mode settings | |
if 'perf_mode' in locals() and perf_mode == "Speed": | |
# Skip denoising for speed in preprocessing | |
speed_preprocessing = preprocessing_options.copy() | |
speed_preprocessing["denoise"] = False | |
processed_bytes = preprocess_image(uploaded_file.getvalue(), speed_preprocessing) | |
else: | |
processed_bytes = preprocess_image(uploaded_file.getvalue(), preprocessing_options) | |
tmp.write(processed_bytes) | |
else: | |
tmp.write(uploaded_file.getvalue()) | |
temp_path = tmp.name | |
# Show progress | |
with progress_placeholder.container(): | |
progress_bar.progress(50) | |
status_text.markdown('<div class="processing-status-container">Analyzing with custom instructions...</div>', unsafe_allow_html=True) | |
# Initialize OCR processor and process with custom prompt | |
processor = StructuredOCR() | |
# Detect document type from custom prompt | |
doc_type = "general" | |
if any(keyword in custom_prompt.lower() for keyword in ["newspaper", "column", "article", "magazine"]): | |
doc_type = "newspaper" | |
elif any(keyword in custom_prompt.lower() for keyword in ["letter", "correspondence", "handwritten"]): | |
doc_type = "letter" | |
elif any(keyword in custom_prompt.lower() for keyword in ["book", "publication"]): | |
doc_type = "book" | |
elif any(keyword in custom_prompt.lower() for keyword in ["form", "certificate", "legal"]): | |
doc_type = "form" | |
elif any(keyword in custom_prompt.lower() for keyword in ["recipe", "ingredients"]): | |
doc_type = "recipe" | |
# Format the custom prompt for better Mistral processing | |
formatted_prompt = f"DOCUMENT TYPE: {doc_type}\nUSER INSTRUCTIONS: {custom_prompt.strip()}\nPay special attention to these instructions and respond accordingly." | |
try: | |
result = processor.process_file( | |
file_path=temp_path, | |
file_type="image", # Always use image for non-PDFs | |
use_vision=use_vision, | |
custom_prompt=formatted_prompt, | |
file_size_mb=len(uploaded_file.getvalue()) / (1024 * 1024) | |
) | |
except Exception as e: | |
# For any error, fall back to standard processing | |
st.warning(f"Custom prompt processing failed. Falling back to standard processing: {str(e)}") | |
result = process_file(uploaded_file, use_vision, preprocessing_options, progress_container=progress_placeholder) | |
# Complete progress | |
with progress_placeholder.container(): | |
progress_bar.progress(100) | |
status_text.markdown('<div class="processing-status-container">Processing complete!</div>', unsafe_allow_html=True) | |
time.sleep(0.8) | |
progress_placeholder.empty() | |
# Clean up temporary file | |
if os.path.exists(temp_path): | |
try: | |
os.unlink(temp_path) | |
except: | |
pass | |
else: | |
# Standard processing without custom prompt | |
result = process_file(uploaded_file, use_vision, preprocessing_options, progress_container=progress_placeholder) | |
# Document results will be shown in the right column | |
with right_col: | |
# Add Document Metadata section header | |
st.subheader("Document Metadata") | |
# Create metadata card with standard styling | |
metadata_html = '<div class="metadata-card" style="padding:15px; margin-bottom:20px;">' | |
# File info | |
metadata_html += f'<p><strong>File Name:</strong> {result.get("file_name", uploaded_file.name)}</p>' | |
# Info about limited pages | |
if 'limited_pages' in result: | |
metadata_html += f'<p style="padding:8px; border-radius:4px;"><strong>Pages:</strong> {result["limited_pages"]["processed"]} of {result["limited_pages"]["total"]} processed</p>' | |
# Languages | |
if 'languages' in result: | |
languages = [lang for lang in result['languages'] if lang is not None] | |
if languages: | |
metadata_html += f'<p><strong>Languages:</strong> {", ".join(languages)}</p>' | |
# Topics - show all subject tags with max of 8 | |
if 'topics' in result and result['topics']: | |
topics_display = result['topics'][:8] | |
topics_str = ", ".join(topics_display) | |
# Add indicator if there are more tags | |
if len(result['topics']) > 8: | |
topics_str += f" + {len(result['topics']) - 8} more" | |
metadata_html += f'<p><strong>Subject Tags:</strong> {topics_str}</p>' | |
# Document type - using simplified labeling consistent with user instructions | |
if 'detected_document_type' in result: | |
# Get clean document type label - removing "historical" prefix if present | |
doc_type = result['detected_document_type'].lower() | |
if doc_type.startswith("historical "): | |
doc_type = doc_type[len("historical "):] | |
# Capitalize first letter of each word for display | |
doc_type = ' '.join(word.capitalize() for word in doc_type.split()) | |
metadata_html += f'<p><strong>Document Type:</strong> {doc_type}</p>' | |
# Processing time | |
if 'processing_time' in result: | |
proc_time = result['processing_time'] | |
metadata_html += f'<p><strong>Processing Time:</strong> {proc_time:.1f}s</p>' | |
# Custom prompt indicator with special styling - simplified and only showing when there are actual instructions | |
# Only show when custom_prompt exists in the session AND has content, or when the result explicitly states it was applied | |
has_instructions = ('custom_prompt' in locals() and custom_prompt and len(str(custom_prompt).strip()) > 0) | |
if has_instructions or 'custom_prompt_applied' in result: | |
# Use consistent styling with other metadata fields | |
metadata_html += f'<p><strong>Advanced Analysis:</strong> Custom instructions applied</p>' | |
# Close the metadata card | |
metadata_html += '</div>' | |
# Render the metadata HTML | |
st.markdown(metadata_html, unsafe_allow_html=True) | |
# Add content section heading - using standard subheader | |
st.subheader("Document Content") | |
# Start document content div with consistent styling class | |
st.markdown('<div class="document-content" style="margin-top:10px;">', unsafe_allow_html=True) | |
if 'ocr_contents' in result: | |
# Check for has_images in the result | |
has_images = result.get('has_images', False) | |
# Create tabs for different views | |
if has_images: | |
view_tab1, view_tab2, view_tab3 = st.tabs(["Structured View", "Raw JSON", "With Images"]) | |
else: | |
view_tab1, view_tab2 = st.tabs(["Structured View", "Raw JSON"]) | |
with view_tab1: | |
# Display in a more user-friendly format based on the content structure | |
html_content = "" | |
if isinstance(result['ocr_contents'], dict): | |
for section, content in result['ocr_contents'].items(): | |
if content: # Only display non-empty sections | |
# Add consistent styling for each section | |
section_title = f'<h4 style="font-family: Georgia, serif; font-size: 18px; margin-top: 20px; margin-bottom: 10px;">{section.replace("_", " ").title()}</h4>' | |
html_content += section_title | |
if isinstance(content, str): | |
# Optimize by using a expander for very long content | |
if len(content) > 1000: | |
# Format content for long text - bold everything after "... that" | |
preview_content = content[:1000] + "..." if len(content) > 1000 else content | |
if "... that" in content: | |
# For the preview (first 1000 chars) | |
if "... that" in preview_content: | |
parts = preview_content.split("... that", 1) | |
formatted_preview = f"{parts[0]}... that<strong>{parts[1]}</strong>" | |
html_content += f"<p style=\"font-size:16px;\">{formatted_preview}</p>" | |
else: | |
html_content += f"<p style=\"font-size:16px; font-weight:normal;\">{preview_content}</p>" | |
# For the full content in expander | |
parts = content.split("... that", 1) | |
formatted_full = f"{parts[0]}... that**{parts[1]}**" | |
st.markdown(f"#### {section.replace('_', ' ').title()}") | |
with st.expander("Show full content"): | |
st.markdown(formatted_full) | |
else: | |
html_content += f"<p style=\"font-size:16px; font-weight:normal;\">{preview_content}</p>" | |
st.markdown(f"#### {section.replace('_', ' ').title()}") | |
with st.expander("Show full content"): | |
st.write(content) | |
else: | |
# Format content - bold everything after "... that" | |
if "... that" in content: | |
parts = content.split("... that", 1) | |
formatted_content = f"{parts[0]}... that<strong>{parts[1]}</strong>" | |
html_content += f"<p style=\"font-size:16px;\">{formatted_content}</p>" | |
st.markdown(f"#### {section.replace('_', ' ').title()}") | |
st.markdown(f"{parts[0]}... that**{parts[1]}**") | |
else: | |
html_content += f"<p style=\"font-size:16px; font-weight:normal;\">{content}</p>" | |
st.markdown(f"#### {section.replace('_', ' ').title()}") | |
st.write(content) | |
elif isinstance(content, list): | |
html_list = "<ul>" | |
st.markdown(f"#### {section.replace('_', ' ').title()}") | |
# Limit display for very long lists | |
if len(content) > 20: | |
with st.expander(f"Show all {len(content)} items"): | |
for item in content: | |
if isinstance(item, str): | |
html_list += f"<li>{item}</li>" | |
st.write(f"- {item}") | |
elif isinstance(item, dict): | |
try: | |
st.json(item) | |
except Exception as e: | |
st.error(f"Error displaying JSON: {str(e)}") | |
st.code(str(item)) | |
else: | |
for item in content: | |
if isinstance(item, str): | |
html_list += f"<li>{item}</li>" | |
st.write(f"- {item}") | |
elif isinstance(item, dict): | |
try: | |
st.json(item) | |
except Exception as e: | |
st.error(f"Error displaying JSON: {str(e)}") | |
st.code(str(item)) | |
html_list += "</ul>" | |
html_content += html_list | |
elif isinstance(content, dict): | |
html_dict = "<dl>" | |
st.markdown(f"#### {section.replace('_', ' ').title()}") | |
for k, v in content.items(): | |
html_dict += f"<dt>{k}</dt><dd>{v}</dd>" | |
st.write(f"**{k}:** {v}") | |
html_dict += "</dl>" | |
html_content += html_dict | |
# Add download button in a smaller section | |
with st.expander("Export Content"): | |
# Get original filename without extension | |
original_name = Path(result.get('file_name', uploaded_file.name)).stem | |
# HTML download button | |
html_bytes = html_content.encode() | |
st.download_button( | |
label="Download as HTML", | |
data=html_bytes, | |
file_name=f"{original_name}_processed.html", | |
mime="text/html" | |
) | |
with view_tab2: | |
# Show the raw JSON for developers, with an expander for large results | |
if len(json.dumps(result)) > 5000: | |
with st.expander("View full JSON"): | |
try: | |
st.json(result) | |
except Exception as e: | |
st.error(f"Error displaying JSON: {str(e)}") | |
# Fallback to string representation | |
st.code(str(result)) | |
else: | |
try: | |
st.json(result) | |
except Exception as e: | |
st.error(f"Error displaying JSON: {str(e)}") | |
# Fallback to string representation | |
st.code(str(result)) | |
if has_images and 'pages_data' in result: | |
with view_tab3: | |
# Use pages_data directly instead of raw_response | |
try: | |
# Use the serialized pages data | |
pages_data = result.get('pages_data', []) | |
if not pages_data: | |
st.warning("No image data found in the document.") | |
st.stop() | |
# Construct markdown from pages_data directly | |
from ocr_utils import replace_images_in_markdown | |
combined_markdown = "" | |
for page in pages_data: | |
page_markdown = page.get('markdown', '') | |
images = page.get('images', []) | |
# Create image dictionary | |
image_dict = {} | |
for img in images: | |
if 'id' in img and 'image_base64' in img: | |
image_dict[img['id']] = img['image_base64'] | |
# Replace image references in markdown | |
if page_markdown and image_dict: | |
page_markdown = replace_images_in_markdown(page_markdown, image_dict) | |
combined_markdown += page_markdown + "\n\n---\n\n" | |
if not combined_markdown: | |
st.warning("No content with images found.") | |
st.stop() | |
# Add CSS for better image handling | |
st.markdown(""" | |
<style> | |
.image-container { | |
margin: 20px 0; | |
text-align: center; | |
} | |
.markdown-text-container { | |
padding: 10px; | |
background-color: #f9f9f9; | |
border-radius: 5px; | |
} | |
.markdown-text-container img { | |
margin: 15px auto; | |
max-width: 90%; | |
max-height: 500px; | |
object-fit: contain; | |
border: 1px solid #ddd; | |
border-radius: 4px; | |
display: block; | |
} | |
.markdown-text-container p { | |
margin-bottom: 16px; | |
line-height: 1.6; | |
font-family: Georgia, serif; | |
} | |
.page-break { | |
border-top: 1px solid #ddd; | |
margin: 20px 0; | |
padding-top: 20px; | |
} | |
.page-text-content { | |
margin-bottom: 20px; | |
} | |
.text-block { | |
background-color: #fff; | |
padding: 15px; | |
border-radius: 4px; | |
border-left: 3px solid #546e7a; | |
margin-bottom: 15px; | |
color: #333; | |
} | |
.text-block p { | |
margin: 8px 0; | |
color: #333; | |
} | |
</style> | |
""", unsafe_allow_html=True) | |
# Process and display content with images properly | |
import re | |
# Process each page separately | |
pages_content = [] | |
# Check if this is from a PDF processed through pdf2image | |
is_pdf2image = result.get('pdf_processing_method') == 'pdf2image' | |
for i, page in enumerate(pages_data): | |
page_markdown = page.get('markdown', '') | |
images = page.get('images', []) | |
if not page_markdown: | |
continue | |
# Create image dictionary | |
image_dict = {} | |
for img in images: | |
if 'id' in img and 'image_base64' in img: | |
image_dict[img['id']] = img['image_base64'] | |
# Create HTML content for this page | |
page_html = f"<h3>Page {i+1}</h3>" if i > 0 else "" | |
# Display the raw text content first to ensure it's visible | |
page_html += f"<div class='page-text-content'>" | |
# Special handling for PDF2image processed documents | |
if is_pdf2image and i == 0 and 'ocr_contents' in result: | |
# Display all structured content from OCR for PDFs | |
page_html += "<div class='text-block pdf-content'>" | |
# Check if custom prompt was applied | |
if result.get('custom_prompt_applied') == 'text_only': | |
page_html += "<div class='prompt-info'><i>Custom analysis applied using text-only processing</i></div>" | |
ocr_contents = result.get('ocr_contents', {}) | |
# Get a sorted list of sections to ensure consistent order | |
section_keys = sorted(ocr_contents.keys()) | |
# Place important sections first | |
priority_sections = ['title', 'subtitle', 'header', 'publication', 'date', 'content', 'main_text'] | |
for important in priority_sections: | |
if important in ocr_contents and important in section_keys: | |
section_keys.remove(important) | |
section_keys.insert(0, important) | |
for section in section_keys: | |
content = ocr_contents[section] | |
if section in ['raw_text', 'error', 'partial_text']: | |
continue # Skip these fields | |
section_title = section.replace('_', ' ').title() | |
page_html += f"<h4>{section_title}</h4>" | |
if isinstance(content, str): | |
# Convert newlines to <br> tags | |
content_html = content.replace('\n', '<br>') | |
page_html += f"<p>{content_html}</p>" | |
elif isinstance(content, list): | |
page_html += "<ul>" | |
for item in content: | |
if isinstance(item, str): | |
page_html += f"<li>{item}</li>" | |
elif isinstance(item, dict): | |
page_html += "<li>" | |
for k, v in item.items(): | |
page_html += f"<strong>{k}:</strong> {v}<br>" | |
page_html += "</li>" | |
else: | |
page_html += f"<li>{str(item)}</li>" | |
page_html += "</ul>" | |
elif isinstance(content, dict): | |
for k, v in content.items(): | |
if isinstance(v, str): | |
page_html += f"<p><strong>{k}:</strong> {v}</p>" | |
elif isinstance(v, list): | |
page_html += f"<p><strong>{k}:</strong></p><ul>" | |
for item in v: | |
page_html += f"<li>{item}</li>" | |
page_html += "</ul>" | |
else: | |
page_html += f"<p><strong>{k}:</strong> {str(v)}</p>" | |
page_html += "</div>" | |
else: | |
# Standard processing for regular documents | |
# Get all text content that isn't an image and add it first | |
text_content = [] | |
for line in page_markdown.split("\n"): | |
if not re.search(r'!\[(.*?)\]\((.*?)\)', line) and line.strip(): | |
text_content.append(line) | |
# Add the text content as a block | |
if text_content: | |
page_html += f"<div class='text-block'>" | |
for line in text_content: | |
page_html += f"<p>{line}</p>" | |
page_html += "</div>" | |
page_html += "</div>" | |
# Then add images separately | |
for line in page_markdown.split("\n"): | |
# Handle image lines | |
img_match = re.search(r'!\[(.*?)\]\((.*?)\)', line) | |
if img_match: | |
alt_text = img_match.group(1) | |
img_ref = img_match.group(2) | |
# Get the base64 data for this image ID | |
img_data = image_dict.get(img_ref, "") | |
if img_data: | |
img_html = f'<div class="image-container"><img src="{img_data}" alt="{alt_text}"></div>' | |
page_html += img_html | |
# Add page separator if not the last page | |
if i < len(pages_data) - 1: | |
page_html += '<div class="page-break"></div>' | |
pages_content.append(page_html) | |
# Combine all pages HTML | |
html_content = "\n".join(pages_content) | |
# Wrap the content in a div with the class for styling | |
st.markdown(f""" | |
<div class="markdown-text-container"> | |
{html_content} | |
</div> | |
""", unsafe_allow_html=True) | |
# Create download HTML content | |
download_html = f""" | |
<html> | |
<head> | |
<style> | |
body {{ | |
font-family: Georgia, serif; | |
line-height: 1.7; | |
margin: 0 auto; | |
max-width: 800px; | |
padding: 20px; | |
}} | |
img {{ | |
max-width: 90%; | |
max-height: 500px; | |
object-fit: contain; | |
margin: 20px auto; | |
display: block; | |
border: 1px solid #ddd; | |
border-radius: 4px; | |
}} | |
.image-container {{ | |
margin: 20px 0; | |
text-align: center; | |
}} | |
.page-break {{ | |
border-top: 1px solid #ddd; | |
margin: 40px 0; | |
padding-top: 40px; | |
}} | |
h3 {{ | |
color: #333; | |
border-bottom: 1px solid #eee; | |
padding-bottom: 10px; | |
}} | |
p {{ | |
margin: 12px 0; | |
}} | |
.page-text-content {{ | |
margin-bottom: 20px; | |
}} | |
.text-block {{ | |
background-color: #f9f9f9; | |
padding: 15px; | |
border-radius: 4px; | |
border-left: 3px solid #546e7a; | |
margin-bottom: 15px; | |
color: #333; | |
}} | |
.text-block p {{ | |
margin: 8px 0; | |
color: #333; | |
}} | |
</style> | |
</head> | |
<body> | |
<div class="markdown-text-container"> | |
{html_content} | |
</div> | |
</body> | |
</html> | |
""" | |
# Create a more descriptive filename | |
original_name = Path(result.get('file_name', uploaded_file.name)).stem | |
# Add document type if available | |
if 'topics' in result and result['topics']: | |
topic = result['topics'][0].lower().replace(' ', '_') | |
original_name = f"{original_name}_{topic}" | |
# Add language if available | |
if 'languages' in result and result['languages']: | |
lang = result['languages'][0].lower() | |
# Only add if it's not already in the filename | |
if lang not in original_name.lower(): | |
original_name = f"{original_name}_{lang}" | |
# Get current date for uniqueness | |
from datetime import datetime | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
# Create final filename | |
download_filename = f"{original_name}_{timestamp}_with_images.html" | |
# Add download button as an expander to prevent page reset | |
with st.expander("Download Document with Images"): | |
st.markdown("Click the button below to download the document with embedded images") | |
st.download_button( | |
label="Download as HTML", | |
data=download_html, | |
file_name=download_filename, | |
mime="text/html", | |
key="download_with_images_button" | |
) | |
except Exception as e: | |
st.error(f"Could not display document with images: {str(e)}") | |
st.info("Try refreshing or processing the document again.") | |
if 'ocr_contents' not in result: | |
st.error("No OCR content was extracted from the document.") | |
else: | |
# Check for minimal text content in OCR results | |
has_minimal_text = False | |
total_text_length = 0 | |
# Check if the document is an image (not a PDF) | |
is_image = result.get('file_name', '').lower().endswith(('.jpg', '.jpeg', '.png', '.gif')) | |
# If image file with raw_text only | |
if is_image and 'ocr_contents' in result: | |
ocr_contents = result['ocr_contents'] | |
# Check if only raw_text exists with minimal content | |
has_raw_text_only = False | |
if 'raw_text' in ocr_contents: | |
raw_text = ocr_contents['raw_text'] | |
total_text_length += len(raw_text.strip()) | |
# Check if raw_text is the only significant field | |
other_content_fields = [k for k in ocr_contents.keys() | |
if k not in ['raw_text', 'error', 'partial_text'] | |
and isinstance(ocr_contents[k], (str, list)) | |
and ocr_contents[k]] | |
if len(other_content_fields) <= 1: # Only raw_text or one other field | |
has_raw_text_only = True | |
# Check if minimal text was extracted (less than 50 characters) | |
if total_text_length < 50 and has_raw_text_only: | |
has_minimal_text = True | |
# Check if any meaningful preprocessing options were used | |
preprocessing_used = False | |
if preprocessing_options.get("document_type", "standard") != "standard": | |
preprocessing_used = True | |
if preprocessing_options.get("grayscale", False): | |
preprocessing_used = True | |
if preprocessing_options.get("denoise", False): | |
preprocessing_used = True | |
if preprocessing_options.get("contrast", 0) != 0: | |
preprocessing_used = True | |
if preprocessing_options.get("rotation", 0) != 0: | |
preprocessing_used = True | |
# If minimal text was found and preprocessing options weren't used | |
if has_minimal_text and not preprocessing_used and uploaded_file.type.startswith('image/'): | |
st.warning(""" | |
**Limited text extracted from this image.** | |
Try using preprocessing options in the sidebar to improve results: | |
- Convert to grayscale for clearer text | |
- Use denoising for aged or degraded documents | |
- Adjust contrast for faded text | |
- Try different rotation if text orientation is unclear | |
Click the "Preprocessing Options" section in the sidebar under "Image Processing". | |
""") | |
# Close document content div | |
st.markdown('</div>', unsafe_allow_html=True) | |
# Set processed_document_active to True when a new document is processed | |
st.session_state.processed_document_active = True | |
# Add CSS for styling close buttons | |
st.markdown(""" | |
<style> | |
.close-button { | |
background-color: #e7e7e7; | |
color: #666; | |
border: none; | |
border-radius: 4px; | |
padding: 0.25rem 0.5rem; | |
font-size: 0.8rem; | |
cursor: pointer; | |
transition: all 0.2s ease; | |
} | |
.close-button:hover { | |
background-color: #d7d7d7; | |
color: #333; | |
} | |
</style> | |
""", unsafe_allow_html=True) | |
# Display success message with close button for dismissing processed documents | |
success_cols = st.columns([5, 1]) | |
with success_cols[0]: | |
metadata_placeholder.success("**Document processed successfully**") | |
with success_cols[1]: | |
# Use a more visually distinctive close button | |
st.markdown(""" | |
<style> | |
div[data-testid="stButton"] button { | |
background-color: #f8f9fa; | |
border: 1px solid #dee2e6; | |
padding: 0.25rem 0.5rem; | |
font-size: 0.875rem; | |
border-radius: 0.2rem; | |
} | |
</style> | |
""", unsafe_allow_html=True) | |
if st.button("β Close Document", key="close_document_button", help="Clear current document and start over"): | |
# Clear the session state | |
st.session_state.processed_document_active = False | |
# Reset any active document data | |
if 'current_result' in st.session_state: | |
del st.session_state.current_result | |
# Rerun to reset the page | |
st.rerun() | |
# Store the result in the previous results list | |
# Add timestamp to result for history tracking | |
result_copy = result.copy() | |
result_copy['timestamp'] = datetime.now().strftime("%Y-%m-%d %H:%M") | |
# Store if this was a sample document | |
if 'sample_document_processed' in st.session_state and st.session_state.sample_document_processed: | |
result_copy['sample_document'] = True | |
# Reset the flag | |
st.session_state.sample_document_processed = False | |
# Generate more descriptive file name for the result | |
original_name = Path(result.get('file_name', uploaded_file.name)).stem | |
# Extract subject tags from content | |
subject_tags = [] | |
# First check if we already have topics in the result | |
if 'topics' in result and result['topics'] and len(result['topics']) >= 3: | |
subject_tags = result['topics'] | |
else: | |
# Generate tags based on document content | |
try: | |
# Extract text from OCR contents | |
raw_text = "" | |
if 'ocr_contents' in result: | |
if 'raw_text' in result['ocr_contents']: | |
raw_text = result['ocr_contents']['raw_text'] | |
elif 'content' in result['ocr_contents']: | |
raw_text = result['ocr_contents']['content'] | |
# Use existing topics as starting point if available | |
if 'topics' in result and result['topics']: | |
subject_tags = list(result['topics']) | |
# Add document type if detected | |
if 'detected_document_type' in result: | |
doc_type = result['detected_document_type'].capitalize() | |
if doc_type not in subject_tags: | |
subject_tags.append(doc_type) | |
# Analyze content for common themes based on keywords | |
content_themes = { | |
"Historical": ["century", "ancient", "historical", "history", "vintage", "archive", "heritage"], | |
"Travel": ["travel", "journey", "expedition", "exploration", "voyage", "map", "location"], | |
"Science": ["experiment", "research", "study", "analysis", "scientific", "laboratory"], | |
"Literature": ["book", "novel", "poetry", "author", "literary", "chapter", "story"], | |
"Art": ["painting", "illustration", "drawing", "artist", "exhibit", "gallery", "portrait"], | |
"Education": ["education", "school", "university", "college", "learning", "student", "teach"], | |
"Politics": ["government", "political", "policy", "administration", "election", "legislature"], | |
"Business": ["business", "company", "corporation", "market", "industry", "commercial", "trade"], | |
"Social": ["society", "community", "social", "culture", "tradition", "customs"], | |
"Technology": ["technology", "invention", "device", "mechanical", "machine", "technical"], | |
"Military": ["military", "army", "navy", "war", "battle", "soldier", "weapon"], | |
"Religion": ["religion", "church", "temple", "spiritual", "sacred", "ritual"], | |
"Medicine": ["medical", "medicine", "health", "hospital", "treatment", "disease", "doctor"], | |
"Legal": ["legal", "law", "court", "justice", "attorney", "judicial", "statute"], | |
"Correspondence": ["letter", "mail", "correspondence", "message", "communication"] | |
} | |
# Search for keywords in content | |
if raw_text: | |
raw_text_lower = raw_text.lower() | |
for theme, keywords in content_themes.items(): | |
if any(keyword in raw_text_lower for keyword in keywords): | |
if theme not in subject_tags: | |
subject_tags.append(theme) | |
# Add document period tag if date patterns are detected | |
if raw_text: | |
# Look for years in content | |
import re | |
year_matches = re.findall(r'\b1[0-9]{3}\b|\b20[0-1][0-9]\b', raw_text) | |
if year_matches: | |
# Convert to integers | |
years = [int(y) for y in year_matches] | |
# Get earliest and latest years | |
earliest = min(years) | |
# Add period tag based on earliest year | |
if earliest < 1800: | |
period_tag = "Pre-1800s" | |
elif earliest < 1850: | |
period_tag = "Early 19th Century" | |
elif earliest < 1900: | |
period_tag = "Late 19th Century" | |
elif earliest < 1950: | |
period_tag = "Early 20th Century" | |
else: | |
period_tag = "Modern Era" | |
if period_tag not in subject_tags: | |
subject_tags.append(period_tag) | |
# Add languages as topics if available | |
if 'languages' in result and result['languages']: | |
for lang in result['languages']: | |
if lang and lang not in subject_tags: | |
lang_tag = f"{lang} Language" | |
subject_tags.append(lang_tag) | |
# Add preprocessing information as tags if preprocessing was applied | |
if uploaded_file.type.startswith('image/'): | |
# Check if meaningful preprocessing options were used | |
if preprocessing_options.get("document_type", "standard") != "standard": | |
doc_type = preprocessing_options["document_type"].capitalize() | |
preprocessing_tag = f"Enhanced ({doc_type})" | |
if preprocessing_tag not in subject_tags: | |
subject_tags.append(preprocessing_tag) | |
preprocessing_methods = [] | |
if preprocessing_options.get("grayscale", False): | |
preprocessing_methods.append("Grayscale") | |
if preprocessing_options.get("denoise", False): | |
preprocessing_methods.append("Denoised") | |
if preprocessing_options.get("contrast", 0) != 0: | |
contrast_val = preprocessing_options.get("contrast", 0) | |
if contrast_val > 0: | |
preprocessing_methods.append("Contrast Enhanced") | |
else: | |
preprocessing_methods.append("Contrast Reduced") | |
if preprocessing_options.get("rotation", 0) != 0: | |
preprocessing_methods.append("Rotated") | |
# Add a combined preprocessing tag if methods were applied | |
if preprocessing_methods: | |
prep_tag = "Preprocessed" | |
if prep_tag not in subject_tags: | |
subject_tags.append(prep_tag) | |
# Add the specific method as a tag if only one was used | |
if len(preprocessing_methods) == 1: | |
method_tag = preprocessing_methods[0] | |
if method_tag not in subject_tags: | |
subject_tags.append(method_tag) | |
except Exception as e: | |
logger.warning(f"Error generating subject tags: {str(e)}") | |
# Fallback tags if extraction fails | |
if not subject_tags: | |
subject_tags = ["Document", "Historical", "Text"] | |
# Ensure we have at least 3 tags | |
while len(subject_tags) < 3: | |
if "Document" not in subject_tags: | |
subject_tags.append("Document") | |
elif "Historical" not in subject_tags: | |
subject_tags.append("Historical") | |
elif "Text" not in subject_tags: | |
subject_tags.append("Text") | |
else: | |
# If we still need tags, add generic ones | |
generic_tags = ["Archive", "Content", "Record"] | |
for tag in generic_tags: | |
if tag not in subject_tags: | |
subject_tags.append(tag) | |
break | |
# Update the result with enhanced tags | |
result_copy['topics'] = subject_tags | |
# Create a more descriptive file name | |
file_type = Path(result.get('file_name', uploaded_file.name)).suffix.lower() | |
doc_type_tag = "" | |
# Add document type to filename if detected | |
if 'detected_document_type' in result: | |
doc_type = result['detected_document_type'].lower() | |
doc_type_tag = f"_{doc_type}" | |
elif len(subject_tags) > 0: | |
# Use first tag as document type if not explicitly detected | |
doc_type_tag = f"_{subject_tags[0].lower().replace(' ', '_')}" | |
# Add period tag for historical context if available | |
period_tag = "" | |
for tag in subject_tags: | |
if "century" in tag.lower() or "pre-" in tag.lower() or "era" in tag.lower(): | |
period_tag = f"_{tag.lower().replace(' ', '_')}" | |
break | |
# Generate final descriptive file name | |
descriptive_name = f"{original_name}{doc_type_tag}{period_tag}{file_type}" | |
result_copy['descriptive_file_name'] = descriptive_name | |
# Add to session state, keeping the most recent 20 results | |
st.session_state.previous_results.insert(0, result_copy) | |
if len(st.session_state.previous_results) > 20: | |
st.session_state.previous_results = st.session_state.previous_results[:20] | |
except Exception as e: | |
st.error(f"Error processing document: {str(e)}") | |
else: | |
# Example Documents section after file uploader | |
st.subheader("Example Documents") | |
# Add a simplified info message about examples | |
st.markdown(""" | |
This app can process various historical documents: | |
- Historical photographs, maps, and manuscripts | |
- Handwritten letters and documents | |
- Printed books and articles | |
- Multi-page PDFs | |
""") | |
# Add CSS to make the dropdown match the column width | |
st.markdown(""" | |
<style> | |
/* Make the selectbox container match the full column width */ | |
.main .block-container .element-container:has([data-testid="stSelectbox"]) { | |
width: 100% !important; | |
max-width: 100% !important; | |
} | |
/* Make the actual selectbox control take the full width */ | |
.stSelectbox > div > div { | |
width: 100% !important; | |
max-width: 100% !important; | |
} | |
</style> | |
""", unsafe_allow_html=True) | |
# Sample document URLs dropdown with clearer label | |
sample_urls = [ | |
"Select a sample document", | |
"https://huggingface.co/spaces/milwright/historical-ocr/resolve/main/input/a-la-carte.pdf", | |
"https://huggingface.co/spaces/milwright/historical-ocr/resolve/main/input/magician-or-bottle-cungerer.jpg", | |
"https://huggingface.co/spaces/milwright/historical-ocr/resolve/main/input/handwritten-letter.jpg", | |
"https://huggingface.co/spaces/milwright/historical-ocr/resolve/main/input/magellan-travels.jpg", | |
"https://huggingface.co/spaces/milwright/historical-ocr/resolve/main/input/milgram-flier.png", | |
"https://huggingface.co/spaces/milwright/historical-ocr/resolve/main/input/baldwin-15st-north.jpg" | |
] | |
sample_names = [ | |
"Select a sample document", | |
"Restaurant Menu (PDF)", | |
"The Magician (Image)", | |
"Handwritten Letter (Image)", | |
"Magellan Travels (Image)", | |
"Milgram Flier (Image)", | |
"Baldwin Street (Image)" | |
] | |
# Initialize sample_document in session state if it doesn't exist | |
if 'sample_document' not in st.session_state: | |
st.session_state.sample_document = None | |
selected_sample = st.selectbox("Select a sample document from `~/input`", options=range(len(sample_urls)), format_func=lambda i: sample_names[i]) | |
if selected_sample > 0: | |
selected_url = sample_urls[selected_sample] | |
# Add process button for the sample document | |
if st.button("Load Sample Document"): | |
try: | |
import requests | |
from io import BytesIO | |
with st.spinner(f"Downloading {sample_names[selected_sample]}..."): | |
response = requests.get(selected_url) | |
response.raise_for_status() | |
# Extract filename from URL | |
file_name = selected_url.split("/")[-1] | |
# Create a BytesIO object from the downloaded content | |
file_content = BytesIO(response.content) | |
# Store as a UploadedFile-like object in session state | |
class SampleDocument: | |
def __init__(self, name, content, content_type): | |
self.name = name | |
self._content = content | |
self.type = content_type | |
self.size = len(content) | |
def getvalue(self): | |
return self._content | |
def read(self): | |
return self._content | |
def seek(self, position): | |
# Implement seek for compatibility with some file operations | |
return | |
def tell(self): | |
# Implement tell for compatibility | |
return 0 | |
# Determine content type based on file extension | |
if file_name.lower().endswith('.pdf'): | |
content_type = 'application/pdf' | |
elif file_name.lower().endswith(('.jpg', '.jpeg')): | |
content_type = 'image/jpeg' | |
elif file_name.lower().endswith('.png'): | |
content_type = 'image/png' | |
else: | |
content_type = 'application/octet-stream' | |
# Save download info in session state for more reliable handling | |
st.session_state.sample_document = SampleDocument( | |
name=file_name, | |
content=response.content, | |
content_type=content_type | |
) | |
# Set a flag to indicate this is a newly loaded sample | |
st.session_state.sample_just_loaded = True | |
# Force rerun to load the document | |
st.rerun() | |
except Exception as e: | |
st.error(f"Error downloading sample document: {str(e)}") | |
st.info("Please try uploading your own document instead.") | |