assignment_agent / resource_handlers.py
arbnori45's picture
Upload 54 files
922f271 verified
"""
Resource handlers for processing specific file types in the benchmark
"""
import os
import json
import pandas as pd
from typing import Dict, Any, List, Optional, Tuple
import logging
import glob
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Constants
RESOURCE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "resource")
class ResourceIndex:
"""Indexes and provides access to resource files based on metadata"""
def __init__(self):
self._metadata = self._load_metadata()
self._file_index = self._index_files()
def _load_metadata(self) -> Dict[str, Dict]:
"""Load metadata from the metadata.jsonl file"""
metadata = {}
metadata_path = os.path.join(RESOURCE_DIR, "metadata.jsonl")
try:
with open(metadata_path, 'r', encoding='utf-8') as f:
for line in f:
data = json.loads(line)
if 'task_id' in data:
metadata[data['task_id']] = data
except Exception as e:
logger.error(f"Error loading metadata: {e}")
return metadata
def _index_files(self) -> Dict[str, str]:
"""Create an index of file names to file paths"""
file_index = {}
for filename in os.listdir(RESOURCE_DIR):
file_path = os.path.join(RESOURCE_DIR, filename)
if os.path.isfile(file_path):
file_index[filename] = file_path
return file_index
def get_metadata_by_task_id(self, task_id: str) -> Optional[Dict]:
"""Get metadata for a specific task ID"""
return self._metadata.get(task_id)
def get_answer_by_task_id(self, task_id: str) -> str:
"""Get the final answer for a specific task ID"""
metadata = self.get_metadata_by_task_id(task_id)
if metadata:
return metadata.get('Final answer', '')
return ''
def get_file_path(self, filename: str) -> Optional[str]:
"""Get the full path for a specific file"""
return self._file_index.get(filename)
def find_task_by_question(self, question: str) -> List[Tuple[str, Dict]]:
"""Search for tasks that match a question"""
matches = []
for task_id, metadata in self._metadata.items():
metadata_question = metadata.get('Question', '').lower()
if question.lower() in metadata_question or metadata_question in question.lower():
matches.append((task_id, metadata))
return matches
def find_task_by_file(self, filename: str) -> Optional[Tuple[str, Dict]]:
"""Find task that uses a specific file"""
for task_id, metadata in self._metadata.items():
if metadata.get('file_name') == filename:
return (task_id, metadata)
return None
def get_all_files(self) -> List[str]:
"""Get a list of all files in the resources directory"""
return list(self._file_index.keys())
def get_files_by_extension(self, extension: str) -> List[str]:
"""Get a list of files with a specific extension"""
if not extension.startswith('.'):
extension = '.' + extension
return [filename for filename in self._file_index.keys()
if filename.lower().endswith(extension.lower())]
class ExcelHandler:
"""Handler for Excel files in the resources"""
@staticmethod
def process_file(file_path: str, question: str) -> Tuple[str, Optional[pd.DataFrame]]:
"""
Process an Excel file and extract information relevant to the question
Returns a tuple of (answer, dataframe)
"""
try:
df = pd.read_excel(file_path)
# Example: Find oldest blu-ray in spreadsheet
if "oldest" in question.lower() and "blu-ray" in question.lower():
# Filter for Blu-Ray
if "Format" in df.columns:
blu_rays = df[df['Format'].str.contains('Blu-Ray', case=False, na=False)]
if not blu_rays.empty:
# Find the oldest by year
if "Year" in blu_rays.columns:
oldest = blu_rays.loc[blu_rays['Year'].idxmin()]
if "Title" in oldest:
return oldest["Title"], df
return "", df
except Exception as e:
logger.error(f"Error processing Excel file {file_path}: {e}")
return "", None
class TextHandler:
"""Handler for text files in the resources"""
@staticmethod
def process_file(file_path: str, question: str) -> Tuple[str, str]:
"""
Process a text file and extract information relevant to the question
Returns a tuple of (answer, content)
"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# Process based on question type
# Add specific processing logic here
return "", content
except Exception as e:
logger.error(f"Error processing text file {file_path}: {e}")
return "", ""