Spaces:
Running
Running
import os | |
import logging | |
import cv2 | |
import numpy as np | |
from pdf2image import convert_from_path | |
from pytesseract import Output, pytesseract | |
from scipy.ndimage import rotate | |
from surya.ocr import run_ocr | |
from surya.model.detection.model import load_model as load_det_model, load_processor as load_det_processor | |
from surya.model.recognition.model import load_model as load_rec_model | |
from surya.model.recognition.processor import load_processor as load_rec_processor | |
import imutils | |
import gradio as gr | |
import subprocess | |
import glob | |
from PIL import Image, ImageDraw | |
from pytesseract import Output | |
import pytesseract | |
# Function to correct image skew | |
def correct_skew(image, delta=0.1, limit=3): | |
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) | |
thresh = cv2.adaptiveThreshold( | |
gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, | |
cv2.THRESH_BINARY_INV, 41, 15 | |
) | |
scores = [] | |
angles = np.arange(-limit, limit + delta, delta) | |
for angle in angles: | |
_, score = determine_score(thresh, angle) | |
scores.append(score) | |
best_angle = angles[scores.index(max(scores))] | |
(h, w) = image.shape[:2] | |
center = (w // 2, h // 2) | |
M = cv2.getRotationMatrix2D(center, best_angle, 1.0) | |
rotated = cv2.warpAffine( | |
image, M, (w, h), flags=cv2.INTER_LINEAR, | |
borderMode=cv2.BORDER_CONSTANT, borderValue=(255, 255, 255) | |
) | |
print(f"[INFO] Detected skew angle: {best_angle} degrees") | |
return rotated | |
def determine_score(arr, angle): | |
data = rotate(arr, angle, reshape=False, order=0) | |
histogram = np.sum(data, axis=1, dtype=float) | |
score = np.sum((histogram[1:] - histogram[:-1]) ** 2, dtype=float) | |
return histogram, score | |
def correct_image_rotation(image): | |
if isinstance(image, Image.Image): | |
original_size = image.size | |
print('image original size is:', original_size) | |
image = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) | |
image_required = image.copy() | |
h, w = image_required.shape[:2] | |
cropped_rotated = cv2.resize(image_required, (w * 4, h * 4)) | |
results = pytesseract.image_to_osd( | |
cropped_rotated, | |
output_type=Output.DICT, | |
config='--dpi 300 --psm 0 -c min_characters_to_try=5 -c tessedit_script_lang=Arabic' | |
) | |
if results["script"] not in ['Bengali', 'Latin', 'Greek', 'Katakana'] and results["orientation"] != 180: | |
print("[INFO] Detected orientation: {}".format(results["orientation"])) | |
print("[INFO] Rotate by {} degrees to correct".format(results["rotate"])) | |
print("[INFO] Detected script: {}".format(results["script"])) | |
rotated = imutils.rotate_bound(image, angle=results['rotate']) | |
if results['rotate'] in [90, 270]: | |
rotated_h, rotated_w = rotated.shape[:2] | |
original_size = (rotated_w, rotated_h) | |
print(f"Rotated dimensions: {rotated_w}x{rotated_h}") | |
if (rotated_w, rotated_h) != (h, w): | |
rotated = cv2.resize(rotated, (w, h)) | |
else: | |
print("[INFO] Major orientation is correct, proceeding to fine-tune...") | |
rotated = image | |
final_rotated = correct_skew(rotated) | |
rotated_pil = Image.fromarray(cv2.cvtColor(final_rotated, cv2.COLOR_BGR2RGB)) | |
print('resize the image to its original size: ', original_size) | |
corrected_image = rotated_pil.resize(original_size, Image.Resampling.LANCZOS) | |
return corrected_image | |
# Function to process PDF or image and detect text lines | |
def process_pdf(file_path): | |
# Define the results directories | |
detected_text_dir = "/home/Detected_Text_Line" | |
detected_layout_dir = "/home/Detected_layout" | |
ocr_dir = "/home/OCR" | |
# Ensure the results directories exist | |
os.makedirs(detected_text_dir, exist_ok=True) | |
os.makedirs(detected_layout_dir, exist_ok=True) | |
os.makedirs(ocr_dir, exist_ok=True) | |
# Extract the PDF name (without extension) | |
pdf_name = os.path.splitext(os.path.basename(file_path))[0] | |
# Step 1: Run surya_detect | |
try: | |
subprocess.run( | |
["surya_detect", "--results_dir", detected_text_dir, "--images", file_path], | |
check=True, | |
) | |
print(f"[INFO] surya_detect completed for {file_path}") | |
except subprocess.CalledProcessError as e: | |
print(f"[ERROR] surya_detect failed: {e}") | |
return None | |
# Step 2: Remove column files (if they exist) | |
column_files = glob.glob(f"{detected_text_dir}/{pdf_name}/*column*") | |
if column_files: | |
try: | |
subprocess.run(["rm"] + column_files, check=True) | |
print(f"[INFO] Removed column files for {pdf_name}") | |
except subprocess.CalledProcessError as e: | |
print(f"[ERROR] Failed to remove column files: {e}") | |
else: | |
print(f"[INFO] No column files found for {pdf_name}") | |
# Return the path to the directory containing the output images | |
output_dir = os.path.join(detected_text_dir, pdf_name) | |
return output_dir | |
# Function to handle the Gradio interface | |
def gradio_interface(file): | |
# Step 1: Correct the skew of the input file | |
corrected_images = [] | |
if file.name.lower().endswith('.pdf'): | |
images = convert_from_path(file.name) | |
for i, image in enumerate(images): | |
corrected_image = correct_image_rotation(image) | |
corrected_images.append(corrected_image) | |
else: | |
image = Image.open(file.name) | |
corrected_image = correct_image_rotation(image) | |
corrected_images.append(corrected_image) | |
# Save corrected images to a folder | |
corrected_dir = "/home/Corrected_Images" | |
os.makedirs(corrected_dir, exist_ok=True) | |
for i, corrected_image in enumerate(corrected_images): | |
corrected_image.save(os.path.join(corrected_dir, f"corrected_{i}.png")) | |
# Step 2: Detect text lines in the corrected images | |
detected_dir = process_pdf(corrected_dir) | |
if detected_dir is None: | |
# Return a placeholder image with an error message | |
error_image = Image.new("RGB", (400, 200), color="red") | |
error_draw = ImageDraw.Draw(error_image) | |
error_draw.text((10, 10), "Error detecting text lines. Check the logs for details.", fill="white") | |
return corrected_images, [error_image] | |
# Load and return the detected text line images | |
detected_images = [] | |
for image_file in sorted(os.listdir(detected_dir)): | |
if image_file.endswith((".png", ".jpg", ".jpeg")): | |
image_path = os.path.join(detected_dir, image_file) | |
detected_images.append(Image.open(image_path)) | |
if not detected_images: | |
# Return a placeholder image if no output images are found | |
placeholder_image = Image.new("RGB", (400, 200), color="gray") | |
placeholder_draw = ImageDraw.Draw(placeholder_image) | |
placeholder_draw.text((10, 10), "No detected text line images found.", fill="white") | |
return corrected_images, [placeholder_image] | |
return corrected_images, detected_images | |
# Gradio Interface | |
iface = gr.Interface( | |
fn=gradio_interface, | |
inputs=gr.File(label="Upload PDF or Image"), | |
outputs=[ | |
gr.Gallery(label="Corrected Images", columns=[2], height="auto"), | |
gr.Gallery(label="Detected Text Lines", columns=[2], height="auto"), | |
], | |
title="PDF/Image Skew Correction and Text Line Detection", | |
description="Upload a PDF or image to correct skew and detect text lines.", | |
) | |
iface.launch() |