historical-ocr / utils.py
milwright's picture
Reconcile main with preview-improvements branch, implementing modular structure, raw text editing, and enhanced framing
7647e70
raw
history blame
10.2 kB
import os
import base64
import hashlib
import time
import logging
from datetime import datetime
from pathlib import Path
from functools import wraps
from constants import CONTENT_THEMES, PERIOD_TAGS, DEFAULT_TAGS, GENERIC_TAGS
# Configure logging
logger = logging.getLogger("utils")
logger.setLevel(logging.INFO)
def get_base64_from_image(image_path):
"""Get base64 string from image file"""
try:
with open(image_path, "rb") as img_file:
return base64.b64encode(img_file.read()).decode('utf-8')
except Exception as e:
logger.error(f"Error encoding image to base64: {str(e)}")
return ""
def timing(description):
"""Context manager for timing code execution"""
class TimingContext:
def __init__(self, description):
self.description = description
def __enter__(self):
self.start_time = time.time()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
end_time = time.time()
execution_time = end_time - self.start_time
logger.info(f"{self.description} took {execution_time:.2f} seconds")
return False
return TimingContext(description)
def format_timestamp(timestamp=None):
"""Format timestamp for display"""
if timestamp is None:
timestamp = datetime.now()
elif isinstance(timestamp, str):
try:
timestamp = datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S")
except ValueError:
timestamp = datetime.now()
return timestamp.strftime("%Y-%m-%d %H:%M")
def generate_cache_key(file_bytes, file_type, use_vision, preprocessing_options=None, pdf_rotation=0, custom_prompt=None):
"""
Generate a cache key for OCR processing
Args:
file_bytes: File content as bytes
file_type: Type of file (pdf or image)
use_vision: Whether to use vision model
preprocessing_options: Dictionary of preprocessing options
pdf_rotation: PDF rotation value
custom_prompt: Custom prompt for OCR
Returns:
str: Cache key
"""
# Generate file hash
file_hash = hashlib.md5(file_bytes).hexdigest()
# Include preprocessing options in cache key
preprocessing_options_hash = ""
if preprocessing_options:
# Add pdf_rotation to preprocessing options to ensure it's part of the cache key
if pdf_rotation != 0:
preprocessing_options_with_rotation = preprocessing_options.copy()
preprocessing_options_with_rotation['pdf_rotation'] = pdf_rotation
preprocessing_str = str(sorted(preprocessing_options_with_rotation.items()))
else:
preprocessing_str = str(sorted(preprocessing_options.items()))
preprocessing_options_hash = hashlib.md5(preprocessing_str.encode()).hexdigest()
elif pdf_rotation != 0:
# If no preprocessing options but we have rotation, include that in the hash
preprocessing_options_hash = hashlib.md5(f"pdf_rotation_{pdf_rotation}".encode()).hexdigest()
# Create base cache key
cache_key = f"{file_hash}_{file_type}_{use_vision}_{preprocessing_options_hash}"
# Include custom prompt in cache key if provided
if custom_prompt:
custom_prompt_hash = hashlib.md5(str(custom_prompt).encode()).hexdigest()
cache_key = f"{cache_key}_{custom_prompt_hash}"
return cache_key
def handle_temp_files(temp_file_paths):
"""
Clean up temporary files
Args:
temp_file_paths: List of temporary file paths to clean up
"""
for temp_path in temp_file_paths:
try:
if os.path.exists(temp_path):
os.unlink(temp_path)
logger.info(f"Removed temporary file: {temp_path}")
except Exception as e:
logger.warning(f"Failed to remove temporary file {temp_path}: {str(e)}")
def create_descriptive_filename(original_filename, result, file_ext, preprocessing_options=None):
"""
Create a descriptive filename for the result
Args:
original_filename: Original filename
result: OCR result dictionary
file_ext: File extension
preprocessing_options: Dictionary of preprocessing options
Returns:
str: Descriptive filename
"""
# Get base name without extension
original_name = Path(original_filename).stem
# Add document type to filename if detected
doc_type_tag = ""
if 'detected_document_type' in result:
doc_type = result['detected_document_type'].lower()
doc_type_tag = f"_{doc_type.replace(' ', '_')}"
elif 'topics' in result and result['topics']:
# Use first tag as document type if not explicitly detected
doc_type_tag = f"_{result['topics'][0].lower().replace(' ', '_')}"
# Add period tag for historical context if available
period_tag = ""
if 'topics' in result and result['topics']:
for tag in result['topics']:
if "century" in tag.lower() or "pre-" in tag.lower() or "era" in tag.lower():
period_tag = f"_{tag.lower().replace(' ', '_')}"
break
# Generate final descriptive filename
descriptive_name = f"{original_name}{doc_type_tag}{period_tag}{file_ext}"
return descriptive_name
def extract_subject_tags(result, raw_text, preprocessing_options=None):
"""
Extract subject tags from OCR result
Args:
result: OCR result dictionary
raw_text: Raw text from OCR
preprocessing_options: Dictionary of preprocessing options
Returns:
list: Subject tags
"""
subject_tags = []
try:
# Use existing topics as starting point if available
if 'topics' in result and result['topics']:
subject_tags = list(result['topics'])
# Add document type if detected
if 'detected_document_type' in result:
doc_type = result['detected_document_type'].capitalize()
if doc_type not in subject_tags:
subject_tags.append(doc_type)
# Analyze content for common themes based on keywords
if raw_text:
raw_text_lower = raw_text.lower()
for theme, keywords in CONTENT_THEMES.items():
if any(keyword in raw_text_lower for keyword in keywords):
if theme not in subject_tags:
subject_tags.append(theme)
# Add document period tag if date patterns are detected
if raw_text:
# Look for years in content
import re
year_matches = re.findall(r'\b1[0-9]{3}\b|\b20[0-1][0-9]\b', raw_text)
if year_matches:
# Convert to integers
years = [int(y) for y in year_matches]
# Get earliest year
earliest = min(years)
# Find the period tag for this year
for year_range, period_tag in PERIOD_TAGS.items():
if year_range[0] <= earliest <= year_range[1]:
if period_tag not in subject_tags:
subject_tags.append(period_tag)
break
# Add languages as topics if available
if 'languages' in result and result['languages']:
for lang in result['languages']:
if lang and lang not in subject_tags:
lang_tag = f"{lang} Language"
subject_tags.append(lang_tag)
# Add preprocessing information as tags if preprocessing was applied
if preprocessing_options:
preprocessing_methods = []
if preprocessing_options.get("document_type", "standard") != "standard":
doc_type = preprocessing_options["document_type"].capitalize()
preprocessing_tag = f"Enhanced ({doc_type})"
if preprocessing_tag not in subject_tags:
subject_tags.append(preprocessing_tag)
if preprocessing_options.get("grayscale", False):
preprocessing_methods.append("Grayscale")
if preprocessing_options.get("denoise", False):
preprocessing_methods.append("Denoised")
if preprocessing_options.get("contrast", 0) != 0:
contrast_val = preprocessing_options.get("contrast", 0)
if contrast_val > 0:
preprocessing_methods.append("Contrast Enhanced")
else:
preprocessing_methods.append("Contrast Reduced")
if preprocessing_options.get("rotation", 0) != 0:
preprocessing_methods.append("Rotated")
# Add a combined preprocessing tag if methods were applied
if preprocessing_methods:
prep_tag = "Preprocessed"
if prep_tag not in subject_tags:
subject_tags.append(prep_tag)
# Add the specific method as a tag if only one was used
if len(preprocessing_methods) == 1:
method_tag = preprocessing_methods[0]
if method_tag not in subject_tags:
subject_tags.append(method_tag)
except Exception as e:
logger.warning(f"Error generating subject tags: {str(e)}")
# Fallback tags if extraction fails
if not subject_tags:
subject_tags = DEFAULT_TAGS.copy()
# Ensure we have at least 3 tags
while len(subject_tags) < 3:
for tag in DEFAULT_TAGS:
if tag not in subject_tags:
subject_tags.append(tag)
break
else:
# If all default tags are already used, add generic ones
for tag in GENERIC_TAGS:
if tag not in subject_tags:
subject_tags.append(tag)
break
else:
# If we still can't add any more tags, break the loop
break
return subject_tags