Spaces:
Runtime error
Runtime error
| """ | |
| معالج ملفات PDF | |
| """ | |
| import os | |
| import io | |
| import re | |
| import PyPDF2 | |
| import fitz # PyMuPDF | |
| import pdfplumber | |
| import numpy as np | |
| from PIL import Image | |
| import pytesseract | |
| import pandas as pd | |
| import traceback | |
| from utils.helpers import create_directory_if_not_exists, extract_numbers_from_text | |
| def extract_text_from_pdf(file_path, method='pymupdf'): | |
| """ | |
| استخراج النص من ملف PDF | |
| المعلمات: | |
| file_path: مسار ملف PDF | |
| method: طريقة الاستخراج ('pymupdf', 'pypdf2', 'pdfplumber') | |
| الإرجاع: | |
| نص مستخرج من ملف PDF | |
| """ | |
| try: | |
| # التحقق من وجود الملف | |
| if not os.path.exists(file_path): | |
| raise FileNotFoundError(f"الملف غير موجود: {file_path}") | |
| # استخراج النص حسب الطريقة المطلوبة | |
| if method.lower() == 'pymupdf': | |
| return _extract_text_with_pymupdf(file_path) | |
| elif method.lower() == 'pypdf2': | |
| return _extract_text_with_pypdf2(file_path) | |
| elif method.lower() == 'pdfplumber': | |
| return _extract_text_with_pdfplumber(file_path) | |
| else: | |
| # استخدام PyMuPDF كطريقة افتراضية | |
| return _extract_text_with_pymupdf(file_path) | |
| except Exception as e: | |
| error_msg = f"خطأ في استخراج النص من ملف PDF: {str(e)}" | |
| print(error_msg) | |
| traceback.print_exc() | |
| raise Exception(error_msg) | |
| def _extract_text_with_pymupdf(file_path): | |
| """استخراج النص باستخدام PyMuPDF""" | |
| document = fitz.open(file_path) | |
| text = "" | |
| for page_number in range(len(document)): | |
| page = document.load_page(page_number) | |
| text += page.get_text("text") + "\n\n" | |
| document.close() | |
| return text | |
| def _extract_text_with_pypdf2(file_path): | |
| """استخراج النص باستخدام PyPDF2""" | |
| with open(file_path, 'rb') as file: | |
| reader = PyPDF2.PdfReader(file) | |
| text = "" | |
| for page_number in range(len(reader.pages)): | |
| page = reader.pages[page_number] | |
| text += page.extract_text() + "\n\n" | |
| return text | |
| def _extract_text_with_pdfplumber(file_path): | |
| """استخراج النص باستخدام pdfplumber""" | |
| with pdfplumber.open(file_path) as pdf: | |
| text = "" | |
| for page in pdf.pages: | |
| text += page.extract_text() + "\n\n" | |
| return text | |
| def extract_tables_from_pdf(file_path, page_numbers=None): | |
| """ | |
| استخراج الجداول من ملف PDF | |
| المعلمات: | |
| file_path: مسار ملف PDF | |
| page_numbers: قائمة بأرقام الصفحات للاستخراج منها (افتراضي: None لجميع الصفحات) | |
| الإرجاع: | |
| قائمة من DataFrames تمثل الجداول المستخرجة | |
| """ | |
| try: | |
| # التحقق من وجود الملف | |
| if not os.path.exists(file_path): | |
| raise FileNotFoundError(f"الملف غير موجود: {file_path}") | |
| # استخراج الجداول باستخدام pdfplumber | |
| tables = [] | |
| with pdfplumber.open(file_path) as pdf: | |
| # تحديد الصفحات المراد استخراج الجداول منها | |
| if page_numbers is None: | |
| pages_to_extract = range(len(pdf.pages)) | |
| else: | |
| pages_to_extract = [p-1 for p in page_numbers if 1 <= p <= len(pdf.pages)] | |
| # استخراج الجداول من كل صفحة | |
| for page_idx in pages_to_extract: | |
| page = pdf.pages[page_idx] | |
| page_tables = page.extract_tables() | |
| if page_tables: | |
| for table in page_tables: | |
| if table: # التحقق من أن الجدول ليس فارغًا | |
| # تحويل الجدول إلى DataFrame | |
| df = pd.DataFrame(table[1:], columns=table[0]) | |
| # تنظيف البيانات | |
| df = df.applymap(lambda x: x.strip() if isinstance(x, str) else x) | |
| # إضافة إلى قائمة الجداول | |
| tables.append(df) | |
| return tables | |
| except Exception as e: | |
| error_msg = f"خطأ في استخراج الجداول من ملف PDF: {str(e)}" | |
| print(error_msg) | |
| traceback.print_exc() | |
| raise Exception(error_msg) | |
| def extract_images_from_pdf(file_path, output_dir=None, prefix='image'): | |
| """ | |
| استخراج الصور من ملف PDF | |
| المعلمات: | |
| file_path: مسار ملف PDF | |
| output_dir: دليل الإخراج (افتراضي: None للإرجاع كقائمة من الصور) | |
| prefix: بادئة أسماء ملفات الصور | |
| الإرجاع: | |
| قائمة من مسارات الصور المستخرجة إذا تم تحديد دليل الإخراج، وإلا قائمة من الصور | |
| """ | |
| try: | |
| # التحقق من وجود الملف | |
| if not os.path.exists(file_path): | |
| raise FileNotFoundError(f"الملف غير موجود: {file_path}") | |
| # إنشاء دليل الإخراج إذا تم تحديده | |
| if output_dir: | |
| create_directory_if_not_exists(output_dir) | |
| # استخراج الصور باستخدام PyMuPDF | |
| document = fitz.open(file_path) | |
| images = [] | |
| image_paths = [] | |
| for page_idx in range(len(document)): | |
| page = document.load_page(page_idx) | |
| # استخراج الصور من الصفحة | |
| image_list = page.get_images(full=True) | |
| for img_idx, img_info in enumerate(image_list): | |
| xref = img_info[0] | |
| base_image = document.extract_image(xref) | |
| image_bytes = base_image["image"] | |
| # إنشاء كائن الصورة | |
| image = Image.open(io.BytesIO(image_bytes)) | |
| if output_dir: | |
| # حفظ الصورة في الدليل المحدد | |
| image_filename = f"{prefix}_{page_idx+1}_{img_idx+1}.{base_image['ext']}" | |
| image_path = os.path.join(output_dir, image_filename) | |
| image.save(image_path) | |
| image_paths.append(image_path) | |
| else: | |
| # إضافة الصورة إلى القائمة | |
| images.append(image) | |
| document.close() | |
| return image_paths if output_dir else images | |
| except Exception as e: | |
| error_msg = f"خطأ في استخراج الصور من ملف PDF: {str(e)}" | |
| print(error_msg) | |
| traceback.print_exc() | |
| raise Exception(error_msg) | |
| def extract_text_from_image(image, lang='ara+eng'): | |
| """ | |
| استخراج النص من صورة باستخدام OCR | |
| المعلمات: | |
| image: كائن الصورة أو مسار الصورة | |
| lang: لغة النص (افتراضي: 'ara+eng' للعربية والإنجليزية) | |
| الإرجاع: | |
| النص المستخرج من الصورة | |
| """ | |
| try: | |
| # إذا كان مسار صورة، قم بفتحها | |
| if isinstance(image, str): | |
| image = Image.open(image) | |
| # استخراج النص باستخدام pytesseract | |
| text = pytesseract.image_to_string(image, lang=lang) | |
| return text | |
| except Exception as e: | |
| error_msg = f"خطأ في استخراج النص من الصورة: {str(e)}" | |
| print(error_msg) | |
| traceback.print_exc() | |
| return "" | |
| def ocr_pdf(file_path, lang='ara+eng'): | |
| """ | |
| تنفيذ OCR على ملف PDF | |
| المعلمات: | |
| file_path: مسار ملف PDF | |
| lang: لغة النص (افتراضي: 'ara+eng' للعربية والإنجليزية) | |
| الإرجاع: | |
| النص المستخرج من ملف PDF | |
| """ | |
| try: | |
| # التحقق من وجود الملف | |
| if not os.path.exists(file_path): | |
| raise FileNotFoundError(f"الملف غير موجود: {file_path}") | |
| # فتح ملف PDF | |
| document = fitz.open(file_path) | |
| text = "" | |
| for page_idx in range(len(document)): | |
| page = document.load_page(page_idx) | |
| # تحويل الصفحة إلى صورة | |
| pix = page.get_pixmap(matrix=fitz.Matrix(300/72, 300/72)) | |
| img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) | |
| # استخراج النص من الصورة | |
| page_text = extract_text_from_image(img, lang=lang) | |
| text += page_text + "\n\n" | |
| document.close() | |
| return text | |
| except Exception as e: | |
| error_msg = f"خطأ في تنفيذ OCR على ملف PDF: {str(e)}" | |
| print(error_msg) | |
| traceback.print_exc() | |
| raise Exception(error_msg) | |
| def search_in_pdf(file_path, search_text): | |
| """ | |
| البحث عن نص في ملف PDF | |
| المعلمات: | |
| file_path: مسار ملف PDF | |
| search_text: النص المراد البحث عنه | |
| الإرجاع: | |
| قائمة من النتائج {page_number, text_snippet, matched_text} | |
| """ | |
| try: | |
| # التحقق من وجود الملف | |
| if not os.path.exists(file_path): | |
| raise FileNotFoundError(f"الملف غير موجود: {file_path}") | |
| # استخراج النص من ملف PDF | |
| document = fitz.open(file_path) | |
| results = [] | |
| for page_idx in range(len(document)): | |
| page = document.load_page(page_idx) | |
| page_text = page.get_text("text") | |
| # البحث عن النص | |
| if search_text.lower() in page_text.lower(): | |
| # استخراج المقتطفات التي تحتوي على النص المطلوب | |
| lines = page_text.split('\n') | |
| for line in lines: | |
| if search_text.lower() in line.lower(): | |
| results.append({ | |
| 'page_number': page_idx + 1, | |
| 'text_snippet': line, | |
| 'matched_text': search_text | |
| }) | |
| document.close() | |
| return results | |
| except Exception as e: | |
| error_msg = f"خطأ في البحث في ملف PDF: {str(e)}" | |
| print(error_msg) | |
| traceback.print_exc() | |
| raise Exception(error_msg) | |
| def extract_quantities_from_pdf(file_path): | |
| """ | |
| استخراج الكميات من ملف PDF | |
| المعلمات: | |
| file_path: مسار ملف PDF | |
| الإرجاع: | |
| DataFrame يحتوي على البنود والكميات المستخرجة | |
| """ | |
| try: | |
| # استخراج النص والجداول | |
| text = extract_text_from_pdf(file_path) | |
| tables = extract_tables_from_pdf(file_path) | |
| quantities = [] | |
| # استخراج الكميات من الجداول | |
| for table in tables: | |
| # البحث عن أعمدة تحتوي على "الكمية" أو "الوحدة" أو "البند" | |
| quantity_cols = [col for col in table.columns if any(term in col.lower() for term in ['كمية', 'عدد', 'الكمية'])] | |
| unit_cols = [col for col in table.columns if any(term in col.lower() for term in ['وحدة', 'الوحدة'])] | |
| item_cols = [col for col in table.columns if any(term in col.lower() for term in ['بند', 'وصف', 'البند', 'العمل'])] | |
| if quantity_cols and (unit_cols or item_cols): | |
| quantity_col = quantity_cols[0] | |
| unit_col = unit_cols[0] if unit_cols else None | |
| item_col = item_cols[0] if item_cols else None | |
| # استخراج الكميات | |
| for _, row in table.iterrows(): | |
| if pd.notna(row[quantity_col]) and (item_col is None or pd.notna(row[item_col])): | |
| quantity_value = extract_numbers_from_text(row[quantity_col]) | |
| quantity = quantity_value[0] if quantity_value else None | |
| quantities.append({ | |
| 'البند': row[item_col] if item_col else "غير محدد", | |
| 'الوحدة': row[unit_col] if unit_col else "غير محدد", | |
| 'الكمية': quantity | |
| }) | |
| # استخراج الكميات من النص | |
| lines = text.split('\n') | |
| for line in lines: | |
| # البحث عن الخطوط التي تحتوي على أرقام ووحدات قياس | |
| if re.search(r'\d+(?:,\d+)*(?:\.\d+)?', line) and any(unit in line for unit in ['م2', 'م3', 'متر', 'طن', 'كجم', 'عدد']): | |
| numbers = extract_numbers_from_text(line) | |
| if numbers: | |
| # استخراج وحدة القياس | |
| unit_match = re.search(r'\b(م2|م3|متر مربع|متر مكعب|م\.ط|طن|كجم|عدد|قطعة)\b', line) | |
| unit = unit_match.group(1) if unit_match else "غير محدد" | |
| quantities.append({ | |
| 'البند': line, | |
| 'الوحدة': unit, | |
| 'الكمية': numbers[0] | |
| }) | |
| # إنشاء DataFrame | |
| if quantities: | |
| quantities_df = pd.DataFrame(quantities) | |
| return quantities_df | |
| else: | |
| return pd.DataFrame(columns=['البند', 'الوحدة', 'الكمية']) | |
| except Exception as e: | |
| error_msg = f"خطأ في استخراج الكميات من ملف PDF: {str(e)}" | |
| print(error_msg) | |
| traceback.print_exc() | |
| raise Exception(error_msg) | |
| def merge_pdfs(input_paths, output_path): | |
| """ | |
| دمج ملفات PDF متعددة في ملف واحد | |
| المعلمات: | |
| input_paths: قائمة من مسارات ملفات PDF المراد دمجها | |
| output_path: مسار ملف PDF الناتج | |
| الإرجاع: | |
| True في حالة النجاح | |
| """ | |
| try: | |
| # التحقق من وجود الملفات | |
| for file_path in input_paths: | |
| if not os.path.exists(file_path): | |
| raise FileNotFoundError(f"الملف غير موجود: {file_path}") | |
| # إنشاء مجلد الإخراج إذا لم يكن موجودًا | |
| output_dir = os.path.dirname(output_path) | |
| create_directory_if_not_exists(output_dir) | |
| # دمج ملفات PDF | |
| merger = PyPDF2.PdfMerger() | |
| for file_path in input_paths: | |
| merger.append(file_path) | |
| merger.write(output_path) | |
| merger.close() | |
| return True | |
| except Exception as e: | |
| error_msg = f"خطأ في دمج ملفات PDF: {str(e)}" | |
| print(error_msg) | |
| traceback.print_exc() | |
| raise Exception(error_msg) | |
| def split_pdf(input_path, output_dir, prefix='page'): | |
| """ | |
| تقسيم ملف PDF إلى ملفات منفصلة لكل صفحة | |
| المعلمات: | |
| input_path: مسار ملف PDF المراد تقسيمه | |
| output_dir: دليل الإخراج | |
| prefix: بادئة أسماء ملفات الإخراج | |
| الإرجاع: | |
| قائمة من مسارات ملفات PDF الناتجة | |
| """ | |
| try: | |
| # التحقق من وجود الملف | |
| if not os.path.exists(input_path): | |
| raise FileNotFoundError(f"الملف غير موجود: {input_path}") | |
| # إنشاء دليل الإخراج | |
| create_directory_if_not_exists(output_dir) | |
| # قراءة ملف PDF | |
| with open(input_path, 'rb') as file: | |
| reader = PyPDF2.PdfReader(file) | |
| output_files = [] | |
| # تقسيم كل صفحة إلى ملف منفصل | |
| for page_idx in range(len(reader.pages)): | |
| writer = PyPDF2.PdfWriter() | |
| writer.add_page(reader.pages[page_idx]) | |
| output_filename = f"{prefix}_{page_idx+1}.pdf" | |
| output_path = os.path.join(output_dir, output_filename) | |
| with open(output_path, 'wb') as output_file: | |
| writer.write(output_file) | |
| output_files.append(output_path) | |
| return output_files | |
| except Exception as e: | |
| error_msg = f"خطأ في تقسيم ملف PDF: {str(e)}" | |
| print(error_msg) | |
| traceback.print_exc() | |
| raise Exception(error_msg) |