Spaces:
Running
Running
import os | |
import json | |
from typing import List, Dict, Any, Tuple | |
from pathlib import Path | |
import hashlib | |
# Document parsing imports | |
try: | |
import fitz # PyMuPDF | |
HAS_PYMUPDF = True | |
except ImportError: | |
HAS_PYMUPDF = False | |
try: | |
from docx import Document | |
HAS_DOCX = True | |
except ImportError: | |
HAS_DOCX = False | |
# Text processing | |
import re | |
from dataclasses import dataclass | |
class DocumentChunk: | |
text: str | |
metadata: Dict[str, Any] | |
chunk_id: str | |
def to_dict(self): | |
return { | |
'text': self.text, | |
'metadata': self.metadata, | |
'chunk_id': self.chunk_id | |
} | |
class DocumentProcessor: | |
def __init__(self, chunk_size: int = 800, chunk_overlap: int = 100): | |
self.chunk_size = chunk_size | |
self.chunk_overlap = chunk_overlap | |
self.supported_extensions = ['.pdf', '.docx', '.txt', '.md'] | |
def process_file(self, file_path: str) -> List[DocumentChunk]: | |
"""Process a single file and return chunks""" | |
path = Path(file_path) | |
if not path.exists(): | |
raise FileNotFoundError(f"File not found: {file_path}") | |
extension = path.suffix.lower() | |
if extension not in self.supported_extensions: | |
raise ValueError(f"Unsupported file type: {extension}") | |
# Extract text based on file type | |
if extension == '.pdf': | |
text = self._extract_pdf_text(file_path) | |
elif extension == '.docx': | |
text = self._extract_docx_text(file_path) | |
elif extension in ['.txt', '.md']: | |
text = self._extract_text_file(file_path) | |
else: | |
raise ValueError(f"Unsupported file type: {extension}") | |
# Create chunks | |
chunks = self._create_chunks(text, file_path) | |
return chunks | |
def _extract_pdf_text(self, file_path: str) -> str: | |
"""Extract text from PDF file""" | |
if not HAS_PYMUPDF: | |
raise ImportError("PyMuPDF not installed. Install with: pip install PyMuPDF") | |
text_parts = [] | |
try: | |
with fitz.open(file_path) as pdf: | |
for page_num in range(len(pdf)): | |
page = pdf[page_num] | |
text = page.get_text() | |
if text.strip(): | |
text_parts.append(f"[Page {page_num + 1}]\n{text}") | |
except Exception as e: | |
raise Exception(f"Error processing PDF: {str(e)}") | |
return "\n\n".join(text_parts) | |
def _extract_docx_text(self, file_path: str) -> str: | |
"""Extract text from DOCX file""" | |
if not HAS_DOCX: | |
raise ImportError("python-docx not installed. Install with: pip install python-docx") | |
text_parts = [] | |
try: | |
doc = Document(file_path) | |
for paragraph in doc.paragraphs: | |
if paragraph.text.strip(): | |
text_parts.append(paragraph.text) | |
# Also extract text from tables | |
for table in doc.tables: | |
for row in table.rows: | |
row_text = [] | |
for cell in row.cells: | |
if cell.text.strip(): | |
row_text.append(cell.text.strip()) | |
if row_text: | |
text_parts.append(" | ".join(row_text)) | |
except Exception as e: | |
raise Exception(f"Error processing DOCX: {str(e)}") | |
return "\n\n".join(text_parts) | |
def _extract_text_file(self, file_path: str) -> str: | |
"""Extract text from plain text or markdown file""" | |
try: | |
with open(file_path, 'r', encoding='utf-8') as f: | |
return f.read() | |
except Exception as e: | |
raise Exception(f"Error reading text file: {str(e)}") | |
def _create_chunks(self, text: str, file_path: str) -> List[DocumentChunk]: | |
"""Create overlapping chunks from text""" | |
chunks = [] | |
# Clean and normalize text | |
text = re.sub(r'\s+', ' ', text) | |
text = text.strip() | |
if not text: | |
return chunks | |
# Simple word-based chunking | |
words = text.split() | |
for i in range(0, len(words), self.chunk_size - self.chunk_overlap): | |
chunk_words = words[i:i + self.chunk_size] | |
chunk_text = ' '.join(chunk_words) | |
# Create chunk ID | |
chunk_id = hashlib.md5(f"{file_path}_{i}_{chunk_text[:50]}".encode()).hexdigest()[:8] | |
# Create metadata | |
metadata = { | |
'file_path': file_path, | |
'file_name': Path(file_path).name, | |
'chunk_index': len(chunks), | |
'start_word': i, | |
'word_count': len(chunk_words) | |
} | |
chunk = DocumentChunk( | |
text=chunk_text, | |
metadata=metadata, | |
chunk_id=chunk_id | |
) | |
chunks.append(chunk) | |
return chunks | |
def process_multiple_files(self, file_paths: List[str]) -> Tuple[List[DocumentChunk], Dict[str, Any]]: | |
"""Process multiple files and return chunks with summary""" | |
all_chunks = [] | |
summary = { | |
'total_files': 0, | |
'total_chunks': 0, | |
'files_processed': [], | |
'errors': [] | |
} | |
for file_path in file_paths: | |
try: | |
chunks = self.process_file(file_path) | |
all_chunks.extend(chunks) | |
summary['files_processed'].append({ | |
'path': file_path, | |
'name': Path(file_path).name, | |
'chunks': len(chunks) | |
}) | |
except Exception as e: | |
summary['errors'].append({ | |
'path': file_path, | |
'error': str(e) | |
}) | |
summary['total_files'] = len(summary['files_processed']) | |
summary['total_chunks'] = len(all_chunks) | |
return all_chunks, summary | |
# Utility function for file size validation | |
def validate_file_size(file_path: str, max_size_mb: float = 10.0) -> bool: | |
"""Check if file size is within limits""" | |
size_bytes = os.path.getsize(file_path) | |
size_mb = size_bytes / (1024 * 1024) | |
return size_mb <= max_size_mb |