Spaces:
Running
Running
File size: 8,234 Bytes
7647e70 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
import os
import io
import cv2
import numpy as np
import tempfile
from PIL import Image, ImageEnhance, ImageFilter
from pdf2image import convert_from_bytes
import streamlit as st
import logging
# Configure logging
logger = logging.getLogger("preprocessing")
logger.setLevel(logging.INFO)
@st.cache_data(ttl=24*3600, show_spinner=False) # Cache for 24 hours
def convert_pdf_to_images(pdf_bytes, dpi=150, rotation=0):
"""Convert PDF bytes to a list of images with caching"""
try:
images = convert_from_bytes(pdf_bytes, dpi=dpi)
# Apply rotation if specified
if rotation != 0 and images:
rotated_images = []
for img in images:
rotated_img = img.rotate(rotation, expand=True, resample=Image.BICUBIC)
rotated_images.append(rotated_img)
return rotated_images
return images
except Exception as e:
st.error(f"Error converting PDF: {str(e)}")
logger.error(f"PDF conversion error: {str(e)}")
return []
@st.cache_data(ttl=24*3600, show_spinner=False, hash_funcs={dict: lambda x: str(sorted(x.items()))})
def preprocess_image(image_bytes, preprocessing_options):
"""Preprocess image with selected options optimized for historical document OCR quality"""
# Setup basic console logging
logger = logging.getLogger("image_preprocessor")
logger.setLevel(logging.INFO)
# Log which preprocessing options are being applied
logger.info(f"Preprocessing image with options: {preprocessing_options}")
# Convert bytes to PIL Image
image = Image.open(io.BytesIO(image_bytes))
# Check for alpha channel (RGBA) and convert to RGB if needed
if image.mode == 'RGBA':
# Convert RGBA to RGB by compositing the image onto a white background
background = Image.new('RGB', image.size, (255, 255, 255))
background.paste(image, mask=image.split()[3]) # 3 is the alpha channel
image = background
logger.info("Converted RGBA image to RGB")
elif image.mode not in ('RGB', 'L'):
# Convert other modes to RGB as well
image = image.convert('RGB')
logger.info(f"Converted {image.mode} image to RGB")
# Apply rotation if specified
if preprocessing_options.get("rotation", 0) != 0:
rotation_degrees = preprocessing_options.get("rotation")
image = image.rotate(rotation_degrees, expand=True, resample=Image.BICUBIC)
# Resize large images while preserving details important for OCR
width, height = image.size
max_dimension = max(width, height)
# Less aggressive resizing to preserve document details
if max_dimension > 2500:
scale_factor = 2500 / max_dimension
new_width = int(width * scale_factor)
new_height = int(height * scale_factor)
# Use LANCZOS for better quality preservation
image = image.resize((new_width, new_height), Image.LANCZOS)
img_array = np.array(image)
# Apply preprocessing based on selected options with settings optimized for historical documents
document_type = preprocessing_options.get("document_type", "standard")
# Process grayscale option first as it's a common foundation
if preprocessing_options.get("grayscale", False):
if len(img_array.shape) == 3: # Only convert if it's not already grayscale
if document_type == "handwritten":
# Enhanced grayscale processing for handwritten documents
img_array = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY)
# Apply adaptive histogram equalization to enhance handwriting
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
img_array = clahe.apply(img_array)
else:
# Standard grayscale for printed documents
img_array = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY)
# Convert back to RGB for further processing
img_array = cv2.cvtColor(img_array, cv2.COLOR_GRAY2RGB)
if preprocessing_options.get("contrast", 0) != 0:
contrast_factor = 1 + (preprocessing_options.get("contrast", 0) / 100)
image = Image.fromarray(img_array)
enhancer = ImageEnhance.Contrast(image)
image = enhancer.enhance(contrast_factor)
img_array = np.array(image)
if preprocessing_options.get("denoise", False):
try:
# Apply appropriate denoising based on document type
if document_type == "handwritten":
# Very light denoising for handwritten documents to preserve pen strokes
if len(img_array.shape) == 3 and img_array.shape[2] == 3: # Color image
img_array = cv2.fastNlMeansDenoisingColored(img_array, None, 3, 3, 5, 9)
else: # Grayscale image
img_array = cv2.fastNlMeansDenoising(img_array, None, 3, 7, 21)
else:
# Standard denoising for printed documents
if len(img_array.shape) == 3 and img_array.shape[2] == 3: # Color image
img_array = cv2.fastNlMeansDenoisingColored(img_array, None, 5, 5, 7, 21)
else: # Grayscale image
img_array = cv2.fastNlMeansDenoising(img_array, None, 5, 7, 21)
except Exception as e:
logger.error(f"Denoising error: {str(e)}, falling back to standard processing")
# Convert back to PIL Image
processed_image = Image.fromarray(img_array)
# Higher quality for OCR processing
byte_io = io.BytesIO()
try:
# Make sure the image is in RGB mode before saving as JPEG
if processed_image.mode not in ('RGB', 'L'):
processed_image = processed_image.convert('RGB')
processed_image.save(byte_io, format='JPEG', quality=92, optimize=True)
byte_io.seek(0)
logger.info(f"Preprocessing complete. Original image mode: {image.mode}, processed mode: {processed_image.mode}")
logger.info(f"Original size: {len(image_bytes)/1024:.1f}KB, processed size: {len(byte_io.getvalue())/1024:.1f}KB")
return byte_io.getvalue()
except Exception as e:
logger.error(f"Error saving processed image: {str(e)}")
# Fallback to original image
logger.info("Using original image as fallback")
image_io = io.BytesIO()
image.save(image_io, format='JPEG', quality=92)
image_io.seek(0)
return image_io.getvalue()
def create_temp_file(content, suffix, temp_file_paths):
"""Create a temporary file and track it for cleanup"""
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp:
tmp.write(content)
temp_path = tmp.name
# Track temporary file for cleanup
temp_file_paths.append(temp_path)
logger.info(f"Created temporary file: {temp_path}")
return temp_path
def apply_preprocessing_to_file(file_bytes, file_ext, preprocessing_options, temp_file_paths):
"""Apply preprocessing to file and return path to processed file"""
# Check if any preprocessing options with boolean values are True, or if any non-boolean values are non-default
has_preprocessing = (
preprocessing_options.get("grayscale", False) or
preprocessing_options.get("denoise", False) or
preprocessing_options.get("contrast", 0) != 0 or
preprocessing_options.get("rotation", 0) != 0 or
preprocessing_options.get("document_type", "standard") != "standard"
)
if has_preprocessing:
# Apply preprocessing
processed_bytes = preprocess_image(file_bytes, preprocessing_options)
# Save processed image to temp file
temp_path = create_temp_file(processed_bytes, file_ext, temp_file_paths)
return temp_path, True # Return path and flag indicating preprocessing was applied
else:
# No preprocessing needed, just save the original file
temp_path = create_temp_file(file_bytes, file_ext, temp_file_paths)
return temp_path, False # Return path and flag indicating no preprocessing was applied
|