Spaces:
Running
Running
import os | |
import streamlit as st | |
import json | |
import sys | |
import time | |
import base64 | |
# Updated import section | |
from pathlib import Path | |
import tempfile | |
import io | |
from pdf2image import convert_from_bytes | |
from PIL import Image, ImageEnhance, ImageFilter | |
import cv2 | |
import numpy as np | |
from datetime import datetime | |
# Import the StructuredOCR class and config from the local files | |
from structured_ocr import StructuredOCR | |
from config import MISTRAL_API_KEY | |
# Import utilities for handling previous results | |
from ocr_utils import create_results_zip | |
def get_base64_from_image(image_path): | |
"""Get base64 string from image file""" | |
with open(image_path, "rb") as img_file: | |
return base64.b64encode(img_file.read()).decode('utf-8') | |
# Set favicon path | |
favicon_path = os.path.join(os.path.dirname(__file__), "static/favicon.png") | |
# Set page configuration | |
st.set_page_config( | |
page_title="Historical OCR", | |
page_icon=favicon_path if os.path.exists(favicon_path) else "π", | |
layout="wide", | |
initial_sidebar_state="expanded" | |
) | |
# Enable caching for expensive operations with longer TTL for better performance | |
# Cache for 24 hours instead of 1 hour | |
def convert_pdf_to_images(pdf_bytes, dpi=150, rotation=0): | |
"""Convert PDF bytes to a list of images with caching""" | |
try: | |
images = convert_from_bytes(pdf_bytes, dpi=dpi) | |
# Apply rotation if specified | |
if rotation != 0 and images: | |
rotated_images = [] | |
for img in images: | |
rotated_img = img.rotate(rotation, expand=True, resample=Image.BICUBIC) | |
rotated_images.append(rotated_img) | |
return rotated_images | |
return images | |
except Exception as e: | |
st.error(f"Error converting PDF: {str(e)}") | |
return [] | |
# Cache preprocessed images for better performance | |
# Cache for 24 hours | |
def preprocess_image(image_bytes, preprocessing_options): | |
"""Preprocess image with selected options optimized for historical document OCR quality""" | |
# Setup basic console logging | |
import logging | |
logger = logging.getLogger("image_preprocessor") | |
logger.setLevel(logging.INFO) | |
# Log which preprocessing options are being applied | |
logger.info(f"Preprocessing image with options: {preprocessing_options}") | |
# Convert bytes to PIL Image | |
image = Image.open(io.BytesIO(image_bytes)) | |
# Check for alpha channel (RGBA) and convert to RGB if needed | |
if image.mode == 'RGBA': | |
# Convert RGBA to RGB by compositing the image onto a white background | |
background = Image.new('RGB', image.size, (255, 255, 255)) | |
background.paste(image, mask=image.split()[3]) # 3 is the alpha channel | |
image = background | |
logger.info("Converted RGBA image to RGB") | |
elif image.mode not in ('RGB', 'L'): | |
# Convert other modes to RGB as well | |
image = image.convert('RGB') | |
logger.info(f"Converted {image.mode} image to RGB") | |
# Apply rotation if specified | |
if preprocessing_options.get("rotation", 0) != 0: | |
rotation_degrees = preprocessing_options.get("rotation") | |
image = image.rotate(rotation_degrees, expand=True, resample=Image.BICUBIC) | |
# Resize large images while preserving details important for OCR | |
width, height = image.size | |
max_dimension = max(width, height) | |
# Less aggressive resizing to preserve document details | |
if max_dimension > 2500: | |
scale_factor = 2500 / max_dimension | |
new_width = int(width * scale_factor) | |
new_height = int(height * scale_factor) | |
# Use LANCZOS for better quality preservation | |
image = image.resize((new_width, new_height), Image.LANCZOS) | |
img_array = np.array(image) | |
# Apply preprocessing based on selected options with settings optimized for historical documents | |
document_type = preprocessing_options.get("document_type", "standard") | |
# Process grayscale option first as it's a common foundation | |
if preprocessing_options.get("grayscale", False): | |
if len(img_array.shape) == 3: # Only convert if it's not already grayscale | |
if document_type == "handwritten": | |
# Enhanced grayscale processing for handwritten documents | |
img_array = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY) | |
# Apply adaptive histogram equalization to enhance handwriting | |
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8)) | |
img_array = clahe.apply(img_array) | |
else: | |
# Standard grayscale for printed documents | |
img_array = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY) | |
# Convert back to RGB for further processing | |
img_array = cv2.cvtColor(img_array, cv2.COLOR_GRAY2RGB) | |
if preprocessing_options.get("contrast", 0) != 0: | |
contrast_factor = 1 + (preprocessing_options.get("contrast", 0) / 10) | |
image = Image.fromarray(img_array) | |
enhancer = ImageEnhance.Contrast(image) | |
image = enhancer.enhance(contrast_factor) | |
img_array = np.array(image) | |
if preprocessing_options.get("denoise", False): | |
try: | |
# Apply appropriate denoising based on document type | |
if document_type == "handwritten": | |
# Very light denoising for handwritten documents to preserve pen strokes | |
if len(img_array.shape) == 3 and img_array.shape[2] == 3: # Color image | |
img_array = cv2.fastNlMeansDenoisingColored(img_array, None, 3, 3, 5, 9) | |
else: # Grayscale image | |
img_array = cv2.fastNlMeansDenoising(img_array, None, 3, 7, 21) | |
else: | |
# Standard denoising for printed documents | |
if len(img_array.shape) == 3 and img_array.shape[2] == 3: # Color image | |
img_array = cv2.fastNlMeansDenoisingColored(img_array, None, 5, 5, 7, 21) | |
else: # Grayscale image | |
img_array = cv2.fastNlMeansDenoising(img_array, None, 5, 7, 21) | |
except Exception as e: | |
print(f"Denoising error: {str(e)}, falling back to standard processing") | |
# Convert back to PIL Image | |
processed_image = Image.fromarray(img_array) | |
# Higher quality for OCR processing | |
byte_io = io.BytesIO() | |
try: | |
# Make sure the image is in RGB mode before saving as JPEG | |
if processed_image.mode not in ('RGB', 'L'): | |
processed_image = processed_image.convert('RGB') | |
processed_image.save(byte_io, format='JPEG', quality=92, optimize=True) | |
byte_io.seek(0) | |
logger.info(f"Preprocessing complete. Original image mode: {image.mode}, processed mode: {processed_image.mode}") | |
logger.info(f"Original size: {len(image_bytes)/1024:.1f}KB, processed size: {len(byte_io.getvalue())/1024:.1f}KB") | |
return byte_io.getvalue() | |
except Exception as e: | |
logger.error(f"Error saving processed image: {str(e)}") | |
# Fallback to original image | |
logger.info("Using original image as fallback") | |
image_io = io.BytesIO() | |
image.save(image_io, format='JPEG', quality=92) | |
image_io.seek(0) | |
return image_io.getvalue() | |
# Cache OCR results in memory to speed up repeated processing | |
def process_file_cached(file_path, file_type, use_vision, file_size_mb, cache_key): | |
"""Cached version of OCR processing to reuse results""" | |
# Initialize OCR processor | |
processor = StructuredOCR() | |
# Process the file | |
result = processor.process_file( | |
file_path, | |
file_type=file_type, | |
use_vision=use_vision, | |
file_size_mb=file_size_mb | |
) | |
return result | |
# Define functions | |
def process_file(uploaded_file, use_vision=True, preprocessing_options=None, progress_container=None): | |
"""Process the uploaded file and return the OCR results | |
Args: | |
uploaded_file: The uploaded file to process | |
use_vision: Whether to use vision model | |
preprocessing_options: Dictionary of preprocessing options | |
progress_container: Optional container for progress indicators | |
""" | |
if preprocessing_options is None: | |
preprocessing_options = {} | |
# Create a container for progress indicators if not provided | |
if progress_container is None: | |
progress_container = st.empty() | |
with progress_container.container(): | |
progress_bar = st.progress(0) | |
status_text = st.empty() | |
status_text.markdown('<div class="processing-status-container">Preparing file for processing...</div>', unsafe_allow_html=True) | |
try: | |
# Check if API key is available | |
if not MISTRAL_API_KEY: | |
# Return dummy data if no API key | |
progress_bar.progress(100) | |
status_text.empty() | |
return { | |
"file_name": uploaded_file.name, | |
"topics": ["Document"], | |
"languages": ["English"], | |
"ocr_contents": { | |
"title": "API Key Required", | |
"content": "Please set the MISTRAL_API_KEY environment variable to process documents." | |
} | |
} | |
# Update progress - more granular steps | |
progress_bar.progress(10) | |
status_text.markdown('<div class="processing-status-container">Initializing OCR processor...</div>', unsafe_allow_html=True) | |
# Determine file type from extension | |
file_ext = Path(uploaded_file.name).suffix.lower() | |
file_type = "pdf" if file_ext == ".pdf" else "image" | |
file_bytes = uploaded_file.getvalue() | |
# Create a temporary file for processing | |
with tempfile.NamedTemporaryFile(delete=False, suffix=file_ext) as tmp: | |
tmp.write(file_bytes) | |
temp_path = tmp.name | |
# Get PDF rotation value if available and file is a PDF | |
pdf_rotation_value = pdf_rotation if 'pdf_rotation' in locals() and file_type == "pdf" else 0 | |
progress_bar.progress(15) | |
# For PDFs, we need to handle differently | |
if file_type == "pdf": | |
status_text.markdown('<div class="processing-status-container">Converting PDF to images...</div>', unsafe_allow_html=True) | |
progress_bar.progress(20) | |
# Convert PDF to images | |
try: | |
# Use the PDF processing pipeline directly from the StructuredOCR class | |
processor = StructuredOCR() | |
# Process the file with direct PDF handling | |
progress_bar.progress(30) | |
status_text.markdown('<div class="processing-status-container">Processing PDF with OCR...</div>', unsafe_allow_html=True) | |
# Get file size in MB for API limits | |
file_size_mb = os.path.getsize(temp_path) / (1024 * 1024) | |
# Check if file exceeds API limits (50 MB) | |
if file_size_mb > 50: | |
os.unlink(temp_path) # Clean up temp file | |
progress_bar.progress(100) | |
status_text.empty() | |
progress_container.empty() | |
return { | |
"file_name": uploaded_file.name, | |
"topics": ["Document"], | |
"languages": ["English"], | |
"error": f"File size {file_size_mb:.2f} MB exceeds Mistral API limit of 50 MB", | |
"ocr_contents": { | |
"error": f"Failed to process file: File size {file_size_mb:.2f} MB exceeds Mistral API limit of 50 MB", | |
"partial_text": "Document could not be processed due to size limitations." | |
} | |
} | |
# Generate cache key | |
import hashlib | |
file_hash = hashlib.md5(file_bytes).hexdigest() | |
cache_key = f"{file_hash}_{file_type}_{use_vision}_{pdf_rotation_value}" | |
# Process with cached function if possible | |
try: | |
result = process_file_cached(temp_path, file_type, use_vision, file_size_mb, cache_key) | |
progress_bar.progress(90) | |
status_text.markdown('<div class="processing-status-container">Finalizing results...</div>', unsafe_allow_html=True) | |
except Exception as e: | |
status_text.markdown(f'<div class="processing-status-container">Processing error: {str(e)}. Retrying...</div>', unsafe_allow_html=True) | |
progress_bar.progress(60) | |
# If caching fails, process directly | |
result = processor.process_file( | |
temp_path, | |
file_type=file_type, | |
use_vision=use_vision, | |
file_size_mb=file_size_mb, | |
) | |
progress_bar.progress(90) | |
status_text.markdown('<div class="processing-status-container">Finalizing results...</div>', unsafe_allow_html=True) | |
except Exception as e: | |
os.unlink(temp_path) # Clean up temp file | |
progress_bar.progress(100) | |
status_text.empty() | |
progress_container.empty() | |
raise ValueError(f"Error processing PDF: {str(e)}") | |
else: | |
# For image files, apply preprocessing if needed | |
# Check if any preprocessing options with boolean values are True, or if any non-boolean values are non-default | |
has_preprocessing = ( | |
preprocessing_options.get("grayscale", False) or | |
preprocessing_options.get("denoise", False) or | |
preprocessing_options.get("contrast", 0) != 0 or | |
preprocessing_options.get("rotation", 0) != 0 or | |
preprocessing_options.get("document_type", "standard") != "standard" | |
) | |
if has_preprocessing: | |
status_text.markdown('<div class="processing-status-container">Applying image preprocessing...</div>', unsafe_allow_html=True) | |
progress_bar.progress(20) | |
processed_bytes = preprocess_image(file_bytes, preprocessing_options) | |
progress_bar.progress(25) | |
# Save processed image to temp file | |
with tempfile.NamedTemporaryFile(delete=False, suffix=file_ext) as proc_tmp: | |
proc_tmp.write(processed_bytes) | |
# Clean up original temp file and use the processed one | |
if os.path.exists(temp_path): | |
os.unlink(temp_path) | |
temp_path = proc_tmp.name | |
progress_bar.progress(30) | |
else: | |
progress_bar.progress(30) | |
# Get file size in MB for API limits | |
file_size_mb = os.path.getsize(temp_path) / (1024 * 1024) | |
# Check if file exceeds API limits (50 MB) | |
if file_size_mb > 50: | |
os.unlink(temp_path) # Clean up temp file | |
progress_bar.progress(100) | |
status_text.empty() | |
progress_container.empty() | |
return { | |
"file_name": uploaded_file.name, | |
"topics": ["Document"], | |
"languages": ["English"], | |
"error": f"File size {file_size_mb:.2f} MB exceeds Mistral API limit of 50 MB", | |
"ocr_contents": { | |
"error": f"Failed to process file: File size {file_size_mb:.2f} MB exceeds Mistral API limit of 50 MB", | |
"partial_text": "Document could not be processed due to size limitations." | |
} | |
} | |
# Update progress - more granular steps | |
progress_bar.progress(40) | |
status_text.markdown('<div class="processing-status-container">Preparing document for OCR analysis...</div>', unsafe_allow_html=True) | |
# Generate a cache key based on file content, type and settings | |
import hashlib | |
# Add pdf_rotation to cache key if present | |
pdf_rotation_value = pdf_rotation if 'pdf_rotation' in locals() else 0 | |
file_hash = hashlib.md5(open(temp_path, 'rb').read()).hexdigest() | |
cache_key = f"{file_hash}_{file_type}_{use_vision}_{pdf_rotation_value}" | |
progress_bar.progress(50) | |
status_text.markdown('<div class="processing-status-container">Processing document with OCR...</div>', unsafe_allow_html=True) | |
# Process the file using cached function if possible | |
try: | |
result = process_file_cached(temp_path, file_type, use_vision, file_size_mb, cache_key) | |
progress_bar.progress(80) | |
status_text.markdown('<div class="processing-status-container">Analyzing document structure...</div>', unsafe_allow_html=True) | |
progress_bar.progress(90) | |
status_text.markdown('<div class="processing-status-container">Finalizing results...</div>', unsafe_allow_html=True) | |
except Exception as e: | |
progress_bar.progress(60) | |
status_text.markdown(f'<div class="processing-status-container">Processing error: {str(e)}. Retrying...</div>', unsafe_allow_html=True) | |
# If caching fails, process directly | |
processor = StructuredOCR() | |
result = processor.process_file(temp_path, file_type=file_type, use_vision=use_vision, file_size_mb=file_size_mb) | |
progress_bar.progress(90) | |
status_text.markdown('<div class="processing-status-container">Finalizing results...</div>', unsafe_allow_html=True) | |
# Complete progress | |
progress_bar.progress(100) | |
status_text.markdown('<div class="processing-status-container">Processing complete!</div>', unsafe_allow_html=True) | |
time.sleep(0.8) # Brief pause to show completion | |
status_text.empty() | |
progress_container.empty() # Remove progress indicators when done | |
# Clean up the temporary file | |
if os.path.exists(temp_path): | |
try: | |
os.unlink(temp_path) | |
except: | |
pass # Ignore errors when cleaning up temporary files | |
return result | |
except Exception as e: | |
progress_bar.progress(100) | |
error_message = str(e) | |
# Check for specific error types and provide helpful user-facing messages | |
if "rate limit" in error_message.lower() or "429" in error_message or "requests rate limit exceeded" in error_message.lower(): | |
friendly_message = "The AI service is currently experiencing high demand. Please try again in a few minutes." | |
logger = logging.getLogger("app") | |
logger.error(f"Rate limit error: {error_message}") | |
status_text.markdown(f'<div class="processing-status-container" style="border-left-color: #ff9800;">Rate Limit: {friendly_message}</div>', unsafe_allow_html=True) | |
elif "quota" in error_message.lower() or "credit" in error_message.lower() or "subscription" in error_message.lower(): | |
friendly_message = "The API usage quota has been reached. Please check your API key and subscription limits." | |
status_text.markdown(f'<div class="processing-status-container" style="border-left-color: #ef5350;">API Quota: {friendly_message}</div>', unsafe_allow_html=True) | |
else: | |
status_text.markdown(f'<div class="processing-status-container" style="border-left-color: #ef5350;">Error: {error_message}</div>', unsafe_allow_html=True) | |
time.sleep(1.5) # Show error briefly | |
status_text.empty() | |
progress_container.empty() | |
# Display an appropriate error message based on the exception type | |
if "rate limit" in error_message.lower() or "429" in error_message or "requests rate limit exceeded" in error_message.lower(): | |
st.warning(f"API Rate Limit: {friendly_message} This is a temporary issue and does not indicate any problem with your document.") | |
elif "quota" in error_message.lower() or "credit" in error_message.lower() or "subscription" in error_message.lower(): | |
st.error(f"API Quota Exceeded: {friendly_message}") | |
else: | |
st.error(f"Error during processing: {error_message}") | |
# Clean up the temporary file | |
try: | |
if 'temp_path' in locals() and os.path.exists(temp_path): | |
os.unlink(temp_path) | |
except: | |
pass # Ignore errors when cleaning up temporary files | |
raise | |
# App title and description | |
favicon_base64 = get_base64_from_image(os.path.join(os.path.dirname(__file__), "static/favicon.png")) | |
st.markdown(f'<div style="display: flex; align-items: center; gap: 10px;"><img src="data:image/png;base64,{favicon_base64}" width="36" height="36" alt="Scroll Icon"/> <div><h1 style="margin: 0; padding: 20px 0 0 0;">Historical Document OCR</h1></div></div>', unsafe_allow_html=True) | |
st.subheader("Made possible by Mistral AI") | |
# Check if pytesseract is available for fallback | |
try: | |
import pytesseract | |
has_pytesseract = True | |
except ImportError: | |
has_pytesseract = False | |
# Initialize session state for storing previous results if not already present | |
if 'previous_results' not in st.session_state: | |
st.session_state.previous_results = [] | |
# Create main layout with tabs and columns | |
main_tab1, main_tab2, main_tab3 = st.tabs(["Document Processing", "Previous Results", "About"]) | |
with main_tab1: | |
# Create a two-column layout for file upload and results | |
left_col, right_col = st.columns([1, 1]) | |
# File uploader in the left column | |
with left_col: | |
# Simple CSS just to fix vertical text in drag and drop area | |
st.markdown(""" | |
<style> | |
/* Reset all file uploader styling */ | |
.uploadedFile, .uploadedFileData, .stFileUploader { | |
color: inherit !important; | |
} | |
/* Fix vertical text orientation */ | |
.stFileUploader p, | |
.stFileUploader span, | |
.stFileUploader div p, | |
.stFileUploader div span, | |
.stFileUploader label p, | |
.stFileUploader label span, | |
.stFileUploader div[data-testid="stFileUploadDropzone"] p, | |
.stFileUploader div[data-testid="stFileUploadDropzone"] span { | |
writing-mode: horizontal-tb !important; | |
} | |
/* Simplify the drop zone appearance */ | |
.stFileUploader > section > div, | |
.stFileUploader div[data-testid="stFileUploadDropzone"] { | |
min-height: 100px !important; | |
} | |
</style> | |
""", unsafe_allow_html=True) | |
# Add heading for the file uploader (just text, no container) | |
st.markdown('### Upload Document') | |
# Model info below the heading | |
st.markdown("Using the latest `mistral-ocr-latest` model for advanced document understanding.") | |
# Enhanced file uploader with better help text | |
uploaded_file = st.file_uploader("Drag and drop PDFs or images here", type=["pdf", "png", "jpg", "jpeg"], | |
help="Supports PDFs, JPGs, PNGs and other image formats") | |
# Removed seed prompt instructions from here, moving to sidebar | |
# Sidebar with options - moved up with equal spacing | |
with st.sidebar: | |
# Options title with reduced top margin | |
st.markdown("<h2 style='margin-top:-25px; margin-bottom:5px; padding:0;'>Options</h2>", unsafe_allow_html=True) | |
# Reduce spacing between sidebar sections | |
st.markdown(""" | |
<style> | |
/* Reduce all spacing in sidebar */ | |
.block-container {padding-top: 0;} | |
.stSidebar .block-container {padding-top: 0 !important;} | |
.stSidebar [data-testid='stSidebarNav'] {margin-bottom: 0 !important;} | |
.stSidebar [data-testid='stMarkdownContainer'] {margin-bottom: 0 !important; margin-top: 0 !important;} | |
.stSidebar [data-testid='stVerticalBlock'] {gap: 0 !important;} | |
/* Make checkbox rows more compact */ | |
.stCheckbox {margin-bottom: 0 !important; padding-bottom: 0 !important; padding-top: 0 !important;} | |
.stExpander {margin-top: 0 !important; margin-bottom: 10px !important;} | |
/* Reduce space between section headings and content */ | |
.stSidebar h1, .stSidebar h2, .stSidebar h3, .stSidebar h4, .stSidebar h5 { | |
margin-top: 0 !important; | |
margin-bottom: 0 !important; | |
padding-top: 0 !important; | |
padding-bottom: 0 !important; | |
line-height: 1.2 !important; | |
} | |
/* Make selectbox and other inputs more compact */ | |
.stSidebar .stSelectbox, .stSidebar .stSlider, .stSidebar .stNumberInput { | |
margin-bottom: 5px !important; | |
padding-bottom: 0 !important; | |
padding-top: 0 !important; | |
} | |
/* Reduce all form element margins */ | |
.stForm > div {margin-bottom: 5px !important;} | |
.stSidebar label {margin-bottom: 0 !important; line-height: 1.2 !important;} | |
</style> | |
""", unsafe_allow_html=True) | |
# Model options - more compact | |
st.markdown("##### Model Settings", help="Configure model options") | |
use_vision = st.checkbox("Use Vision Model", value=True, | |
help="For image files, use the vision model for improved analysis (may be slower)") | |
# Historical Context section with minimal spacing | |
st.markdown("##### Historical Context", help="Add historical context information") | |
# Historical period selector | |
historical_periods = [ | |
"Select period (if known)", | |
"Pre-1700s", | |
"18th Century (1700s)", | |
"19th Century (1800s)", | |
"Early 20th Century (1900-1950)", | |
"Modern (Post 1950)" | |
] | |
selected_period = st.selectbox( | |
"Historical Period", | |
options=historical_periods, | |
index=0, | |
help="Select the time period of the document for better OCR processing" | |
) | |
# Document purpose selector | |
document_purposes = [ | |
"Select purpose (if known)", | |
"Personal Letter/Correspondence", | |
"Official/Government Document", | |
"Business/Financial Record", | |
"Literary/Academic Work", | |
"News/Journalism", | |
"Religious Text", | |
"Legal Document" | |
] | |
selected_purpose = st.selectbox( | |
"Document Purpose", | |
options=document_purposes, | |
index=0, | |
help="Select the purpose or type of the document for better OCR processing" | |
) | |
# Custom prompt field | |
custom_prompt_text = "" | |
if selected_period != "Select period (if known)": | |
custom_prompt_text += f"This is a {selected_period} document. " | |
if selected_purpose != "Select purpose (if known)": | |
custom_prompt_text += f"It appears to be a {selected_purpose}. " | |
custom_prompt = st.text_area( | |
"Additional Context", | |
value=custom_prompt_text, | |
placeholder="Example: This document has unusual handwriting with cursive script. Please identify any mentioned locations and dates.", | |
height=150, | |
max_chars=500, | |
key="custom_analysis_instructions", | |
help="Powerful instructions field that impacts how the AI processes your document. Can request translations, format images correctly, extract specific information, or handle challenging documents. See the 'Additional Context Instructions & Examples' section below for more details." | |
) | |
# Enhanced instructions for Additional Context with more capabilities | |
with st.expander("Prompting Instructions"): | |
st.markdown(""" | |
### How Additional Context Affects Processing | |
The "Additional Context" field provides instructions directly to the AI to influence how it processes your document. Use it to: | |
#### Document Understanding | |
- **Specify handwriting styles**: "This document uses old-fashioned cursive with numerous flourishes and abbreviations" | |
- **Identify language features**: "The text contains archaic spellings common in 18th century documents" | |
- **Highlight focus areas**: "Look for mentions of financial transactions or dates of travel" | |
#### Output Formatting & Languages | |
- **Request translations**: "After extracting the text, translate the content into Spanish" | |
- **Format image orientation**: "Ensure images are displayed in the same orientation as they appear in the document" | |
- **Format tables**: "Convert any tables in the document to structured format with clear columns" | |
#### Special Processing | |
- **Handle challenges**: "Some portions may be faded; the page edges contain handwritten notes" | |
- **Technical terms**: "This is a medical document with specialized terminology about surgical procedures" | |
- **Organization**: "Separate the letter content from the address blocks and signature" | |
#### Example Combinations | |
``` | |
This is a handwritten letter from the 1850s. The writer uses archaic spellings and formal language. | |
Please preserve paragraph structure, identify any place names mentioned, and note any references | |
to historical events. Format any lists as bullet points. | |
``` | |
""") | |
# Image preprocessing options with reduced spacing | |
st.markdown("##### Image Preprocessing", help="Options for enhancing images before OCR") | |
with st.expander("Preprocessing Options", expanded=False): | |
preprocessing_options = {} | |
# Document type selector - important for optimized processing | |
doc_type_options = ["standard", "handwritten", "typed", "printed"] | |
preprocessing_options["document_type"] = st.selectbox( | |
"Document Type", | |
options=doc_type_options, | |
index=0, # Default to standard | |
format_func=lambda x: x.capitalize(), | |
help="Select document type for optimized processing - choose 'Handwritten' for letters and manuscripts" | |
) | |
preprocessing_options["grayscale"] = st.checkbox("Convert to Grayscale", | |
help="Convert image to grayscale before OCR") | |
preprocessing_options["denoise"] = st.checkbox("Denoise Image", | |
help="Remove noise from the image") | |
preprocessing_options["contrast"] = st.slider("Adjust Contrast", -5, 5, 0, | |
help="Adjust image contrast (-5 to +5)") | |
# Add rotation options | |
rotation_options = [0, 90, 180, 270] | |
preprocessing_options["rotation"] = st.select_slider( | |
"Rotate Document", | |
options=rotation_options, | |
value=0, | |
format_func=lambda x: f"{x}Β° {'(No rotation)' if x == 0 else ''}", | |
help="Rotate the document to correct orientation" | |
) | |
# PDF options with consistent formatting | |
st.markdown("##### PDF Options", help="Settings for PDF documents") | |
with st.expander("PDF Settings", expanded=False): | |
pdf_dpi = st.slider("PDF Resolution (DPI)", 72, 300, 100, | |
help="Higher DPI gives better quality but slower processing. Try 100 for faster processing.") | |
max_pages = st.number_input("Maximum Pages to Process", 1, 20, 3, | |
help="Limit number of pages to process") | |
# Add PDF rotation option | |
rotation_options = [0, 90, 180, 270] | |
pdf_rotation = st.select_slider( | |
"Rotate PDF", | |
options=rotation_options, | |
value=0, | |
format_func=lambda x: f"{x}Β° {'(No rotation)' if x == 0 else ''}", | |
help="Rotate the PDF pages to correct orientation" | |
) | |
# Store PDF rotation separately instead of in preprocessing_options | |
# This prevents conflict with image preprocessing | |
# Previous Results tab content | |
with main_tab2: | |
st.markdown('<h2>Previous Results</h2>', unsafe_allow_html=True) | |
# Load custom CSS for Previous Results tab | |
from ui.layout import load_css | |
load_css() | |
# Display previous results if available | |
if not st.session_state.previous_results: | |
st.markdown(""" | |
<div class="previous-results-container" style="text-align: center; padding: 40px 20px; background-color: #f0f2f6; border-radius: 8px;"> | |
<div style="font-size: 48px; margin-bottom: 20px;">π</div> | |
<h3 style="margin-bottom: 10px; font-weight: 600;">No Previous Results</h3> | |
<p style="font-size: 16px;">Process a document to see your results history saved here.</p> | |
</div> | |
""", unsafe_allow_html=True) | |
else: | |
# Create a container for the results list | |
st.markdown('<div class="previous-results-container">', unsafe_allow_html=True) | |
st.markdown(f'<h3>{len(st.session_state.previous_results)} Previous Results</h3>', unsafe_allow_html=True) | |
# Create two columns for filters and download buttons | |
filter_col, download_col = st.columns([2, 1]) | |
with filter_col: | |
# Add filter options | |
filter_options = ["All Types"] | |
if any(result.get("file_name", "").lower().endswith(".pdf") for result in st.session_state.previous_results): | |
filter_options.append("PDF Documents") | |
if any(result.get("file_name", "").lower().endswith((".jpg", ".jpeg", ".png")) for result in st.session_state.previous_results): | |
filter_options.append("Images") | |
selected_filter = st.selectbox("Filter by Type:", filter_options) | |
with download_col: | |
# Add download all button for results | |
if len(st.session_state.previous_results) > 0: | |
try: | |
# Create buffer in memory instead of file on disk | |
import io | |
from ocr_utils import create_results_zip_in_memory | |
# Get zip data directly in memory | |
zip_data = create_results_zip_in_memory(st.session_state.previous_results) | |
st.download_button( | |
label="Download All Results", | |
data=zip_data, | |
file_name="all_ocr_results.zip", | |
mime="application/zip", | |
help="Download all previous results as a ZIP file containing HTML and JSON files" | |
) | |
except Exception as e: | |
st.error(f"Error creating download: {str(e)}") | |
st.info("Try with fewer results or individual downloads") | |
# Filter results based on selection | |
filtered_results = st.session_state.previous_results | |
if selected_filter == "PDF Documents": | |
filtered_results = [r for r in st.session_state.previous_results if r.get("file_name", "").lower().endswith(".pdf")] | |
elif selected_filter == "Images": | |
filtered_results = [r for r in st.session_state.previous_results if r.get("file_name", "").lower().endswith((".jpg", ".jpeg", ".png"))] | |
# Show a message if no results match the filter | |
if not filtered_results: | |
st.markdown(""" | |
<div style="text-align: center; padding: 20px; background-color: #f9f9f9; border-radius: 5px; margin: 20px 0;"> | |
<p>No results match the selected filter.</p> | |
</div> | |
""", unsafe_allow_html=True) | |
# Display each result as a card | |
for i, result in enumerate(filtered_results): | |
# Determine file type icon | |
file_name = result.get("file_name", f"Document {i+1}") | |
file_type_lower = file_name.lower() | |
if file_type_lower.endswith(".pdf"): | |
icon = "π" | |
elif file_type_lower.endswith((".jpg", ".jpeg", ".png", ".gif")): | |
icon = "πΌοΈ" | |
else: | |
icon = "π" | |
# Create a card for each result | |
st.markdown(f""" | |
<div class="result-card"> | |
<div class="result-header"> | |
<div class="result-filename">{icon} {file_name}</div> | |
<div class="result-date">{result.get('timestamp', 'Unknown')}</div> | |
</div> | |
<div class="result-metadata"> | |
<div class="result-tag">Languages: {', '.join(result.get('languages', ['Unknown']))}</div> | |
<div class="result-tag">Topics: {', '.join(result.get('topics', ['Unknown']))}</div> | |
</div> | |
""", unsafe_allow_html=True) | |
# Add view button inside the card with proper styling | |
st.markdown('<div class="result-action-button">', unsafe_allow_html=True) | |
if st.button(f"View Document", key=f"view_{i}"): | |
# Set the selected result in the session state | |
st.session_state.selected_previous_result = st.session_state.previous_results[i] | |
# Force a rerun to show the selected result | |
st.rerun() | |
st.markdown('</div>', unsafe_allow_html=True) | |
# Close the result card | |
st.markdown('</div>', unsafe_allow_html=True) | |
# Close the container | |
st.markdown('</div>', unsafe_allow_html=True) | |
# Display the selected result if available | |
if 'selected_previous_result' in st.session_state and st.session_state.selected_previous_result: | |
selected_result = st.session_state.selected_previous_result | |
# Create a styled container for the selected result | |
st.markdown(f""" | |
<div class="selected-result-container"> | |
<div class="result-header" style="margin-bottom: 20px;"> | |
<div class="selected-result-title">Selected Document: {selected_result.get('file_name', 'Unknown')}</div> | |
<div class="result-date">{selected_result.get('timestamp', '')}</div> | |
</div> | |
""", unsafe_allow_html=True) | |
# Display metadata in a styled way | |
meta_col1, meta_col2 = st.columns(2) | |
with meta_col1: | |
# Display document metadata | |
if 'languages' in selected_result: | |
languages = [lang for lang in selected_result['languages'] if lang is not None] | |
if languages: | |
st.write(f"**Languages:** {', '.join(languages)}") | |
if 'topics' in selected_result and selected_result['topics']: | |
st.write(f"**Topics:** {', '.join(selected_result['topics'])}") | |
with meta_col2: | |
# Display processing metadata | |
if 'limited_pages' in selected_result: | |
st.info(f"Processed {selected_result['limited_pages']['processed']} of {selected_result['limited_pages']['total']} pages") | |
if 'processing_time' in selected_result: | |
proc_time = selected_result['processing_time'] | |
st.write(f"**Processing Time:** {proc_time:.1f}s") | |
# Create tabs for content display | |
has_images = selected_result.get('has_images', False) | |
if has_images: | |
view_tab1, view_tab2, view_tab3 = st.tabs(["Structured View", "Raw JSON", "With Images"]) | |
else: | |
view_tab1, view_tab2 = st.tabs(["Structured View", "Raw JSON"]) | |
with view_tab1: | |
# Display structured content | |
if 'ocr_contents' in selected_result and isinstance(selected_result['ocr_contents'], dict): | |
for section, content in selected_result['ocr_contents'].items(): | |
if content and section not in ['error', 'raw_text', 'partial_text']: # Skip error and raw text sections | |
st.markdown(f"#### {section.replace('_', ' ').title()}") | |
if isinstance(content, str): | |
st.write(content) | |
elif isinstance(content, list): | |
for item in content: | |
if isinstance(item, str): | |
st.write(f"- {item}") | |
else: | |
st.write(f"- {str(item)}") | |
elif isinstance(content, dict): | |
for k, v in content.items(): | |
st.write(f"**{k}:** {v}") | |
with view_tab2: | |
# Show the raw JSON with an option to download it | |
try: | |
st.json(selected_result) | |
except Exception as e: | |
st.error(f"Error displaying JSON: {str(e)}") | |
# Try a safer approach with string representation | |
st.code(str(selected_result)) | |
# Add JSON download button | |
try: | |
json_str = json.dumps(selected_result, indent=2) | |
filename = selected_result.get('file_name', 'document').split('.')[0] | |
st.download_button( | |
label="Download JSON", | |
data=json_str, | |
file_name=f"{filename}_data.json", | |
mime="application/json" | |
) | |
except Exception as e: | |
st.error(f"Error creating JSON download: {str(e)}") | |
# Fallback to string representation for download | |
st.download_button( | |
label="Download as Text", | |
data=str(selected_result), | |
file_name=f"{filename}_data.txt", | |
mime="text/plain" | |
) | |
if has_images and 'pages_data' in selected_result: | |
with view_tab3: | |
# Display content with images in a nicely formatted way | |
pages_data = selected_result.get('pages_data', []) | |
# Process and display each page | |
for page_idx, page in enumerate(pages_data): | |
# Add a page header if multi-page | |
if len(pages_data) > 1: | |
st.markdown(f"### Page {page_idx + 1}") | |
# Create columns for better layout | |
if page.get('images'): | |
# Extract images for this page | |
images = page.get('images', []) | |
for img in images: | |
if 'image_base64' in img: | |
st.image(img['image_base64'], width=600) | |
# Display text content if available | |
text_content = page.get('markdown', '') | |
if text_content: | |
with st.expander("View Page Text", expanded=True): | |
st.markdown(text_content) | |
else: | |
# Just display text if no images | |
text_content = page.get('markdown', '') | |
if text_content: | |
st.markdown(text_content) | |
# Add page separator | |
if page_idx < len(pages_data) - 1: | |
st.markdown("---") | |
# Add HTML download button if images are available | |
from ocr_utils import create_html_with_images | |
html_content = create_html_with_images(selected_result) | |
filename = selected_result.get('file_name', 'document').split('.')[0] | |
st.download_button( | |
label="Download as HTML with Images", | |
data=html_content, | |
file_name=f"{filename}_with_images.html", | |
mime="text/html" | |
) | |
# Close the container | |
st.markdown('</div>', unsafe_allow_html=True) | |
# Add clear button outside the container with proper styling | |
col1, col2, col3 = st.columns([1, 1, 1]) | |
with col2: | |
st.markdown('<div class="result-action-button" style="text-align: center;">', unsafe_allow_html=True) | |
if st.button("Close Selected Document", key="close_selected"): | |
# Clear the selected result from session state | |
del st.session_state.selected_previous_result | |
# Force a rerun to update the view | |
st.rerun() | |
st.markdown('</div>', unsafe_allow_html=True) | |
# About tab content | |
with main_tab3: | |
# Add a notice about local OCR fallback if available | |
fallback_notice = "" | |
if 'has_pytesseract' in locals() and has_pytesseract: | |
fallback_notice = """ | |
**Local OCR Fallback:** | |
- Local OCR fallback using Tesseract is available if API rate limits are reached | |
- Provides basic text extraction when cloud OCR is unavailable | |
""" | |
st.markdown(f""" | |
### About Historical Document OCR | |
This application specializes in processing historical documents using [Mistral AI's Document OCR](https://docs.mistral.ai/capabilities/document/), which is particularly effective for handling challenging textual materials. | |
#### Document Processing Capabilities | |
- **Historical Images**: Process vintage photographs, scanned historical papers, manuscripts | |
- **Handwritten Documents**: Extract text from letters, journals, notes, and records | |
- **Multi-Page PDFs**: Process historical books, articles, and longer documents | |
- **Mixed Content**: Handle documents with both text and imagery | |
#### Key Features | |
- **Advanced Image Preprocessing** | |
- Grayscale conversion optimized for historical documents | |
- Denoising to remove artifacts and improve clarity | |
- Contrast adjustment to enhance faded text | |
- Document rotation for proper orientation | |
- **Document Analysis** | |
- Text extraction with `mistral-ocr-latest` | |
- Structured data extraction: dates, names, places, topics | |
- Multi-language support with automatic detection | |
- Handling of period-specific terminology and obsolete language | |
- **Flexible Output Formats** | |
- Structured view with organized content sections | |
- Developer JSON for integration with other applications | |
- Visual representation preserving original document layout | |
- Downloadable results in various formats | |
#### Historical Context | |
Add period-specific context to improve analysis: | |
- Historical period selection | |
- Document purpose identification | |
- Custom instructions for specialized terminology | |
#### Data Privacy | |
- All document processing happens through secure AI processing | |
- No documents are permanently stored on the server | |
- Results are only saved in your current session | |
{fallback_notice} | |
""") | |
with main_tab1: | |
if uploaded_file is not None: | |
# Check file size (cap at 50MB) | |
file_size_mb = len(uploaded_file.getvalue()) / (1024 * 1024) | |
if file_size_mb > 50: | |
with left_col: | |
st.error(f"File too large ({file_size_mb:.1f} MB). Maximum file size is 50MB.") | |
st.stop() | |
file_ext = Path(uploaded_file.name).suffix.lower() | |
# Process button - flush left with similar padding as file browser | |
with left_col: | |
process_button = st.button("Process Document") | |
# Empty container for progress indicators - will be filled during processing | |
# Positioned right after the process button for better visibility | |
progress_placeholder = st.empty() | |
# Image preprocessing preview - automatically show only the preprocessed version | |
if any(preprocessing_options.values()) and uploaded_file.type.startswith('image/'): | |
st.markdown("**Preprocessed Preview**") | |
try: | |
# Create a container for the preview to better control layout | |
with st.container(): | |
processed_bytes = preprocess_image(uploaded_file.getvalue(), preprocessing_options) | |
# Use use_column_width=True for responsive design | |
st.image(io.BytesIO(processed_bytes), use_column_width=True) | |
# Show preprocessing metadata in a well-formatted caption | |
meta_items = [] | |
if preprocessing_options.get("document_type", "standard") != "standard": | |
meta_items.append(f"Document type ({preprocessing_options['document_type']})") | |
if preprocessing_options.get("grayscale", False): | |
meta_items.append("Grayscale") | |
if preprocessing_options.get("denoise", False): | |
meta_items.append("Denoise") | |
if preprocessing_options.get("contrast", 0) != 0: | |
meta_items.append(f"Contrast ({preprocessing_options['contrast']})") | |
if preprocessing_options.get("rotation", 0) != 0: | |
meta_items.append(f"Rotation ({preprocessing_options['rotation']}Β°)") | |
# Only show "Applied:" if there are actual preprocessing steps | |
if meta_items: | |
meta_text = "Applied: " + ", ".join(meta_items) | |
st.caption(meta_text) | |
except Exception as e: | |
st.error(f"Error in preprocessing: {str(e)}") | |
st.info("Try using grayscale preprocessing for PNG images with transparency") | |
# Container for success message (will be filled after processing) | |
# No extra spacing needed as it will be managed programmatically | |
metadata_placeholder = st.empty() | |
# Results section | |
if process_button: | |
# Move the progress indicator reference to just below the button | |
progress_container = progress_placeholder | |
try: | |
# Get max_pages or default if not available | |
max_pages_value = max_pages if 'max_pages' in locals() else None | |
# Apply performance mode settings | |
if 'perf_mode' in locals(): | |
if perf_mode == "Speed": | |
# Override settings for faster processing | |
if 'preprocessing_options' in locals(): | |
preprocessing_options["denoise"] = False # Skip denoising for speed | |
if 'pdf_dpi' in locals() and file_ext.lower() == '.pdf': | |
pdf_dpi = min(pdf_dpi, 100) # Lower DPI for speed | |
# Process file with or without custom prompt | |
if custom_prompt and custom_prompt.strip(): | |
# Process with custom instructions for the AI | |
with progress_placeholder.container(): | |
progress_bar = st.progress(0) | |
status_text = st.empty() | |
status_text.markdown('<div class="processing-status-container">Processing with custom instructions...</div>', unsafe_allow_html=True) | |
progress_bar.progress(30) | |
# Special handling for PDF files with custom prompts | |
if file_ext.lower() == ".pdf": | |
# For PDFs with custom prompts, we use a special two-step process | |
with progress_placeholder.container(): | |
status_text.markdown('<div class="processing-status-container">Using special PDF processing for custom instructions...</div>', unsafe_allow_html=True) | |
progress_bar.progress(40) | |
try: | |
# Step 1: Process without custom prompt to get OCR text | |
processor = StructuredOCR() | |
# First save the PDF to a temp file | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp: | |
tmp.write(uploaded_file.getvalue()) | |
temp_path = tmp.name | |
# Process with NO custom prompt first | |
# Apply PDF rotation if specified | |
pdf_rotation_value = pdf_rotation if 'pdf_rotation' in locals() else 0 | |
base_result = processor.process_file( | |
file_path=temp_path, | |
file_type="pdf", | |
use_vision=use_vision, | |
custom_prompt=None, # No custom prompt in first step | |
file_size_mb=len(uploaded_file.getvalue()) / (1024 * 1024), | |
pdf_rotation=pdf_rotation_value # Pass rotation value to processor | |
) | |
progress_bar.progress(70) | |
status_text.markdown('<div class="processing-status-container">Applying custom analysis to extracted text...</div>', unsafe_allow_html=True) | |
# Step 2: Apply custom prompt to the extracted text using text-only LLM | |
if 'ocr_contents' in base_result and isinstance(base_result['ocr_contents'], dict): | |
# Get text from OCR result | |
ocr_text = "" | |
for section, content in base_result['ocr_contents'].items(): | |
if isinstance(content, str): | |
ocr_text += content + "\n\n" | |
elif isinstance(content, list): | |
for item in content: | |
if isinstance(item, str): | |
ocr_text += item + "\n" | |
ocr_text += "\n" | |
# Format the custom prompt for text-only processing | |
formatted_prompt = f"USER INSTRUCTIONS: {custom_prompt.strip()}\nPay special attention to these instructions and respond accordingly." | |
# Apply custom prompt to extracted text | |
enhanced_result = processor._extract_structured_data_text_only(ocr_text, uploaded_file.name, formatted_prompt) | |
# Merge results, keeping images from base_result | |
result = base_result.copy() | |
result['custom_prompt_applied'] = 'text_only' | |
# Update with enhanced analysis results, preserving image data | |
for key, value in enhanced_result.items(): | |
if key not in ['raw_response_data', 'pages_data', 'has_images']: | |
result[key] = value | |
else: | |
# If no OCR content, just use the base result | |
result = base_result | |
result['custom_prompt_applied'] = 'failed' | |
# Clean up temp file | |
if os.path.exists(temp_path): | |
os.unlink(temp_path) | |
except Exception as e: | |
# If anything fails, revert to standard processing | |
st.warning(f"Special PDF processing failed. Falling back to standard method: {str(e)}") | |
result = process_file(uploaded_file, use_vision, {}, progress_container=progress_placeholder) | |
else: | |
# For non-PDF files, use normal processing with custom prompt | |
# Save the uploaded file to a temporary file with preprocessing | |
with tempfile.NamedTemporaryFile(delete=False, suffix=Path(uploaded_file.name).suffix) as tmp: | |
# Apply preprocessing if any options are selected | |
if any(preprocessing_options.values()): | |
# Apply performance mode settings | |
if 'perf_mode' in locals() and perf_mode == "Speed": | |
# Skip denoising for speed in preprocessing | |
speed_preprocessing = preprocessing_options.copy() | |
speed_preprocessing["denoise"] = False | |
processed_bytes = preprocess_image(uploaded_file.getvalue(), speed_preprocessing) | |
else: | |
processed_bytes = preprocess_image(uploaded_file.getvalue(), preprocessing_options) | |
tmp.write(processed_bytes) | |
else: | |
tmp.write(uploaded_file.getvalue()) | |
temp_path = tmp.name | |
# Show progress | |
with progress_placeholder.container(): | |
progress_bar.progress(50) | |
status_text.markdown('<div class="processing-status-container">Analyzing with custom instructions...</div>', unsafe_allow_html=True) | |
# Initialize OCR processor and process with custom prompt | |
processor = StructuredOCR() | |
# Format the custom prompt to ensure it has an impact | |
formatted_prompt = f"USER INSTRUCTIONS: {custom_prompt.strip()}\nPay special attention to these instructions and respond accordingly." | |
try: | |
result = processor.process_file( | |
file_path=temp_path, | |
file_type="image", # Always use image for non-PDFs | |
use_vision=use_vision, | |
custom_prompt=formatted_prompt, | |
file_size_mb=len(uploaded_file.getvalue()) / (1024 * 1024) | |
) | |
except Exception as e: | |
# For any error, fall back to standard processing | |
st.warning(f"Custom prompt processing failed. Falling back to standard processing: {str(e)}") | |
result = process_file(uploaded_file, use_vision, preprocessing_options, progress_container=progress_placeholder) | |
# Complete progress | |
with progress_placeholder.container(): | |
progress_bar.progress(100) | |
status_text.markdown('<div class="processing-status-container">Processing complete!</div>', unsafe_allow_html=True) | |
time.sleep(0.8) | |
progress_placeholder.empty() | |
# Clean up temporary file | |
if os.path.exists(temp_path): | |
try: | |
os.unlink(temp_path) | |
except: | |
pass | |
else: | |
# Standard processing without custom prompt | |
result = process_file(uploaded_file, use_vision, preprocessing_options, progress_container=progress_placeholder) | |
# Document results will be shown in the right column | |
with right_col: | |
# Add Document Metadata section header | |
st.subheader("Document Metadata") | |
# Create metadata card with standard styling | |
metadata_html = '<div class="metadata-card" style="padding:15px; margin-bottom:20px;">' | |
# File info | |
metadata_html += f'<p><strong>File Name:</strong> {result.get("file_name", uploaded_file.name)}</p>' | |
# Info about limited pages | |
if 'limited_pages' in result: | |
metadata_html += f'<p style="padding:8px; border-radius:4px;"><strong>Pages:</strong> {result["limited_pages"]["processed"]} of {result["limited_pages"]["total"]} processed</p>' | |
# Languages | |
if 'languages' in result: | |
languages = [lang for lang in result['languages'] if lang is not None] | |
if languages: | |
metadata_html += f'<p><strong>Languages:</strong> {", ".join(languages)}</p>' | |
# Topics | |
if 'topics' in result and result['topics']: | |
metadata_html += f'<p><strong>Topics:</strong> {", ".join(result["topics"])}</p>' | |
# Processing time | |
if 'processing_time' in result: | |
proc_time = result['processing_time'] | |
metadata_html += f'<p><strong>Processing Time:</strong> {proc_time:.1f}s</p>' | |
# Close the metadata card | |
metadata_html += '</div>' | |
# Render the metadata HTML | |
st.markdown(metadata_html, unsafe_allow_html=True) | |
# Add content section heading - using standard subheader | |
st.subheader("Document Content") | |
# Start document content div with consistent styling class | |
st.markdown('<div class="document-content" style="margin-top:10px;">', unsafe_allow_html=True) | |
if 'ocr_contents' in result: | |
# Check for has_images in the result | |
has_images = result.get('has_images', False) | |
# Create tabs for different views | |
if has_images: | |
view_tab1, view_tab2, view_tab3 = st.tabs(["Structured View", "Raw JSON", "With Images"]) | |
else: | |
view_tab1, view_tab2 = st.tabs(["Structured View", "Raw JSON"]) | |
with view_tab1: | |
# Display in a more user-friendly format based on the content structure | |
html_content = "" | |
if isinstance(result['ocr_contents'], dict): | |
for section, content in result['ocr_contents'].items(): | |
if content: # Only display non-empty sections | |
# Add consistent styling for each section | |
section_title = f'<h4 style="font-family: Georgia, serif; font-size: 18px; margin-top: 20px; margin-bottom: 10px;">{section.replace("_", " ").title()}</h4>' | |
html_content += section_title | |
if isinstance(content, str): | |
# Optimize by using a expander for very long content | |
if len(content) > 1000: | |
# Format content for long text - bold everything after "... that" | |
preview_content = content[:1000] + "..." if len(content) > 1000 else content | |
if "... that" in content: | |
# For the preview (first 1000 chars) | |
if "... that" in preview_content: | |
parts = preview_content.split("... that", 1) | |
formatted_preview = f"{parts[0]}... that<strong>{parts[1]}</strong>" | |
html_content += f"<p style=\"font-size:16px;\">{formatted_preview}</p>" | |
else: | |
html_content += f"<p style=\"font-size:16px; font-weight:normal;\">{preview_content}</p>" | |
# For the full content in expander | |
parts = content.split("... that", 1) | |
formatted_full = f"{parts[0]}... that**{parts[1]}**" | |
st.markdown(f"#### {section.replace('_', ' ').title()}") | |
with st.expander("Show full content"): | |
st.markdown(formatted_full) | |
else: | |
html_content += f"<p style=\"font-size:16px; font-weight:normal;\">{preview_content}</p>" | |
st.markdown(f"#### {section.replace('_', ' ').title()}") | |
with st.expander("Show full content"): | |
st.write(content) | |
else: | |
# Format content - bold everything after "... that" | |
if "... that" in content: | |
parts = content.split("... that", 1) | |
formatted_content = f"{parts[0]}... that<strong>{parts[1]}</strong>" | |
html_content += f"<p style=\"font-size:16px;\">{formatted_content}</p>" | |
st.markdown(f"#### {section.replace('_', ' ').title()}") | |
st.markdown(f"{parts[0]}... that**{parts[1]}**") | |
else: | |
html_content += f"<p style=\"font-size:16px; font-weight:normal;\">{content}</p>" | |
st.markdown(f"#### {section.replace('_', ' ').title()}") | |
st.write(content) | |
elif isinstance(content, list): | |
html_list = "<ul>" | |
st.markdown(f"#### {section.replace('_', ' ').title()}") | |
# Limit display for very long lists | |
if len(content) > 20: | |
with st.expander(f"Show all {len(content)} items"): | |
for item in content: | |
if isinstance(item, str): | |
html_list += f"<li>{item}</li>" | |
st.write(f"- {item}") | |
elif isinstance(item, dict): | |
try: | |
st.json(item) | |
except Exception as e: | |
st.error(f"Error displaying JSON: {str(e)}") | |
st.code(str(item)) | |
else: | |
for item in content: | |
if isinstance(item, str): | |
html_list += f"<li>{item}</li>" | |
st.write(f"- {item}") | |
elif isinstance(item, dict): | |
try: | |
st.json(item) | |
except Exception as e: | |
st.error(f"Error displaying JSON: {str(e)}") | |
st.code(str(item)) | |
html_list += "</ul>" | |
html_content += html_list | |
elif isinstance(content, dict): | |
html_dict = "<dl>" | |
st.markdown(f"#### {section.replace('_', ' ').title()}") | |
for k, v in content.items(): | |
html_dict += f"<dt>{k}</dt><dd>{v}</dd>" | |
st.write(f"**{k}:** {v}") | |
html_dict += "</dl>" | |
html_content += html_dict | |
# Add download button in a smaller section | |
with st.expander("Export Content"): | |
# Get original filename without extension | |
original_name = Path(result.get('file_name', uploaded_file.name)).stem | |
# HTML download button | |
html_bytes = html_content.encode() | |
st.download_button( | |
label="Download as HTML", | |
data=html_bytes, | |
file_name=f"{original_name}_processed.html", | |
mime="text/html" | |
) | |
with view_tab2: | |
# Show the raw JSON for developers, with an expander for large results | |
if len(json.dumps(result)) > 5000: | |
with st.expander("View full JSON"): | |
try: | |
st.json(result) | |
except Exception as e: | |
st.error(f"Error displaying JSON: {str(e)}") | |
# Fallback to string representation | |
st.code(str(result)) | |
else: | |
try: | |
st.json(result) | |
except Exception as e: | |
st.error(f"Error displaying JSON: {str(e)}") | |
# Fallback to string representation | |
st.code(str(result)) | |
if has_images and 'pages_data' in result: | |
with view_tab3: | |
# Use pages_data directly instead of raw_response | |
try: | |
# Use the serialized pages data | |
pages_data = result.get('pages_data', []) | |
if not pages_data: | |
st.warning("No image data found in the document.") | |
st.stop() | |
# Construct markdown from pages_data directly | |
from ocr_utils import replace_images_in_markdown | |
combined_markdown = "" | |
for page in pages_data: | |
page_markdown = page.get('markdown', '') | |
images = page.get('images', []) | |
# Create image dictionary | |
image_dict = {} | |
for img in images: | |
if 'id' in img and 'image_base64' in img: | |
image_dict[img['id']] = img['image_base64'] | |
# Replace image references in markdown | |
if page_markdown and image_dict: | |
page_markdown = replace_images_in_markdown(page_markdown, image_dict) | |
combined_markdown += page_markdown + "\n\n---\n\n" | |
if not combined_markdown: | |
st.warning("No content with images found.") | |
st.stop() | |
# Add CSS for better image handling | |
st.markdown(""" | |
<style> | |
.image-container { | |
margin: 20px 0; | |
text-align: center; | |
} | |
.markdown-text-container { | |
padding: 10px; | |
background-color: #f9f9f9; | |
border-radius: 5px; | |
} | |
.markdown-text-container img { | |
margin: 15px auto; | |
max-width: 90%; | |
max-height: 500px; | |
object-fit: contain; | |
border: 1px solid #ddd; | |
border-radius: 4px; | |
display: block; | |
} | |
.markdown-text-container p { | |
margin-bottom: 16px; | |
line-height: 1.6; | |
font-family: Georgia, serif; | |
} | |
.page-break { | |
border-top: 1px solid #ddd; | |
margin: 20px 0; | |
padding-top: 20px; | |
} | |
.page-text-content { | |
margin-bottom: 20px; | |
} | |
.text-block { | |
background-color: #fff; | |
padding: 15px; | |
border-radius: 4px; | |
border-left: 3px solid #546e7a; | |
margin-bottom: 15px; | |
color: #333; | |
} | |
.text-block p { | |
margin: 8px 0; | |
color: #333; | |
} | |
</style> | |
""", unsafe_allow_html=True) | |
# Process and display content with images properly | |
import re | |
# Process each page separately | |
pages_content = [] | |
# Check if this is from a PDF processed through pdf2image | |
is_pdf2image = result.get('pdf_processing_method') == 'pdf2image' | |
for i, page in enumerate(pages_data): | |
page_markdown = page.get('markdown', '') | |
images = page.get('images', []) | |
if not page_markdown: | |
continue | |
# Create image dictionary | |
image_dict = {} | |
for img in images: | |
if 'id' in img and 'image_base64' in img: | |
image_dict[img['id']] = img['image_base64'] | |
# Create HTML content for this page | |
page_html = f"<h3>Page {i+1}</h3>" if i > 0 else "" | |
# Display the raw text content first to ensure it's visible | |
page_html += f"<div class='page-text-content'>" | |
# Special handling for PDF2image processed documents | |
if is_pdf2image and i == 0 and 'ocr_contents' in result: | |
# Display all structured content from OCR for PDFs | |
page_html += "<div class='text-block pdf-content'>" | |
# Check if custom prompt was applied | |
if result.get('custom_prompt_applied') == 'text_only': | |
page_html += "<div class='prompt-info'><i>Custom analysis applied using text-only processing</i></div>" | |
ocr_contents = result.get('ocr_contents', {}) | |
# Get a sorted list of sections to ensure consistent order | |
section_keys = sorted(ocr_contents.keys()) | |
# Place important sections first | |
priority_sections = ['title', 'subtitle', 'header', 'publication', 'date', 'content', 'main_text'] | |
for important in priority_sections: | |
if important in ocr_contents and important in section_keys: | |
section_keys.remove(important) | |
section_keys.insert(0, important) | |
for section in section_keys: | |
content = ocr_contents[section] | |
if section in ['raw_text', 'error', 'partial_text']: | |
continue # Skip these fields | |
section_title = section.replace('_', ' ').title() | |
page_html += f"<h4>{section_title}</h4>" | |
if isinstance(content, str): | |
# Convert newlines to <br> tags | |
content_html = content.replace('\n', '<br>') | |
page_html += f"<p>{content_html}</p>" | |
elif isinstance(content, list): | |
page_html += "<ul>" | |
for item in content: | |
if isinstance(item, str): | |
page_html += f"<li>{item}</li>" | |
elif isinstance(item, dict): | |
page_html += "<li>" | |
for k, v in item.items(): | |
page_html += f"<strong>{k}:</strong> {v}<br>" | |
page_html += "</li>" | |
else: | |
page_html += f"<li>{str(item)}</li>" | |
page_html += "</ul>" | |
elif isinstance(content, dict): | |
for k, v in content.items(): | |
if isinstance(v, str): | |
page_html += f"<p><strong>{k}:</strong> {v}</p>" | |
elif isinstance(v, list): | |
page_html += f"<p><strong>{k}:</strong></p><ul>" | |
for item in v: | |
page_html += f"<li>{item}</li>" | |
page_html += "</ul>" | |
else: | |
page_html += f"<p><strong>{k}:</strong> {str(v)}</p>" | |
page_html += "</div>" | |
else: | |
# Standard processing for regular documents | |
# Get all text content that isn't an image and add it first | |
text_content = [] | |
for line in page_markdown.split("\n"): | |
if not re.search(r'!\[(.*?)\]\((.*?)\)', line) and line.strip(): | |
text_content.append(line) | |
# Add the text content as a block | |
if text_content: | |
page_html += f"<div class='text-block'>" | |
for line in text_content: | |
page_html += f"<p>{line}</p>" | |
page_html += "</div>" | |
page_html += "</div>" | |
# Then add images separately | |
for line in page_markdown.split("\n"): | |
# Handle image lines | |
img_match = re.search(r'!\[(.*?)\]\((.*?)\)', line) | |
if img_match: | |
alt_text = img_match.group(1) | |
img_ref = img_match.group(2) | |
# Get the base64 data for this image ID | |
img_data = image_dict.get(img_ref, "") | |
if img_data: | |
img_html = f'<div class="image-container"><img src="{img_data}" alt="{alt_text}"></div>' | |
page_html += img_html | |
# Add page separator if not the last page | |
if i < len(pages_data) - 1: | |
page_html += '<div class="page-break"></div>' | |
pages_content.append(page_html) | |
# Combine all pages HTML | |
html_content = "\n".join(pages_content) | |
# Wrap the content in a div with the class for styling | |
st.markdown(f""" | |
<div class="markdown-text-container"> | |
{html_content} | |
</div> | |
""", unsafe_allow_html=True) | |
# Create download HTML content | |
download_html = f""" | |
<html> | |
<head> | |
<style> | |
body {{ | |
font-family: Georgia, serif; | |
line-height: 1.7; | |
margin: 0 auto; | |
max-width: 800px; | |
padding: 20px; | |
}} | |
img {{ | |
max-width: 90%; | |
max-height: 500px; | |
object-fit: contain; | |
margin: 20px auto; | |
display: block; | |
border: 1px solid #ddd; | |
border-radius: 4px; | |
}} | |
.image-container {{ | |
margin: 20px 0; | |
text-align: center; | |
}} | |
.page-break {{ | |
border-top: 1px solid #ddd; | |
margin: 40px 0; | |
padding-top: 40px; | |
}} | |
h3 {{ | |
color: #333; | |
border-bottom: 1px solid #eee; | |
padding-bottom: 10px; | |
}} | |
p {{ | |
margin: 12px 0; | |
}} | |
.page-text-content {{ | |
margin-bottom: 20px; | |
}} | |
.text-block {{ | |
background-color: #f9f9f9; | |
padding: 15px; | |
border-radius: 4px; | |
border-left: 3px solid #546e7a; | |
margin-bottom: 15px; | |
color: #333; | |
}} | |
.text-block p {{ | |
margin: 8px 0; | |
color: #333; | |
}} | |
</style> | |
</head> | |
<body> | |
<div class="markdown-text-container"> | |
{html_content} | |
</div> | |
</body> | |
</html> | |
""" | |
# Get original filename without extension | |
original_name = Path(result.get('file_name', uploaded_file.name)).stem | |
# Add download button as an expander to prevent page reset | |
with st.expander("Download Document with Images"): | |
st.markdown("Click the button below to download the document with embedded images") | |
st.download_button( | |
label="Download as HTML", | |
data=download_html, | |
file_name=f"{original_name}_with_images.html", | |
mime="text/html", | |
key="download_with_images_button" | |
) | |
except Exception as e: | |
st.error(f"Could not display document with images: {str(e)}") | |
st.info("Try refreshing or processing the document again.") | |
if 'ocr_contents' not in result: | |
st.error("No OCR content was extracted from the document.") | |
# Close document content div | |
st.markdown('</div>', unsafe_allow_html=True) | |
# Show a compact success message without extra container space | |
metadata_placeholder.success("**Document processed successfully**") | |
# Store the result in the previous results list | |
# Add timestamp to result for history tracking | |
result_copy = result.copy() | |
result_copy['timestamp'] = datetime.now().strftime("%Y-%m-%d %H:%M") | |
# Add to session state, keeping the most recent 20 results | |
st.session_state.previous_results.insert(0, result_copy) | |
if len(st.session_state.previous_results) > 20: | |
st.session_state.previous_results = st.session_state.previous_results[:20] | |
except Exception as e: | |
st.error(f"Error processing document: {str(e)}") | |
else: | |
# Empty placeholder - we've moved the upload instruction to the file_uploader | |
# Show example images in a simpler layout | |
st.subheader("Example Documents") | |
# Add a simplified info message about examples | |
st.markdown(""" | |
This app can process various historical documents: | |
- Historical photographs, maps, and manuscripts | |
- Handwritten letters and documents | |
- Printed books and articles | |
- Multi-page PDFs | |
Upload your own document to get started or explore the 'About' tab for more information. | |
""") | |
# Display a direct message about sample documents | |
st.info("Sample documents are available in the input directory. Upload a document to begin analysis.")# Minor update | |