Spaces:
Running
Running
File size: 8,710 Bytes
196a045 c50c16b 196a045 0598282 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 |
import os
import logging
import cv2
import numpy as np
from PIL import Image
from pdf2image import convert_from_path
from pytesseract import Output, pytesseract
from scipy.ndimage import rotate
from surya.ocr import run_ocr
from surya.model.detection.model import load_model as load_det_model, load_processor as load_det_processor
from surya.model.recognition.model import load_model as load_rec_model
from surya.model.recognition.processor import load_processor as load_rec_processor
import imutils
import gradio as gr
# Set the Tesseract path (update this path based on your system)
# pytesseract.pytesseract.tesseract_cmd = r'C:\Program Files\Tesseract-OCR\tesseract.exe' # Windows
pytesseract.tesseract_cmd = r'/usr/bin/tesseract' # Correct
# Configure logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
# Initialize OCR models
det_processor, det_model = load_det_processor(), load_det_model()
rec_model, rec_processor = load_rec_model(), load_rec_processor()
class DocumentProcessor:
def __init__(self, output_dir: str = "output"):
self.output_dir = output_dir
self.corrected_images_dir = os.path.join(output_dir, "corrected_images")
self.extracted_text_dir = os.path.join(output_dir, "extracted_text")
self.detected_text_dir = os.path.join(output_dir, "Detected_Text_Line")
self.detected_layout_dir = os.path.join(output_dir, "Detected_layout")
self._create_dirs()
def _create_dirs(self):
"""Create output directories if they don't exist."""
os.makedirs(self.corrected_images_dir, exist_ok=True)
os.makedirs(self.extracted_text_dir, exist_ok=True)
os.makedirs(self.detected_text_dir, exist_ok=True)
os.makedirs(self.detected_layout_dir, exist_ok=True)
def process_document(self, input_path: str):
"""
Process a PDF or image to:
1. Correct image skew and rotation.
2. Extract text using OCR.
3. Save corrected images, detected images, and extracted text.
"""
try:
if input_path.endswith(".pdf"):
images = self._convert_pdf_to_images(input_path)
else:
images = [Image.open(input_path)]
# Run Surya detection and layout
self._run_surya_detection(input_path)
corrected_images = []
extracted_texts = []
for i, image in enumerate(images):
logging.info(f"Processing page {i + 1}")
corrected_image = self._correct_image_rotation(image)
extracted_text = self._extract_text(corrected_image)
# Save results
self._save_results(corrected_image, extracted_text, i + 1)
corrected_images.append(corrected_image)
extracted_texts.append(extracted_text)
return corrected_images, extracted_texts
except Exception as e:
logging.error(f"Error processing document: {e}")
raise
def _convert_pdf_to_images(self, pdf_path: str):
"""Convert PDF to a list of images."""
logging.info(f"Converting PDF to images: {pdf_path}")
return convert_from_path(pdf_path)
def _run_surya_detection(self, input_path: str):
"""Run Surya detection and layout commands."""
logging.info("Running Surya detection and layout")
# Step 1: Run surya_detect
os.system(f"surya_detect --results_dir {self.detected_text_dir} --images {input_path}")
# Extract the PDF name (without extension)
pdf_name = os.path.splitext(os.path.basename(input_path))[0]
# Step 2: Remove column files
os.system(f"rm {self.detected_text_dir}/{pdf_name}/*column*")
# Step 3: Run surya_layout
os.system(f"surya_layout --results_dir {self.detected_layout_dir} --images {input_path}")
def _correct_image_rotation(self, image: Image.Image):
"""Correct the skew and rotation of the image."""
logging.info("Correcting image rotation")
if isinstance(image, Image.Image):
image = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)
# Correct skew
corrected_image = self._correct_skew(image)
# Correct rotation
results = pytesseract.image_to_osd(
corrected_image,
output_type=Output.DICT,
config='--dpi 300 --psm 0 -c min_characters_to_try=5 -c tessedit_script_lang=Arabic'
)
if results["orientation"] != 0:
corrected_image = imutils.rotate_bound(corrected_image, angle=results["rotate"])
return Image.fromarray(cv2.cvtColor(corrected_image, cv2.COLOR_BGR2RGB))
def _correct_skew(self, image: np.ndarray, delta: float = 0.1, limit: int = 3):
"""Correct the skew of an image by finding the best angle."""
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
thresh = cv2.adaptiveThreshold(
gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY_INV, 41, 15
)
scores = []
angles = np.arange(-limit, limit + delta, delta)
for angle in angles:
_, score = self._determine_score(thresh, angle)
scores.append(score)
best_angle = angles[scores.index(max(scores))]
(h, w) = image.shape[:2]
center = (w // 2, h // 2)
M = cv2.getRotationMatrix2D(center, best_angle, 1.0)
rotated = cv2.warpAffine(
image, M, (w, h), flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_CONSTANT, borderValue=(255, 255, 255)
)
logging.info(f"Detected skew angle: {best_angle} degrees")
return rotated
def _determine_score(self, arr: np.ndarray, angle: float):
"""Rotate the image and calculate the score based on pixel intensity."""
data = rotate(arr, angle, reshape=False, order=0)
histogram = np.sum(data, axis=1, dtype=float)
score = np.sum((histogram[1:] - histogram[:-1]) ** 2, dtype=float)
return histogram, score
def _extract_text(self, image: Image.Image):
"""Extract text from the image using OCR."""
logging.info("Extracting text")
extracted_text_surya = run_ocr([image], [["en"]], det_model, det_processor, rec_model, rec_processor)
surya_text = [line.text for line in extracted_text_surya[0].text_lines]
return "\n".join(surya_text)
def _save_results(self, corrected_image: Image.Image, extracted_text: str, page_num: int):
"""Save corrected images and extracted text."""
# Save corrected image
corrected_image.save(os.path.join(self.corrected_images_dir, f"page_{page_num}_corrected.png"))
# Save extracted text
with open(os.path.join(self.extracted_text_dir, f"page_{page_num}_text.txt"), "w", encoding="utf-8") as f:
f.write(extracted_text)
logging.info(f"Saved results for page {page_num}")
# Gradio Interface
def process_document_interface(file):
processor = DocumentProcessor(output_dir="output")
corrected_images, extracted_texts = processor.process_document(file.name)
# Get detected images
pdf_name = os.path.splitext(os.path.basename(file.name))[0]
detected_text_images = [
os.path.join(processor.detected_text_dir, pdf_name, f"{pdf_name}_{i}_bbox.png")
for i in range(len(corrected_images))
]
detected_layout_images = [
os.path.join(processor.detected_layout_dir, pdf_name, f"{pdf_name}_{i}_bbox.png")
for i in range(len(corrected_images))
]
# Prepare outputs
outputs = []
for i, (corrected_image, extracted_text, detected_text_image, detected_layout_image) in enumerate(zip(corrected_images, extracted_texts, detected_text_images, detected_layout_images)):
outputs.append((corrected_image, detected_text_image, detected_layout_image, extracted_text))
return outputs
# Gradio App
iface = gr.Interface(
fn=process_document_interface,
inputs=gr.File(label="Upload PDF or Image"),
outputs=[
gr.Gallery(label="Corrected Images"),
gr.Gallery(label="Detected Text Images"),
gr.Gallery(label="Detected Layout Images"),
gr.Textbox(label="Extracted Text")
],
title="Document Processor",
description="Upload a PDF or image to correct skew/rotation, detect text/layout, and extract text using OCR."
)
if __name__ == "__main__":
iface.launch() |