Spaces:
Running
Running
import os | |
import io | |
import cv2 | |
import numpy as np | |
import tempfile | |
from PIL import Image, ImageEnhance, ImageFilter | |
from pdf2image import convert_from_bytes | |
import streamlit as st | |
import logging | |
# Configure logging | |
logger = logging.getLogger("preprocessing") | |
logger.setLevel(logging.INFO) | |
# Cache for 24 hours | |
def convert_pdf_to_images(pdf_bytes, dpi=150, rotation=0): | |
"""Convert PDF bytes to a list of images with caching""" | |
try: | |
images = convert_from_bytes(pdf_bytes, dpi=dpi) | |
# Apply rotation if specified | |
if rotation != 0 and images: | |
rotated_images = [] | |
for img in images: | |
rotated_img = img.rotate(rotation, expand=True, resample=Image.BICUBIC) | |
rotated_images.append(rotated_img) | |
return rotated_images | |
return images | |
except Exception as e: | |
st.error(f"Error converting PDF: {str(e)}") | |
logger.error(f"PDF conversion error: {str(e)}") | |
return [] | |
def preprocess_image(image_bytes, preprocessing_options): | |
"""Preprocess image with selected options optimized for historical document OCR quality""" | |
# Setup basic console logging | |
logger = logging.getLogger("image_preprocessor") | |
logger.setLevel(logging.INFO) | |
# Log which preprocessing options are being applied | |
logger.info(f"Preprocessing image with options: {preprocessing_options}") | |
# Convert bytes to PIL Image | |
image = Image.open(io.BytesIO(image_bytes)) | |
# Check for alpha channel (RGBA) and convert to RGB if needed | |
if image.mode == 'RGBA': | |
# Convert RGBA to RGB by compositing the image onto a white background | |
background = Image.new('RGB', image.size, (255, 255, 255)) | |
background.paste(image, mask=image.split()[3]) # 3 is the alpha channel | |
image = background | |
logger.info("Converted RGBA image to RGB") | |
elif image.mode not in ('RGB', 'L'): | |
# Convert other modes to RGB as well | |
image = image.convert('RGB') | |
logger.info(f"Converted {image.mode} image to RGB") | |
# Apply rotation if specified | |
if preprocessing_options.get("rotation", 0) != 0: | |
rotation_degrees = preprocessing_options.get("rotation") | |
image = image.rotate(rotation_degrees, expand=True, resample=Image.BICUBIC) | |
# Resize large images while preserving details important for OCR | |
width, height = image.size | |
max_dimension = max(width, height) | |
# Less aggressive resizing to preserve document details | |
if max_dimension > 2500: | |
scale_factor = 2500 / max_dimension | |
new_width = int(width * scale_factor) | |
new_height = int(height * scale_factor) | |
# Use LANCZOS for better quality preservation | |
image = image.resize((new_width, new_height), Image.LANCZOS) | |
img_array = np.array(image) | |
# Apply preprocessing based on selected options with settings optimized for historical documents | |
document_type = preprocessing_options.get("document_type", "standard") | |
# Process grayscale option first as it's a common foundation | |
if preprocessing_options.get("grayscale", False): | |
if len(img_array.shape) == 3: # Only convert if it's not already grayscale | |
if document_type == "handwritten": | |
# Enhanced grayscale processing for handwritten documents | |
img_array = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY) | |
# Apply adaptive histogram equalization to enhance handwriting | |
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8)) | |
img_array = clahe.apply(img_array) | |
else: | |
# Standard grayscale for printed documents | |
img_array = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY) | |
# Convert back to RGB for further processing | |
img_array = cv2.cvtColor(img_array, cv2.COLOR_GRAY2RGB) | |
if preprocessing_options.get("contrast", 0) != 0: | |
contrast_factor = 1 + (preprocessing_options.get("contrast", 0) / 100) | |
image = Image.fromarray(img_array) | |
enhancer = ImageEnhance.Contrast(image) | |
image = enhancer.enhance(contrast_factor) | |
img_array = np.array(image) | |
if preprocessing_options.get("denoise", False): | |
try: | |
# Apply appropriate denoising based on document type | |
if document_type == "handwritten": | |
# Very light denoising for handwritten documents to preserve pen strokes | |
if len(img_array.shape) == 3 and img_array.shape[2] == 3: # Color image | |
img_array = cv2.fastNlMeansDenoisingColored(img_array, None, 3, 3, 5, 9) | |
else: # Grayscale image | |
img_array = cv2.fastNlMeansDenoising(img_array, None, 3, 7, 21) | |
else: | |
# Standard denoising for printed documents | |
if len(img_array.shape) == 3 and img_array.shape[2] == 3: # Color image | |
img_array = cv2.fastNlMeansDenoisingColored(img_array, None, 5, 5, 7, 21) | |
else: # Grayscale image | |
img_array = cv2.fastNlMeansDenoising(img_array, None, 5, 7, 21) | |
except Exception as e: | |
logger.error(f"Denoising error: {str(e)}, falling back to standard processing") | |
# Convert back to PIL Image | |
processed_image = Image.fromarray(img_array) | |
# Higher quality for OCR processing | |
byte_io = io.BytesIO() | |
try: | |
# Make sure the image is in RGB mode before saving as JPEG | |
if processed_image.mode not in ('RGB', 'L'): | |
processed_image = processed_image.convert('RGB') | |
processed_image.save(byte_io, format='JPEG', quality=92, optimize=True) | |
byte_io.seek(0) | |
logger.info(f"Preprocessing complete. Original image mode: {image.mode}, processed mode: {processed_image.mode}") | |
logger.info(f"Original size: {len(image_bytes)/1024:.1f}KB, processed size: {len(byte_io.getvalue())/1024:.1f}KB") | |
return byte_io.getvalue() | |
except Exception as e: | |
logger.error(f"Error saving processed image: {str(e)}") | |
# Fallback to original image | |
logger.info("Using original image as fallback") | |
image_io = io.BytesIO() | |
image.save(image_io, format='JPEG', quality=92) | |
image_io.seek(0) | |
return image_io.getvalue() | |
def create_temp_file(content, suffix, temp_file_paths): | |
"""Create a temporary file and track it for cleanup""" | |
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp: | |
tmp.write(content) | |
temp_path = tmp.name | |
# Track temporary file for cleanup | |
temp_file_paths.append(temp_path) | |
logger.info(f"Created temporary file: {temp_path}") | |
return temp_path | |
def apply_preprocessing_to_file(file_bytes, file_ext, preprocessing_options, temp_file_paths): | |
"""Apply preprocessing to file and return path to processed file""" | |
# Check if any preprocessing options with boolean values are True, or if any non-boolean values are non-default | |
has_preprocessing = ( | |
preprocessing_options.get("grayscale", False) or | |
preprocessing_options.get("denoise", False) or | |
preprocessing_options.get("contrast", 0) != 0 or | |
preprocessing_options.get("rotation", 0) != 0 or | |
preprocessing_options.get("document_type", "standard") != "standard" | |
) | |
if has_preprocessing: | |
# Apply preprocessing | |
processed_bytes = preprocess_image(file_bytes, preprocessing_options) | |
# Save processed image to temp file | |
temp_path = create_temp_file(processed_bytes, file_ext, temp_file_paths) | |
return temp_path, True # Return path and flag indicating preprocessing was applied | |
else: | |
# No preprocessing needed, just save the original file | |
temp_path = create_temp_file(file_bytes, file_ext, temp_file_paths) | |
return temp_path, False # Return path and flag indicating no preprocessing was applied | |