ai_agents_final / agent /tools /file_handlers.py
Arbnor Tefiki
Test the agent in HF
5d9aa5e
"""
File handlers for processing different file types.
"""
import os
import json
import csv
import zipfile
import io
import re
from typing import Dict, Any, List, Optional, Tuple
import pandas as pd
from PIL import Image
import PyPDF2
import docx
from pptx import Presentation
class FileHandler:
"""Base class for file handlers."""
def __init__(self, resource_dir: str):
"""
Initialize the file handler.
Args:
resource_dir: Directory containing resource files
"""
self.resource_dir = resource_dir
def get_file_path(self, file_name: str) -> str:
"""
Get the full path to a file.
Args:
file_name: Name of the file
Returns:
Full path to the file
"""
return os.path.join(self.resource_dir, file_name)
def can_handle(self, file_path: str) -> bool:
"""
Check if the handler can process the given file.
Args:
file_path: Path to the file
Returns:
True if the handler can process the file, False otherwise
"""
raise NotImplementedError("Subclasses must implement this method")
def extract_content(self, file_path: str) -> Any:
"""
Extract content from the file.
Args:
file_path: Path to the file
Returns:
Extracted content
"""
raise NotImplementedError("Subclasses must implement this method")
def get_file_type(self, file_path: str) -> str:
"""
Get the file type based on extension.
Args:
file_path: Path to the file
Returns:
File type (extension)
"""
_, ext = os.path.splitext(file_path)
return ext.lower()
class ExcelHandler(FileHandler):
"""Handler for Excel files."""
def can_handle(self, file_path: str) -> bool:
"""Check if the file is an Excel file."""
return self.get_file_type(file_path) in ['.xlsx', '.xls']
def extract_content(self, file_path: str) -> Dict[str, pd.DataFrame]:
"""
Extract content from an Excel file.
Returns:
Dictionary mapping sheet names to DataFrames
"""
try:
# Read all sheets
excel_file = pd.ExcelFile(file_path)
sheets = {}
for sheet_name in excel_file.sheet_names:
sheets[sheet_name] = pd.read_excel(excel_file, sheet_name)
return sheets
except Exception as e:
print(f"Error extracting content from Excel file {file_path}: {e}")
return {}
class CSVHandler(FileHandler):
"""Handler for CSV files."""
def can_handle(self, file_path: str) -> bool:
"""Check if the file is a CSV file."""
return self.get_file_type(file_path) == '.csv'
def extract_content(self, file_path: str) -> pd.DataFrame:
"""
Extract content from a CSV file.
Returns:
DataFrame containing the CSV data
"""
try:
# Try different encodings and delimiters
try:
return pd.read_csv(file_path)
except:
# Try with different delimiter
with open(file_path, 'r', newline='') as csvfile:
dialect = csv.Sniffer().sniff(csvfile.read(1024))
csvfile.seek(0)
return pd.read_csv(file_path, delimiter=dialect.delimiter)
except Exception as e:
print(f"Error extracting content from CSV file {file_path}: {e}")
return pd.DataFrame()
class TextHandler(FileHandler):
"""Handler for text files."""
def can_handle(self, file_path: str) -> bool:
"""Check if the file is a text file."""
return self.get_file_type(file_path) == '.txt'
def extract_content(self, file_path: str) -> str:
"""
Extract content from a text file.
Returns:
Text content of the file
"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
return f.read()
except UnicodeDecodeError:
# Try a different encoding
try:
with open(file_path, 'r', encoding='latin-1') as f:
return f.read()
except Exception as e:
print(f"Error extracting content from text file {file_path}: {e}")
return ""
except Exception as e:
print(f"Error extracting content from text file {file_path}: {e}")
return ""
class PDFHandler(FileHandler):
"""Handler for PDF files."""
def can_handle(self, file_path: str) -> bool:
"""Check if the file is a PDF file."""
return self.get_file_type(file_path) == '.pdf'
def extract_content(self, file_path: str) -> Dict[int, str]:
"""
Extract content from a PDF file.
Returns:
Dictionary mapping page numbers to text content
"""
try:
result = {}
with open(file_path, 'rb') as f:
pdf_reader = PyPDF2.PdfReader(f)
for i in range(len(pdf_reader.pages)):
page = pdf_reader.pages[i]
result[i + 1] = page.extract_text()
return result
except Exception as e:
print(f"Error extracting content from PDF file {file_path}: {e}")
return {}
class ImageHandler(FileHandler):
"""Handler for image files."""
def can_handle(self, file_path: str) -> bool:
"""Check if the file is an image file."""
return self.get_file_type(file_path) in ['.jpg', '.jpeg', '.png', '.gif', '.bmp']
def extract_content(self, file_path: str) -> Dict[str, Any]:
"""
Extract metadata from an image file.
Returns:
Dictionary containing image metadata
"""
try:
with Image.open(file_path) as img:
metadata = {
'format': img.format,
'mode': img.mode,
'size': img.size,
'width': img.width,
'height': img.height,
}
# Extract EXIF data if available
if hasattr(img, '_getexif') and img._getexif():
exif = {
PyPDF2.ExifTags.TAGS.get(k, k): v
for k, v in img._getexif().items()
if k in PyPDF2.ExifTags.TAGS
}
metadata['exif'] = exif
return metadata
except Exception as e:
print(f"Error extracting content from image file {file_path}: {e}")
return {}
class DocxHandler(FileHandler):
"""Handler for Word documents."""
def can_handle(self, file_path: str) -> bool:
"""Check if the file is a Word document."""
return self.get_file_type(file_path) == '.docx'
def extract_content(self, file_path: str) -> str:
"""
Extract text content from a Word document.
Returns:
Text content of the document
"""
try:
doc = docx.Document(file_path)
full_text = []
# Extract text from paragraphs
for para in doc.paragraphs:
full_text.append(para.text)
# Extract text from tables
for table in doc.tables:
for row in table.rows:
for cell in row.cells:
full_text.append(cell.text)
return '\n'.join(full_text)
except Exception as e:
print(f"Error extracting content from Word document {file_path}: {e}")
return ""
class PptxHandler(FileHandler):
"""Handler for PowerPoint presentations."""
def can_handle(self, file_path: str) -> bool:
"""Check if the file is a PowerPoint presentation."""
return self.get_file_type(file_path) == '.pptx'
def extract_content(self, file_path: str) -> Dict[int, str]:
"""
Extract text content from a PowerPoint presentation.
Returns:
Dictionary mapping slide numbers to text content
"""
try:
prs = Presentation(file_path)
slides_text = {}
for i, slide in enumerate(prs.slides):
texts = []
for shape in slide.shapes:
if hasattr(shape, "text"):
texts.append(shape.text)
slides_text[i + 1] = '\n'.join(texts)
return slides_text
except Exception as e:
print(f"Error extracting content from PowerPoint presentation {file_path}: {e}")
return {}
class JsonHandler(FileHandler):
"""Handler for JSON files."""
def can_handle(self, file_path: str) -> bool:
"""Check if the file is a JSON file."""
return self.get_file_type(file_path) in ['.json', '.jsonld']
def extract_content(self, file_path: str) -> Dict[str, Any]:
"""
Extract content from a JSON file.
Returns:
Parsed JSON content
"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
print(f"Error extracting content from JSON file {file_path}: {e}")
return {}
class ZipHandler(FileHandler):
"""Handler for ZIP archives."""
def can_handle(self, file_path: str) -> bool:
"""Check if the file is a ZIP archive."""
return self.get_file_type(file_path) == '.zip'
def extract_content(self, file_path: str) -> Dict[str, Any]:
"""
Extract information about the contents of a ZIP archive.
Returns:
Dictionary containing information about the archived files
"""
try:
result = {'files': []}
with zipfile.ZipFile(file_path, 'r') as zip_ref:
# Get information about each file in the archive
for file_info in zip_ref.infolist():
result['files'].append({
'filename': file_info.filename,
'size': file_info.file_size,
'compressed_size': file_info.compress_size,
'date_time': file_info.date_time,
})
# Try to extract and process common file types
for file_info in zip_ref.infolist():
if file_info.filename.endswith('.txt'):
with zip_ref.open(file_info.filename) as f:
content = f.read().decode('utf-8', errors='ignore')
result[file_info.filename] = content
return result
except Exception as e:
print(f"Error extracting content from ZIP archive {file_path}: {e}")
return {}
class PdbHandler(FileHandler):
"""Handler for PDB (Protein Data Bank) files."""
def can_handle(self, file_path: str) -> bool:
"""Check if the file is a PDB file."""
return self.get_file_type(file_path) == '.pdb'
def extract_content(self, file_path: str) -> Dict[str, Any]:
"""
Extract basic information from a PDB file.
Returns:
Dictionary containing basic information about the PDB file
"""
try:
result = {
'header': '',
'title': '',
'compounds': [],
'authors': [],
'atoms_count': 0,
}
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
for line in f:
if line.startswith('HEADER'):
result['header'] = line[10:].strip()
elif line.startswith('TITLE'):
result['title'] += line[10:].strip()
elif line.startswith('COMPND'):
result['compounds'].append(line[10:].strip())
elif line.startswith('AUTHOR'):
result['authors'].append(line[10:].strip())
elif line.startswith('ATOM') or line.startswith('HETATM'):
result['atoms_count'] += 1
return result
except Exception as e:
print(f"Error extracting content from PDB file {file_path}: {e}")
return {}
class PythonHandler(FileHandler):
"""Handler for Python files."""
def can_handle(self, file_path: str) -> bool:
"""Check if the file is a Python file."""
return self.get_file_type(file_path) == '.py'
def extract_content(self, file_path: str) -> Dict[str, Any]:
"""
Extract content and structure from a Python file.
Returns:
Dictionary containing the file content and structure information
"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
result = {
'content': content,
'classes': [],
'functions': [],
'imports': [],
}
# Extract class definitions
class_pattern = r'class\s+(\w+)(?:\(([^)]*)\))?:'
for match in re.finditer(class_pattern, content):
class_name = match.group(1)
parent_class = match.group(2) if match.group(2) else None
result['classes'].append({
'name': class_name,
'parent': parent_class,
})
# Extract function definitions
func_pattern = r'def\s+(\w+)\s*\(([^)]*)\):'
for match in re.finditer(func_pattern, content):
func_name = match.group(1)
params = match.group(2)
result['functions'].append({
'name': func_name,
'params': params.strip(),
})
# Extract imports
import_pattern = r'(?:from\s+(\w+(?:\.\w+)*)\s+)?import\s+(.+?)(?:\s+as\s+(\w+))?$'
for line in content.split('\n'):
line = line.strip()
if line.startswith('import ') or line.startswith('from '):
match = re.match(import_pattern, line)
if match:
from_module = match.group(1)
imported = match.group(2)
as_name = match.group(3)
result['imports'].append({
'from': from_module,
'import': imported,
'as': as_name,
})
return result
except Exception as e:
print(f"Error extracting content from Python file {file_path}: {e}")
return {}
class JsonlHandler(FileHandler):
"""Handler for JSONL (JSON Lines) files."""
def can_handle(self, file_path: str) -> bool:
"""Check if the file is a JSONL file."""
return self.get_file_type(file_path) == '.jsonl'
def extract_content(self, file_path: str) -> List[Dict[str, Any]]:
"""
Extract content from a JSONL file.
Returns:
List of parsed JSON objects
"""
try:
result = []
with open(file_path, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line: # Skip empty lines
result.append(json.loads(line))
return result
except Exception as e:
print(f"Error extracting content from JSONL file {file_path}: {e}")
return []
def get_all_handlers(resource_dir: str) -> List[FileHandler]:
"""
Get a list of all file handlers.
Args:
resource_dir: Directory containing resource files
Returns:
List of file handlers
"""
return [
ExcelHandler(resource_dir),
CSVHandler(resource_dir),
TextHandler(resource_dir),
PDFHandler(resource_dir),
ImageHandler(resource_dir),
DocxHandler(resource_dir),
PptxHandler(resource_dir),
JsonHandler(resource_dir),
ZipHandler(resource_dir),
PdbHandler(resource_dir),
PythonHandler(resource_dir),
JsonlHandler(resource_dir),
]
def get_handler_for_file(file_path: str, resource_dir: str) -> Optional[FileHandler]:
"""
Get the appropriate handler for a file.
Args:
file_path: Path to the file
resource_dir: Directory containing resource files
Returns:
Appropriate file handler, or None if no handler can process the file
"""
handlers = get_all_handlers(resource_dir)
for handler in handlers:
if handler.can_handle(file_path):
return handler
return None
def extract_file_content(file_path: str, resource_dir: str) -> Tuple[Any, Optional[FileHandler]]:
"""
Extract content from a file using the appropriate handler.
Args:
file_path: Path to the file
resource_dir: Directory containing resource files
Returns:
Tuple of (extracted content, handler used)
"""
handler = get_handler_for_file(file_path, resource_dir)
if handler:
content = handler.extract_content(file_path)
return content, handler
return None, None