historical-ocr / utils.py
milwright's picture
Consolidate segmentation improvements and code cleanup
42dc069
import os
import base64
import hashlib
import time
import logging
from datetime import datetime
from pathlib import Path
from functools import wraps
from constants import CONTENT_THEMES, PERIOD_TAGS, DEFAULT_TAGS, GENERIC_TAGS
# Configure logging
logger = logging.getLogger("utils")
logger.setLevel(logging.INFO)
def get_base64_from_image(image_path):
"""
Get base64 data URL from image file with proper MIME type.
Args:
image_path: Path to the image file
Returns:
Base64 data URL with appropriate MIME type prefix
"""
try:
# Convert to Path object for better handling
path_obj = Path(image_path)
# Determine mime type based on file extension
mime_type = 'image/jpeg' # Default mime type
suffix = path_obj.suffix.lower()
if suffix == '.png':
mime_type = 'image/png'
elif suffix == '.gif':
mime_type = 'image/gif'
elif suffix in ['.jpg', '.jpeg']:
mime_type = 'image/jpeg'
elif suffix == '.pdf':
mime_type = 'application/pdf'
# Read and encode file
with open(path_obj, "rb") as file:
encoded = base64.b64encode(file.read()).decode('utf-8')
return f"data:{mime_type};base64,{encoded}"
except Exception as e:
logger.error(f"Error encoding file to base64: {str(e)}")
return ""
def get_base64_from_bytes(file_bytes, mime_type=None, file_name=None):
"""
Get base64 data URL from file bytes with proper MIME type.
Args:
file_bytes: Binary file data
mime_type: MIME type of the file (optional)
file_name: Original file name for MIME type detection (optional)
Returns:
Base64 data URL with appropriate MIME type prefix
"""
try:
# Determine mime type if not provided
if mime_type is None and file_name is not None:
# Get file extension
suffix = Path(file_name).suffix.lower()
if suffix == '.png':
mime_type = 'image/png'
elif suffix == '.gif':
mime_type = 'image/gif'
elif suffix in ['.jpg', '.jpeg']:
mime_type = 'image/jpeg'
elif suffix == '.pdf':
mime_type = 'application/pdf'
else:
# Default to octet-stream for unknown types
mime_type = 'application/octet-stream'
elif mime_type is None:
# Default MIME type if we can't determine it
mime_type = 'application/octet-stream'
# Encode and create data URL
encoded = base64.b64encode(file_bytes).decode('utf-8')
return f"data:{mime_type};base64,{encoded}"
except Exception as e:
logger.error(f"Error encoding bytes to base64: {str(e)}")
return ""
def timing(description):
"""Context manager for timing code execution"""
class TimingContext:
def __init__(self, description):
self.description = description
def __enter__(self):
self.start_time = time.time()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
end_time = time.time()
execution_time = end_time - self.start_time
logger.info(f"{self.description} took {execution_time:.2f} seconds")
return False
return TimingContext(description)
def format_timestamp(timestamp=None, for_filename=False):
"""
Format timestamp for display or filenames
Args:
timestamp: Datetime object or string to format (defaults to current time)
for_filename: Whether to format for use in a filename (defaults to False)
Returns:
str: Formatted timestamp
"""
if timestamp is None:
timestamp = datetime.now()
elif isinstance(timestamp, str):
try:
timestamp = datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S")
except ValueError:
timestamp = datetime.now()
if for_filename:
# Format suitable for filenames: "Apr 30, 2025"
return timestamp.strftime("%b %d, %Y")
else:
# Standard format for display
return timestamp.strftime("%Y-%m-%d %H:%M")
def generate_cache_key(file_bytes, file_type, use_vision, preprocessing_options=None, pdf_rotation=0, custom_prompt=None):
"""
Generate a cache key for OCR processing
Args:
file_bytes: File content as bytes
file_type: Type of file (pdf or image)
use_vision: Whether to use vision model
preprocessing_options: Dictionary of preprocessing options
pdf_rotation: PDF rotation value
custom_prompt: Custom prompt for OCR
Returns:
str: Cache key
"""
# Generate file hash
file_hash = hashlib.md5(file_bytes).hexdigest()
# Include preprocessing options in cache key
preprocessing_options_hash = ""
if preprocessing_options:
# Add pdf_rotation to preprocessing options to ensure it's part of the cache key
if pdf_rotation != 0:
preprocessing_options_with_rotation = preprocessing_options.copy()
preprocessing_options_with_rotation['pdf_rotation'] = pdf_rotation
preprocessing_str = str(sorted(preprocessing_options_with_rotation.items()))
else:
preprocessing_str = str(sorted(preprocessing_options.items()))
preprocessing_options_hash = hashlib.md5(preprocessing_str.encode()).hexdigest()
elif pdf_rotation != 0:
# If no preprocessing options but we have rotation, include that in the hash
preprocessing_options_hash = hashlib.md5(f"pdf_rotation_{pdf_rotation}".encode()).hexdigest()
# Create base cache key
cache_key = f"{file_hash}_{file_type}_{use_vision}_{preprocessing_options_hash}"
# Include custom prompt in cache key if provided
if custom_prompt:
custom_prompt_hash = hashlib.md5(str(custom_prompt).encode()).hexdigest()
cache_key = f"{cache_key}_{custom_prompt_hash}"
return cache_key
def handle_temp_files(temp_file_paths):
"""
Clean up temporary files
Args:
temp_file_paths: List of temporary file paths to clean up
"""
for temp_path in temp_file_paths:
try:
if os.path.exists(temp_path):
os.unlink(temp_path)
logger.info(f"Removed temporary file: {temp_path}")
except Exception as e:
logger.warning(f"Failed to remove temporary file {temp_path}: {str(e)}")
def create_descriptive_filename(original_filename, result, file_ext, preprocessing_options=None):
"""
Create a user-friendly descriptive filename for the result
Args:
original_filename: Original filename
result: OCR result dictionary
file_ext: File extension
preprocessing_options: Dictionary of preprocessing options
Returns:
str: Human-readable descriptive filename
"""
from datetime import datetime
# Get base name without extension and capitalize words
original_name = Path(original_filename).stem
# Make the original name more readable by replacing dashes and underscores with spaces
# Then capitalize each word
readable_name = original_name.replace('-', ' ').replace('_', ' ')
# Split by spaces and capitalize each word, then rejoin
name_parts = readable_name.split()
readable_name = ' '.join(word.capitalize() for word in name_parts)
# Determine document type
doc_type = None
if 'detected_document_type' in result and result['detected_document_type']:
doc_type = result['detected_document_type'].capitalize()
elif 'topics' in result and result['topics']:
# Use first topic as document type if not explicitly detected
doc_type = result['topics'][0]
# Find period/era information
period_info = None
if 'topics' in result and result['topics']:
for tag in result['topics']:
if "century" in tag.lower() or "pre-" in tag.lower() or "era" in tag.lower():
period_info = tag
break
# Format metadata within parentheses if available
metadata = []
if doc_type:
metadata.append(doc_type)
if period_info:
metadata.append(period_info)
metadata_str = ""
if metadata:
metadata_str = f" ({', '.join(metadata)})"
# Add current date for uniqueness and sorting
current_date = format_timestamp(for_filename=True)
date_str = f" - {current_date}"
# Generate final user-friendly filename
descriptive_name = f"{readable_name}{metadata_str}{date_str}{file_ext}"
return descriptive_name
def extract_subject_tags(result, raw_text, preprocessing_options=None):
"""
Extract subject tags from OCR result
Args:
result: OCR result dictionary
raw_text: Raw text from OCR
preprocessing_options: Dictionary of preprocessing options
Returns:
list: Subject tags
"""
subject_tags = []
try:
# Use existing topics as starting point if available
if 'topics' in result and result['topics']:
subject_tags = list(result['topics'])
# Add document type if detected
if 'detected_document_type' in result:
doc_type = result['detected_document_type'].capitalize()
if doc_type not in subject_tags:
subject_tags.append(doc_type)
# Analyze content for common themes based on keywords
if raw_text:
raw_text_lower = raw_text.lower()
# Track keyword matches for each theme and their frequency
theme_matches = {}
# First pass - find all matching keywords for each theme
for theme, keywords in CONTENT_THEMES.items():
matches = []
for keyword in keywords:
# For multi-word keywords, we want exact phrase matching
if " " in keyword:
if keyword in raw_text_lower:
matches.append(keyword)
# For single-word keywords, we want word boundary matching to avoid partial matches
else:
import re
pattern = r'\b' + re.escape(keyword) + r'\b'
if re.search(pattern, raw_text_lower):
matches.append(keyword)
if matches:
# Store both the matches and their count
theme_matches[theme] = {
"matches": matches,
"count": len(matches)
}
# Sort themes by match count in descending order
sorted_themes = sorted(theme_matches.keys(),
key=lambda t: theme_matches[t]["count"],
reverse=True)
# Add the most relevant themes (more matches = more relevant)
# Limit to top 5 themes to avoid too many irrelevant tags
top_themes = sorted_themes[:5] if len(sorted_themes) > 5 else sorted_themes
# Add historical period tags first (they're often most important for historical research)
period_themes = [t for t in top_themes if t in [
"Prehistoric", "Ancient World", "Medieval", "Renaissance",
"Early Modern", "18th Century", "19th Century", "20th Century", "Contemporary"
]]
for theme in period_themes:
if theme not in subject_tags:
subject_tags.append(theme)
# Then add the remaining top themes
for theme in top_themes:
if theme not in period_themes and theme not in subject_tags:
subject_tags.append(theme)
# Add debug information to log
if theme_matches:
logger.info(f"Extracted themes: {', '.join(top_themes)}")
logger.info(f"Theme match details: {theme_matches}")
# Add document period tag if date patterns are detected
if raw_text:
# Look for years in content
import re
year_matches = re.findall(r'\b1[0-9]{3}\b|\b20[0-1][0-9]\b', raw_text)
if year_matches:
# Convert to integers
years = [int(y) for y in year_matches]
# Get earliest year
earliest = min(years)
# Find the period tag for this year
for year_range, period_tag in PERIOD_TAGS.items():
if year_range[0] <= earliest <= year_range[1]:
if period_tag not in subject_tags:
subject_tags.append(period_tag)
break
# Add languages as topics if available
if 'languages' in result and result['languages']:
for lang in result['languages']:
if lang and lang not in subject_tags:
lang_tag = f"{lang} Language"
subject_tags.append(lang_tag)
# Add preprocessing information as tags if preprocessing was applied
if preprocessing_options:
preprocessing_methods = []
if preprocessing_options.get("document_type", "standard") != "standard":
doc_type = preprocessing_options["document_type"].capitalize()
preprocessing_tag = f"Enhanced ({doc_type})"
if preprocessing_tag not in subject_tags:
subject_tags.append(preprocessing_tag)
if preprocessing_options.get("grayscale", False):
preprocessing_methods.append("Grayscale")
if preprocessing_options.get("denoise", False):
preprocessing_methods.append("Denoised")
if preprocessing_options.get("contrast", 0) != 0:
contrast_val = preprocessing_options.get("contrast", 0)
if contrast_val > 0:
preprocessing_methods.append("Contrast Enhanced")
else:
preprocessing_methods.append("Contrast Reduced")
if preprocessing_options.get("rotation", 0) != 0:
preprocessing_methods.append("Rotated")
# Add a combined preprocessing tag if methods were applied
if preprocessing_methods:
prep_tag = "Preprocessed"
if prep_tag not in subject_tags:
subject_tags.append(prep_tag)
# Add the specific method as a tag if only one was used
if len(preprocessing_methods) == 1:
method_tag = preprocessing_methods[0]
if method_tag not in subject_tags:
subject_tags.append(method_tag)
except Exception as e:
logger.warning(f"Error generating subject tags: {str(e)}")
# Fallback tags if extraction fails
if not subject_tags:
subject_tags = DEFAULT_TAGS.copy()
# Ensure we have at least 3 tags
while len(subject_tags) < 3:
for tag in DEFAULT_TAGS:
if tag not in subject_tags:
subject_tags.append(tag)
break
else:
# If all default tags are already used, add generic ones
for tag in GENERIC_TAGS:
if tag not in subject_tags:
subject_tags.append(tag)
break
else:
# If we still can't add any more tags, break the loop
break
return subject_tags