Spaces:
Running
Running
import os | |
import hashlib | |
import tempfile | |
import streamlit as st | |
import logging | |
import time | |
from datetime import datetime | |
from pathlib import Path | |
from structured_ocr import StructuredOCR | |
from utils import generate_cache_key, timing, format_timestamp, create_descriptive_filename, extract_subject_tags | |
from preprocessing import apply_preprocessing_to_file | |
from error_handler import handle_ocr_error, check_file_size | |
# Configure logging | |
logger = logging.getLogger("ocr_processing") | |
logger.setLevel(logging.INFO) | |
def process_file_cached(file_path, file_type, use_vision, file_size_mb, cache_key, preprocessing_options_hash=None): | |
""" | |
Cached version of OCR processing to reuse results | |
Args: | |
file_path: Path to the file to process | |
file_type: Type of file (pdf or image) | |
use_vision: Whether to use vision model | |
file_size_mb: File size in MB | |
cache_key: Cache key for the file | |
preprocessing_options_hash: Hash of preprocessing options | |
Returns: | |
dict: OCR result | |
""" | |
# Initialize OCR processor | |
processor = StructuredOCR() | |
# Process the file | |
with timing(f"OCR processing of {file_type} file"): | |
result = processor.process_file( | |
file_path, | |
file_type=file_type, | |
use_vision=use_vision, | |
file_size_mb=file_size_mb | |
) | |
return result | |
def process_file(uploaded_file, use_vision=True, preprocessing_options=None, progress_reporter=None, | |
pdf_dpi=150, max_pages=3, pdf_rotation=0, custom_prompt=None, perf_mode="Quality"): | |
""" | |
Process the uploaded file and return the OCR results | |
Args: | |
uploaded_file: The uploaded file to process | |
use_vision: Whether to use vision model | |
preprocessing_options: Dictionary of preprocessing options | |
progress_reporter: ProgressReporter instance for UI updates | |
pdf_dpi: DPI for PDF conversion | |
max_pages: Maximum number of pages to process | |
pdf_rotation: PDF rotation value | |
custom_prompt: Custom prompt for OCR | |
perf_mode: Performance mode (Quality or Speed) | |
Returns: | |
dict: OCR result | |
""" | |
if preprocessing_options is None: | |
preprocessing_options = {} | |
# Create a container for progress indicators if not provided | |
if progress_reporter is None: | |
from ui_components import ProgressReporter | |
progress_reporter = ProgressReporter(st.empty()).setup() | |
# Initialize temporary file paths list | |
temp_file_paths = [] | |
try: | |
# Check if file size exceeds maximum allowed size | |
is_valid, file_size_mb, error_message = check_file_size(uploaded_file.getvalue()) | |
if not is_valid: | |
progress_reporter.complete(success=False) | |
st.error(error_message) | |
return { | |
"file_name": uploaded_file.name, | |
"topics": ["Document"], | |
"languages": ["English"], | |
"error": error_message, | |
"ocr_contents": { | |
"error": error_message, | |
"partial_text": "Document could not be processed due to size limitations." | |
} | |
} | |
# Update progress | |
progress_reporter.update(10, "Initializing OCR processor...") | |
# Determine file type from extension | |
file_ext = Path(uploaded_file.name).suffix.lower() | |
file_type = "pdf" if file_ext == ".pdf" else "image" | |
file_bytes = uploaded_file.getvalue() | |
# For PDFs, we need to handle differently | |
if file_type == "pdf": | |
progress_reporter.update(20, "Converting PDF to images...") | |
# Process PDF with direct handling | |
progress_reporter.update(30, "Processing PDF with OCR...") | |
# Create a temporary file for processing | |
temp_path = tempfile.NamedTemporaryFile(delete=False, suffix=file_ext).name | |
with open(temp_path, 'wb') as f: | |
f.write(file_bytes) | |
temp_file_paths.append(temp_path) | |
# Generate cache key | |
cache_key = generate_cache_key( | |
file_bytes, | |
file_type, | |
use_vision, | |
preprocessing_options, | |
pdf_rotation, | |
custom_prompt | |
) | |
# Process with cached function if possible | |
try: | |
result = process_file_cached(temp_path, file_type, use_vision, file_size_mb, cache_key, str(preprocessing_options)) | |
progress_reporter.update(90, "Finalizing results...") | |
except Exception as e: | |
logger.warning(f"Cached processing failed: {str(e)}. Retrying with direct processing.") | |
progress_reporter.update(60, f"Processing error: {str(e)}. Retrying...") | |
# If caching fails, process directly | |
processor = StructuredOCR() | |
# Apply performance mode settings | |
if perf_mode == "Speed": | |
# Override settings for faster processing | |
if pdf_dpi > 100: | |
pdf_dpi = 100 # Lower DPI for speed | |
# Process directly with optimized settings | |
result = processor.process_file( | |
file_path=temp_path, | |
file_type="pdf", | |
use_vision=use_vision, | |
custom_prompt=custom_prompt, | |
file_size_mb=file_size_mb, | |
pdf_rotation=pdf_rotation | |
) | |
progress_reporter.update(90, "Finalizing results...") | |
else: | |
# For image files | |
progress_reporter.update(20, "Preparing image for processing...") | |
# Apply preprocessing if needed | |
temp_path, preprocessing_applied = apply_preprocessing_to_file( | |
file_bytes, | |
file_ext, | |
preprocessing_options, | |
temp_file_paths | |
) | |
if preprocessing_applied: | |
progress_reporter.update(30, "Applied image preprocessing...") | |
# Generate cache key | |
cache_key = generate_cache_key( | |
open(temp_path, 'rb').read(), | |
file_type, | |
use_vision, | |
preprocessing_options, | |
0, # No rotation for images (handled in preprocessing) | |
custom_prompt | |
) | |
# Process the file using cached function if possible | |
progress_reporter.update(50, "Processing document with OCR...") | |
try: | |
result = process_file_cached(temp_path, file_type, use_vision, file_size_mb, cache_key, str(preprocessing_options)) | |
progress_reporter.update(80, "Analyzing document structure...") | |
progress_reporter.update(90, "Finalizing results...") | |
except Exception as e: | |
logger.warning(f"Cached processing failed: {str(e)}. Retrying with direct processing.") | |
progress_reporter.update(60, f"Processing error: {str(e)}. Retrying...") | |
# If caching fails, process directly | |
processor = StructuredOCR() | |
# Apply performance mode settings | |
if perf_mode == "Speed": | |
# Use simpler processing for speed | |
pass # Any speed optimizations would be handled by the StructuredOCR class | |
result = processor.process_file( | |
file_path=temp_path, | |
file_type=file_type, | |
use_vision=use_vision, | |
custom_prompt=custom_prompt, | |
file_size_mb=file_size_mb | |
) | |
progress_reporter.update(90, "Finalizing results...") | |
# Add additional metadata to result | |
result = process_result(result, uploaded_file, preprocessing_options) | |
# Complete progress | |
progress_reporter.complete() | |
return result | |
except Exception as e: | |
# Handle errors | |
error_message = handle_ocr_error(e, progress_reporter) | |
# Return error result | |
return { | |
"file_name": uploaded_file.name, | |
"topics": ["Document"], | |
"languages": ["English"], | |
"error": error_message, | |
"ocr_contents": { | |
"error": f"Failed to process file: {error_message}", | |
"partial_text": "Document could not be processed due to an error." | |
} | |
} | |
finally: | |
# Clean up temporary files | |
for temp_path in temp_file_paths: | |
try: | |
if os.path.exists(temp_path): | |
os.unlink(temp_path) | |
logger.info(f"Removed temporary file: {temp_path}") | |
except Exception as e: | |
logger.warning(f"Failed to remove temporary file {temp_path}: {str(e)}") | |
def process_result(result, uploaded_file, preprocessing_options=None): | |
""" | |
Process OCR result to add metadata, tags, etc. | |
Args: | |
result: OCR result dictionary | |
uploaded_file: The uploaded file | |
preprocessing_options: Dictionary of preprocessing options | |
Returns: | |
dict: Processed OCR result | |
""" | |
# Add timestamp | |
result['timestamp'] = format_timestamp() | |
# Add processing time if not already present | |
if 'processing_time' not in result: | |
result['processing_time'] = 0.0 | |
# Generate descriptive filename | |
file_ext = Path(uploaded_file.name).suffix.lower() | |
result['descriptive_file_name'] = create_descriptive_filename( | |
uploaded_file.name, | |
result, | |
file_ext, | |
preprocessing_options | |
) | |
# Extract raw text from OCR contents | |
raw_text = "" | |
if 'ocr_contents' in result: | |
if 'raw_text' in result['ocr_contents']: | |
raw_text = result['ocr_contents']['raw_text'] | |
elif 'content' in result['ocr_contents']: | |
raw_text = result['ocr_contents']['content'] | |
# Extract subject tags if not already present or enhance existing ones | |
if 'topics' not in result or not result['topics']: | |
result['topics'] = extract_subject_tags(result, raw_text, preprocessing_options) | |
return result | |