|
import os |
|
import json |
|
from typing import List, Dict, Any |
|
import pdfplumber |
|
from docx import Document |
|
from config.settings import Config |
|
|
|
class DocumentProcessor: |
|
def __init__(self): |
|
self.config = Config() |
|
|
|
def validate_file_size(self, file_path: str) -> bool: |
|
"""Validate file size is within limits.""" |
|
file_size_mb = os.path.getsize(file_path) / (1024 * 1024) |
|
return file_size_mb <= self.config.MAX_FILE_SIZE_MB |
|
|
|
def load_document(self, file_path: str) -> str: |
|
"""Load document content based on file extension.""" |
|
if not self.validate_file_size(file_path): |
|
raise ValueError(f"File size exceeds {self.config.MAX_FILE_SIZE_MB}MB limit") |
|
|
|
file_ext = os.path.splitext(file_path)[1].lower() |
|
|
|
if file_ext == '.pdf': |
|
return self._load_pdf(file_path) |
|
elif file_ext == '.docx': |
|
return self._load_docx(file_path) |
|
elif file_ext == '.txt': |
|
return self._load_txt(file_path) |
|
elif file_ext == '.json': |
|
return self._load_json(file_path) |
|
else: |
|
raise ValueError(f"Unsupported file format: {file_ext}") |
|
|
|
def _load_pdf(self, file_path: str) -> str: |
|
"""Load PDF content.""" |
|
text = "" |
|
with pdfplumber.open(file_path) as pdf: |
|
for page in pdf.pages: |
|
page_text = page.extract_text() |
|
if page_text: |
|
text += page_text + "\n" |
|
return text |
|
|
|
def _load_docx(self, file_path: str) -> str: |
|
"""Load DOCX content.""" |
|
doc = Document(file_path) |
|
text = "" |
|
for paragraph in doc.paragraphs: |
|
text += paragraph.text + "\n" |
|
return text |
|
|
|
def _load_txt(self, file_path: str) -> str: |
|
"""Load TXT content.""" |
|
with open(file_path, 'r', encoding='utf-8') as file: |
|
return file.read() |
|
|
|
def _load_json(self, file_path: str) -> str: |
|
"""Load JSON content and convert to text.""" |
|
with open(file_path, 'r', encoding='utf-8') as file: |
|
data = json.load(file) |
|
return json.dumps(data, indent=2) |
|
|
|
def chunk_text(self, text: str) -> List[str]: |
|
"""Split text into overlapping chunks for processing.""" |
|
if len(text) <= self.config.CHUNK_SIZE: |
|
return [text] |
|
|
|
chunks = [] |
|
start = 0 |
|
|
|
while start < len(text): |
|
end = start + self.config.CHUNK_SIZE |
|
|
|
|
|
if end < len(text): |
|
|
|
sentence_end = text.rfind('.', start, end) |
|
if sentence_end == -1: |
|
sentence_end = text.rfind('!', start, end) |
|
if sentence_end == -1: |
|
sentence_end = text.rfind('?', start, end) |
|
|
|
if sentence_end != -1 and sentence_end > start + self.config.CHUNK_SIZE // 2: |
|
end = sentence_end + 1 |
|
|
|
chunk = text[start:end].strip() |
|
if chunk: |
|
chunks.append(chunk) |
|
|
|
start = end - self.config.CHUNK_OVERLAP |
|
if start >= len(text): |
|
break |
|
|
|
return chunks |
|
|
|
def process_documents(self, file_paths: List[str], batch_mode: bool = False) -> List[Dict[str, Any]]: |
|
"""Process multiple documents.""" |
|
results = [] |
|
|
|
for file_path in file_paths: |
|
try: |
|
content = self.load_document(file_path) |
|
chunks = self.chunk_text(content) |
|
|
|
results.append({ |
|
'file_path': file_path, |
|
'content': content, |
|
'chunks': chunks, |
|
'status': 'success' |
|
}) |
|
|
|
if not batch_mode: |
|
break |
|
|
|
except Exception as e: |
|
results.append({ |
|
'file_path': file_path, |
|
'content': '', |
|
'chunks': [], |
|
'status': 'error', |
|
'error': str(e) |
|
}) |
|
|
|
return results |
|
|