LamiaYT's picture
Fixing
d3c0517
raw
history blame
11.3 kB
import os
import gradio as gr
import requests
import json
import re
import numexpr
import pandas as pd
import math
import pdfminer
from duckduckgo_search import DDGS
from pdfminer.high_level import extract_text
from bs4 import BeautifulSoup
import html2text
from typing import Dict, Any, List, Tuple, Callable, Optional
from dotenv import load_dotenv
from transformers import AutoModelForCausalLM, AutoTokenizer, GenerationConfig
import torch
import time
import gc
import warnings
# Suppress warnings
warnings.filterwarnings("ignore")
os.environ["TOKENIZERS_PARALLELISM"] = "false"
# --- Load Environment Variables ---
load_dotenv()
SERPER_API_KEY = os.getenv("SERPER_API_KEY")
# --- Constants (ULTRA FAST MODE) ---
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
MAX_STEPS = 3 # Reduced to 3
MAX_TOKENS = 64 # Very short responses
MODEL_NAME = "microsoft/Phi-3-mini-4k-instruct"
TIMEOUT_PER_QUESTION = 15 # 15 seconds max
MAX_CONTEXT = 1024 # Very short context
# --- Configure Environment ---
os.environ["PIP_BREAK_SYSTEM_PACKAGES"] = "1"
os.environ["HF_HUB_DISABLE_SYMLINKS_WARNING"] = "1"
os.environ["BITSANDBYTES_NOWELCOME"] = "1"
print("Loading model (ULTRA FAST mode)...")
start_time = time.time()
# Minimal model loading
model = AutoModelForCausalLM.from_pretrained(
MODEL_NAME,
trust_remote_code=True,
torch_dtype=torch.float32,
device_map="cpu",
low_cpu_mem_usage=True,
use_cache=False
)
tokenizer = AutoTokenizer.from_pretrained(
MODEL_NAME,
use_fast=True,
trust_remote_code=True,
padding_side="left"
)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
# Pre-compile generation config
GENERATION_CONFIG = GenerationConfig(
max_new_tokens=MAX_TOKENS,
temperature=0.3,
do_sample=True,
pad_token_id=tokenizer.pad_token_id,
eos_token_id=tokenizer.eos_token_id,
use_cache=False,
repetition_penalty=1.1
)
load_time = time.time() - start_time
print(f"Model loaded in {load_time:.2f} seconds")
# --- Lightning Fast Tools ---
def web_search(query: str) -> str:
"""Ultra-fast web search"""
try:
if SERPER_API_KEY:
params = {'q': query[:100], 'num': 1} # Single result
headers = {'X-API-KEY': SERPER_API_KEY, 'Content-Type': 'application/json'}
response = requests.post(
'https://google.serper.dev/search',
headers=headers,
json=params,
timeout=3
)
results = response.json()
if 'organic' in results and results['organic']:
return f"{results['organic'][0]['title']}: {results['organic'][0]['snippet'][:200]}"
return "No results"
else:
with DDGS() as ddgs:
for result in ddgs.text(query, max_results=1):
return f"{result['title']}: {result['body'][:200]}"
return "No results"
except:
return "Search failed"
def calculator(expression: str) -> str:
"""Lightning calculator"""
try:
clean_expr = re.sub(r'[^\d+\-*/().\s]', '', str(expression))
if not clean_expr.strip():
return "Invalid expression"
result = eval(clean_expr) # Simple eval for speed
return str(float(result))
except:
return "Calc error"
def read_pdf(file_path: str) -> str:
"""Fast PDF reader"""
try:
text = extract_text(file_path)
return text[:500] if text else "No PDF text"
except:
return "PDF error"
def read_webpage(url: str) -> str:
"""Fast webpage reader"""
try:
response = requests.get(url, timeout=3, headers={'User-Agent': 'Bot'})
soup = BeautifulSoup(response.text, 'html.parser')
text = soup.get_text(separator=' ', strip=True)
return text[:500] if text else "No webpage text"
except:
return "Webpage error"
TOOLS = {
"web_search": web_search,
"calculator": calculator,
"read_pdf": read_pdf,
"read_webpage": read_webpage
}
# --- Ultra Fast Agent ---
class FastGAIA_Agent:
def __init__(self):
self.tools = TOOLS
self.prompt_template = (
"<|system|>You solve GAIA questions fast. Tools: web_search, calculator, read_pdf, read_webpage.\n"
"Format: ```json\n{\"tool\": \"name\", \"args\": {\"key\": \"value\"}}```\n"
"Always end with: Final Answer: [answer]<|end|>\n"
"<|user|>{history}<|end|>\n<|assistant|>"
)
def __call__(self, question: str) -> str:
start_time = time.time()
try:
history = f"Question: {question}"
for step in range(MAX_STEPS):
if time.time() - start_time > TIMEOUT_PER_QUESTION:
return "TIMEOUT"
response = self._fast_generate(history)
# Quick final answer check
if "Final Answer:" in response:
answer = response.split("Final Answer:")[-1].strip().split('\n')[0]
return answer[:200] # Limit answer length
# Quick tool parsing
tool_result = self._quick_tool_use(response)
if tool_result:
history += f"\nAction: {tool_result}"
else:
history += f"\nThought: {response[:100]}"
# Keep history short
if len(history) > 800:
history = history[-800:]
return "No solution found"
except Exception as e:
return f"Error: {str(e)[:50]}"
def _fast_generate(self, history: str) -> str:
try:
prompt = self.prompt_template.format(history=history)
# Fast tokenization
inputs = tokenizer(
prompt,
return_tensors="pt",
truncation=True,
max_length=MAX_CONTEXT,
padding=False
)
# Fast generation
with torch.no_grad():
outputs = model.generate(
inputs.input_ids,
generation_config=GENERATION_CONFIG,
attention_mask=inputs.attention_mask
)
# Fast decoding
response = tokenizer.decode(outputs[0], skip_special_tokens=True)
response = response.split("<|assistant|>")[-1].strip()
# Immediate cleanup
del inputs, outputs
gc.collect()
return response
except Exception as e:
return f"Gen error: {str(e)}"
def _quick_tool_use(self, text: str) -> str:
try:
# Quick JSON extraction
json_match = re.search(r'```json\s*({[^}]*})\s*```', text)
if not json_match:
return ""
tool_data = json.loads(json_match.group(1))
tool_name = tool_data.get("tool", "")
args = tool_data.get("args", {})
if tool_name in self.tools:
result = self.tools[tool_name](**args)
return f"Used {tool_name}: {str(result)[:150]}"
except:
pass
return ""
# --- Lightning Fast Runner ---
def run_and_submit_all(profile: gr.OAuthProfile | None):
if not profile:
return "❌ Please login first", None
username = profile.username
# Quick setup
agent = FastGAIA_Agent()
api_url = DEFAULT_API_URL
space_id = os.getenv("SPACE_ID", "unknown")
print(f"πŸš€ ULTRA FAST mode - User: {username}")
# Fetch questions quickly
try:
response = requests.get(f"{api_url}/questions", timeout=10)
questions = response.json()
print(f"πŸ“ Got {len(questions)} questions")
except Exception as e:
return f"❌ Failed to get questions: {e}", None
# Process at lightning speed
results = []
answers = []
start_time = time.time()
for i, item in enumerate(questions):
task_id = item.get("task_id")
question = item.get("question", "")
if not task_id:
continue
print(f"⚑ [{i+1}/{len(questions)}] {task_id[:8]}...")
try:
answer = agent(question)
answers.append({"task_id": task_id, "submitted_answer": answer})
results.append({
"ID": task_id[:8],
"Question": question[:60] + "...",
"Answer": answer[:80] + "..." if len(answer) > 80 else answer
})
except Exception as e:
error_ans = f"ERROR: {str(e)[:30]}"
answers.append({"task_id": task_id, "submitted_answer": error_ans})
results.append({
"ID": task_id[:8],
"Question": question[:60] + "...",
"Answer": error_ans
})
# Quick memory cleanup
if i % 5 == 0:
gc.collect()
total_time = time.time() - start_time
print(f"⏱️ Completed in {total_time:.1f}s ({total_time/len(questions):.1f}s per question)")
# Submit results
try:
submission = {
"username": username,
"agent_code": f"https://huggingface.co/spaces/{space_id}/tree/main",
"answers": answers
}
response = requests.post(f"{api_url}/submit", json=submission, timeout=30)
result = response.json()
status = (
f"🎯 ULTRA FAST RESULTS\n"
f"πŸ‘€ User: {result.get('username', username)}\n"
f"πŸ“Š Score: {result.get('score', 'N/A')}% "
f"({result.get('correct_count', '?')}/{result.get('total_attempted', '?')})\n"
f"⏱️ Time: {total_time:.1f}s ({total_time/len(questions):.1f}s/question)\n"
f"πŸ’¬ {result.get('message', 'Completed!')}"
)
return status, pd.DataFrame(results)
except Exception as e:
error_status = f"❌ Submission failed: {str(e)}\n⏱️ Processing time: {total_time:.1f}s"
return error_status, pd.DataFrame(results)
# --- Ultra Simple UI ---
with gr.Blocks(title="GAIA Agent - ULTRA FAST") as demo:
gr.Markdown("# ⚑ GAIA Agent - ULTRA FAST MODE")
gr.Markdown("**Speed settings:** 3 steps max β€’ 64 tokens β€’ 15s timeout β€’ Lightning tools")
gr.LoginButton()
run_btn = gr.Button("πŸš€ RUN ULTRA FAST", variant="primary", size="lg")
status = gr.Textbox(label="πŸ“Š Results", lines=6, interactive=False)
table = gr.DataFrame(label="πŸ“‹ Answers", interactive=False)
run_btn.click(run_and_submit_all, outputs=[status, table], show_progress=True)
if __name__ == "__main__":
print("⚑ ULTRA FAST GAIA Agent Starting...")
print(f"βš™οΈ {MAX_STEPS} steps, {MAX_TOKENS} tokens, {TIMEOUT_PER_QUESTION}s timeout")
demo.launch(
share=True, # Added share=True for public link
server_name="0.0.0.0",
server_port=7860,
debug=False,
show_error=True
)