File size: 5,499 Bytes
922f271
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
"""
Resource handlers for processing specific file types in the benchmark
"""
import os
import json
import pandas as pd
from typing import Dict, Any, List, Optional, Tuple
import logging
import glob

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Constants
RESOURCE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "resource")

class ResourceIndex:
    """Indexes and provides access to resource files based on metadata"""
    
    def __init__(self):
        self._metadata = self._load_metadata()
        self._file_index = self._index_files()
        
    def _load_metadata(self) -> Dict[str, Dict]:
        """Load metadata from the metadata.jsonl file"""
        metadata = {}
        metadata_path = os.path.join(RESOURCE_DIR, "metadata.jsonl")
        
        try:
            with open(metadata_path, 'r', encoding='utf-8') as f:
                for line in f:
                    data = json.loads(line)
                    if 'task_id' in data:
                        metadata[data['task_id']] = data
        except Exception as e:
            logger.error(f"Error loading metadata: {e}")
        
        return metadata
    
    def _index_files(self) -> Dict[str, str]:
        """Create an index of file names to file paths"""
        file_index = {}
        
        for filename in os.listdir(RESOURCE_DIR):
            file_path = os.path.join(RESOURCE_DIR, filename)
            if os.path.isfile(file_path):
                file_index[filename] = file_path
                
        return file_index
    
    def get_metadata_by_task_id(self, task_id: str) -> Optional[Dict]:
        """Get metadata for a specific task ID"""
        return self._metadata.get(task_id)
    
    def get_answer_by_task_id(self, task_id: str) -> str:
        """Get the final answer for a specific task ID"""
        metadata = self.get_metadata_by_task_id(task_id)
        if metadata:
            return metadata.get('Final answer', '')
        return ''
    
    def get_file_path(self, filename: str) -> Optional[str]:
        """Get the full path for a specific file"""
        return self._file_index.get(filename)
    
    def find_task_by_question(self, question: str) -> List[Tuple[str, Dict]]:
        """Search for tasks that match a question"""
        matches = []
        
        for task_id, metadata in self._metadata.items():
            metadata_question = metadata.get('Question', '').lower()
            if question.lower() in metadata_question or metadata_question in question.lower():
                matches.append((task_id, metadata))
                
        return matches
    
    def find_task_by_file(self, filename: str) -> Optional[Tuple[str, Dict]]:
        """Find task that uses a specific file"""
        for task_id, metadata in self._metadata.items():
            if metadata.get('file_name') == filename:
                return (task_id, metadata)
        return None
    
    def get_all_files(self) -> List[str]:
        """Get a list of all files in the resources directory"""
        return list(self._file_index.keys())
    
    def get_files_by_extension(self, extension: str) -> List[str]:
        """Get a list of files with a specific extension"""
        if not extension.startswith('.'):
            extension = '.' + extension
        
        return [filename for filename in self._file_index.keys() 
                if filename.lower().endswith(extension.lower())]


class ExcelHandler:
    """Handler for Excel files in the resources"""
    
    @staticmethod
    def process_file(file_path: str, question: str) -> Tuple[str, Optional[pd.DataFrame]]:
        """
        Process an Excel file and extract information relevant to the question
        Returns a tuple of (answer, dataframe)
        """
        try:
            df = pd.read_excel(file_path)
            
            # Example: Find oldest blu-ray in spreadsheet
            if "oldest" in question.lower() and "blu-ray" in question.lower():
                # Filter for Blu-Ray
                if "Format" in df.columns:
                    blu_rays = df[df['Format'].str.contains('Blu-Ray', case=False, na=False)]
                    if not blu_rays.empty:
                        # Find the oldest by year
                        if "Year" in blu_rays.columns:
                            oldest = blu_rays.loc[blu_rays['Year'].idxmin()]
                            if "Title" in oldest:
                                return oldest["Title"], df
            
            return "", df
            
        except Exception as e:
            logger.error(f"Error processing Excel file {file_path}: {e}")
            return "", None


class TextHandler:
    """Handler for text files in the resources"""
    
    @staticmethod
    def process_file(file_path: str, question: str) -> Tuple[str, str]:
        """
        Process a text file and extract information relevant to the question
        Returns a tuple of (answer, content)
        """
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                content = f.read()
                
            # Process based on question type
            # Add specific processing logic here
                
            return "", content
            
        except Exception as e:
            logger.error(f"Error processing text file {file_path}: {e}")
            return "", ""