import os import gradio as gr import requests import pandas as pd import re import time import json import base64 from typing import Dict, Any, List, Optional, Tuple from io import StringIO, BytesIO import openpyxl from PIL import Image import PyPDF2 import ast import math import statistics from datetime import datetime, timedelta DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" class FileProcessor: """Handle various file types that GAIA questions might reference""" @staticmethod def process_excel_file(file_path: str) -> Dict[str, Any]: """Process Excel files and extract data""" try: # Try multiple sheet reading approaches excel_data = {} workbook = openpyxl.load_workbook(file_path, data_only=True) for sheet_name in workbook.sheetnames: sheet = workbook[sheet_name] data = [] for row in sheet.iter_rows(values_only=True): if any(cell is not None for cell in row): data.append(row) excel_data[sheet_name] = data return excel_data except Exception as e: print(f"Excel processing error: {e}") return {} @staticmethod def process_python_code(code_content: str) -> str: """Execute Python code safely and return output""" try: # Create a safe execution environment safe_globals = { '__builtins__': { 'print': print, 'len': len, 'range': range, 'sum': sum, 'max': max, 'min': min, 'abs': abs, 'round': round, 'int': int, 'float': float, 'str': str, 'list': list, 'dict': dict, 'set': set, 'tuple': tuple }, 'math': math, 'statistics': statistics } # Capture output import io import sys old_stdout = sys.stdout sys.stdout = captured_output = io.StringIO() try: exec(code_content, safe_globals) output = captured_output.getvalue() finally: sys.stdout = old_stdout return output.strip() except Exception as e: return f"Code execution error: {e}" @staticmethod def process_pdf_file(file_path: str) -> str: """Extract text from PDF files""" try: with open(file_path, 'rb') as file: pdf_reader = PyPDF2.PdfReader(file) text = "" for page in pdf_reader.pages: text += page.extract_text() + "\n" return text.strip() except Exception as e: return f"PDF processing error: {e}" class AdvancedWebSearchEngine: """Enhanced web search with multiple strategies""" def __init__(self): self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' }) self.serper_api_key = os.getenv("SERPER_API_KEY") self.search_cache = {} def search_with_serper(self, query: str, search_type: str = "search") -> Dict[str, Any]: """Enhanced Serper API search with different types""" if not self.serper_api_key: return {} # Check cache first cache_key = f"{query}_{search_type}" if cache_key in self.search_cache: return self.search_cache[cache_key] try: url = f"https://google.serper.dev/{search_type}" payload = { "q": query, "num": 15, # Get more results "gl": "us", # US results "hl": "en" # English language } headers = { "X-API-KEY": self.serper_api_key, "Content-Type": "application/json" } response = self.session.post(url, json=payload, headers=headers, timeout=20) result = response.json() if response.status_code == 200 else {} # Cache the result self.search_cache[cache_key] = result return result except Exception as e: print(f"Serper API error: {e}") return {} def multi_strategy_search(self, query: str) -> Dict[str, Any]: """Try multiple search strategies for better results""" results = {} # Primary search primary = self.search_with_serper(query) if primary: results['primary'] = primary # Try variations if primary doesn't yield good results variations = [ f'"{query}"', # Exact phrase f"{query} site:wikipedia.org", # Wikipedia specific f"{query} facts information", # More specific ] for i, variation in enumerate(variations): if len(results) < 2: # Don't overdo it var_result = self.search_with_serper(variation) if var_result and var_result != primary: results[f'variation_{i}'] = var_result return results def extract_answer_from_results(self, results: Dict[str, Any], question: str) -> str: """Advanced answer extraction from search results""" all_content = [] for result_type, data in results.items(): # Extract answer box if "answerBox" in data: answer_box = data["answerBox"] if "answer" in answer_box: return answer_box["answer"] elif "snippet" in answer_box: return answer_box["snippet"] # Extract knowledge graph if "knowledgeGraph" in data: kg = data["knowledgeGraph"] if "description" in kg: all_content.append(kg["description"]) # Extract organic results for organic in data.get("organic", []): title = organic.get("title", "") snippet = organic.get("snippet", "") if title and snippet: all_content.append(f"{title}: {snippet}") # Combine all content combined_content = "\n".join(all_content) # Apply question-specific extraction return self.extract_specific_answer(combined_content, question) def extract_specific_answer(self, content: str, question: str) -> str: """Extract specific answers based on question type""" q_lower = question.lower() # Numbers and quantities if any(word in q_lower for word in ['how many', 'how much', 'number of', 'count']): numbers = re.findall(r'\b\d{1,10}\b', content) if numbers: # Return the most likely number (often the first one found) return numbers[0] # Names and people if any(word in q_lower for word in ['who', 'whom', 'name', 'person']): # Look for proper names (capitalized words) names = re.findall(r'\b[A-Z][a-z]+ [A-Z][a-z]+(?:\s[A-Z][a-z]+)*\b', content) if names: if 'first name' in q_lower: return names[0].split()[0] elif 'last name' in q_lower or 'surname' in q_lower: return names[0].split()[-1] else: return names[0] # Dates and years if any(word in q_lower for word in ['when', 'year', 'date']): years = re.findall(r'\b(19|20)\d{2}\b', content) if years: return years[0] dates = re.findall(r'\b\w+ \d{1,2}, \d{4}\b', content) if dates: return dates[0] # Places and locations if any(word in q_lower for word in ['where', 'location', 'place', 'country']): # Look for place names places = re.findall(r'\b[A-Z][a-z]+(?:\s[A-Z][a-z]+)*(?:\s(?:City|State|Country|Province|Region))?\b', content) if places: return places[0] # Country codes if 'country code' in q_lower: codes = re.findall(r'\b[A-Z]{2,3}\b', content) if codes: return codes[0] # Default: return first meaningful sentence sentences = [s.strip() for s in content.split('.') if len(s.strip()) > 20] return sentences[0] if sentences else "Answer not found in search results" class EnhancedQuestionSolver: """Advanced question solver with multiple reasoning strategies""" def __init__(self): self.search_engine = AdvancedWebSearchEngine() self.file_processor = FileProcessor() def solve_question(self, question: str, files: List[str] = None) -> str: """Main question solving method with multiple strategies""" print(f"šŸ¤” Analyzing: {question[:100]}...") # Handle file-based questions first if files: file_answer = self.handle_file_based_question(question, files) if file_answer and file_answer != "File processing failed": return file_answer # Detect file references in question text if self.has_file_references(question): return self.handle_file_reference_question(question) # Handle mathematical calculations if self.is_math_question(question): return self.handle_math_question(question) # Handle multi-step reasoning questions if self.needs_multi_step_reasoning(question): return self.handle_multi_step_question(question) # Handle specific structured questions return self.handle_structured_question(question) def has_file_references(self, question: str) -> bool: """Check if question references files""" file_indicators = [ "attached", "excel file", "python code", "pdf", "image", "spreadsheet", "document", "file contains", "in the file" ] return any(indicator in question.lower() for indicator in file_indicators) def handle_file_reference_question(self, question: str) -> str: """Handle questions that reference files but files aren't provided""" # Try to search for the specific content mentioned if "excel file" in question.lower() and "sales" in question.lower(): return "Unable to access attached Excel file. Please ensure file is properly uploaded." elif "python code" in question.lower(): return "Unable to access attached Python code. Please ensure file is properly uploaded." else: return "File referenced but not accessible. Please provide the file." def handle_file_based_question(self, question: str, files: List[str]) -> str: """Handle questions that involve file processing""" try: for file_path in files: if file_path.endswith('.xlsx') or file_path.endswith('.xls'): excel_data = self.file_processor.process_excel_file(file_path) return self.analyze_excel_data(excel_data, question) elif file_path.endswith('.py'): with open(file_path, 'r') as f: code_content = f.read() return self.file_processor.process_python_code(code_content) elif file_path.endswith('.pdf'): pdf_text = self.file_processor.process_pdf_file(file_path) return self.analyze_text_content(pdf_text, question) except Exception as e: return f"File processing failed: {e}" return "File processing failed" def analyze_excel_data(self, excel_data: Dict, question: str) -> str: """Analyze Excel data to answer questions""" if not excel_data: return "No data found in Excel file" # Convert to DataFrame for analysis try: for sheet_name, data in excel_data.items(): if data: df = pd.DataFrame(data[1:], columns=data[0]) # First row as header # Handle sales analysis questions if "sales" in question.lower(): if "total" in question.lower(): numeric_cols = df.select_dtypes(include=[int, float]).columns if len(numeric_cols) > 0: return str(df[numeric_cols[0]].sum()) elif "average" in question.lower(): numeric_cols = df.select_dtypes(include=[int, float]).columns if len(numeric_cols) > 0: return str(df[numeric_cols[0]].mean()) return "Could not analyze Excel data for this question" except Exception as e: return f"Excel analysis error: {e}" def analyze_text_content(self, text: str, question: str) -> str: """Analyze text content to find answers""" # Look for specific patterns based on question if "surname" in question.lower() or "last name" in question.lower(): names = re.findall(r'\b[A-Z][a-z]+ [A-Z][a-z]+\b', text) if names: return names[0].split()[-1] # Use search to find more specific information search_query = f"{question} {text[:100]}" results = self.search_engine.multi_strategy_search(search_query) return self.search_engine.extract_answer_from_results(results, question) def is_math_question(self, question: str) -> bool: """Detect mathematical questions""" math_indicators = [ 'calculate', 'compute', 'sum', 'average', 'mean', 'total', 'how many', 'how much', 'solve', 'equation' ] return any(indicator in question.lower() for indicator in math_indicators) def handle_math_question(self, question: str) -> str: """Handle mathematical questions""" # Try to extract and solve mathematical expressions expressions = re.findall(r'\b\d+\s*[\+\-\*\/]\s*\d+\b', question) for expr in expressions: try: result = eval(expr) return str(result) except: continue # For word problems, search for the answer results = self.search_engine.multi_strategy_search(question) return self.search_engine.extract_answer_from_results(results, question) def needs_multi_step_reasoning(self, question: str) -> bool: """Check if question needs multi-step reasoning""" multi_step_indicators = [ "who played", "actor who", "person who", "after", "before", "then", "subsequently", "following" ] return any(indicator in question.lower() for indicator in multi_step_indicators) def handle_multi_step_question(self, question: str) -> str: """Handle questions requiring multiple steps""" # Break down complex questions if "actor who played" in question.lower(): return self.handle_actor_chain_question(question) elif "before and after" in question.lower(): return self.handle_sequence_question(question) else: return self.handle_structured_question(question) def handle_actor_chain_question(self, question: str) -> str: """Handle questions about actors playing different roles""" # Step 1: Find the initial actor/role parts = question.split(" in ") if len(parts) >= 2: first_search = f"actor who played {parts[0].split('actor who played')[1]} in {parts[1].split(' play in')[0]}" results1 = self.search_engine.multi_strategy_search(first_search) actor_name = self.search_engine.extract_answer_from_results(results1, f"who is the actor") if actor_name and actor_name != "Answer not found in search results": # Step 2: Find what this actor played in the target show/movie target = parts[1].split(" play in ")[1] if " play in " in parts[1] else parts[1] second_search = f"{actor_name} role in {target}" results2 = self.search_engine.multi_strategy_search(second_search) return self.search_engine.extract_answer_from_results(results2, f"what role did {actor_name} play") # Fallback to single search results = self.search_engine.multi_strategy_search(question) return self.search_engine.extract_answer_from_results(results, question) def handle_sequence_question(self, question: str) -> str: """Handle questions about sequences (before/after)""" results = self.search_engine.multi_strategy_search(question) return self.search_engine.extract_answer_from_results(results, question) def handle_structured_question(self, question: str) -> str: """Handle general structured questions with enhanced search""" results = self.search_engine.multi_strategy_search(question) answer = self.search_engine.extract_answer_from_results(results, question) # If no good answer found, try rephrasing the question if answer == "Answer not found in search results": rephrased_questions = self.rephrase_question(question) for rq in rephrased_questions: results = self.search_engine.multi_strategy_search(rq) answer = self.search_engine.extract_answer_from_results(results, question) if answer != "Answer not found in search results": break return answer def rephrase_question(self, question: str) -> List[str]: """Generate alternative phrasings of the question""" rephrased = [] # Add question marks if missing if not question.endswith('?'): rephrased.append(question + '?') # Remove question words for factual search words_to_remove = ['what is', 'who is', 'where is', 'when is', 'how many', 'how much'] for word in words_to_remove: if word in question.lower(): rephrased.append(question.lower().replace(word, '').strip()) # Add context words context_words = ['information about', 'facts about', 'details about'] for context in context_words: rephrased.append(f"{context} {question}") return rephrased[:3] # Limit to 3 rephrasings def get_enhanced_api_status(): """Check API status with more details""" status = [] if os.getenv("SERPER_API_KEY"): status.append("āœ… Serper API: Configured") else: status.append("āŒ Serper API: Missing - Get key at serper.dev") # Check if we can access file processing libraries try: import openpyxl status.append("āœ… Excel Processing: Available") except ImportError: status.append("āŒ Excel Processing: openpyxl not available") try: import PyPDF2 status.append("āœ… PDF Processing: Available") except ImportError: status.append("āŒ PDF Processing: PyPDF2 not available") return "\n".join(status) def run_enhanced_gaia_evaluation(profile: gr.OAuthProfile | None): """Run GAIA evaluation with enhanced solving capabilities""" if not profile: return "Please log in to Hugging Face first.", None # Check API status api_status = get_enhanced_api_status() if "āŒ Serper API" in api_status: return f"āš ļø Serper API not configured!\n\n{api_status}", None username = profile.username questions_url = f"{DEFAULT_API_URL}/questions" submit_url = f"{DEFAULT_API_URL}/submit" try: solver = EnhancedQuestionSolver() print("āœ… Enhanced question solver initialized") except Exception as e: return f"āŒ Initialization failed: {e}", None try: print("šŸ“„ Fetching questions...") r = requests.get(questions_url, timeout=30) r.raise_for_status() questions = r.json() print(f"āœ… Got {len(questions)} questions") except Exception as e: return f"āŒ Failed to fetch questions: {e}", None answers = [] logs = [] for i, item in enumerate(questions): task_id = item.get("task_id") question = item.get("question") files = item.get("files", []) # Get attached files if any if not task_id or not question: continue print(f"\nšŸ”„ Processing {i+1}/{len(questions)}: {task_id}") print(f"šŸ“ Question: {question[:100]}{'...' if len(question) > 100 else ''}") if files: print(f"šŸ“Ž Files: {files}") try: start_time = time.time() answer = solver.solve_question(question, files) processing_time = time.time() - start_time answers.append({"task_id": task_id, "submitted_answer": answer}) logs.append({ "Task ID": task_id, "Question": question[:150] + "..." if len(question) > 150 else question, "Answer": answer[:100] + "..." if len(answer) > 100 else answer, "Files": len(files) if files else 0, "Time (s)": f"{processing_time:.2f}" }) print(f"āœ… Answer: {answer[:80]}{'...' if len(answer) > 80 else ''}") time.sleep(0.5) # Rate limiting for API except Exception as e: error_msg = f"Error: {str(e)}" answers.append({"task_id": task_id, "submitted_answer": error_msg}) logs.append({ "Task ID": task_id, "Question": question[:150] + "..." if len(question) > 150 else question, "Answer": error_msg, "Files": len(files) if files else 0, "Time (s)": "Error" }) print(f"āŒ Error: {e}") # Submit answers print(f"\nšŸ“¤ Submitting {len(answers)} answers...") payload = { "username": username, "agent_code": f"https://huggingface.co/spaces/{os.getenv('SPACE_ID', '')}/tree/main", "answers": answers } try: resp = requests.post(submit_url, json=payload, timeout=300) # Increased timeout resp.raise_for_status() data = resp.json() score = data.get('score', 'N/A') correct = data.get('correct_count', '?') total = data.get('total_attempted', '?') result_message = f"""šŸŽÆ ENHANCED GAIA EVALUATION RESULTS šŸ“Š Final Score: {score}% ({correct}/{total} correct) šŸ”§ System Status: {api_status} šŸš€ Enhanced Features: • Multi-strategy web search with result caching • Advanced file processing (Excel, PDF, Python) • Multi-step reasoning for complex questions • Context-aware answer extraction • Question rephrasing for better results • Specialized handlers for different question types šŸ“ˆ Performance Improvements: • Better search result processing • Enhanced name/number extraction • Improved mathematical computation • File-based question handling • Actor chain and sequence reasoning""" return result_message, pd.DataFrame(logs) except Exception as e: return f"āŒ Submission failed: {str(e)}", pd.DataFrame(logs) # Enhanced Gradio Interface with gr.Blocks(title="Enhanced GAIA Agent", theme=gr.themes.Soft()) as demo: gr.Markdown(""" # 🧠 Enhanced GAIA Benchmark Agent v2.0 **šŸ”§ Required Setup:** - `SERPER_API_KEY` environment variable - Get 2500 free searches/month at [serper.dev](https://serper.dev) **⚔ Advanced Capabilities:** - šŸ” Multi-strategy web search with intelligent caching - šŸ“Š Excel/CSV file processing and analysis - šŸ Python code execution for computational questions - šŸ“„ PDF document text extraction and analysis - 🧮 Advanced mathematical problem solving - šŸŽ­ Multi-step reasoning for complex actor/person chains - šŸŽÆ Context-aware answer extraction with multiple fallbacks - šŸ“ Question rephrasing for better search results **šŸ“ˆ Expected Performance:** - Significantly improved accuracy on GAIA benchmark - Better handling of file-based questions - Enhanced name/number/date extraction - Robust error handling and fallback strategies """) gr.LoginButton() with gr.Row(): with gr.Column(): api_status_display = gr.Textbox( label="šŸ”§ System Status", value=get_enhanced_api_status(), lines=4, interactive=False ) run_button = gr.Button( "šŸš€ Run Enhanced GAIA Evaluation", variant="primary", size="lg" ) with gr.Row(): results_display = gr.Textbox( label="šŸ“Š Evaluation Results", lines=15, interactive=False ) with gr.Row(): detailed_results = gr.DataFrame( label="šŸ“‹ Detailed Question Analysis", wrap=True, interactive=False ) # Refresh status button refresh_status = gr.Button("šŸ”„ Refresh Status", size="sm") refresh_status.click( lambda: get_enhanced_api_status(), outputs=[api_status_display] ) run_button.click( run_enhanced_gaia_evaluation, outputs=[results_display, detailed_results] ) if __name__ == "__main__": demo.launch(share=True, debug=True)