Spaces:
Runtime error
Runtime error
import os | |
import gradio as gr | |
import requests | |
import json | |
import re | |
import numexpr | |
import pandas as pd | |
import math | |
import pdfminer | |
from duckduckgo_search import DDGS | |
from pdfminer.high_level import extract_text | |
from bs4 import BeautifulSoup | |
import html2text | |
from typing import Dict, Any, List, Tuple, Callable, Optional | |
from dotenv import load_dotenv | |
from transformers import AutoModelForCausalLM, AutoTokenizer, GenerationConfig | |
import torch | |
import time | |
import gc | |
import threading | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
# --- Load Environment Variables --- | |
load_dotenv() | |
SERPER_API_KEY = os.getenv("SERPER_API_KEY") | |
# --- Constants --- | |
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
MAX_STEPS = 4 # Reduced from 6 | |
MAX_TOKENS = 128 # Reduced from 256 | |
MODEL_NAME = "microsoft/Phi-3-mini-4k-instruct" | |
TIMEOUT_PER_QUESTION = 30 # 30 seconds max per question | |
# --- Configure Environment for Hugging Face Spaces --- | |
os.environ["PIP_BREAK_SYSTEM_PACKAGES"] = "1" | |
os.environ["HF_HUB_DISABLE_SYMLINKS_WARNING"] = "1" | |
os.environ["BITSANDBYTES_NOWELCOME"] = "1" | |
print("Loading model (CPU-optimized)...") | |
start_time = time.time() | |
# Load model with aggressive optimization | |
model = AutoModelForCausalLM.from_pretrained( | |
MODEL_NAME, | |
trust_remote_code=True, | |
torch_dtype=torch.float32, | |
device_map="cpu", | |
low_cpu_mem_usage=True, | |
use_cache=False, | |
attn_implementation="eager" # Use eager attention for better CPU performance | |
) | |
tokenizer = AutoTokenizer.from_pretrained( | |
MODEL_NAME, | |
use_fast=True, # Changed to True for faster tokenization | |
trust_remote_code=True | |
) | |
if tokenizer.pad_token is None: | |
tokenizer.pad_token = tokenizer.eos_token | |
load_time = time.time() - start_time | |
print(f"Model loaded in {load_time:.2f} seconds") | |
# --- Optimized Tools --- | |
def web_search(query: str) -> str: | |
"""Search the web with timeout and result limiting""" | |
try: | |
if SERPER_API_KEY: | |
params = {'q': query, 'num': 2, 'hl': 'en', 'gl': 'us'} | |
headers = {'X-API-KEY': SERPER_API_KEY, 'Content-Type': 'application/json'} | |
response = requests.post( | |
'https://google.serper.dev/search', | |
headers=headers, | |
json=params, | |
timeout=5 # Reduced timeout | |
) | |
results = response.json() | |
if 'organic' in results: | |
return json.dumps([f"{r['title']}: {r['snippet'][:100]}" for r in results['organic'][:2]]) | |
return "No results found" | |
else: | |
with DDGS() as ddgs: | |
results = [r for r in ddgs.text(query, max_results=2)] | |
return json.dumps([f"{r['title']}: {r['body'][:100]}" for r in results]) | |
except Exception as e: | |
return f"Search error: {str(e)}" | |
def calculator(expression: str) -> str: | |
"""Fast mathematical evaluation""" | |
try: | |
expression = re.sub(r'[^\d+\-*/().\s]', '', expression) | |
result = numexpr.evaluate(expression) | |
return str(float(result)) | |
except Exception as e: | |
return f"Calculation error: {str(e)}" | |
def read_pdf(file_path: str) -> str: | |
"""Extract text from PDF with length limit""" | |
try: | |
text = extract_text(file_path) | |
return text[:1000] if text else "No text found in PDF" # Reduced limit | |
except Exception as e: | |
return f"PDF read error: {str(e)}" | |
def read_webpage(url: str) -> str: | |
"""Fast webpage reading with aggressive limits""" | |
try: | |
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'} | |
response = requests.get(url, timeout=5, headers=headers) # Reduced timeout | |
response.raise_for_status() | |
soup = BeautifulSoup(response.text, 'html.parser') | |
for script in soup(["script", "style"]): | |
script.decompose() | |
text = soup.get_text(separator=' ', strip=True) | |
return text[:1000] if text else "No text found on webpage" # Reduced limit | |
except Exception as e: | |
return f"Webpage read error: {str(e)}" | |
TOOLS = { | |
"web_search": web_search, | |
"calculator": calculator, | |
"read_pdf": read_pdf, | |
"read_webpage": read_webpage | |
} | |
# --- Optimized GAIA Agent --- | |
class GAIA_Agent: | |
def __init__(self): | |
self.tools = TOOLS | |
self.system_prompt = ( | |
"You are a GAIA problem solver. Tools: {web_search, calculator, read_pdf, read_webpage}.\n" | |
"Be concise and direct. Use tools efficiently.\n" | |
"Tool format: ```json\n{'tool': 'tool_name', 'args': {'arg1': value}}```\n" | |
"End with: Final Answer: [exact answer]" | |
) | |
def __call__(self, question: str) -> str: | |
start_time = time.time() | |
print(f"Processing: {question[:50]}...") | |
try: | |
history = [f"Question: {question}"] | |
for step in range(MAX_STEPS): | |
# Check timeout | |
if time.time() - start_time > TIMEOUT_PER_QUESTION: | |
return "TIMEOUT: Question took too long" | |
prompt = self._build_prompt(history) | |
response = self._call_model(prompt) | |
if "Final Answer" in response: | |
answer = response.split("Final Answer:")[-1].strip() | |
elapsed = time.time() - start_time | |
print(f"Completed in {elapsed:.1f}s: {answer[:30]}...") | |
return answer | |
tool_call = self._parse_tool_call(response) | |
if tool_call: | |
tool_name, args = tool_call | |
observation = self._use_tool(tool_name, args) | |
history.append(f"Action: {tool_name}") | |
history.append(f"Result: {observation}") | |
else: | |
history.append(f"Thought: {response}") | |
# Aggressive memory cleanup | |
gc.collect() | |
return "Could not solve within step limit" | |
except Exception as e: | |
print(f"Agent error: {str(e)}") | |
return f"Error: {str(e)}" | |
def _build_prompt(self, history: List[str]) -> str: | |
prompt = "<|system|>\n" + self.system_prompt + "<|end|>\n" | |
prompt += "<|user|>\n" + "\n".join(history) + "<|end|>\n" | |
prompt += "<|assistant|>" | |
return prompt | |
def _call_model(self, prompt: str) -> str: | |
try: | |
inputs = tokenizer( | |
prompt, | |
return_tensors="pt", | |
truncation=True, | |
max_length=2048, # Reduced context | |
padding=False | |
) | |
generation_config = GenerationConfig( | |
max_new_tokens=MAX_TOKENS, | |
temperature=0.1, # Less randomness for faster convergence | |
do_sample=True, | |
pad_token_id=tokenizer.pad_token_id, | |
eos_token_id=tokenizer.eos_token_id, | |
use_cache=False | |
) | |
with torch.no_grad(): | |
outputs = model.generate( | |
inputs.input_ids, | |
generation_config=generation_config, | |
attention_mask=inputs.attention_mask | |
) | |
full_response = tokenizer.decode(outputs[0], skip_special_tokens=True) | |
response = full_response.split("<|assistant|>")[-1].strip() | |
# Immediate cleanup | |
del inputs, outputs | |
torch.cuda.empty_cache() if torch.cuda.is_available() else None | |
return response | |
except Exception as e: | |
return f"Generation error: {str(e)}" | |
def _parse_tool_call(self, text: str) -> Optional[Tuple[str, Dict]]: | |
try: | |
json_match = re.search(r'```json\s*({.*?})\s*```', text, re.DOTALL) | |
if json_match: | |
tool_call = json.loads(json_match.group(1)) | |
if "tool" in tool_call and "args" in tool_call: | |
return tool_call["tool"], tool_call["args"] | |
except: | |
pass | |
return None | |
def _use_tool(self, tool_name: str, args: Dict) -> str: | |
if tool_name not in self.tools: | |
return f"Unknown tool: {tool_name}" | |
try: | |
result = self.tools[tool_name](**args) | |
return str(result)[:300] # Truncate results | |
except Exception as e: | |
return f"Tool error: {str(e)}" | |
# --- Optimized Evaluation Runner --- | |
def run_and_submit_all(profile: gr.OAuthProfile | None): | |
"""Fast evaluation with parallel processing where possible""" | |
space_id = os.getenv("SPACE_ID") | |
if not profile: | |
return "Please Login to Hugging Face with the button.", None | |
username = profile.username | |
api_url = DEFAULT_API_URL | |
questions_url = f"{api_url}/questions" | |
submit_url = f"{api_url}/submit" | |
try: | |
agent = GAIA_Agent() | |
except Exception as e: | |
return f"Error initializing agent: {e}", None | |
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" | |
# Fetch Questions | |
try: | |
response = requests.get(questions_url, timeout=15) | |
response.raise_for_status() | |
questions_data = response.json() | |
if not questions_data: | |
return "No questions found.", None | |
print(f"Processing {len(questions_data)} questions...") | |
except Exception as e: | |
return f"Error fetching questions: {e}", None | |
# Process questions with progress tracking | |
results_log = [] | |
answers_payload = [] | |
total_start = time.time() | |
for i, item in enumerate(questions_data): | |
task_id = item.get("task_id") | |
question_text = item.get("question") | |
if not task_id or question_text is None: | |
continue | |
try: | |
print(f"[{i+1}/{len(questions_data)}] Processing {task_id}...") | |
submitted_answer = agent(question_text) | |
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) | |
results_log.append({ | |
"Task ID": task_id, | |
"Question": question_text[:80] + "..." if len(question_text) > 80 else question_text, | |
"Answer": submitted_answer[:100] + "..." if len(submitted_answer) > 100 else submitted_answer | |
}) | |
# Memory cleanup every few questions | |
if i % 3 == 0: | |
gc.collect() | |
except Exception as e: | |
error_answer = f"ERROR: {str(e)}" | |
answers_payload.append({"task_id": task_id, "submitted_answer": error_answer}) | |
results_log.append({ | |
"Task ID": task_id, | |
"Question": question_text[:80] + "..." if len(question_text) > 80 else question_text, | |
"Answer": error_answer | |
}) | |
total_time = time.time() - total_start | |
print(f"All questions processed in {total_time:.1f} seconds") | |
if not answers_payload: | |
return "No answers generated.", pd.DataFrame(results_log) | |
# Submit results | |
submission_data = { | |
"username": username.strip(), | |
"agent_code": agent_code, | |
"answers": answers_payload | |
} | |
try: | |
response = requests.post(submit_url, json=submission_data, timeout=60) | |
response.raise_for_status() | |
result_data = response.json() | |
final_status = ( | |
f"β Submission Successful!\n" | |
f"User: {result_data.get('username')}\n" | |
f"Score: {result_data.get('score', 'N/A')}% " | |
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" | |
f"Processing Time: {total_time:.1f}s\n" | |
f"Message: {result_data.get('message', 'No message')}" | |
) | |
results_df = pd.DataFrame(results_log) | |
return final_status, results_df | |
except Exception as e: | |
error_msg = f"β Submission Failed: {str(e)}" | |
results_df = pd.DataFrame(results_log) | |
return error_msg, results_df | |
# --- Gradio Interface --- | |
with gr.Blocks(title="GAIA Agent - Fast Mode") as demo: | |
gr.Markdown("# π GAIA Agent Evaluation (Optimized)") | |
gr.Markdown( | |
""" | |
**Fast Mode Optimizations:** | |
- Reduced max steps: 4 per question | |
- Shorter token generation: 128 tokens max | |
- 30s timeout per question | |
- Aggressive memory management | |
**Usage:** Login β Click Run β View Results | |
""" | |
) | |
with gr.Row(): | |
gr.LoginButton() | |
with gr.Row(): | |
run_button = gr.Button("πββοΈ Run Fast Evaluation", variant="primary", size="lg") | |
with gr.Row(): | |
status_output = gr.Textbox( | |
label="π Status & Results", | |
lines=6, | |
interactive=False, | |
placeholder="Ready to run evaluation..." | |
) | |
with gr.Row(): | |
results_table = gr.DataFrame( | |
label="π Questions & Answers", | |
wrap=True, | |
interactive=False | |
) | |
run_button.click( | |
fn=run_and_submit_all, | |
outputs=[status_output, results_table], | |
show_progress=True | |
) | |
if __name__ == "__main__": | |
print("π GAIA Agent Fast Mode Starting...") | |
print(f"βοΈ Max Steps: {MAX_STEPS}, Max Tokens: {MAX_TOKENS}") | |
print(f"β±οΈ Timeout per question: {TIMEOUT_PER_QUESTION}s") | |
demo.launch( | |
debug=False, | |
share=False, | |
server_name="0.0.0.0", | |
server_port=7860, | |
show_error=True | |
) |