""" Utilities for analyzing and understanding questions. """ import re import json import os from typing import Dict, Any, List, Optional, Tuple, Set class QuestionAnalyzer: """ Class for analyzing and understanding questions. """ def __init__(self, resource_dir: str, metadata_path: Optional[str] = None): """ Initialize the question analyzer. Args: resource_dir: Directory containing resource files metadata_path: Path to the metadata file (optional) """ self.resource_dir = resource_dir self.metadata_path = metadata_path or os.path.join(resource_dir, 'metadata.jsonl') self.metadata = self._load_metadata() def _load_metadata(self) -> Dict[str, Dict[str, Any]]: """ Load metadata from the metadata file. Returns: Dictionary mapping task IDs to metadata """ metadata = {} if os.path.exists(self.metadata_path): try: with open(self.metadata_path, 'r', encoding='utf-8') as f: for line in f: entry = json.loads(line.strip()) task_id = entry.get('task_id') if task_id: metadata[task_id] = entry except Exception as e: print(f"Error loading metadata: {e}") return metadata def extract_file_mention(self, question: str) -> Optional[str]: """ Extract mentioned file name from the question. Args: question: The question to analyze Returns: Mentioned file name, or None if no file is mentioned """ # Look for "attached file" or "attached spreadsheet" patterns attached_pattern = r'attached (?:file|spreadsheet|document|image|picture|pdf|excel|csv|text file|zip|archive) (?:named |called |")?([\w\.-]+)' match = re.search(attached_pattern, question, re.IGNORECASE) if match: return match.group(1) # Look for file extensions extensions = [ '.xlsx', '.xls', '.csv', '.txt', '.pdf', '.jpg', '.jpeg', '.png', '.docx', '.pptx', '.json', '.jsonld', '.zip', '.pdb', '.py' ] for ext in extensions: pattern = r'(\w+(?:-\w+)*' + re.escape(ext) + r')' match = re.search(pattern, question, re.IGNORECASE) if match: return match.group(1) return None def find_relevant_file(self, question: str, task_id: Optional[str] = None) -> Optional[str]: """ Find the relevant file for a question. Args: question: The question to analyze task_id: The task ID (optional) Returns: Path to the relevant file, or None if no file is found """ # Check if task_id is in metadata and has a file_name if task_id and task_id in self.metadata: file_name = self.metadata[task_id].get('file_name') if file_name and file_name.strip(): # Make sure file_name is not empty file_path = os.path.join(self.resource_dir, file_name) if os.path.exists(file_path): print(f"Found file in metadata for task_id {task_id}: {file_path}") return file_path # Try to find task_id in all metadata entries by matching the question if not task_id: for entry_id, entry in self.metadata.items(): if entry.get('Question') and entry.get('Question') == question: file_name = entry.get('file_name') if file_name and file_name.strip(): file_path = os.path.join(self.resource_dir, file_name) if os.path.exists(file_path): print(f"Found file in metadata by matching question: {file_path}") return file_path # Extract file mention from question file_mention = self.extract_file_mention(question) if file_mention: # Check if the mentioned file exists file_path = os.path.join(self.resource_dir, file_mention) if os.path.exists(file_path): print(f"Found file by direct mention: {file_path}") return file_path # Check if there's a file with a similar name for file_name in os.listdir(self.resource_dir): if file_mention.lower() in file_name.lower(): file_path = os.path.join(self.resource_dir, file_name) print(f"Found file by partial name match: {file_path}") return file_path # Look for UUID pattern in the question which might be a file name without extension uuid_pattern = r'([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})' uuid_match = re.search(uuid_pattern, question, re.IGNORECASE) if uuid_match: uuid = uuid_match.group(1) for file_name in os.listdir(self.resource_dir): if uuid in file_name: file_path = os.path.join(self.resource_dir, file_name) print(f"Found file by UUID match: {file_path}") return file_path # If no file is found, try to find a file mentioned in the metadata if task_id and task_id in self.metadata: # Extract keywords from the question keywords = self._extract_keywords(question) # Check all files in the resource directory best_match = None best_score = 0 for file_name in os.listdir(self.resource_dir): # Skip metadata file if file_name == 'metadata.jsonl': continue # Calculate score based on keyword matches score = 0 for keyword in keywords: if keyword.lower() in file_name.lower(): score += 1 if score > best_score: best_score = score best_match = file_name if best_match: file_path = os.path.join(self.resource_dir, best_match) print(f"Found file by keyword matching: {file_path}") return file_path # If still no match, check the content of metadata.jsonl for clues try: with open(self.metadata_path, 'r', encoding='utf-8') as f: for line in f: entry = json.loads(line.strip()) if 'Question' in entry and entry['Question'] and 'file_name' in entry and entry['file_name']: # Compare with current question if self._questions_are_similar(question, entry['Question']): file_name = entry['file_name'] file_path = os.path.join(self.resource_dir, file_name) if os.path.exists(file_path): print(f"Found file by similar question in metadata: {file_path}") return file_path except Exception as e: print(f"Error searching metadata for similar questions: {e}") return None def _questions_are_similar(self, q1: str, q2: str) -> bool: """ Check if two questions are similar. Args: q1: First question q2: Second question Returns: True if the questions are similar, False otherwise """ # Convert to lowercase and remove punctuation q1 = re.sub(r'[^\w\s]', '', q1.lower()) q2 = re.sub(r'[^\w\s]', '', q2.lower()) # Split into words words1 = set(q1.split()) words2 = set(q2.split()) # Calculate Jaccard similarity intersection = len(words1.intersection(words2)) union = len(words1.union(words2)) if union == 0: return False similarity = intersection / union # Return True if similarity is above threshold return similarity > 0.5 def _extract_keywords(self, text: str) -> Set[str]: """ Extract keywords from text. Args: text: The text to analyze Returns: Set of keywords """ # Remove common stop words stop_words = { 'a', 'an', 'the', 'and', 'or', 'but', 'if', 'then', 'else', 'when', 'at', 'from', 'by', 'for', 'with', 'about', 'against', 'between', 'into', 'through', 'during', 'before', 'after', 'above', 'below', 'to', 'of', 'in', 'on', 'is', 'are', 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'having', 'do', 'does', 'did', 'doing', 'would', 'should', 'could', 'might', 'will', 'shall', 'can', 'may', 'must', 'ought' } # Extract words words = re.findall(r'\b\w+\b', text.lower()) # Filter out stop words and short words keywords = {word for word in words if word not in stop_words and len(word) > 2} return keywords def analyze_question(self, question: str, task_id: Optional[str] = None) -> Dict[str, Any]: """ Analyze a question to understand what it's asking. Args: question: The question to analyze task_id: The task ID (optional) Returns: Dictionary containing analysis results """ result = { 'question': question, 'task_id': task_id, 'file_path': None, 'keywords': list(self._extract_keywords(question)), 'expected_answer': None, } # Try to extract task_id from the question if not provided if not task_id: task_id_match = re.search(r'task_id[: ]+([\w\-]+)', question, re.IGNORECASE) if task_id_match: result['task_id'] = task_id_match.group(1) task_id = result['task_id'] # Find relevant file file_path = self.find_relevant_file(question, task_id) if file_path: result['file_path'] = file_path # Get expected answer if available if task_id and task_id in self.metadata: # Check multiple possible fields for the answer for answer_field in ['answer', 'Final answer', 'expected_answer']: if answer_field in self.metadata[task_id]: result['expected_answer'] = self.metadata[task_id].get(answer_field) break # If we still don't have an expected answer, search the metadata file again if not result['expected_answer'] and os.path.exists(self.metadata_path): try: with open(self.metadata_path, 'r', encoding='utf-8') as f: for line in f: try: entry = json.loads(line.strip()) if entry.get('task_id') == task_id: for answer_field in ['answer', 'Final answer', 'expected_answer']: if answer_field in entry: result['expected_answer'] = entry[answer_field] break if result['expected_answer']: break # Also check if the task_id is in the question field if task_id and 'question' in entry and task_id in entry['question']: for answer_field in ['answer', 'Final answer', 'expected_answer']: if answer_field in entry: result['expected_answer'] = entry[answer_field] break if result['expected_answer']: break except json.JSONDecodeError: continue except Exception as e: print(f"Error searching metadata for expected answer: {e}") return result def find_file_by_task_id(self, task_id: str) -> Optional[str]: """ Find a file path by task_id in metadata. Args: task_id: The task ID Returns: File path if found, None otherwise """ if not task_id: return None # Check if we have this task_id in our metadata if task_id in self.metadata: file_name = self.metadata[task_id].get('file_name') if file_name: file_path = os.path.join(self.resource_dir, file_name) if os.path.exists(file_path): print(f"Found file in metadata for task_id {task_id}: {file_path}") return file_path # Search through metadata file again to find the task_id try: with open(self.metadata_path, 'r', encoding='utf-8') as f: for line in f: try: entry = json.loads(line.strip()) if entry.get('task_id') == task_id and 'file_name' in entry: file_name = entry['file_name'] file_path = os.path.join(self.resource_dir, file_name) if os.path.exists(file_path): print(f"Found file in metadata for task_id {task_id}: {file_path}") return file_path # If the file doesn't exist with the exact path, look for similar files for existing_file in os.listdir(self.resource_dir): if task_id in existing_file: file_path = os.path.join(self.resource_dir, existing_file) print(f"Found file matching task_id {task_id}: {file_path}") return file_path except json.JSONDecodeError: continue except Exception as e: print(f"Error searching metadata for file by task_id: {e}") return None