Spaces:
Running
Running
| import os | |
| import json | |
| from typing import List, Dict, Any, Tuple | |
| from pathlib import Path | |
| import hashlib | |
| # Document parsing imports | |
| try: | |
| import fitz # PyMuPDF | |
| HAS_PYMUPDF = True | |
| except ImportError: | |
| HAS_PYMUPDF = False | |
| try: | |
| from docx import Document | |
| HAS_DOCX = True | |
| except ImportError: | |
| HAS_DOCX = False | |
| # Text processing | |
| import re | |
| from dataclasses import dataclass | |
| class DocumentChunk: | |
| text: str | |
| metadata: Dict[str, Any] | |
| chunk_id: str | |
| def to_dict(self): | |
| return { | |
| 'text': self.text, | |
| 'metadata': self.metadata, | |
| 'chunk_id': self.chunk_id | |
| } | |
| class DocumentProcessor: | |
| def __init__(self, chunk_size: int = 800, chunk_overlap: int = 100): | |
| self.chunk_size = chunk_size | |
| self.chunk_overlap = chunk_overlap | |
| self.supported_extensions = ['.pdf', '.docx', '.txt', '.md'] | |
| def process_file(self, file_path: str) -> List[DocumentChunk]: | |
| """Process a single file and return chunks""" | |
| path = Path(file_path) | |
| if not path.exists(): | |
| raise FileNotFoundError(f"File not found: {file_path}") | |
| extension = path.suffix.lower() | |
| if extension not in self.supported_extensions: | |
| raise ValueError(f"Unsupported file type: {extension}") | |
| # Extract text based on file type | |
| if extension == '.pdf': | |
| text = self._extract_pdf_text(file_path) | |
| elif extension == '.docx': | |
| text = self._extract_docx_text(file_path) | |
| elif extension in ['.txt', '.md']: | |
| text = self._extract_text_file(file_path) | |
| else: | |
| raise ValueError(f"Unsupported file type: {extension}") | |
| # Create chunks | |
| chunks = self._create_chunks(text, file_path) | |
| return chunks | |
| def _extract_pdf_text(self, file_path: str) -> str: | |
| """Extract text from PDF file""" | |
| if not HAS_PYMUPDF: | |
| raise ImportError("PyMuPDF not installed. Install with: pip install PyMuPDF") | |
| text_parts = [] | |
| try: | |
| with fitz.open(file_path) as pdf: | |
| for page_num in range(len(pdf)): | |
| page = pdf[page_num] | |
| text = page.get_text() | |
| if text.strip(): | |
| text_parts.append(f"[Page {page_num + 1}]\n{text}") | |
| except Exception as e: | |
| raise Exception(f"Error processing PDF: {str(e)}") | |
| return "\n\n".join(text_parts) | |
| def _extract_docx_text(self, file_path: str) -> str: | |
| """Extract text from DOCX file""" | |
| if not HAS_DOCX: | |
| raise ImportError("python-docx not installed. Install with: pip install python-docx") | |
| text_parts = [] | |
| try: | |
| doc = Document(file_path) | |
| for paragraph in doc.paragraphs: | |
| if paragraph.text.strip(): | |
| text_parts.append(paragraph.text) | |
| # Also extract text from tables | |
| for table in doc.tables: | |
| for row in table.rows: | |
| row_text = [] | |
| for cell in row.cells: | |
| if cell.text.strip(): | |
| row_text.append(cell.text.strip()) | |
| if row_text: | |
| text_parts.append(" | ".join(row_text)) | |
| except Exception as e: | |
| raise Exception(f"Error processing DOCX: {str(e)}") | |
| return "\n\n".join(text_parts) | |
| def _extract_text_file(self, file_path: str) -> str: | |
| """Extract text from plain text or markdown file""" | |
| try: | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| return f.read() | |
| except Exception as e: | |
| raise Exception(f"Error reading text file: {str(e)}") | |
| def _create_chunks(self, text: str, file_path: str) -> List[DocumentChunk]: | |
| """Create overlapping chunks from text""" | |
| chunks = [] | |
| # Clean and normalize text | |
| text = re.sub(r'\s+', ' ', text) | |
| text = text.strip() | |
| if not text: | |
| return chunks | |
| # Simple word-based chunking | |
| words = text.split() | |
| for i in range(0, len(words), self.chunk_size - self.chunk_overlap): | |
| chunk_words = words[i:i + self.chunk_size] | |
| chunk_text = ' '.join(chunk_words) | |
| # Create chunk ID | |
| chunk_id = hashlib.md5(f"{file_path}_{i}_{chunk_text[:50]}".encode()).hexdigest()[:8] | |
| # Create metadata | |
| metadata = { | |
| 'file_path': file_path, | |
| 'file_name': Path(file_path).name, | |
| 'chunk_index': len(chunks), | |
| 'start_word': i, | |
| 'word_count': len(chunk_words) | |
| } | |
| chunk = DocumentChunk( | |
| text=chunk_text, | |
| metadata=metadata, | |
| chunk_id=chunk_id | |
| ) | |
| chunks.append(chunk) | |
| return chunks | |
| def process_multiple_files(self, file_paths: List[str]) -> Tuple[List[DocumentChunk], Dict[str, Any]]: | |
| """Process multiple files and return chunks with summary""" | |
| all_chunks = [] | |
| summary = { | |
| 'total_files': 0, | |
| 'total_chunks': 0, | |
| 'files_processed': [], | |
| 'errors': [] | |
| } | |
| for file_path in file_paths: | |
| try: | |
| chunks = self.process_file(file_path) | |
| all_chunks.extend(chunks) | |
| summary['files_processed'].append({ | |
| 'path': file_path, | |
| 'name': Path(file_path).name, | |
| 'chunks': len(chunks) | |
| }) | |
| except Exception as e: | |
| summary['errors'].append({ | |
| 'path': file_path, | |
| 'error': str(e) | |
| }) | |
| summary['total_files'] = len(summary['files_processed']) | |
| summary['total_chunks'] = len(all_chunks) | |
| return all_chunks, summary | |
| # Utility function for file size validation | |
| def validate_file_size(file_path: str, max_size_mb: float = 10.0) -> bool: | |
| """Check if file size is within limits""" | |
| size_bytes = os.path.getsize(file_path) | |
| size_mb = size_bytes / (1024 * 1024) | |
| return size_mb <= max_size_mb |