""" File handlers for processing different file types. """ import os import json import csv import zipfile import io import re from typing import Dict, Any, List, Optional, Tuple import pandas as pd from PIL import Image import PyPDF2 import docx from pptx import Presentation class FileHandler: """Base class for file handlers.""" def __init__(self, resource_dir: str): """ Initialize the file handler. Args: resource_dir: Directory containing resource files """ self.resource_dir = resource_dir def get_file_path(self, file_name: str) -> str: """ Get the full path to a file. Args: file_name: Name of the file Returns: Full path to the file """ return os.path.join(self.resource_dir, file_name) def can_handle(self, file_path: str) -> bool: """ Check if the handler can process the given file. Args: file_path: Path to the file Returns: True if the handler can process the file, False otherwise """ raise NotImplementedError("Subclasses must implement this method") def extract_content(self, file_path: str) -> Any: """ Extract content from the file. Args: file_path: Path to the file Returns: Extracted content """ raise NotImplementedError("Subclasses must implement this method") def get_file_type(self, file_path: str) -> str: """ Get the file type based on extension. Args: file_path: Path to the file Returns: File type (extension) """ _, ext = os.path.splitext(file_path) return ext.lower() class ExcelHandler(FileHandler): """Handler for Excel files.""" def can_handle(self, file_path: str) -> bool: """Check if the file is an Excel file.""" return self.get_file_type(file_path) in ['.xlsx', '.xls'] def extract_content(self, file_path: str) -> Dict[str, pd.DataFrame]: """ Extract content from an Excel file. Returns: Dictionary mapping sheet names to DataFrames """ try: # Read all sheets excel_file = pd.ExcelFile(file_path) sheets = {} for sheet_name in excel_file.sheet_names: sheets[sheet_name] = pd.read_excel(excel_file, sheet_name) return sheets except Exception as e: print(f"Error extracting content from Excel file {file_path}: {e}") return {} class CSVHandler(FileHandler): """Handler for CSV files.""" def can_handle(self, file_path: str) -> bool: """Check if the file is a CSV file.""" return self.get_file_type(file_path) == '.csv' def extract_content(self, file_path: str) -> pd.DataFrame: """ Extract content from a CSV file. Returns: DataFrame containing the CSV data """ try: # Try different encodings and delimiters try: return pd.read_csv(file_path) except: # Try with different delimiter with open(file_path, 'r', newline='') as csvfile: dialect = csv.Sniffer().sniff(csvfile.read(1024)) csvfile.seek(0) return pd.read_csv(file_path, delimiter=dialect.delimiter) except Exception as e: print(f"Error extracting content from CSV file {file_path}: {e}") return pd.DataFrame() class TextHandler(FileHandler): """Handler for text files.""" def can_handle(self, file_path: str) -> bool: """Check if the file is a text file.""" return self.get_file_type(file_path) == '.txt' def extract_content(self, file_path: str) -> str: """ Extract content from a text file. Returns: Text content of the file """ try: with open(file_path, 'r', encoding='utf-8') as f: return f.read() except UnicodeDecodeError: # Try a different encoding try: with open(file_path, 'r', encoding='latin-1') as f: return f.read() except Exception as e: print(f"Error extracting content from text file {file_path}: {e}") return "" except Exception as e: print(f"Error extracting content from text file {file_path}: {e}") return "" class PDFHandler(FileHandler): """Handler for PDF files.""" def can_handle(self, file_path: str) -> bool: """Check if the file is a PDF file.""" return self.get_file_type(file_path) == '.pdf' def extract_content(self, file_path: str) -> Dict[int, str]: """ Extract content from a PDF file. Returns: Dictionary mapping page numbers to text content """ try: result = {} with open(file_path, 'rb') as f: pdf_reader = PyPDF2.PdfReader(f) for i in range(len(pdf_reader.pages)): page = pdf_reader.pages[i] result[i + 1] = page.extract_text() return result except Exception as e: print(f"Error extracting content from PDF file {file_path}: {e}") return {} class ImageHandler(FileHandler): """Handler for image files.""" def can_handle(self, file_path: str) -> bool: """Check if the file is an image file.""" return self.get_file_type(file_path) in ['.jpg', '.jpeg', '.png', '.gif', '.bmp'] def extract_content(self, file_path: str) -> Dict[str, Any]: """ Extract metadata from an image file. Returns: Dictionary containing image metadata """ try: with Image.open(file_path) as img: metadata = { 'format': img.format, 'mode': img.mode, 'size': img.size, 'width': img.width, 'height': img.height, } # Extract EXIF data if available if hasattr(img, '_getexif') and img._getexif(): exif = { PyPDF2.ExifTags.TAGS.get(k, k): v for k, v in img._getexif().items() if k in PyPDF2.ExifTags.TAGS } metadata['exif'] = exif return metadata except Exception as e: print(f"Error extracting content from image file {file_path}: {e}") return {} class DocxHandler(FileHandler): """Handler for Word documents.""" def can_handle(self, file_path: str) -> bool: """Check if the file is a Word document.""" return self.get_file_type(file_path) == '.docx' def extract_content(self, file_path: str) -> str: """ Extract text content from a Word document. Returns: Text content of the document """ try: doc = docx.Document(file_path) full_text = [] # Extract text from paragraphs for para in doc.paragraphs: full_text.append(para.text) # Extract text from tables for table in doc.tables: for row in table.rows: for cell in row.cells: full_text.append(cell.text) return '\n'.join(full_text) except Exception as e: print(f"Error extracting content from Word document {file_path}: {e}") return "" class PptxHandler(FileHandler): """Handler for PowerPoint presentations.""" def can_handle(self, file_path: str) -> bool: """Check if the file is a PowerPoint presentation.""" return self.get_file_type(file_path) == '.pptx' def extract_content(self, file_path: str) -> Dict[int, str]: """ Extract text content from a PowerPoint presentation. Returns: Dictionary mapping slide numbers to text content """ try: prs = Presentation(file_path) slides_text = {} for i, slide in enumerate(prs.slides): texts = [] for shape in slide.shapes: if hasattr(shape, "text"): texts.append(shape.text) slides_text[i + 1] = '\n'.join(texts) return slides_text except Exception as e: print(f"Error extracting content from PowerPoint presentation {file_path}: {e}") return {} class JsonHandler(FileHandler): """Handler for JSON files.""" def can_handle(self, file_path: str) -> bool: """Check if the file is a JSON file.""" return self.get_file_type(file_path) in ['.json', '.jsonld'] def extract_content(self, file_path: str) -> Dict[str, Any]: """ Extract content from a JSON file. Returns: Parsed JSON content """ try: with open(file_path, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: print(f"Error extracting content from JSON file {file_path}: {e}") return {} class ZipHandler(FileHandler): """Handler for ZIP archives.""" def can_handle(self, file_path: str) -> bool: """Check if the file is a ZIP archive.""" return self.get_file_type(file_path) == '.zip' def extract_content(self, file_path: str) -> Dict[str, Any]: """ Extract information about the contents of a ZIP archive. Returns: Dictionary containing information about the archived files """ try: result = {'files': []} with zipfile.ZipFile(file_path, 'r') as zip_ref: # Get information about each file in the archive for file_info in zip_ref.infolist(): result['files'].append({ 'filename': file_info.filename, 'size': file_info.file_size, 'compressed_size': file_info.compress_size, 'date_time': file_info.date_time, }) # Try to extract and process common file types for file_info in zip_ref.infolist(): if file_info.filename.endswith('.txt'): with zip_ref.open(file_info.filename) as f: content = f.read().decode('utf-8', errors='ignore') result[file_info.filename] = content return result except Exception as e: print(f"Error extracting content from ZIP archive {file_path}: {e}") return {} class PdbHandler(FileHandler): """Handler for PDB (Protein Data Bank) files.""" def can_handle(self, file_path: str) -> bool: """Check if the file is a PDB file.""" return self.get_file_type(file_path) == '.pdb' def extract_content(self, file_path: str) -> Dict[str, Any]: """ Extract basic information from a PDB file. Returns: Dictionary containing basic information about the PDB file """ try: result = { 'header': '', 'title': '', 'compounds': [], 'authors': [], 'atoms_count': 0, } with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: for line in f: if line.startswith('HEADER'): result['header'] = line[10:].strip() elif line.startswith('TITLE'): result['title'] += line[10:].strip() elif line.startswith('COMPND'): result['compounds'].append(line[10:].strip()) elif line.startswith('AUTHOR'): result['authors'].append(line[10:].strip()) elif line.startswith('ATOM') or line.startswith('HETATM'): result['atoms_count'] += 1 return result except Exception as e: print(f"Error extracting content from PDB file {file_path}: {e}") return {} class PythonHandler(FileHandler): """Handler for Python files.""" def can_handle(self, file_path: str) -> bool: """Check if the file is a Python file.""" return self.get_file_type(file_path) == '.py' def extract_content(self, file_path: str) -> Dict[str, Any]: """ Extract content and structure from a Python file. Returns: Dictionary containing the file content and structure information """ try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() result = { 'content': content, 'classes': [], 'functions': [], 'imports': [], } # Extract class definitions class_pattern = r'class\s+(\w+)(?:\(([^)]*)\))?:' for match in re.finditer(class_pattern, content): class_name = match.group(1) parent_class = match.group(2) if match.group(2) else None result['classes'].append({ 'name': class_name, 'parent': parent_class, }) # Extract function definitions func_pattern = r'def\s+(\w+)\s*\(([^)]*)\):' for match in re.finditer(func_pattern, content): func_name = match.group(1) params = match.group(2) result['functions'].append({ 'name': func_name, 'params': params.strip(), }) # Extract imports import_pattern = r'(?:from\s+(\w+(?:\.\w+)*)\s+)?import\s+(.+?)(?:\s+as\s+(\w+))?$' for line in content.split('\n'): line = line.strip() if line.startswith('import ') or line.startswith('from '): match = re.match(import_pattern, line) if match: from_module = match.group(1) imported = match.group(2) as_name = match.group(3) result['imports'].append({ 'from': from_module, 'import': imported, 'as': as_name, }) return result except Exception as e: print(f"Error extracting content from Python file {file_path}: {e}") return {} class JsonlHandler(FileHandler): """Handler for JSONL (JSON Lines) files.""" def can_handle(self, file_path: str) -> bool: """Check if the file is a JSONL file.""" return self.get_file_type(file_path) == '.jsonl' def extract_content(self, file_path: str) -> List[Dict[str, Any]]: """ Extract content from a JSONL file. Returns: List of parsed JSON objects """ try: result = [] with open(file_path, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if line: # Skip empty lines result.append(json.loads(line)) return result except Exception as e: print(f"Error extracting content from JSONL file {file_path}: {e}") return [] def get_all_handlers(resource_dir: str) -> List[FileHandler]: """ Get a list of all file handlers. Args: resource_dir: Directory containing resource files Returns: List of file handlers """ return [ ExcelHandler(resource_dir), CSVHandler(resource_dir), TextHandler(resource_dir), PDFHandler(resource_dir), ImageHandler(resource_dir), DocxHandler(resource_dir), PptxHandler(resource_dir), JsonHandler(resource_dir), ZipHandler(resource_dir), PdbHandler(resource_dir), PythonHandler(resource_dir), JsonlHandler(resource_dir), ] def get_handler_for_file(file_path: str, resource_dir: str) -> Optional[FileHandler]: """ Get the appropriate handler for a file. Args: file_path: Path to the file resource_dir: Directory containing resource files Returns: Appropriate file handler, or None if no handler can process the file """ handlers = get_all_handlers(resource_dir) for handler in handlers: if handler.can_handle(file_path): return handler return None def extract_file_content(file_path: str, resource_dir: str) -> Tuple[Any, Optional[FileHandler]]: """ Extract content from a file using the appropriate handler. Args: file_path: Path to the file resource_dir: Directory containing resource files Returns: Tuple of (extracted content, handler used) """ handler = get_handler_for_file(file_path, resource_dir) if handler: content = handler.extract_content(file_path) return content, handler return None, None