Spaces:
Running
Running
import os | |
import logging | |
import cv2 | |
import numpy as np | |
from PIL import Image | |
from pdf2image import convert_from_path | |
from pytesseract import Output, pytesseract | |
from scipy.ndimage import rotate | |
from surya.ocr import run_ocr | |
from surya.model.detection.model import load_model as load_det_model, load_processor as load_det_processor | |
from surya.model.recognition.model import load_model as load_rec_model | |
from surya.model.recognition.processor import load_processor as load_rec_processor | |
import imutils | |
import gradio as gr | |
# Set the Tesseract path (update this path based on your system) | |
# pytesseract.pytesseract.tesseract_cmd = r'C:\Program Files\Tesseract-OCR\tesseract.exe' # Windows | |
pytesseract.tesseract_cmd = r'/usr/bin/tesseract' # Correct | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
# Initialize OCR models | |
det_processor, det_model = load_det_processor(), load_det_model() | |
rec_model, rec_processor = load_rec_model(), load_rec_processor() | |
class DocumentProcessor: | |
def __init__(self, output_dir: str = "output"): | |
self.output_dir = output_dir | |
self.corrected_images_dir = os.path.join(output_dir, "corrected_images") | |
self.extracted_text_dir = os.path.join(output_dir, "extracted_text") | |
self.detected_text_dir = os.path.join(output_dir, "Detected_Text_Line") | |
self.detected_layout_dir = os.path.join(output_dir, "Detected_layout") | |
self._create_dirs() | |
def _create_dirs(self): | |
"""Create output directories if they don't exist.""" | |
os.makedirs(self.corrected_images_dir, exist_ok=True) | |
os.makedirs(self.extracted_text_dir, exist_ok=True) | |
os.makedirs(self.detected_text_dir, exist_ok=True) | |
os.makedirs(self.detected_layout_dir, exist_ok=True) | |
def process_document(self, input_path: str): | |
""" | |
Process a PDF or image to: | |
1. Correct image skew and rotation. | |
2. Extract text using OCR. | |
3. Save corrected images, detected images, and extracted text. | |
""" | |
try: | |
if input_path.endswith(".pdf"): | |
images = self._convert_pdf_to_images(input_path) | |
else: | |
images = [Image.open(input_path)] | |
# Run Surya detection and layout | |
self._run_surya_detection(input_path) | |
corrected_images = [] | |
extracted_texts = [] | |
for i, image in enumerate(images): | |
logging.info(f"Processing page {i + 1}") | |
corrected_image = self._correct_image_rotation(image) | |
extracted_text = self._extract_text(corrected_image) | |
# Save results | |
self._save_results(corrected_image, extracted_text, i + 1) | |
corrected_images.append(corrected_image) | |
extracted_texts.append(extracted_text) | |
return corrected_images, extracted_texts | |
except Exception as e: | |
logging.error(f"Error processing document: {e}") | |
raise | |
def _convert_pdf_to_images(self, pdf_path: str): | |
"""Convert PDF to a list of images.""" | |
logging.info(f"Converting PDF to images: {pdf_path}") | |
return convert_from_path(pdf_path) | |
def _run_surya_detection(self, input_path: str): | |
"""Run Surya detection and layout commands.""" | |
logging.info("Running Surya detection and layout") | |
# Step 1: Run surya_detect | |
os.system(f"surya_detect --results_dir {self.detected_text_dir} --images {input_path}") | |
# Extract the PDF name (without extension) | |
pdf_name = os.path.splitext(os.path.basename(input_path))[0] | |
# Step 2: Remove column files | |
os.system(f"rm {self.detected_text_dir}/{pdf_name}/*column*") | |
# Step 3: Run surya_layout | |
os.system(f"surya_layout --results_dir {self.detected_layout_dir} --images {input_path}") | |
def _correct_image_rotation(self, image: Image.Image): | |
"""Correct the skew and rotation of the image.""" | |
logging.info("Correcting image rotation") | |
if isinstance(image, Image.Image): | |
image = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) | |
# Correct skew | |
corrected_image = self._correct_skew(image) | |
# Correct rotation | |
results = pytesseract.image_to_osd( | |
corrected_image, | |
output_type=Output.DICT, | |
config='--dpi 300 --psm 0 -c min_characters_to_try=5 -c tessedit_script_lang=Arabic' | |
) | |
if results["orientation"] != 0: | |
corrected_image = imutils.rotate_bound(corrected_image, angle=results["rotate"]) | |
return Image.fromarray(cv2.cvtColor(corrected_image, cv2.COLOR_BGR2RGB)) | |
def _correct_skew(self, image: np.ndarray, delta: float = 0.1, limit: int = 3): | |
"""Correct the skew of an image by finding the best angle.""" | |
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) | |
thresh = cv2.adaptiveThreshold( | |
gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, | |
cv2.THRESH_BINARY_INV, 41, 15 | |
) | |
scores = [] | |
angles = np.arange(-limit, limit + delta, delta) | |
for angle in angles: | |
_, score = self._determine_score(thresh, angle) | |
scores.append(score) | |
best_angle = angles[scores.index(max(scores))] | |
(h, w) = image.shape[:2] | |
center = (w // 2, h // 2) | |
M = cv2.getRotationMatrix2D(center, best_angle, 1.0) | |
rotated = cv2.warpAffine( | |
image, M, (w, h), flags=cv2.INTER_LINEAR, | |
borderMode=cv2.BORDER_CONSTANT, borderValue=(255, 255, 255) | |
) | |
logging.info(f"Detected skew angle: {best_angle} degrees") | |
return rotated | |
def _determine_score(self, arr: np.ndarray, angle: float): | |
"""Rotate the image and calculate the score based on pixel intensity.""" | |
data = rotate(arr, angle, reshape=False, order=0) | |
histogram = np.sum(data, axis=1, dtype=float) | |
score = np.sum((histogram[1:] - histogram[:-1]) ** 2, dtype=float) | |
return histogram, score | |
def _extract_text(self, image: Image.Image): | |
"""Extract text from the image using OCR.""" | |
logging.info("Extracting text") | |
extracted_text_surya = run_ocr([image], [["en"]], det_model, det_processor, rec_model, rec_processor) | |
surya_text = [line.text for line in extracted_text_surya[0].text_lines] | |
return "\n".join(surya_text) | |
def _save_results(self, corrected_image: Image.Image, extracted_text: str, page_num: int): | |
"""Save corrected images and extracted text.""" | |
# Save corrected image | |
corrected_image.save(os.path.join(self.corrected_images_dir, f"page_{page_num}_corrected.png")) | |
# Save extracted text | |
with open(os.path.join(self.extracted_text_dir, f"page_{page_num}_text.txt"), "w", encoding="utf-8") as f: | |
f.write(extracted_text) | |
logging.info(f"Saved results for page {page_num}") | |
# Gradio Interface | |
def process_document_interface(file): | |
processor = DocumentProcessor(output_dir="output") | |
corrected_images, extracted_texts = processor.process_document(file.name) | |
# Get detected images | |
pdf_name = os.path.splitext(os.path.basename(file.name))[0] | |
detected_text_images = [ | |
os.path.join(processor.detected_text_dir, pdf_name, f"{pdf_name}_{i}_bbox.png") | |
for i in range(len(corrected_images)) | |
] | |
detected_layout_images = [ | |
os.path.join(processor.detected_layout_dir, pdf_name, f"{pdf_name}_{i}_bbox.png") | |
for i in range(len(corrected_images)) | |
] | |
# Prepare outputs | |
outputs = [] | |
for i, (corrected_image, extracted_text, detected_text_image, detected_layout_image) in enumerate(zip(corrected_images, extracted_texts, detected_text_images, detected_layout_images)): | |
outputs.append((corrected_image, detected_text_image, detected_layout_image, extracted_text)) | |
return outputs | |
# Gradio App | |
iface = gr.Interface( | |
fn=process_document_interface, | |
inputs=gr.File(label="Upload PDF or Image"), | |
outputs=[ | |
gr.Gallery(label="Corrected Images"), | |
gr.Gallery(label="Detected Text Images"), | |
gr.Gallery(label="Detected Layout Images"), | |
gr.Textbox(label="Extracted Text") | |
], | |
title="Document Processor", | |
description="Upload a PDF or image to correct skew/rotation, detect text/layout, and extract text using OCR." | |
) | |
if __name__ == "__main__": | |
iface.launch() |