File size: 4,369 Bytes
e86199a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
import os
import json
from typing import List, Dict, Any
import pdfplumber
from docx import Document
from config.settings import Config
class DocumentProcessor:
def __init__(self):
self.config = Config()
def validate_file_size(self, file_path: str) -> bool:
"""Validate file size is within limits."""
file_size_mb = os.path.getsize(file_path) / (1024 * 1024)
return file_size_mb <= self.config.MAX_FILE_SIZE_MB
def load_document(self, file_path: str) -> str:
"""Load document content based on file extension."""
if not self.validate_file_size(file_path):
raise ValueError(f"File size exceeds {self.config.MAX_FILE_SIZE_MB}MB limit")
file_ext = os.path.splitext(file_path)[1].lower()
if file_ext == '.pdf':
return self._load_pdf(file_path)
elif file_ext == '.docx':
return self._load_docx(file_path)
elif file_ext == '.txt':
return self._load_txt(file_path)
elif file_ext == '.json':
return self._load_json(file_path)
else:
raise ValueError(f"Unsupported file format: {file_ext}")
def _load_pdf(self, file_path: str) -> str:
"""Load PDF content."""
text = ""
with pdfplumber.open(file_path) as pdf:
for page in pdf.pages:
page_text = page.extract_text()
if page_text:
text += page_text + "\n"
return text
def _load_docx(self, file_path: str) -> str:
"""Load DOCX content."""
doc = Document(file_path)
text = ""
for paragraph in doc.paragraphs:
text += paragraph.text + "\n"
return text
def _load_txt(self, file_path: str) -> str:
"""Load TXT content."""
with open(file_path, 'r', encoding='utf-8') as file:
return file.read()
def _load_json(self, file_path: str) -> str:
"""Load JSON content and convert to text."""
with open(file_path, 'r', encoding='utf-8') as file:
data = json.load(file)
return json.dumps(data, indent=2)
def chunk_text(self, text: str) -> List[str]:
"""Split text into overlapping chunks for processing."""
if len(text) <= self.config.CHUNK_SIZE:
return [text]
chunks = []
start = 0
while start < len(text):
end = start + self.config.CHUNK_SIZE
# Try to break at sentence boundaries
if end < len(text):
# Look for sentence endings
sentence_end = text.rfind('.', start, end)
if sentence_end == -1:
sentence_end = text.rfind('!', start, end)
if sentence_end == -1:
sentence_end = text.rfind('?', start, end)
if sentence_end != -1 and sentence_end > start + self.config.CHUNK_SIZE // 2:
end = sentence_end + 1
chunk = text[start:end].strip()
if chunk:
chunks.append(chunk)
start = end - self.config.CHUNK_OVERLAP
if start >= len(text):
break
return chunks
def process_documents(self, file_paths: List[str], batch_mode: bool = False) -> List[Dict[str, Any]]:
"""Process multiple documents."""
results = []
for file_path in file_paths:
try:
content = self.load_document(file_path)
chunks = self.chunk_text(content)
results.append({
'file_path': file_path,
'content': content,
'chunks': chunks,
'status': 'success'
})
if not batch_mode:
break # Process only one file if not in batch mode
except Exception as e:
results.append({
'file_path': file_path,
'content': '',
'chunks': [],
'status': 'error',
'error': str(e)
})
return results
|