Spaces:
Runtime error
Runtime error
import fitz | |
from docx import Document | |
import re | |
import pyarabic.araby as araby | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from typing import List, Dict | |
class DocumentProcessor: | |
def __init__(self, chunk_size=512, chunk_overlap=64): | |
self.chunk_size = chunk_size | |
self.chunk_overlap = chunk_overlap | |
self.text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=chunk_size, | |
chunk_overlap=chunk_overlap, | |
separators=["\n\n", "۔", ".", "؟", "!", "\n"] | |
) | |
def _normalize_arabic(self, text: str) -> str: | |
text = araby.strip_diacritics(text) | |
text = araby.normalize_ligatures(text) | |
text = araby.normalize_hamza(text) | |
return re.sub(r'\s+', ' ', text).strip() | |
def _process_pdf(self, file_path: str) -> List[Dict]: | |
doc = fitz.open(file_path) | |
pages = [] | |
for page_num, page in enumerate(doc): | |
text = "" | |
blocks = page.get_text("dict")["blocks"] | |
for block in blocks: | |
if "lines" in block: | |
for line in block["lines"]: | |
for span in line["spans"]: | |
if span["flags"] & 16: # Bold text | |
text += f"**{span['text']}** " | |
else: | |
text += span["text"] + " " | |
pages.append({ | |
"text": self._normalize_arabic(text), | |
"source": file_path, | |
"page": page_num + 1 | |
}) | |
return pages | |
def _process_docx(self, file_path: str) -> List[Dict]: | |
doc = Document(file_path) | |
sections = [] | |
current_section = {"text": "", "source": file_path, "page": 1} | |
for para in doc.paragraphs: | |
if para.style.name.startswith('Heading'): | |
if current_section["text"]: | |
sections.append(current_section) | |
current_section = {"text": "", "source": file_path, "page": len(sections)+1} | |
current_section["text"] += f"\n# {para.text}\n" | |
else: | |
current_section["text"] += para.text + "\n" | |
if current_section["text"]: | |
sections.append(current_section) | |
return [{ | |
"text": self._normalize_arabic(s["text"]), | |
"source": s["source"], | |
"page": s["page"] | |
} for s in sections] | |
def process_documents(self, files: List) -> List[Dict]: | |
all_chunks = [] | |
for file_info in files: | |
if file_info.name.endswith(".pdf"): | |
pages = self._process_pdf(file_info.name) | |
elif file_info.name.endswith(".docx"): | |
pages = self._process_docx(file_info.name) | |
else: | |
continue | |
for page in pages: | |
chunks = self.text_splitter.split_text(page["text"]) | |
for chunk in chunks: | |
all_chunks.append({ | |
"text": chunk, | |
"source": page["source"], | |
"page": page["page"] | |
}) | |
return all_chunks |