Spaces:
Sleeping
Sleeping
""" | |
Document Processor Module | |
This module is responsible for processing various document formats including | |
PDF, DOCX, CSV, PPTX, and Excel files with complete functionality. | |
Technologies: PyMuPDF, python-docx, pandas, python-pptx, pdfplumber | |
""" | |
import os | |
import time | |
from datetime import datetime | |
from pathlib import Path | |
from typing import Dict, List, Any, Optional, Union | |
import logging | |
# Import document processing libraries | |
try: | |
import fitz # PyMuPDF | |
import docx | |
import pandas as pd | |
import pptx | |
import pdfplumber | |
from openpyxl import load_workbook | |
except ImportError as e: | |
logging.warning(f"Some document processing libraries are not installed: {e}") | |
from utils.error_handler import DocumentProcessingError, error_handler, ErrorType | |
class DocumentProcessor: | |
""" | |
Processes various document formats and extracts text content with full functionality. | |
Supported formats: | |
- PDF (using PyMuPDF and pdfplumber) | |
- DOCX (using python-docx) | |
- CSV/Excel (using pandas) | |
- PPTX (using python-pptx) | |
""" | |
def __init__(self, config: Optional[Dict[str, Any]] = None): | |
""" | |
Initialize the DocumentProcessor with configuration. | |
Args: | |
config: Configuration dictionary with processing parameters | |
""" | |
self.config = config or {} | |
self.logger = logging.getLogger(__name__) | |
# Configuration settings | |
self.max_file_size_mb = self.config.get("max_file_size_mb", 50) | |
self.supported_formats = self.config.get( | |
"supported_formats", | |
[".pdf", ".docx", ".csv", ".xlsx", ".xls", ".pptx", ".txt", ".md"], | |
) | |
def process_document(self, file_path: str) -> Dict[str, Any]: | |
""" | |
Process a document and extract its text content with metadata. | |
Args: | |
file_path: Path to the document file | |
Returns: | |
Dictionary containing extracted text and metadata | |
""" | |
if not os.path.exists(file_path): | |
raise DocumentProcessingError(f"Document not found: {file_path}", file_path) | |
# Validate file size | |
file_size_mb = os.path.getsize(file_path) / (1024 * 1024) | |
if file_size_mb > self.max_file_size_mb: | |
raise DocumentProcessingError( | |
f"File too large: {file_size_mb:.1f}MB (max: {self.max_file_size_mb}MB)", | |
file_path, | |
) | |
file_extension = os.path.splitext(file_path)[1].lower() | |
# Validate file format | |
if file_extension not in self.supported_formats: | |
raise DocumentProcessingError( | |
f"Unsupported file format: {file_extension}", file_path | |
) | |
self.logger.info(f"Processing document: {file_path} ({file_size_mb:.1f}MB)") | |
try: | |
if file_extension == ".pdf": | |
return self._process_pdf(file_path) | |
elif file_extension == ".docx": | |
return self._process_docx(file_path) | |
elif file_extension in [".csv", ".xlsx", ".xls"]: | |
return self._process_spreadsheet(file_path) | |
elif file_extension == ".pptx": | |
return self._process_pptx(file_path) | |
elif file_extension in [".txt", ".md"]: | |
return self._process_text_file(file_path) | |
except Exception as e: | |
raise DocumentProcessingError( | |
f"Error processing document: {str(e)}", file_path | |
) | |
def process_batch(self, file_paths: List[str]) -> List[Dict[str, Any]]: | |
""" | |
Process multiple documents in batch. | |
Args: | |
file_paths: List of file paths to process | |
Returns: | |
List of processed document results | |
""" | |
results = [] | |
self.logger.info(f"Processing batch of {len(file_paths)} documents") | |
for i, file_path in enumerate(file_paths): | |
try: | |
result = self.process_document(file_path) | |
results.append(result) | |
self.logger.info(f"Processed {i+1}/{len(file_paths)}: {file_path}") | |
except Exception as e: | |
self.logger.error(f"❌ Failed to process {file_path}: {str(e)}") | |
# Continue with other files | |
continue | |
return results | |
def _extract_metadata(self, file_path: str) -> Dict[str, Any]: | |
""" | |
Extract common metadata from file. | |
Args: | |
file_path: Path to the file | |
Returns: | |
Dictionary containing file metadata | |
""" | |
file_stat = os.stat(file_path) | |
file_path_obj = Path(file_path) | |
return { | |
"filename": file_path_obj.name, | |
"file_extension": file_path_obj.suffix.lower(), | |
"file_size_bytes": file_stat.st_size, | |
"file_size_mb": round(file_stat.st_size / (1024 * 1024), 2), | |
"created_time": datetime.fromtimestamp(file_stat.st_ctime).isoformat(), | |
"modified_time": datetime.fromtimestamp(file_stat.st_mtime).isoformat(), | |
"processed_time": datetime.now().isoformat(), | |
} | |
def _process_pdf(self, file_path: str) -> Dict[str, Any]: | |
""" | |
📄 Extract text from a PDF document using PyMuPDF with fallback to pdfplumber. | |
Args: | |
file_path: Path to the PDF file | |
Returns: | |
Dictionary with extracted text and metadata | |
""" | |
self.logger.info(f"Processing PDF: {file_path}") | |
text_content = [] | |
metadata = self._extract_metadata(file_path) | |
try: | |
# Primary method: PyMuPDF (faster) | |
doc = fitz.open(file_path) | |
metadata.update( | |
{ | |
"page_count": doc.page_count, | |
"title": doc.metadata.get("title", ""), | |
"author": doc.metadata.get("author", ""), | |
"subject": doc.metadata.get("subject", ""), | |
"creator": doc.metadata.get("creator", ""), | |
} | |
) | |
for page_num in range(doc.page_count): | |
page = doc[page_num] | |
text = page.get_text() | |
if text.strip(): # Only add non-empty pages | |
text_content.append({"page": page_num + 1, "content": text.strip()}) | |
doc.close() | |
except Exception as e: | |
self.logger.warning(f"PyMuPDF failed, trying pdfplumber: {str(e)}") | |
# Fallback method: pdfplumber (more robust for complex PDFs) | |
try: | |
with pdfplumber.open(file_path) as pdf: | |
metadata["page_count"] = len(pdf.pages) | |
for page_num, page in enumerate(pdf.pages): | |
text = page.extract_text() | |
if text and text.strip(): | |
text_content.append( | |
{"page": page_num + 1, "content": text.strip()} | |
) | |
except Exception as fallback_error: | |
raise DocumentProcessingError( | |
f"Both PDF extraction methods failed: {str(fallback_error)}", | |
file_path, | |
) | |
# Final content processing | |
full_text = "\n\n".join([item["content"] for item in text_content]) | |
metadata["total_characters"] = len(full_text) | |
metadata["total_words"] = len(full_text.split()) | |
return { | |
"content": full_text, | |
"pages": text_content, | |
"metadata": metadata, | |
"source": file_path, | |
"document_type": "pdf", | |
} | |
def _process_docx(self, file_path: str) -> Dict[str, Any]: | |
""" | |
Extract text from a DOCX document using python-docx. | |
Args: | |
file_path: Path to the DOCX file | |
Returns: | |
Dictionary with extracted text and metadata | |
""" | |
self.logger.info(f"Processing DOCX: {file_path}") | |
try: | |
doc = docx.Document(file_path) | |
metadata = self._extract_metadata(file_path) | |
# Extract document properties | |
core_props = doc.core_properties | |
metadata.update( | |
{ | |
"title": core_props.title or "", | |
"author": core_props.author or "", | |
"subject": core_props.subject or "", | |
"created": ( | |
core_props.created.isoformat() if core_props.created else "" | |
), | |
"modified": ( | |
core_props.modified.isoformat() if core_props.modified else "" | |
), | |
"paragraph_count": len(doc.paragraphs), | |
} | |
) | |
# Extract text content | |
paragraphs = [] | |
full_text_parts = [] | |
for i, paragraph in enumerate(doc.paragraphs): | |
text = paragraph.text.strip() | |
if text: # Only include non-empty paragraphs | |
paragraphs.append({"paragraph": i + 1, "content": text}) | |
full_text_parts.append(text) | |
# Extract tables if present | |
tables_content = [] | |
for table_idx, table in enumerate(doc.tables): | |
table_data = [] | |
for row in table.rows: | |
row_data = [cell.text.strip() for cell in row.cells] | |
if any(row_data): # Only include non-empty rows | |
table_data.append(row_data) | |
if table_data: | |
tables_content.append({"table": table_idx + 1, "data": table_data}) | |
# Add table content to full text | |
table_text = "\n".join([" | ".join(row) for row in table_data]) | |
full_text_parts.append(f"\n[Table {table_idx + 1}]\n{table_text}") | |
full_text = "\n\n".join(full_text_parts) | |
metadata.update( | |
{ | |
"total_characters": len(full_text), | |
"total_words": len(full_text.split()), | |
"table_count": len(tables_content), | |
} | |
) | |
return { | |
"content": full_text, | |
"paragraphs": paragraphs, | |
"tables": tables_content, | |
"metadata": metadata, | |
"source": file_path, | |
"document_type": "docx", | |
} | |
except Exception as e: | |
raise DocumentProcessingError(f"Error processing DOCX: {str(e)}", file_path) | |
def _process_spreadsheet(self, file_path: str) -> Dict[str, Any]: | |
""" | |
Extract text from a CSV or Excel file using pandas. | |
Args: | |
file_path: Path to the spreadsheet file | |
Returns: | |
Dictionary with extracted text and metadata | |
""" | |
file_extension = os.path.splitext(file_path)[1].lower() | |
self.logger.info(f"Processing spreadsheet: {file_path}") | |
try: | |
metadata = self._extract_metadata(file_path) | |
sheets_data = [] | |
if file_extension == ".csv": | |
# 📄 Process CSV file | |
df = pd.read_csv(file_path, encoding="utf-8") | |
sheet_content = self._process_dataframe(df, "Sheet1") | |
sheets_data.append(sheet_content) | |
metadata["sheet_count"] = 1 | |
else: | |
# Process Excel file | |
excel_file = pd.ExcelFile(file_path) | |
metadata["sheet_count"] = len(excel_file.sheet_names) | |
for sheet_name in excel_file.sheet_names: | |
df = pd.read_excel(file_path, sheet_name=sheet_name) | |
sheet_content = self._process_dataframe(df, sheet_name) | |
sheets_data.append(sheet_content) | |
# 🔗 Combine all sheets content | |
full_text_parts = [] | |
for sheet in sheets_data: | |
full_text_parts.append(f"[{sheet['sheet_name']}]\n{sheet['content']}") | |
full_text = "\n\n".join(full_text_parts) | |
metadata.update( | |
{ | |
"total_characters": len(full_text), | |
"total_words": len(full_text.split()), | |
"total_rows": sum(sheet["row_count"] for sheet in sheets_data), | |
"total_columns": ( | |
max(sheet["column_count"] for sheet in sheets_data) | |
if sheets_data | |
else 0 | |
), | |
} | |
) | |
return { | |
"content": full_text, | |
"sheets": sheets_data, | |
"metadata": metadata, | |
"source": file_path, | |
"document_type": "spreadsheet", | |
} | |
except Exception as e: | |
raise DocumentProcessingError( | |
f"Error processing spreadsheet: {str(e)}", file_path | |
) | |
def _process_dataframe(self, df: pd.DataFrame, sheet_name: str) -> Dict[str, Any]: | |
""" | |
Process a pandas DataFrame into text content. | |
Args: | |
df: Pandas DataFrame | |
sheet_name: Name of the sheet | |
Returns: | |
Dictionary with processed sheet data | |
""" | |
# Clean the dataframe | |
df = df.dropna(how="all") # Remove completely empty rows | |
df = df.fillna("") # Fill NaN with empty strings | |
# Create text representation | |
content_parts = [] | |
# Add headers | |
headers = df.columns.tolist() | |
content_parts.append(" | ".join(str(h) for h in headers)) | |
content_parts.append("-" * 50) # Separator | |
# Add data rows | |
for _, row in df.iterrows(): | |
row_text = " | ".join(str(cell) for cell in row.values) | |
content_parts.append(row_text) | |
content = "\n".join(content_parts) | |
return { | |
"sheet_name": sheet_name, | |
"content": content, | |
"headers": headers, | |
"row_count": len(df), | |
"column_count": len(df.columns), | |
"data": df.to_dict("records"), # For structured access | |
} | |
def _process_pptx(self, file_path: str) -> Dict[str, Any]: | |
""" | |
🎯 Extract text from a PowerPoint presentation using python-pptx. | |
Args: | |
file_path: Path to the PPTX file | |
Returns: | |
Dictionary with extracted text and metadata | |
""" | |
self.logger.info(f" Processing PPTX: {file_path}") | |
try: | |
presentation = pptx.Presentation(file_path) | |
metadata = self._extract_metadata(file_path) | |
# Extract presentation metadata | |
core_props = presentation.core_properties | |
metadata.update( | |
{ | |
"title": core_props.title or "", | |
"author": core_props.author or "", | |
"subject": core_props.subject or "", | |
"created": ( | |
core_props.created.isoformat() if core_props.created else "" | |
), | |
"modified": ( | |
core_props.modified.isoformat() if core_props.modified else "" | |
), | |
"slide_count": len(presentation.slides), | |
} | |
) | |
# 🎯 Extract content from slides | |
slides_content = [] | |
full_text_parts = [] | |
for slide_idx, slide in enumerate(presentation.slides): | |
slide_text_parts = [] | |
# Extract text from all shapes in the slide | |
for shape in slide.shapes: | |
if hasattr(shape, "text") and shape.text.strip(): | |
slide_text_parts.append(shape.text.strip()) | |
if slide_text_parts: | |
slide_content = "\n".join(slide_text_parts) | |
slides_content.append( | |
{"slide": slide_idx + 1, "content": slide_content} | |
) | |
full_text_parts.append(f"[Slide {slide_idx + 1}]\n{slide_content}") | |
full_text = "\n\n".join(full_text_parts) | |
metadata.update( | |
{ | |
"total_characters": len(full_text), | |
"total_words": len(full_text.split()), | |
"slides_with_content": len(slides_content), | |
} | |
) | |
return { | |
"content": full_text, | |
"slides": slides_content, | |
"metadata": metadata, | |
"source": file_path, | |
"document_type": "pptx", | |
} | |
except Exception as e: | |
raise DocumentProcessingError(f"Error processing PPTX: {str(e)}", file_path) | |
def _process_text_file(self, file_path: str) -> Dict[str, Any]: | |
""" | |
📝 Extract text from plain text files (.txt, .md). | |
Args: | |
file_path: Path to the text file | |
Returns: | |
Dictionary with extracted text and metadata | |
""" | |
file_extension = os.path.splitext(file_path)[1].lower() | |
self.logger.info(f" Processing text file: {file_path}") | |
try: | |
metadata = self._extract_metadata(file_path) | |
# Try different encodings for robust text reading | |
encodings = ["utf-8", "utf-8-sig", "latin-1", "cp1252"] | |
content = None | |
for encoding in encodings: | |
try: | |
with open(file_path, "r", encoding=encoding) as file: | |
content = file.read() | |
self.logger.info( | |
f" Successfully read file with {encoding} encoding" | |
) | |
break | |
except UnicodeDecodeError: | |
continue | |
except Exception as e: | |
self.logger.warning(f"Failed to read with {encoding}: {str(e)}") | |
continue | |
if content is None: | |
raise DocumentProcessingError( | |
f"Could not read file with any supported encoding", file_path | |
) | |
# Clean and process content | |
content = content.strip() | |
if not content: | |
raise DocumentProcessingError( | |
f"File is empty or contains no readable text", file_path | |
) | |
# Split content into logical sections for better processing | |
sections = [] | |
if file_extension == ".md": | |
# 📋 For Markdown files, split by headers | |
sections = self._split_markdown_content(content) | |
else: | |
# 📄 For plain text, split by paragraphs | |
sections = self._split_text_content(content) | |
# Update metadata with text-specific information | |
lines = content.split("\n") | |
metadata.update( | |
{ | |
"file_type": ( | |
"markdown" if file_extension == ".md" else "plain_text" | |
), | |
"line_count": len(lines), | |
"paragraph_count": len( | |
[p for p in content.split("\n\n") if p.strip()] | |
), | |
"total_characters": len(content), | |
"total_words": len(content.split()), | |
"encoding_used": encoding if "encoding" in locals() else "utf-8", | |
"sections_count": len(sections), | |
} | |
) | |
return { | |
"content": content, | |
"sections": sections, | |
"metadata": metadata, | |
"source": file_path, | |
"document_type": "markdown" if file_extension == ".md" else "text", | |
} | |
except Exception as e: | |
raise DocumentProcessingError( | |
f"Error processing text file: {str(e)}", file_path | |
) | |
def _split_markdown_content(self, content: str) -> List[Dict[str, Any]]: | |
""" | |
Split Markdown content by headers for better organization. | |
Args: | |
content: Markdown content | |
Returns: | |
List of sections with headers and content | |
""" | |
sections = [] | |
lines = content.split("\n") | |
current_section = {"header": "", "content": [], "level": 0} | |
for line in lines: | |
# Check for markdown headers | |
if line.strip().startswith("#"): | |
# Save previous section if it has content | |
if current_section["content"] or current_section["header"]: | |
section_content = "\n".join(current_section["content"]).strip() | |
if section_content or current_section["header"]: | |
sections.append( | |
{ | |
"header": current_section["header"], | |
"content": section_content, | |
"level": current_section["level"], | |
"section_index": len(sections), | |
} | |
) | |
# Start new section | |
header_level = len(line) - len(line.lstrip("#")) | |
header_text = line.lstrip("#").strip() | |
current_section = { | |
"header": header_text, | |
"content": [], | |
"level": header_level, | |
} | |
else: | |
current_section["content"].append(line) | |
# Add the last section | |
if current_section["content"] or current_section["header"]: | |
section_content = "\n".join(current_section["content"]).strip() | |
if section_content or current_section["header"]: | |
sections.append( | |
{ | |
"header": current_section["header"], | |
"content": section_content, | |
"level": current_section["level"], | |
"section_index": len(sections), | |
} | |
) | |
# If no headers found, treat entire content as one section | |
if not sections: | |
sections.append( | |
{ | |
"header": "Document Content", | |
"content": content.strip(), | |
"level": 1, | |
"section_index": 0, | |
} | |
) | |
return sections | |
def _split_text_content(self, content: str) -> List[Dict[str, Any]]: | |
""" | |
Split plain text content by paragraphs. | |
Args: | |
content: Plain text content | |
Returns: | |
List of paragraph sections | |
""" | |
sections = [] | |
paragraphs = [p.strip() for p in content.split("\n\n") if p.strip()] | |
for i, paragraph in enumerate(paragraphs): | |
sections.append( | |
{ | |
"header": f"Paragraph {i + 1}", | |
"content": paragraph, | |
"level": 1, | |
"section_index": i, | |
} | |
) | |
# If no clear paragraphs, treat as single section | |
if not sections: | |
sections.append( | |
{ | |
"header": "Document Content", | |
"content": content.strip(), | |
"level": 1, | |
"section_index": 0, | |
} | |
) | |
return sections | |