Spaces:
Running
Running
import os | |
import base64 | |
import hashlib | |
import time | |
import logging | |
from datetime import datetime | |
from pathlib import Path | |
from functools import wraps | |
from constants import CONTENT_THEMES, PERIOD_TAGS, DEFAULT_TAGS, GENERIC_TAGS | |
# Configure logging | |
logger = logging.getLogger("utils") | |
logger.setLevel(logging.INFO) | |
def get_base64_from_image(image_path): | |
"""Get base64 string from image file""" | |
try: | |
with open(image_path, "rb") as img_file: | |
return base64.b64encode(img_file.read()).decode('utf-8') | |
except Exception as e: | |
logger.error(f"Error encoding image to base64: {str(e)}") | |
return "" | |
def timing(description): | |
"""Context manager for timing code execution""" | |
class TimingContext: | |
def __init__(self, description): | |
self.description = description | |
def __enter__(self): | |
self.start_time = time.time() | |
return self | |
def __exit__(self, exc_type, exc_val, exc_tb): | |
end_time = time.time() | |
execution_time = end_time - self.start_time | |
logger.info(f"{self.description} took {execution_time:.2f} seconds") | |
return False | |
return TimingContext(description) | |
def format_timestamp(timestamp=None): | |
"""Format timestamp for display""" | |
if timestamp is None: | |
timestamp = datetime.now() | |
elif isinstance(timestamp, str): | |
try: | |
timestamp = datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S") | |
except ValueError: | |
timestamp = datetime.now() | |
return timestamp.strftime("%Y-%m-%d %H:%M") | |
def generate_cache_key(file_bytes, file_type, use_vision, preprocessing_options=None, pdf_rotation=0, custom_prompt=None): | |
""" | |
Generate a cache key for OCR processing | |
Args: | |
file_bytes: File content as bytes | |
file_type: Type of file (pdf or image) | |
use_vision: Whether to use vision model | |
preprocessing_options: Dictionary of preprocessing options | |
pdf_rotation: PDF rotation value | |
custom_prompt: Custom prompt for OCR | |
Returns: | |
str: Cache key | |
""" | |
# Generate file hash | |
file_hash = hashlib.md5(file_bytes).hexdigest() | |
# Include preprocessing options in cache key | |
preprocessing_options_hash = "" | |
if preprocessing_options: | |
# Add pdf_rotation to preprocessing options to ensure it's part of the cache key | |
if pdf_rotation != 0: | |
preprocessing_options_with_rotation = preprocessing_options.copy() | |
preprocessing_options_with_rotation['pdf_rotation'] = pdf_rotation | |
preprocessing_str = str(sorted(preprocessing_options_with_rotation.items())) | |
else: | |
preprocessing_str = str(sorted(preprocessing_options.items())) | |
preprocessing_options_hash = hashlib.md5(preprocessing_str.encode()).hexdigest() | |
elif pdf_rotation != 0: | |
# If no preprocessing options but we have rotation, include that in the hash | |
preprocessing_options_hash = hashlib.md5(f"pdf_rotation_{pdf_rotation}".encode()).hexdigest() | |
# Create base cache key | |
cache_key = f"{file_hash}_{file_type}_{use_vision}_{preprocessing_options_hash}" | |
# Include custom prompt in cache key if provided | |
if custom_prompt: | |
custom_prompt_hash = hashlib.md5(str(custom_prompt).encode()).hexdigest() | |
cache_key = f"{cache_key}_{custom_prompt_hash}" | |
return cache_key | |
def handle_temp_files(temp_file_paths): | |
""" | |
Clean up temporary files | |
Args: | |
temp_file_paths: List of temporary file paths to clean up | |
""" | |
for temp_path in temp_file_paths: | |
try: | |
if os.path.exists(temp_path): | |
os.unlink(temp_path) | |
logger.info(f"Removed temporary file: {temp_path}") | |
except Exception as e: | |
logger.warning(f"Failed to remove temporary file {temp_path}: {str(e)}") | |
def create_descriptive_filename(original_filename, result, file_ext, preprocessing_options=None): | |
""" | |
Create a descriptive filename for the result | |
Args: | |
original_filename: Original filename | |
result: OCR result dictionary | |
file_ext: File extension | |
preprocessing_options: Dictionary of preprocessing options | |
Returns: | |
str: Descriptive filename | |
""" | |
# Get base name without extension | |
original_name = Path(original_filename).stem | |
# Add document type to filename if detected | |
doc_type_tag = "" | |
if 'detected_document_type' in result: | |
doc_type = result['detected_document_type'].lower() | |
doc_type_tag = f"_{doc_type.replace(' ', '_')}" | |
elif 'topics' in result and result['topics']: | |
# Use first tag as document type if not explicitly detected | |
doc_type_tag = f"_{result['topics'][0].lower().replace(' ', '_')}" | |
# Add period tag for historical context if available | |
period_tag = "" | |
if 'topics' in result and result['topics']: | |
for tag in result['topics']: | |
if "century" in tag.lower() or "pre-" in tag.lower() or "era" in tag.lower(): | |
period_tag = f"_{tag.lower().replace(' ', '_')}" | |
break | |
# Generate final descriptive filename | |
descriptive_name = f"{original_name}{doc_type_tag}{period_tag}{file_ext}" | |
return descriptive_name | |
def extract_subject_tags(result, raw_text, preprocessing_options=None): | |
""" | |
Extract subject tags from OCR result | |
Args: | |
result: OCR result dictionary | |
raw_text: Raw text from OCR | |
preprocessing_options: Dictionary of preprocessing options | |
Returns: | |
list: Subject tags | |
""" | |
subject_tags = [] | |
try: | |
# Use existing topics as starting point if available | |
if 'topics' in result and result['topics']: | |
subject_tags = list(result['topics']) | |
# Add document type if detected | |
if 'detected_document_type' in result: | |
doc_type = result['detected_document_type'].capitalize() | |
if doc_type not in subject_tags: | |
subject_tags.append(doc_type) | |
# Analyze content for common themes based on keywords | |
if raw_text: | |
raw_text_lower = raw_text.lower() | |
for theme, keywords in CONTENT_THEMES.items(): | |
if any(keyword in raw_text_lower for keyword in keywords): | |
if theme not in subject_tags: | |
subject_tags.append(theme) | |
# Add document period tag if date patterns are detected | |
if raw_text: | |
# Look for years in content | |
import re | |
year_matches = re.findall(r'\b1[0-9]{3}\b|\b20[0-1][0-9]\b', raw_text) | |
if year_matches: | |
# Convert to integers | |
years = [int(y) for y in year_matches] | |
# Get earliest year | |
earliest = min(years) | |
# Find the period tag for this year | |
for year_range, period_tag in PERIOD_TAGS.items(): | |
if year_range[0] <= earliest <= year_range[1]: | |
if period_tag not in subject_tags: | |
subject_tags.append(period_tag) | |
break | |
# Add languages as topics if available | |
if 'languages' in result and result['languages']: | |
for lang in result['languages']: | |
if lang and lang not in subject_tags: | |
lang_tag = f"{lang} Language" | |
subject_tags.append(lang_tag) | |
# Add preprocessing information as tags if preprocessing was applied | |
if preprocessing_options: | |
preprocessing_methods = [] | |
if preprocessing_options.get("document_type", "standard") != "standard": | |
doc_type = preprocessing_options["document_type"].capitalize() | |
preprocessing_tag = f"Enhanced ({doc_type})" | |
if preprocessing_tag not in subject_tags: | |
subject_tags.append(preprocessing_tag) | |
if preprocessing_options.get("grayscale", False): | |
preprocessing_methods.append("Grayscale") | |
if preprocessing_options.get("denoise", False): | |
preprocessing_methods.append("Denoised") | |
if preprocessing_options.get("contrast", 0) != 0: | |
contrast_val = preprocessing_options.get("contrast", 0) | |
if contrast_val > 0: | |
preprocessing_methods.append("Contrast Enhanced") | |
else: | |
preprocessing_methods.append("Contrast Reduced") | |
if preprocessing_options.get("rotation", 0) != 0: | |
preprocessing_methods.append("Rotated") | |
# Add a combined preprocessing tag if methods were applied | |
if preprocessing_methods: | |
prep_tag = "Preprocessed" | |
if prep_tag not in subject_tags: | |
subject_tags.append(prep_tag) | |
# Add the specific method as a tag if only one was used | |
if len(preprocessing_methods) == 1: | |
method_tag = preprocessing_methods[0] | |
if method_tag not in subject_tags: | |
subject_tags.append(method_tag) | |
except Exception as e: | |
logger.warning(f"Error generating subject tags: {str(e)}") | |
# Fallback tags if extraction fails | |
if not subject_tags: | |
subject_tags = DEFAULT_TAGS.copy() | |
# Ensure we have at least 3 tags | |
while len(subject_tags) < 3: | |
for tag in DEFAULT_TAGS: | |
if tag not in subject_tags: | |
subject_tags.append(tag) | |
break | |
else: | |
# If all default tags are already used, add generic ones | |
for tag in GENERIC_TAGS: | |
if tag not in subject_tags: | |
subject_tags.append(tag) | |
break | |
else: | |
# If we still can't add any more tags, break the loop | |
break | |
return subject_tags | |