Spaces:
Sleeping
Sleeping
import os | |
import json | |
import logging | |
import cv2 | |
import numpy as np | |
from PIL import Image | |
from pdf2image import convert_from_path | |
from pytesseract import Output, pytesseract | |
from scipy.ndimage import rotate | |
from surya.ocr import run_ocr | |
from surya.model.detection.model import load_model as load_det_model, load_processor as load_det_processor | |
from surya.model.recognition.model import load_model as load_rec_model | |
from surya.model.recognition.processor import load_processor as load_rec_processor | |
import imutils | |
import shutil | |
import gradio as gr | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
# Initialize OCR models | |
det_processor, det_model = load_det_processor(), load_det_model() | |
rec_model, rec_processor = load_rec_model(), load_rec_processor() | |
class DocumentProcessor: | |
def __init__(self, output_dir: str = "output"): | |
self.output_dir = output_dir | |
self.corrected_images_dir = os.path.join(output_dir, "corrected_images") | |
self.extracted_text_dir = os.path.join(output_dir, "extracted_text") | |
self._create_dirs() | |
def _create_dirs(self): | |
"""Create output directories if they don't exist.""" | |
os.makedirs(self.corrected_images_dir, exist_ok=True) | |
os.makedirs(self.extracted_text_dir, exist_ok=True) | |
def process_document(self, input_path: str): | |
""" | |
Process a PDF or image to: | |
1. Correct image skew and rotation. | |
2. Extract text using OCR. | |
3. Save corrected images and extracted text. | |
""" | |
try: | |
if input_path.endswith(".pdf"): | |
images = self._convert_pdf_to_images(input_path) | |
else: | |
images = [Image.open(input_path)] | |
# Run Surya detection and layout | |
self._run_surya_detection(input_path) | |
for i, image in enumerate(images): | |
logging.info(f"Processing page {i + 1}") | |
corrected_image = self._correct_image_rotation(image) | |
extracted_text = self._extract_text(corrected_image) | |
# Save results | |
self._save_results(corrected_image, extracted_text, i + 1) | |
except Exception as e: | |
logging.error(f"Error processing document: {e}") | |
raise | |
def _convert_pdf_to_images(self, pdf_path: str): | |
"""Convert PDF to a list of images.""" | |
logging.info(f"Converting PDF to images: {pdf_path}") | |
return convert_from_path(pdf_path) | |
def _run_surya_detection(self, input_path: str): | |
"""Run Surya detection and layout commands.""" | |
logging.info("Running Surya detection and layout") | |
detected_text_dir = "/home/output/Detected_Text_Line" | |
detected_layout_dir = "/home/output/Detected_layout" | |
ocr_dir = "/home/output/OCR" | |
# Ensure the results directories exist | |
os.makedirs(detected_text_dir, exist_ok=True) | |
os.makedirs(detected_layout_dir, exist_ok=True) | |
os.makedirs(ocr_dir, exist_ok=True) | |
# Step 1: Run surya_detect | |
os.system(f"surya_detect --results_dir {detected_text_dir} --images {input_path}") | |
# Extract the PDF name (without extension) | |
pdf_name = os.path.splitext(os.path.basename(input_path))[0] | |
# Step 2: Remove column files | |
os.system(f"rm {detected_text_dir}/{pdf_name}/*column*") | |
# Step 3: Run surya_layout | |
os.system(f"surya_layout --results_dir {detected_layout_dir} --images {input_path}") | |
# Step 4: Run surya_ocr | |
os.system(f"surya_ocr --results_dir {ocr_dir} --images {input_path}") | |
def _correct_image_rotation(self, image: Image.Image): | |
"""Correct the skew and rotation of the image.""" | |
logging.info("Correcting image rotation") | |
if isinstance(image, Image.Image): | |
image = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) | |
# Correct skew | |
corrected_image = self._correct_skew(image) | |
# Correct rotation | |
results = pytesseract.image_to_osd( | |
corrected_image, | |
output_type=Output.DICT, | |
config='--dpi 300 --psm 0 -c min_characters_to_try=5 -c tessedit_script_lang=Arabic' | |
) | |
if results["orientation"] != 0: | |
corrected_image = imutils.rotate_bound(corrected_image, angle=results["rotate"]) | |
return Image.fromarray(cv2.cvtColor(corrected_image, cv2.COLOR_BGR2RGB)) | |
def _correct_skew(self, image: np.ndarray, delta: float = 0.1, limit: int = 3): | |
"""Correct the skew of an image by finding the best angle.""" | |
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) | |
thresh = cv2.adaptiveThreshold( | |
gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, | |
cv2.THRESH_BINARY_INV, 41, 15 | |
) | |
scores = [] | |
angles = np.arange(-limit, limit + delta, delta) | |
for angle in angles: | |
_, score = self._determine_score(thresh, angle) | |
scores.append(score) | |
best_angle = angles[scores.index(max(scores))] | |
(h, w) = image.shape[:2] | |
center = (w // 2, h // 2) | |
M = cv2.getRotationMatrix2D(center, best_angle, 1.0) | |
rotated = cv2.warpAffine( | |
image, M, (w, h), flags=cv2.INTER_LINEAR, | |
borderMode=cv2.BORDER_CONSTANT, borderValue=(255, 255, 255) | |
) | |
logging.info(f"Detected skew angle: {best_angle} degrees") | |
return rotated | |
def _determine_score(self, arr: np.ndarray, angle: float): | |
"""Rotate the image and calculate the score based on pixel intensity.""" | |
data = rotate(arr, angle, reshape=False, order=0) | |
histogram = np.sum(data, axis=1, dtype=float) | |
score = np.sum((histogram[1:] - histogram[:-1]) ** 2, dtype=float) | |
return histogram, score | |
def _extract_text(self, image: Image.Image): | |
"""Extract text from the image using OCR.""" | |
logging.info("Extracting text") | |
extracted_text_surya = run_ocr([image], [["en"]], det_model, det_processor, rec_model, rec_processor) | |
surya_text = [line.text for line in extracted_text_surya[0].text_lines] | |
return "\n".join(surya_text) | |
def _save_results(self, corrected_image: Image.Image, extracted_text: str, page_num: int): | |
"""Save corrected images and extracted text.""" | |
# Save corrected image | |
corrected_image.save(os.path.join(self.corrected_images_dir, f"page_{page_num}_corrected.png")) | |
# Save extracted text | |
with open(os.path.join(self.extracted_text_dir, f"page_{page_num}_text.txt"), "w", encoding="utf-8") as f: | |
f.write(extracted_text) | |
logging.info(f"Saved results for page {page_num}") | |
# Gradio Interface | |
def process_document_interface(file): | |
processor = DocumentProcessor(output_dir="/home/output") | |
processor.process_document(file.name) | |
corrected_image_path = os.path.join("/home/output/corrected_images", "page_1_corrected.png") | |
extracted_text_path = os.path.join("/home/output/extracted_text", "page_1_text.txt") | |
with open(extracted_text_path, "r", encoding="utf-8") as f: | |
extracted_text = f.read() | |
return corrected_image_path, extracted_text | |
iface = gr.Interface( | |
fn=process_document_interface, | |
inputs=gr.File(label="Upload PDF or Image"), | |
outputs=[gr.Image(label="Corrected Image"), gr.Textbox(label="Extracted Text")], | |
title="Document Processor", | |
description="Upload a PDF or image to correct skew/rotation and extract text using OCR." | |
) | |
if __name__ == "__main__": | |
iface.launch() |