LamiaYT's picture
fixing
cccb073
raw
history blame
13.8 kB
import os
import gradio as gr
import requests
import json
import re
import numexpr
import pandas as pd
import math
import pdfminer
from duckduckgo_search import DDGS
from pdfminer.high_level import extract_text
from bs4 import BeautifulSoup
import html2text
from typing import Dict, Any, List, Tuple, Callable, Optional
from dotenv import load_dotenv
from transformers import AutoModelForCausalLM, AutoTokenizer, GenerationConfig
import torch
import time
import gc
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
# --- Load Environment Variables ---
load_dotenv()
SERPER_API_KEY = os.getenv("SERPER_API_KEY")
# --- Constants ---
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
MAX_STEPS = 4 # Reduced from 6
MAX_TOKENS = 128 # Reduced from 256
MODEL_NAME = "microsoft/Phi-3-mini-4k-instruct"
TIMEOUT_PER_QUESTION = 30 # 30 seconds max per question
# --- Configure Environment for Hugging Face Spaces ---
os.environ["PIP_BREAK_SYSTEM_PACKAGES"] = "1"
os.environ["HF_HUB_DISABLE_SYMLINKS_WARNING"] = "1"
os.environ["BITSANDBYTES_NOWELCOME"] = "1"
print("Loading model (CPU-optimized)...")
start_time = time.time()
# Load model with aggressive optimization
model = AutoModelForCausalLM.from_pretrained(
MODEL_NAME,
trust_remote_code=True,
torch_dtype=torch.float32,
device_map="cpu",
low_cpu_mem_usage=True,
use_cache=False,
attn_implementation="eager" # Use eager attention for better CPU performance
)
tokenizer = AutoTokenizer.from_pretrained(
MODEL_NAME,
use_fast=True, # Changed to True for faster tokenization
trust_remote_code=True
)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
load_time = time.time() - start_time
print(f"Model loaded in {load_time:.2f} seconds")
# --- Optimized Tools ---
def web_search(query: str) -> str:
"""Search the web with timeout and result limiting"""
try:
if SERPER_API_KEY:
params = {'q': query, 'num': 2, 'hl': 'en', 'gl': 'us'}
headers = {'X-API-KEY': SERPER_API_KEY, 'Content-Type': 'application/json'}
response = requests.post(
'https://google.serper.dev/search',
headers=headers,
json=params,
timeout=5 # Reduced timeout
)
results = response.json()
if 'organic' in results:
return json.dumps([f"{r['title']}: {r['snippet'][:100]}" for r in results['organic'][:2]])
return "No results found"
else:
with DDGS() as ddgs:
results = [r for r in ddgs.text(query, max_results=2)]
return json.dumps([f"{r['title']}: {r['body'][:100]}" for r in results])
except Exception as e:
return f"Search error: {str(e)}"
def calculator(expression: str) -> str:
"""Fast mathematical evaluation"""
try:
expression = re.sub(r'[^\d+\-*/().\s]', '', expression)
result = numexpr.evaluate(expression)
return str(float(result))
except Exception as e:
return f"Calculation error: {str(e)}"
def read_pdf(file_path: str) -> str:
"""Extract text from PDF with length limit"""
try:
text = extract_text(file_path)
return text[:1000] if text else "No text found in PDF" # Reduced limit
except Exception as e:
return f"PDF read error: {str(e)}"
def read_webpage(url: str) -> str:
"""Fast webpage reading with aggressive limits"""
try:
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'}
response = requests.get(url, timeout=5, headers=headers) # Reduced timeout
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
for script in soup(["script", "style"]):
script.decompose()
text = soup.get_text(separator=' ', strip=True)
return text[:1000] if text else "No text found on webpage" # Reduced limit
except Exception as e:
return f"Webpage read error: {str(e)}"
TOOLS = {
"web_search": web_search,
"calculator": calculator,
"read_pdf": read_pdf,
"read_webpage": read_webpage
}
# --- Optimized GAIA Agent ---
class GAIA_Agent:
def __init__(self):
self.tools = TOOLS
self.system_prompt = (
"You are a GAIA problem solver. Tools: {web_search, calculator, read_pdf, read_webpage}.\n"
"Be concise and direct. Use tools efficiently.\n"
"Tool format: ```json\n{'tool': 'tool_name', 'args': {'arg1': value}}```\n"
"End with: Final Answer: [exact answer]"
)
def __call__(self, question: str) -> str:
start_time = time.time()
print(f"Processing: {question[:50]}...")
try:
history = [f"Question: {question}"]
for step in range(MAX_STEPS):
# Check timeout
if time.time() - start_time > TIMEOUT_PER_QUESTION:
return "TIMEOUT: Question took too long"
prompt = self._build_prompt(history)
response = self._call_model(prompt)
if "Final Answer" in response:
answer = response.split("Final Answer:")[-1].strip()
elapsed = time.time() - start_time
print(f"Completed in {elapsed:.1f}s: {answer[:30]}...")
return answer
tool_call = self._parse_tool_call(response)
if tool_call:
tool_name, args = tool_call
observation = self._use_tool(tool_name, args)
history.append(f"Action: {tool_name}")
history.append(f"Result: {observation}")
else:
history.append(f"Thought: {response}")
# Aggressive memory cleanup
gc.collect()
return "Could not solve within step limit"
except Exception as e:
print(f"Agent error: {str(e)}")
return f"Error: {str(e)}"
def _build_prompt(self, history: List[str]) -> str:
prompt = "<|system|>\n" + self.system_prompt + "<|end|>\n"
prompt += "<|user|>\n" + "\n".join(history) + "<|end|>\n"
prompt += "<|assistant|>"
return prompt
def _call_model(self, prompt: str) -> str:
try:
inputs = tokenizer(
prompt,
return_tensors="pt",
truncation=True,
max_length=2048, # Reduced context
padding=False
)
generation_config = GenerationConfig(
max_new_tokens=MAX_TOKENS,
temperature=0.1, # Less randomness for faster convergence
do_sample=True,
pad_token_id=tokenizer.pad_token_id,
eos_token_id=tokenizer.eos_token_id,
use_cache=False
)
with torch.no_grad():
outputs = model.generate(
inputs.input_ids,
generation_config=generation_config,
attention_mask=inputs.attention_mask
)
full_response = tokenizer.decode(outputs[0], skip_special_tokens=True)
response = full_response.split("<|assistant|>")[-1].strip()
# Immediate cleanup
del inputs, outputs
torch.cuda.empty_cache() if torch.cuda.is_available() else None
return response
except Exception as e:
return f"Generation error: {str(e)}"
def _parse_tool_call(self, text: str) -> Optional[Tuple[str, Dict]]:
try:
json_match = re.search(r'```json\s*({.*?})\s*```', text, re.DOTALL)
if json_match:
tool_call = json.loads(json_match.group(1))
if "tool" in tool_call and "args" in tool_call:
return tool_call["tool"], tool_call["args"]
except:
pass
return None
def _use_tool(self, tool_name: str, args: Dict) -> str:
if tool_name not in self.tools:
return f"Unknown tool: {tool_name}"
try:
result = self.tools[tool_name](**args)
return str(result)[:300] # Truncate results
except Exception as e:
return f"Tool error: {str(e)}"
# --- Optimized Evaluation Runner ---
def run_and_submit_all(profile: gr.OAuthProfile | None):
"""Fast evaluation with parallel processing where possible"""
space_id = os.getenv("SPACE_ID")
if not profile:
return "Please Login to Hugging Face with the button.", None
username = profile.username
api_url = DEFAULT_API_URL
questions_url = f"{api_url}/questions"
submit_url = f"{api_url}/submit"
try:
agent = GAIA_Agent()
except Exception as e:
return f"Error initializing agent: {e}", None
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main"
# Fetch Questions
try:
response = requests.get(questions_url, timeout=15)
response.raise_for_status()
questions_data = response.json()
if not questions_data:
return "No questions found.", None
print(f"Processing {len(questions_data)} questions...")
except Exception as e:
return f"Error fetching questions: {e}", None
# Process questions with progress tracking
results_log = []
answers_payload = []
total_start = time.time()
for i, item in enumerate(questions_data):
task_id = item.get("task_id")
question_text = item.get("question")
if not task_id or question_text is None:
continue
try:
print(f"[{i+1}/{len(questions_data)}] Processing {task_id}...")
submitted_answer = agent(question_text)
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer})
results_log.append({
"Task ID": task_id,
"Question": question_text[:80] + "..." if len(question_text) > 80 else question_text,
"Answer": submitted_answer[:100] + "..." if len(submitted_answer) > 100 else submitted_answer
})
# Memory cleanup every few questions
if i % 3 == 0:
gc.collect()
except Exception as e:
error_answer = f"ERROR: {str(e)}"
answers_payload.append({"task_id": task_id, "submitted_answer": error_answer})
results_log.append({
"Task ID": task_id,
"Question": question_text[:80] + "..." if len(question_text) > 80 else question_text,
"Answer": error_answer
})
total_time = time.time() - total_start
print(f"All questions processed in {total_time:.1f} seconds")
if not answers_payload:
return "No answers generated.", pd.DataFrame(results_log)
# Submit results
submission_data = {
"username": username.strip(),
"agent_code": agent_code,
"answers": answers_payload
}
try:
response = requests.post(submit_url, json=submission_data, timeout=60)
response.raise_for_status()
result_data = response.json()
final_status = (
f"βœ… Submission Successful!\n"
f"User: {result_data.get('username')}\n"
f"Score: {result_data.get('score', 'N/A')}% "
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n"
f"Processing Time: {total_time:.1f}s\n"
f"Message: {result_data.get('message', 'No message')}"
)
results_df = pd.DataFrame(results_log)
return final_status, results_df
except Exception as e:
error_msg = f"❌ Submission Failed: {str(e)}"
results_df = pd.DataFrame(results_log)
return error_msg, results_df
# --- Gradio Interface ---
with gr.Blocks(title="GAIA Agent - Fast Mode") as demo:
gr.Markdown("# πŸš€ GAIA Agent Evaluation (Optimized)")
gr.Markdown(
"""
**Fast Mode Optimizations:**
- Reduced max steps: 4 per question
- Shorter token generation: 128 tokens max
- 30s timeout per question
- Aggressive memory management
**Usage:** Login β†’ Click Run β†’ View Results
"""
)
with gr.Row():
gr.LoginButton()
with gr.Row():
run_button = gr.Button("πŸƒβ€β™‚οΈ Run Fast Evaluation", variant="primary", size="lg")
with gr.Row():
status_output = gr.Textbox(
label="πŸ“Š Status & Results",
lines=6,
interactive=False,
placeholder="Ready to run evaluation..."
)
with gr.Row():
results_table = gr.DataFrame(
label="πŸ“ Questions & Answers",
wrap=True,
interactive=False
)
run_button.click(
fn=run_and_submit_all,
outputs=[status_output, results_table],
show_progress=True
)
if __name__ == "__main__":
print("πŸš€ GAIA Agent Fast Mode Starting...")
print(f"βš™οΈ Max Steps: {MAX_STEPS}, Max Tokens: {MAX_TOKENS}")
print(f"⏱️ Timeout per question: {TIMEOUT_PER_QUESTION}s")
demo.launch(
debug=False,
share=False,
server_name="0.0.0.0",
server_port=7860,
show_error=True
)