import os import json from typing import List, Dict, Any, Tuple from pathlib import Path import hashlib # Document parsing imports try: import fitz # PyMuPDF HAS_PYMUPDF = True except ImportError: HAS_PYMUPDF = False try: from docx import Document HAS_DOCX = True except ImportError: HAS_DOCX = False # Text processing import re from dataclasses import dataclass @dataclass class DocumentChunk: text: str metadata: Dict[str, Any] chunk_id: str def to_dict(self): return { 'text': self.text, 'metadata': self.metadata, 'chunk_id': self.chunk_id } class DocumentProcessor: def __init__(self, chunk_size: int = 800, chunk_overlap: int = 100): self.chunk_size = chunk_size self.chunk_overlap = chunk_overlap self.supported_extensions = ['.pdf', '.docx', '.txt', '.md'] def process_file(self, file_path: str) -> List[DocumentChunk]: """Process a single file and return chunks""" path = Path(file_path) if not path.exists(): raise FileNotFoundError(f"File not found: {file_path}") extension = path.suffix.lower() if extension not in self.supported_extensions: raise ValueError(f"Unsupported file type: {extension}") # Extract text based on file type if extension == '.pdf': text = self._extract_pdf_text(file_path) elif extension == '.docx': text = self._extract_docx_text(file_path) elif extension in ['.txt', '.md']: text = self._extract_text_file(file_path) else: raise ValueError(f"Unsupported file type: {extension}") # Create chunks chunks = self._create_chunks(text, file_path) return chunks def _extract_pdf_text(self, file_path: str) -> str: """Extract text from PDF file""" if not HAS_PYMUPDF: raise ImportError("PyMuPDF not installed. Install with: pip install PyMuPDF") text_parts = [] try: with fitz.open(file_path) as pdf: for page_num in range(len(pdf)): page = pdf[page_num] text = page.get_text() if text.strip(): text_parts.append(f"[Page {page_num + 1}]\n{text}") except Exception as e: raise Exception(f"Error processing PDF: {str(e)}") return "\n\n".join(text_parts) def _extract_docx_text(self, file_path: str) -> str: """Extract text from DOCX file""" if not HAS_DOCX: raise ImportError("python-docx not installed. Install with: pip install python-docx") text_parts = [] try: doc = Document(file_path) for paragraph in doc.paragraphs: if paragraph.text.strip(): text_parts.append(paragraph.text) # Also extract text from tables for table in doc.tables: for row in table.rows: row_text = [] for cell in row.cells: if cell.text.strip(): row_text.append(cell.text.strip()) if row_text: text_parts.append(" | ".join(row_text)) except Exception as e: raise Exception(f"Error processing DOCX: {str(e)}") return "\n\n".join(text_parts) def _extract_text_file(self, file_path: str) -> str: """Extract text from plain text or markdown file""" try: with open(file_path, 'r', encoding='utf-8') as f: return f.read() except Exception as e: raise Exception(f"Error reading text file: {str(e)}") def _create_chunks(self, text: str, file_path: str) -> List[DocumentChunk]: """Create overlapping chunks from text""" chunks = [] # Clean and normalize text text = re.sub(r'\s+', ' ', text) text = text.strip() if not text: return chunks # Simple word-based chunking words = text.split() for i in range(0, len(words), self.chunk_size - self.chunk_overlap): chunk_words = words[i:i + self.chunk_size] chunk_text = ' '.join(chunk_words) # Create chunk ID chunk_id = hashlib.md5(f"{file_path}_{i}_{chunk_text[:50]}".encode()).hexdigest()[:8] # Create metadata metadata = { 'file_path': file_path, 'file_name': Path(file_path).name, 'chunk_index': len(chunks), 'start_word': i, 'word_count': len(chunk_words) } chunk = DocumentChunk( text=chunk_text, metadata=metadata, chunk_id=chunk_id ) chunks.append(chunk) return chunks def process_multiple_files(self, file_paths: List[str]) -> Tuple[List[DocumentChunk], Dict[str, Any]]: """Process multiple files and return chunks with summary""" all_chunks = [] summary = { 'total_files': 0, 'total_chunks': 0, 'files_processed': [], 'errors': [] } for file_path in file_paths: try: chunks = self.process_file(file_path) all_chunks.extend(chunks) summary['files_processed'].append({ 'path': file_path, 'name': Path(file_path).name, 'chunks': len(chunks) }) except Exception as e: summary['errors'].append({ 'path': file_path, 'error': str(e) }) summary['total_files'] = len(summary['files_processed']) summary['total_chunks'] = len(all_chunks) return all_chunks, summary # Utility function for file size validation def validate_file_size(file_path: str, max_size_mb: float = 10.0) -> bool: """Check if file size is within limits""" size_bytes = os.path.getsize(file_path) size_mb = size_bytes / (1024 * 1024) return size_mb <= max_size_mb