Spaces:
Sleeping
Sleeping
""" | |
File handlers for processing different file types. | |
""" | |
import os | |
import json | |
import csv | |
import zipfile | |
import io | |
import re | |
from typing import Dict, Any, List, Optional, Tuple | |
import pandas as pd | |
from PIL import Image | |
import PyPDF2 | |
import docx | |
from pptx import Presentation | |
class FileHandler: | |
"""Base class for file handlers.""" | |
def __init__(self, resource_dir: str): | |
""" | |
Initialize the file handler. | |
Args: | |
resource_dir: Directory containing resource files | |
""" | |
self.resource_dir = resource_dir | |
def get_file_path(self, file_name: str) -> str: | |
""" | |
Get the full path to a file. | |
Args: | |
file_name: Name of the file | |
Returns: | |
Full path to the file | |
""" | |
return os.path.join(self.resource_dir, file_name) | |
def can_handle(self, file_path: str) -> bool: | |
""" | |
Check if the handler can process the given file. | |
Args: | |
file_path: Path to the file | |
Returns: | |
True if the handler can process the file, False otherwise | |
""" | |
raise NotImplementedError("Subclasses must implement this method") | |
def extract_content(self, file_path: str) -> Any: | |
""" | |
Extract content from the file. | |
Args: | |
file_path: Path to the file | |
Returns: | |
Extracted content | |
""" | |
raise NotImplementedError("Subclasses must implement this method") | |
def get_file_type(self, file_path: str) -> str: | |
""" | |
Get the file type based on extension. | |
Args: | |
file_path: Path to the file | |
Returns: | |
File type (extension) | |
""" | |
_, ext = os.path.splitext(file_path) | |
return ext.lower() | |
class ExcelHandler(FileHandler): | |
"""Handler for Excel files.""" | |
def can_handle(self, file_path: str) -> bool: | |
"""Check if the file is an Excel file.""" | |
return self.get_file_type(file_path) in ['.xlsx', '.xls'] | |
def extract_content(self, file_path: str) -> Dict[str, pd.DataFrame]: | |
""" | |
Extract content from an Excel file. | |
Returns: | |
Dictionary mapping sheet names to DataFrames | |
""" | |
try: | |
# Read all sheets | |
excel_file = pd.ExcelFile(file_path) | |
sheets = {} | |
for sheet_name in excel_file.sheet_names: | |
sheets[sheet_name] = pd.read_excel(excel_file, sheet_name) | |
return sheets | |
except Exception as e: | |
print(f"Error extracting content from Excel file {file_path}: {e}") | |
return {} | |
class CSVHandler(FileHandler): | |
"""Handler for CSV files.""" | |
def can_handle(self, file_path: str) -> bool: | |
"""Check if the file is a CSV file.""" | |
return self.get_file_type(file_path) == '.csv' | |
def extract_content(self, file_path: str) -> pd.DataFrame: | |
""" | |
Extract content from a CSV file. | |
Returns: | |
DataFrame containing the CSV data | |
""" | |
try: | |
# Try different encodings and delimiters | |
try: | |
return pd.read_csv(file_path) | |
except: | |
# Try with different delimiter | |
with open(file_path, 'r', newline='') as csvfile: | |
dialect = csv.Sniffer().sniff(csvfile.read(1024)) | |
csvfile.seek(0) | |
return pd.read_csv(file_path, delimiter=dialect.delimiter) | |
except Exception as e: | |
print(f"Error extracting content from CSV file {file_path}: {e}") | |
return pd.DataFrame() | |
class TextHandler(FileHandler): | |
"""Handler for text files.""" | |
def can_handle(self, file_path: str) -> bool: | |
"""Check if the file is a text file.""" | |
return self.get_file_type(file_path) == '.txt' | |
def extract_content(self, file_path: str) -> str: | |
""" | |
Extract content from a text file. | |
Returns: | |
Text content of the file | |
""" | |
try: | |
with open(file_path, 'r', encoding='utf-8') as f: | |
return f.read() | |
except UnicodeDecodeError: | |
# Try a different encoding | |
try: | |
with open(file_path, 'r', encoding='latin-1') as f: | |
return f.read() | |
except Exception as e: | |
print(f"Error extracting content from text file {file_path}: {e}") | |
return "" | |
except Exception as e: | |
print(f"Error extracting content from text file {file_path}: {e}") | |
return "" | |
class PDFHandler(FileHandler): | |
"""Handler for PDF files.""" | |
def can_handle(self, file_path: str) -> bool: | |
"""Check if the file is a PDF file.""" | |
return self.get_file_type(file_path) == '.pdf' | |
def extract_content(self, file_path: str) -> Dict[int, str]: | |
""" | |
Extract content from a PDF file. | |
Returns: | |
Dictionary mapping page numbers to text content | |
""" | |
try: | |
result = {} | |
with open(file_path, 'rb') as f: | |
pdf_reader = PyPDF2.PdfReader(f) | |
for i in range(len(pdf_reader.pages)): | |
page = pdf_reader.pages[i] | |
result[i + 1] = page.extract_text() | |
return result | |
except Exception as e: | |
print(f"Error extracting content from PDF file {file_path}: {e}") | |
return {} | |
class ImageHandler(FileHandler): | |
"""Handler for image files.""" | |
def can_handle(self, file_path: str) -> bool: | |
"""Check if the file is an image file.""" | |
return self.get_file_type(file_path) in ['.jpg', '.jpeg', '.png', '.gif', '.bmp'] | |
def extract_content(self, file_path: str) -> Dict[str, Any]: | |
""" | |
Extract metadata from an image file. | |
Returns: | |
Dictionary containing image metadata | |
""" | |
try: | |
with Image.open(file_path) as img: | |
metadata = { | |
'format': img.format, | |
'mode': img.mode, | |
'size': img.size, | |
'width': img.width, | |
'height': img.height, | |
} | |
# Extract EXIF data if available | |
if hasattr(img, '_getexif') and img._getexif(): | |
exif = { | |
PyPDF2.ExifTags.TAGS.get(k, k): v | |
for k, v in img._getexif().items() | |
if k in PyPDF2.ExifTags.TAGS | |
} | |
metadata['exif'] = exif | |
return metadata | |
except Exception as e: | |
print(f"Error extracting content from image file {file_path}: {e}") | |
return {} | |
class DocxHandler(FileHandler): | |
"""Handler for Word documents.""" | |
def can_handle(self, file_path: str) -> bool: | |
"""Check if the file is a Word document.""" | |
return self.get_file_type(file_path) == '.docx' | |
def extract_content(self, file_path: str) -> str: | |
""" | |
Extract text content from a Word document. | |
Returns: | |
Text content of the document | |
""" | |
try: | |
doc = docx.Document(file_path) | |
full_text = [] | |
# Extract text from paragraphs | |
for para in doc.paragraphs: | |
full_text.append(para.text) | |
# Extract text from tables | |
for table in doc.tables: | |
for row in table.rows: | |
for cell in row.cells: | |
full_text.append(cell.text) | |
return '\n'.join(full_text) | |
except Exception as e: | |
print(f"Error extracting content from Word document {file_path}: {e}") | |
return "" | |
class PptxHandler(FileHandler): | |
"""Handler for PowerPoint presentations.""" | |
def can_handle(self, file_path: str) -> bool: | |
"""Check if the file is a PowerPoint presentation.""" | |
return self.get_file_type(file_path) == '.pptx' | |
def extract_content(self, file_path: str) -> Dict[int, str]: | |
""" | |
Extract text content from a PowerPoint presentation. | |
Returns: | |
Dictionary mapping slide numbers to text content | |
""" | |
try: | |
prs = Presentation(file_path) | |
slides_text = {} | |
for i, slide in enumerate(prs.slides): | |
texts = [] | |
for shape in slide.shapes: | |
if hasattr(shape, "text"): | |
texts.append(shape.text) | |
slides_text[i + 1] = '\n'.join(texts) | |
return slides_text | |
except Exception as e: | |
print(f"Error extracting content from PowerPoint presentation {file_path}: {e}") | |
return {} | |
class JsonHandler(FileHandler): | |
"""Handler for JSON files.""" | |
def can_handle(self, file_path: str) -> bool: | |
"""Check if the file is a JSON file.""" | |
return self.get_file_type(file_path) in ['.json', '.jsonld'] | |
def extract_content(self, file_path: str) -> Dict[str, Any]: | |
""" | |
Extract content from a JSON file. | |
Returns: | |
Parsed JSON content | |
""" | |
try: | |
with open(file_path, 'r', encoding='utf-8') as f: | |
return json.load(f) | |
except Exception as e: | |
print(f"Error extracting content from JSON file {file_path}: {e}") | |
return {} | |
class ZipHandler(FileHandler): | |
"""Handler for ZIP archives.""" | |
def can_handle(self, file_path: str) -> bool: | |
"""Check if the file is a ZIP archive.""" | |
return self.get_file_type(file_path) == '.zip' | |
def extract_content(self, file_path: str) -> Dict[str, Any]: | |
""" | |
Extract information about the contents of a ZIP archive. | |
Returns: | |
Dictionary containing information about the archived files | |
""" | |
try: | |
result = {'files': []} | |
with zipfile.ZipFile(file_path, 'r') as zip_ref: | |
# Get information about each file in the archive | |
for file_info in zip_ref.infolist(): | |
result['files'].append({ | |
'filename': file_info.filename, | |
'size': file_info.file_size, | |
'compressed_size': file_info.compress_size, | |
'date_time': file_info.date_time, | |
}) | |
# Try to extract and process common file types | |
for file_info in zip_ref.infolist(): | |
if file_info.filename.endswith('.txt'): | |
with zip_ref.open(file_info.filename) as f: | |
content = f.read().decode('utf-8', errors='ignore') | |
result[file_info.filename] = content | |
return result | |
except Exception as e: | |
print(f"Error extracting content from ZIP archive {file_path}: {e}") | |
return {} | |
class PdbHandler(FileHandler): | |
"""Handler for PDB (Protein Data Bank) files.""" | |
def can_handle(self, file_path: str) -> bool: | |
"""Check if the file is a PDB file.""" | |
return self.get_file_type(file_path) == '.pdb' | |
def extract_content(self, file_path: str) -> Dict[str, Any]: | |
""" | |
Extract basic information from a PDB file. | |
Returns: | |
Dictionary containing basic information about the PDB file | |
""" | |
try: | |
result = { | |
'header': '', | |
'title': '', | |
'compounds': [], | |
'authors': [], | |
'atoms_count': 0, | |
} | |
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: | |
for line in f: | |
if line.startswith('HEADER'): | |
result['header'] = line[10:].strip() | |
elif line.startswith('TITLE'): | |
result['title'] += line[10:].strip() | |
elif line.startswith('COMPND'): | |
result['compounds'].append(line[10:].strip()) | |
elif line.startswith('AUTHOR'): | |
result['authors'].append(line[10:].strip()) | |
elif line.startswith('ATOM') or line.startswith('HETATM'): | |
result['atoms_count'] += 1 | |
return result | |
except Exception as e: | |
print(f"Error extracting content from PDB file {file_path}: {e}") | |
return {} | |
class PythonHandler(FileHandler): | |
"""Handler for Python files.""" | |
def can_handle(self, file_path: str) -> bool: | |
"""Check if the file is a Python file.""" | |
return self.get_file_type(file_path) == '.py' | |
def extract_content(self, file_path: str) -> Dict[str, Any]: | |
""" | |
Extract content and structure from a Python file. | |
Returns: | |
Dictionary containing the file content and structure information | |
""" | |
try: | |
with open(file_path, 'r', encoding='utf-8') as f: | |
content = f.read() | |
result = { | |
'content': content, | |
'classes': [], | |
'functions': [], | |
'imports': [], | |
} | |
# Extract class definitions | |
class_pattern = r'class\s+(\w+)(?:\(([^)]*)\))?:' | |
for match in re.finditer(class_pattern, content): | |
class_name = match.group(1) | |
parent_class = match.group(2) if match.group(2) else None | |
result['classes'].append({ | |
'name': class_name, | |
'parent': parent_class, | |
}) | |
# Extract function definitions | |
func_pattern = r'def\s+(\w+)\s*\(([^)]*)\):' | |
for match in re.finditer(func_pattern, content): | |
func_name = match.group(1) | |
params = match.group(2) | |
result['functions'].append({ | |
'name': func_name, | |
'params': params.strip(), | |
}) | |
# Extract imports | |
import_pattern = r'(?:from\s+(\w+(?:\.\w+)*)\s+)?import\s+(.+?)(?:\s+as\s+(\w+))?$' | |
for line in content.split('\n'): | |
line = line.strip() | |
if line.startswith('import ') or line.startswith('from '): | |
match = re.match(import_pattern, line) | |
if match: | |
from_module = match.group(1) | |
imported = match.group(2) | |
as_name = match.group(3) | |
result['imports'].append({ | |
'from': from_module, | |
'import': imported, | |
'as': as_name, | |
}) | |
return result | |
except Exception as e: | |
print(f"Error extracting content from Python file {file_path}: {e}") | |
return {} | |
class JsonlHandler(FileHandler): | |
"""Handler for JSONL (JSON Lines) files.""" | |
def can_handle(self, file_path: str) -> bool: | |
"""Check if the file is a JSONL file.""" | |
return self.get_file_type(file_path) == '.jsonl' | |
def extract_content(self, file_path: str) -> List[Dict[str, Any]]: | |
""" | |
Extract content from a JSONL file. | |
Returns: | |
List of parsed JSON objects | |
""" | |
try: | |
result = [] | |
with open(file_path, 'r', encoding='utf-8') as f: | |
for line in f: | |
line = line.strip() | |
if line: # Skip empty lines | |
result.append(json.loads(line)) | |
return result | |
except Exception as e: | |
print(f"Error extracting content from JSONL file {file_path}: {e}") | |
return [] | |
def get_all_handlers(resource_dir: str) -> List[FileHandler]: | |
""" | |
Get a list of all file handlers. | |
Args: | |
resource_dir: Directory containing resource files | |
Returns: | |
List of file handlers | |
""" | |
return [ | |
ExcelHandler(resource_dir), | |
CSVHandler(resource_dir), | |
TextHandler(resource_dir), | |
PDFHandler(resource_dir), | |
ImageHandler(resource_dir), | |
DocxHandler(resource_dir), | |
PptxHandler(resource_dir), | |
JsonHandler(resource_dir), | |
ZipHandler(resource_dir), | |
PdbHandler(resource_dir), | |
PythonHandler(resource_dir), | |
JsonlHandler(resource_dir), | |
] | |
def get_handler_for_file(file_path: str, resource_dir: str) -> Optional[FileHandler]: | |
""" | |
Get the appropriate handler for a file. | |
Args: | |
file_path: Path to the file | |
resource_dir: Directory containing resource files | |
Returns: | |
Appropriate file handler, or None if no handler can process the file | |
""" | |
handlers = get_all_handlers(resource_dir) | |
for handler in handlers: | |
if handler.can_handle(file_path): | |
return handler | |
return None | |
def extract_file_content(file_path: str, resource_dir: str) -> Tuple[Any, Optional[FileHandler]]: | |
""" | |
Extract content from a file using the appropriate handler. | |
Args: | |
file_path: Path to the file | |
resource_dir: Directory containing resource files | |
Returns: | |
Tuple of (extracted content, handler used) | |
""" | |
handler = get_handler_for_file(file_path, resource_dir) | |
if handler: | |
content = handler.extract_content(file_path) | |
return content, handler | |
return None, None | |