Spaces:
Runtime error
Runtime error
import os | |
import gradio as gr | |
import requests | |
import json | |
import re | |
import numexpr | |
import pandas as pd | |
import math | |
import pdfminer | |
from duckduckgo_search import DDGS | |
from pdfminer.high_level import extract_text | |
from bs4 import BeautifulSoup | |
import html2text | |
from typing import Dict, Any, List, Tuple, Callable, Optional | |
from dotenv import load_dotenv | |
from transformers import AutoModelForCausalLM, AutoTokenizer, GenerationConfig | |
import torch | |
import time | |
import gc | |
import warnings | |
# Suppress warnings | |
warnings.filterwarnings("ignore") | |
os.environ["TOKENIZERS_PARALLELISM"] = "false" | |
# --- Load Environment Variables --- | |
load_dotenv() | |
SERPER_API_KEY = os.getenv("SERPER_API_KEY") | |
# --- Constants (ULTRA FAST MODE) --- | |
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
MAX_STEPS = 3 # Reduced to 3 | |
MAX_TOKENS = 64 # Very short responses | |
MODEL_NAME = "microsoft/Phi-3-mini-4k-instruct" | |
TIMEOUT_PER_QUESTION = 15 # 15 seconds max | |
MAX_CONTEXT = 1024 # Very short context | |
# --- Configure Environment --- | |
os.environ["PIP_BREAK_SYSTEM_PACKAGES"] = "1" | |
os.environ["HF_HUB_DISABLE_SYMLINKS_WARNING"] = "1" | |
os.environ["BITSANDBYTES_NOWELCOME"] = "1" | |
print("Loading model (ULTRA FAST mode)...") | |
start_time = time.time() | |
# Minimal model loading | |
model = AutoModelForCausalLM.from_pretrained( | |
MODEL_NAME, | |
trust_remote_code=True, | |
torch_dtype=torch.float32, | |
device_map="cpu", | |
low_cpu_mem_usage=True, | |
use_cache=False | |
) | |
tokenizer = AutoTokenizer.from_pretrained( | |
MODEL_NAME, | |
use_fast=True, | |
trust_remote_code=True, | |
padding_side="left" | |
) | |
if tokenizer.pad_token is None: | |
tokenizer.pad_token = tokenizer.eos_token | |
# Pre-compile generation config | |
GENERATION_CONFIG = GenerationConfig( | |
max_new_tokens=MAX_TOKENS, | |
temperature=0.3, | |
do_sample=True, | |
pad_token_id=tokenizer.pad_token_id, | |
eos_token_id=tokenizer.eos_token_id, | |
use_cache=False, | |
repetition_penalty=1.1 | |
) | |
load_time = time.time() - start_time | |
print(f"Model loaded in {load_time:.2f} seconds") | |
# --- Lightning Fast Tools --- | |
def web_search(query: str) -> str: | |
"""Ultra-fast web search""" | |
try: | |
if SERPER_API_KEY: | |
params = {'q': query[:100], 'num': 1} # Single result | |
headers = {'X-API-KEY': SERPER_API_KEY, 'Content-Type': 'application/json'} | |
response = requests.post( | |
'https://google.serper.dev/search', | |
headers=headers, | |
json=params, | |
timeout=3 | |
) | |
results = response.json() | |
if 'organic' in results and results['organic']: | |
return f"{results['organic'][0]['title']}: {results['organic'][0]['snippet'][:200]}" | |
return "No results" | |
else: | |
with DDGS() as ddgs: | |
for result in ddgs.text(query, max_results=1): | |
return f"{result['title']}: {result['body'][:200]}" | |
return "No results" | |
except: | |
return "Search failed" | |
def calculator(expression: str) -> str: | |
"""Lightning calculator""" | |
try: | |
clean_expr = re.sub(r'[^\d+\-*/().\s]', '', str(expression)) | |
if not clean_expr.strip(): | |
return "Invalid expression" | |
result = eval(clean_expr) # Simple eval for speed | |
return str(float(result)) | |
except: | |
return "Calc error" | |
def read_pdf(file_path: str) -> str: | |
"""Fast PDF reader""" | |
try: | |
text = extract_text(file_path) | |
return text[:500] if text else "No PDF text" | |
except: | |
return "PDF error" | |
def read_webpage(url: str) -> str: | |
"""Fast webpage reader""" | |
try: | |
response = requests.get(url, timeout=3, headers={'User-Agent': 'Bot'}) | |
soup = BeautifulSoup(response.text, 'html.parser') | |
text = soup.get_text(separator=' ', strip=True) | |
return text[:500] if text else "No webpage text" | |
except: | |
return "Webpage error" | |
TOOLS = { | |
"web_search": web_search, | |
"calculator": calculator, | |
"read_pdf": read_pdf, | |
"read_webpage": read_webpage | |
} | |
# --- Ultra Fast Agent --- | |
class FastGAIA_Agent: | |
def __init__(self): | |
self.tools = TOOLS | |
self.prompt_template = ( | |
"<|system|>You solve GAIA questions fast. Tools: web_search, calculator, read_pdf, read_webpage.\n" | |
"Format: ```json\n{\"tool\": \"name\", \"args\": {\"key\": \"value\"}}```\n" | |
"Always end with: Final Answer: [answer]<|end|>\n" | |
"<|user|>{history}<|end|>\n<|assistant|>" | |
) | |
def __call__(self, question: str) -> str: | |
start_time = time.time() | |
try: | |
history = f"Question: {question}" | |
for step in range(MAX_STEPS): | |
if time.time() - start_time > TIMEOUT_PER_QUESTION: | |
return "TIMEOUT" | |
response = self._fast_generate(history) | |
# Quick final answer check | |
if "Final Answer:" in response: | |
answer = response.split("Final Answer:")[-1].strip().split('\n')[0] | |
return answer[:200] # Limit answer length | |
# Quick tool parsing | |
tool_result = self._quick_tool_use(response) | |
if tool_result: | |
history += f"\nAction: {tool_result}" | |
else: | |
history += f"\nThought: {response[:100]}" | |
# Keep history short | |
if len(history) > 800: | |
history = history[-800:] | |
return "No solution found" | |
except Exception as e: | |
return f"Error: {str(e)[:50]}" | |
def _fast_generate(self, history: str) -> str: | |
try: | |
prompt = self.prompt_template.format(history=history) | |
# Fast tokenization | |
inputs = tokenizer( | |
prompt, | |
return_tensors="pt", | |
truncation=True, | |
max_length=MAX_CONTEXT, | |
padding=False | |
) | |
# Fast generation | |
with torch.no_grad(): | |
outputs = model.generate( | |
inputs.input_ids, | |
generation_config=GENERATION_CONFIG, | |
attention_mask=inputs.attention_mask | |
) | |
# Fast decoding | |
response = tokenizer.decode(outputs[0], skip_special_tokens=True) | |
response = response.split("<|assistant|>")[-1].strip() | |
# Immediate cleanup | |
del inputs, outputs | |
gc.collect() | |
return response | |
except Exception as e: | |
return f"Gen error: {str(e)}" | |
def _quick_tool_use(self, text: str) -> str: | |
try: | |
# Quick JSON extraction | |
json_match = re.search(r'```json\s*({[^}]*})\s*```', text) | |
if not json_match: | |
return "" | |
tool_data = json.loads(json_match.group(1)) | |
tool_name = tool_data.get("tool", "") | |
args = tool_data.get("args", {}) | |
if tool_name in self.tools: | |
result = self.tools[tool_name](**args) | |
return f"Used {tool_name}: {str(result)[:150]}" | |
except: | |
pass | |
return "" | |
# --- Lightning Fast Runner --- | |
def run_and_submit_all(profile: gr.OAuthProfile | None): | |
if not profile: | |
return "β Please login first", None | |
username = profile.username | |
# Quick setup | |
agent = FastGAIA_Agent() | |
api_url = DEFAULT_API_URL | |
space_id = os.getenv("SPACE_ID", "unknown") | |
print(f"π ULTRA FAST mode - User: {username}") | |
# Fetch questions quickly | |
try: | |
response = requests.get(f"{api_url}/questions", timeout=10) | |
questions = response.json() | |
print(f"π Got {len(questions)} questions") | |
except Exception as e: | |
return f"β Failed to get questions: {e}", None | |
# Process at lightning speed | |
results = [] | |
answers = [] | |
start_time = time.time() | |
for i, item in enumerate(questions): | |
task_id = item.get("task_id") | |
question = item.get("question", "") | |
if not task_id: | |
continue | |
print(f"β‘ [{i+1}/{len(questions)}] {task_id[:8]}...") | |
try: | |
answer = agent(question) | |
answers.append({"task_id": task_id, "submitted_answer": answer}) | |
results.append({ | |
"ID": task_id[:8], | |
"Question": question[:60] + "...", | |
"Answer": answer[:80] + "..." if len(answer) > 80 else answer | |
}) | |
except Exception as e: | |
error_ans = f"ERROR: {str(e)[:30]}" | |
answers.append({"task_id": task_id, "submitted_answer": error_ans}) | |
results.append({ | |
"ID": task_id[:8], | |
"Question": question[:60] + "...", | |
"Answer": error_ans | |
}) | |
# Quick memory cleanup | |
if i % 5 == 0: | |
gc.collect() | |
total_time = time.time() - start_time | |
print(f"β±οΈ Completed in {total_time:.1f}s ({total_time/len(questions):.1f}s per question)") | |
# Submit results | |
try: | |
submission = { | |
"username": username, | |
"agent_code": f"https://huggingface.co/spaces/{space_id}/tree/main", | |
"answers": answers | |
} | |
response = requests.post(f"{api_url}/submit", json=submission, timeout=30) | |
result = response.json() | |
status = ( | |
f"π― ULTRA FAST RESULTS\n" | |
f"π€ User: {result.get('username', username)}\n" | |
f"π Score: {result.get('score', 'N/A')}% " | |
f"({result.get('correct_count', '?')}/{result.get('total_attempted', '?')})\n" | |
f"β±οΈ Time: {total_time:.1f}s ({total_time/len(questions):.1f}s/question)\n" | |
f"π¬ {result.get('message', 'Completed!')}" | |
) | |
return status, pd.DataFrame(results) | |
except Exception as e: | |
error_status = f"β Submission failed: {str(e)}\nβ±οΈ Processing time: {total_time:.1f}s" | |
return error_status, pd.DataFrame(results) | |
# --- Ultra Simple UI --- | |
with gr.Blocks(title="GAIA Agent - ULTRA FAST") as demo: | |
gr.Markdown("# β‘ GAIA Agent - ULTRA FAST MODE") | |
gr.Markdown("**Speed settings:** 3 steps max β’ 64 tokens β’ 15s timeout β’ Lightning tools") | |
gr.LoginButton() | |
run_btn = gr.Button("π RUN ULTRA FAST", variant="primary", size="lg") | |
status = gr.Textbox(label="π Results", lines=6, interactive=False) | |
table = gr.DataFrame(label="π Answers", interactive=False) | |
run_btn.click(run_and_submit_all, outputs=[status, table], show_progress=True) | |
if __name__ == "__main__": | |
print("β‘ ULTRA FAST GAIA Agent Starting...") | |
print(f"βοΈ {MAX_STEPS} steps, {MAX_TOKENS} tokens, {TIMEOUT_PER_QUESTION}s timeout") | |
demo.launch( | |
share=True, # Added share=True for public link | |
server_name="0.0.0.0", | |
server_port=7860, | |
debug=False, | |
show_error=True | |
) |