""" agent.py - Minimal Claude implementation for GAIA challenge ----------------------------------------------------------- A simplified implementation with direct litellm access to Anthropic's Claude """ import base64 import mimetypes import os import re import tempfile import time import random from typing import List, Dict, Any, Optional import requests from urllib.parse import urlparse from smolagents import CodeAgent, DuckDuckGoSearchTool, PythonInterpreterTool, tool # --------------------------------------------------------------------------- # # Constants & helpers # --------------------------------------------------------------------------- # DEFAULT_API_URL = os.getenv( "GAIA_API_URL", "https://agents-course-unit4-scoring.hf.space" ) FILE_TAG = re.compile(r"]+)>") # def _download_file(file_id: str) -> bytes: """Download the attachment for a GAIA task.""" url = f"{DEFAULT_API_URL}/files/{file_id}" resp = requests.get(url, timeout=30) resp.raise_for_status() return resp.content # --------------------------------------------------------------------------- # # Direct Claude model implementation with litellm # --------------------------------------------------------------------------- # class DirectClaudeModel: """ Direct interface to Claude via litellm that works with smolagents This avoids the message format issues by keeping things very simple """ def __init__( self, api_key: Optional[str] = None, temperature: float = 0.1 ): """Initialize the Claude model""" self.api_key = api_key or os.getenv("ANTHROPIC_API_KEY") if not self.api_key: raise ValueError("No Anthropic API key provided") self.temperature = temperature self.model_name = "anthropic/claude-3-5-sonnet-20240620" print(f"Initialized DirectClaudeModel with {self.model_name}") # Sleep random amount to avoid race conditions with many queries time.sleep(random.uniform(1, 3)) def __call__(self, prompt: str, **kwargs) -> str: """ Simple call method that works with smolagents Args: prompt: The user prompt **kwargs: Additional parameters (ignored) Returns: Claude's response as a string """ # Import here to avoid any circular imports from litellm import completion # Use a simple format: system message + user message messages = [ { "role": "system", "content": """You are a concise, highly accurate assistant specialized in solving challenges. Your answers should be precise, direct, and exactly match the expected format. All answers are graded by exact string match, so format carefully!""" }, { "role": "user", "content": prompt } ] # Add delay to avoid rate limits time.sleep(random.uniform(0.5, 2.0)) try: # Make API call with simple format response = completion( model=self.model_name, messages=messages, temperature=self.temperature, max_tokens=1024, api_key=self.api_key ) # Extract and return the text content only return response.choices[0].message.content except Exception as e: # If it's a rate limit error, wait and retry if "rate_limit" in str(e).lower(): print(f"Rate limit hit, waiting 30 seconds: {e}") time.sleep(30) return self.__call__(prompt, **kwargs) else: print(f"Error: {str(e)}") raise # --------------------------------------------------------------------------- # # Custom tool: fetch GAIA attachments # --------------------------------------------------------------------------- # @tool def gaia_file_reader(file_id: str) -> str: """ Download a GAIA attachment and return its contents """ try: raw = _download_file(file_id) mime = mimetypes.guess_type(file_id)[0] or "application/octet-stream" if mime.startswith("text") or mime in ("application/json",): return raw.decode(errors="ignore") return base64.b64encode(raw).decode() except Exception as exc: return f"ERROR downloading {file_id}: {exc}" # --------------------------------------------------------------------------- # # Additional tools # --------------------------------------------------------------------------- # @tool def save_and_read_file(content: str, filename: Optional[str] = None) -> str: """Save content to a file and return the path""" temp_dir = tempfile.gettempdir() if filename is None: temp_file = tempfile.NamedTemporaryFile(delete=False) filepath = temp_file.name else: filepath = os.path.join(temp_dir, filename) with open(filepath, 'w') as f: f.write(content) return f"File saved to {filepath}." @tool def analyze_csv_file(file_path: str, query: str) -> str: """Analyze a CSV file with pandas""" try: import pandas as pd df = pd.read_csv(file_path) result = f"CSV file loaded with {len(df)} rows and {len(df.columns)} columns.\n" result += f"Columns: {', '.join(df.columns)}\n\n" result += "Summary statistics:\n" result += str(df.describe()) return result except ImportError: return "Error: pandas is not installed." except Exception as e: return f"Error analyzing CSV file: {str(e)}" @tool def analyze_excel_file(file_path: str, query: str) -> str: """Analyze an Excel file with pandas""" try: import pandas as pd df = pd.read_excel(file_path) result = f"Excel file loaded with {len(df)} rows and {len(df.columns)} columns.\n" result += f"Columns: {', '.join(df.columns)}\n\n" result += "Summary statistics:\n" result += str(df.describe()) return result except ImportError: return "Error: pandas and openpyxl are not installed." except Exception as e: return f"Error analyzing Excel file: {str(e)}" # --------------------------------------------------------------------------- # # ClaudeAgent - Main class for GAIA challenge # --------------------------------------------------------------------------- # class ClaudeAgent: """A simplified Claude agent for the GAIA challenge""" def __init__(self): """Initialize the agent with Claude""" try: # Get API key api_key = os.getenv("ANTHROPIC_API_KEY") if not api_key: raise ValueError("ANTHROPIC_API_KEY environment variable not found") print("✅ Initializing ClaudeAgent") # Create the model with direct implementation model = DirectClaudeModel(api_key=api_key, temperature=0.1) # Set up tools tools = [ DuckDuckGoSearchTool(), PythonInterpreterTool(), save_and_read_file, analyze_csv_file, analyze_excel_file, gaia_file_reader ] # Create the CodeAgent self.agent = CodeAgent( tools=tools, model=model, additional_authorized_imports=["pandas", "numpy", "json", "re", "math"], executor_type="local", verbosity_level=2 ) print("Agent initialized successfully") except Exception as e: print(f"Error initializing ClaudeAgent: {e}") raise def __call__(self, question: str) -> str: """Process a question and return the answer""" try: print(f"Processing question: {question[:100]}..." if len(question) > 100 else question) # Add a small delay between questions time.sleep(random.uniform(1.0, 3.0)) # Handle file references file_match = re.search(r"]+)>", question) if file_match: file_id = file_match.group(1) print(f"Detected file: {file_id}") # Download file try: file_content = _download_file(file_id) temp_dir = tempfile.gettempdir() file_path = os.path.join(temp_dir, file_id) with open(file_path, 'wb') as f: f.write(file_content) # Remove file tag from question clean_question = re.sub(r"]+>", "", question).strip() # Build prompt with file context prompt = f""" Question: {clean_question} There is a file available at path: {file_path} Use appropriate tools to analyze this file if needed. Answer the question directly and precisely. """ except Exception as e: print(f"Error downloading file: {e}") prompt = question else: # Handle reversed text separately if question.startswith(".") or ".rewsna eht sa" in question: prompt = f""" This question is in reversed text. Here's the normal version: {question[::-1]} Answer the question directly and precisely. """ else: prompt = question # Execute agent with prompt answer = self.agent.run(prompt) # Clean up response answer = self._clean_answer(answer) print(f"Generated answer: {answer}") return answer except Exception as e: print(f"Error: {str(e)}") return f"Error processing question: {str(e)}" def _clean_answer(self, answer: any) -> str: """Clean up the answer for exact matching""" if not isinstance(answer, str): return str(answer) # Normalize spacing answer = answer.strip() # Remove common prefixes prefixes = [ "The answer is ", "Answer: ", "Final answer: ", "The result is ", "Based on the information provided, " ] for prefix in prefixes: if answer.startswith(prefix): answer = answer[len(prefix):].strip() # Remove quotes if (answer.startswith('"') and answer.endswith('"')) or ( answer.startswith("'") and answer.endswith("'") ): answer = answer[1:-1].strip() return answer