import fitz from docx import Document import re import pyarabic.araby as araby from langchain.text_splitter import RecursiveCharacterTextSplitter from typing import List, Dict class DocumentProcessor: def __init__(self, chunk_size=512, chunk_overlap=64): self.chunk_size = chunk_size self.chunk_overlap = chunk_overlap self.text_splitter = RecursiveCharacterTextSplitter( chunk_size=chunk_size, chunk_overlap=chunk_overlap, separators=["\n\n", "۔", ".", "؟", "!", "\n"] ) def _normalize_arabic(self, text: str) -> str: text = araby.strip_diacritics(text) text = araby.normalize_ligatures(text) text = araby.normalize_hamza(text) return re.sub(r'\s+', ' ', text).strip() def _process_pdf(self, file_path: str) -> List[Dict]: doc = fitz.open(file_path) pages = [] for page_num, page in enumerate(doc): text = "" blocks = page.get_text("dict")["blocks"] for block in blocks: if "lines" in block: for line in block["lines"]: for span in line["spans"]: if span["flags"] & 16: # Bold text text += f"**{span['text']}** " else: text += span["text"] + " " pages.append({ "text": self._normalize_arabic(text), "source": file_path, "page": page_num + 1 }) return pages def _process_docx(self, file_path: str) -> List[Dict]: doc = Document(file_path) sections = [] current_section = {"text": "", "source": file_path, "page": 1} for para in doc.paragraphs: if para.style.name.startswith('Heading'): if current_section["text"]: sections.append(current_section) current_section = {"text": "", "source": file_path, "page": len(sections)+1} current_section["text"] += f"\n# {para.text}\n" else: current_section["text"] += para.text + "\n" if current_section["text"]: sections.append(current_section) return [{ "text": self._normalize_arabic(s["text"]), "source": s["source"], "page": s["page"] } for s in sections] def process_documents(self, files: List) -> List[Dict]: all_chunks = [] for file_info in files: if file_info.name.endswith(".pdf"): pages = self._process_pdf(file_info.name) elif file_info.name.endswith(".docx"): pages = self._process_docx(file_info.name) else: continue for page in pages: chunks = self.text_splitter.split_text(page["text"]) for chunk in chunks: all_chunks.append({ "text": chunk, "source": page["source"], "page": page["page"] }) return all_chunks