import os import gradio as gr import requests import json import re import numexpr import pandas as pd import math import pdfminer from duckduckgo_search import DDGS from pdfminer.high_level import extract_text from bs4 import BeautifulSoup import html2text from typing import Dict, Any, List, Tuple, Callable, Optional from dotenv import load_dotenv from transformers import AutoModelForCausalLM, AutoTokenizer, GenerationConfig import torch import time import gc import threading from concurrent.futures import ThreadPoolExecutor, as_completed # --- Load Environment Variables --- load_dotenv() SERPER_API_KEY = os.getenv("SERPER_API_KEY") # --- Constants --- DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" MAX_STEPS = 4 # Reduced from 6 MAX_TOKENS = 128 # Reduced from 256 MODEL_NAME = "microsoft/Phi-3-mini-4k-instruct" TIMEOUT_PER_QUESTION = 30 # 30 seconds max per question # --- Configure Environment for Hugging Face Spaces --- os.environ["PIP_BREAK_SYSTEM_PACKAGES"] = "1" os.environ["HF_HUB_DISABLE_SYMLINKS_WARNING"] = "1" os.environ["BITSANDBYTES_NOWELCOME"] = "1" print("Loading model (CPU-optimized)...") start_time = time.time() # Load model with aggressive optimization model = AutoModelForCausalLM.from_pretrained( MODEL_NAME, trust_remote_code=True, torch_dtype=torch.float32, device_map="cpu", low_cpu_mem_usage=True, use_cache=False, attn_implementation="eager" # Use eager attention for better CPU performance ) tokenizer = AutoTokenizer.from_pretrained( MODEL_NAME, use_fast=True, # Changed to True for faster tokenization trust_remote_code=True ) if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token load_time = time.time() - start_time print(f"Model loaded in {load_time:.2f} seconds") # --- Optimized Tools --- def web_search(query: str) -> str: """Search the web with timeout and result limiting""" try: if SERPER_API_KEY: params = {'q': query, 'num': 2, 'hl': 'en', 'gl': 'us'} headers = {'X-API-KEY': SERPER_API_KEY, 'Content-Type': 'application/json'} response = requests.post( 'https://google.serper.dev/search', headers=headers, json=params, timeout=5 # Reduced timeout ) results = response.json() if 'organic' in results: return json.dumps([f"{r['title']}: {r['snippet'][:100]}" for r in results['organic'][:2]]) return "No results found" else: with DDGS() as ddgs: results = [r for r in ddgs.text(query, max_results=2)] return json.dumps([f"{r['title']}: {r['body'][:100]}" for r in results]) except Exception as e: return f"Search error: {str(e)}" def calculator(expression: str) -> str: """Fast mathematical evaluation""" try: expression = re.sub(r'[^\d+\-*/().\s]', '', expression) result = numexpr.evaluate(expression) return str(float(result)) except Exception as e: return f"Calculation error: {str(e)}" def read_pdf(file_path: str) -> str: """Extract text from PDF with length limit""" try: text = extract_text(file_path) return text[:1000] if text else "No text found in PDF" # Reduced limit except Exception as e: return f"PDF read error: {str(e)}" def read_webpage(url: str) -> str: """Fast webpage reading with aggressive limits""" try: headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'} response = requests.get(url, timeout=5, headers=headers) # Reduced timeout response.raise_for_status() soup = BeautifulSoup(response.text, 'html.parser') for script in soup(["script", "style"]): script.decompose() text = soup.get_text(separator=' ', strip=True) return text[:1000] if text else "No text found on webpage" # Reduced limit except Exception as e: return f"Webpage read error: {str(e)}" TOOLS = { "web_search": web_search, "calculator": calculator, "read_pdf": read_pdf, "read_webpage": read_webpage } # --- Optimized GAIA Agent --- class GAIA_Agent: def __init__(self): self.tools = TOOLS self.system_prompt = ( "You are a GAIA problem solver. Tools: {web_search, calculator, read_pdf, read_webpage}.\n" "Be concise and direct. Use tools efficiently.\n" "Tool format: ```json\n{'tool': 'tool_name', 'args': {'arg1': value}}```\n" "End with: Final Answer: [exact answer]" ) def __call__(self, question: str) -> str: start_time = time.time() print(f"Processing: {question[:50]}...") try: history = [f"Question: {question}"] for step in range(MAX_STEPS): # Check timeout if time.time() - start_time > TIMEOUT_PER_QUESTION: return "TIMEOUT: Question took too long" prompt = self._build_prompt(history) response = self._call_model(prompt) if "Final Answer" in response: answer = response.split("Final Answer:")[-1].strip() elapsed = time.time() - start_time print(f"Completed in {elapsed:.1f}s: {answer[:30]}...") return answer tool_call = self._parse_tool_call(response) if tool_call: tool_name, args = tool_call observation = self._use_tool(tool_name, args) history.append(f"Action: {tool_name}") history.append(f"Result: {observation}") else: history.append(f"Thought: {response}") # Aggressive memory cleanup gc.collect() return "Could not solve within step limit" except Exception as e: print(f"Agent error: {str(e)}") return f"Error: {str(e)}" def _build_prompt(self, history: List[str]) -> str: prompt = "<|system|>\n" + self.system_prompt + "<|end|>\n" prompt += "<|user|>\n" + "\n".join(history) + "<|end|>\n" prompt += "<|assistant|>" return prompt def _call_model(self, prompt: str) -> str: try: inputs = tokenizer( prompt, return_tensors="pt", truncation=True, max_length=2048, # Reduced context padding=False ) generation_config = GenerationConfig( max_new_tokens=MAX_TOKENS, temperature=0.1, # Less randomness for faster convergence do_sample=True, pad_token_id=tokenizer.pad_token_id, eos_token_id=tokenizer.eos_token_id, use_cache=False ) with torch.no_grad(): outputs = model.generate( inputs.input_ids, generation_config=generation_config, attention_mask=inputs.attention_mask ) full_response = tokenizer.decode(outputs[0], skip_special_tokens=True) response = full_response.split("<|assistant|>")[-1].strip() # Immediate cleanup del inputs, outputs torch.cuda.empty_cache() if torch.cuda.is_available() else None return response except Exception as e: return f"Generation error: {str(e)}" def _parse_tool_call(self, text: str) -> Optional[Tuple[str, Dict]]: try: json_match = re.search(r'```json\s*({.*?})\s*```', text, re.DOTALL) if json_match: tool_call = json.loads(json_match.group(1)) if "tool" in tool_call and "args" in tool_call: return tool_call["tool"], tool_call["args"] except: pass return None def _use_tool(self, tool_name: str, args: Dict) -> str: if tool_name not in self.tools: return f"Unknown tool: {tool_name}" try: result = self.tools[tool_name](**args) return str(result)[:300] # Truncate results except Exception as e: return f"Tool error: {str(e)}" # --- Optimized Evaluation Runner --- def run_and_submit_all(profile: gr.OAuthProfile | None): """Fast evaluation with parallel processing where possible""" space_id = os.getenv("SPACE_ID") if not profile: return "Please Login to Hugging Face with the button.", None username = profile.username api_url = DEFAULT_API_URL questions_url = f"{api_url}/questions" submit_url = f"{api_url}/submit" try: agent = GAIA_Agent() except Exception as e: return f"Error initializing agent: {e}", None agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" # Fetch Questions try: response = requests.get(questions_url, timeout=15) response.raise_for_status() questions_data = response.json() if not questions_data: return "No questions found.", None print(f"Processing {len(questions_data)} questions...") except Exception as e: return f"Error fetching questions: {e}", None # Process questions with progress tracking results_log = [] answers_payload = [] total_start = time.time() for i, item in enumerate(questions_data): task_id = item.get("task_id") question_text = item.get("question") if not task_id or question_text is None: continue try: print(f"[{i+1}/{len(questions_data)}] Processing {task_id}...") submitted_answer = agent(question_text) answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) results_log.append({ "Task ID": task_id, "Question": question_text[:80] + "..." if len(question_text) > 80 else question_text, "Answer": submitted_answer[:100] + "..." if len(submitted_answer) > 100 else submitted_answer }) # Memory cleanup every few questions if i % 3 == 0: gc.collect() except Exception as e: error_answer = f"ERROR: {str(e)}" answers_payload.append({"task_id": task_id, "submitted_answer": error_answer}) results_log.append({ "Task ID": task_id, "Question": question_text[:80] + "..." if len(question_text) > 80 else question_text, "Answer": error_answer }) total_time = time.time() - total_start print(f"All questions processed in {total_time:.1f} seconds") if not answers_payload: return "No answers generated.", pd.DataFrame(results_log) # Submit results submission_data = { "username": username.strip(), "agent_code": agent_code, "answers": answers_payload } try: response = requests.post(submit_url, json=submission_data, timeout=60) response.raise_for_status() result_data = response.json() final_status = ( f"✅ Submission Successful!\n" f"User: {result_data.get('username')}\n" f"Score: {result_data.get('score', 'N/A')}% " f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" f"Processing Time: {total_time:.1f}s\n" f"Message: {result_data.get('message', 'No message')}" ) results_df = pd.DataFrame(results_log) return final_status, results_df except Exception as e: error_msg = f"❌ Submission Failed: {str(e)}" results_df = pd.DataFrame(results_log) return error_msg, results_df # --- Gradio Interface --- with gr.Blocks(title="GAIA Agent - Fast Mode") as demo: gr.Markdown("# 🚀 GAIA Agent Evaluation (Optimized)") gr.Markdown( """ **Fast Mode Optimizations:** - Reduced max steps: 4 per question - Shorter token generation: 128 tokens max - 30s timeout per question - Aggressive memory management **Usage:** Login → Click Run → View Results """ ) with gr.Row(): gr.LoginButton() with gr.Row(): run_button = gr.Button("🏃‍♂️ Run Fast Evaluation", variant="primary", size="lg") with gr.Row(): status_output = gr.Textbox( label="📊 Status & Results", lines=6, interactive=False, placeholder="Ready to run evaluation..." ) with gr.Row(): results_table = gr.DataFrame( label="📝 Questions & Answers", wrap=True, interactive=False ) run_button.click( fn=run_and_submit_all, outputs=[status_output, results_table], show_progress=True ) if __name__ == "__main__": print("🚀 GAIA Agent Fast Mode Starting...") print(f"⚙️ Max Steps: {MAX_STEPS}, Max Tokens: {MAX_TOKENS}") print(f"⏱️ Timeout per question: {TIMEOUT_PER_QUESTION}s") demo.launch( debug=False, share=False, server_name="0.0.0.0", server_port=7860, show_error=True )