Spaces:
Sleeping
Sleeping
import os | |
import gradio as gr | |
import requests | |
import inspect | |
import pandas as pd | |
import json | |
from datasets import Dataset | |
from huggingface_hub import HfApi | |
from gaia_agent import GaiaAgent | |
# (Keep Constants as is) | |
# --- Constants --- | |
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
# To check if we are running locally | |
running_on_hf = bool(os.getenv("SPACE_ID") or os.getenv("SPACE_HOST")) | |
# Questions the agent can reliably solve (no images, audio, video) | |
SOLVABLE_INDICES = [0, 2, 4] # Mercedes Sosa, Reversed text, Dinosaur Featured Article | |
def get_dataset_name(): | |
"""Get the private dataset name for this space""" | |
space_id = os.getenv("SPACE_ID") | |
if space_id: | |
# Replace invalid characters for HF dataset names | |
clean_name = space_id.replace('/', '_').replace('-', '_') | |
return f"{clean_name}_gaia_answers" | |
return "gaia_answers_cache" | |
def load_answers_cache(): | |
"""Load cached answers from local file (fallback from HF Dataset due to auth issues)""" | |
try: | |
cache_file = "verified_answers.json" | |
if os.path.exists(cache_file): | |
with open(cache_file, 'r') as f: | |
cache = json.load(f) | |
print(f"✅ Loaded {len(cache)} cached answers from local file") | |
return cache | |
except Exception as e: | |
print(f"📝 No existing cache found: {e}") | |
return {} | |
def save_answers_cache(cache, token=None): | |
"""Save cached answers to local file (fallback from HF Dataset due to auth issues)""" | |
if not cache: | |
return False | |
try: | |
cache_file = "verified_answers.json" | |
with open(cache_file, 'w') as f: | |
json.dump(cache, f, indent=2) | |
print(f"💾 Saved {len(cache)} answers to local file: {cache_file}") | |
# Try to commit to git if in HF Spaces | |
if running_on_hf: | |
try: | |
import subprocess | |
subprocess.run(["git", "add", cache_file], check=True) | |
subprocess.run(["git", "commit", "-m", f"Cache {len(cache)} verified answers"], check=True) | |
print("📝 Committed cache to repository") | |
except Exception as git_error: | |
print(f"⚠️ Could not commit to git: {git_error}") | |
return True | |
except Exception as e: | |
print(f"Error saving cache: {e}") | |
return False | |
def check_answers_correctness(answers_payload, questions_data): | |
""" | |
Submit answers to get correctness feedback and return which ones were correct | |
""" | |
if not running_on_hf: | |
return {} | |
try: | |
# Prepare minimal submission for validation | |
space_id = os.getenv("SPACE_ID") | |
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" | |
submission_data = { | |
"username": "validation_check", | |
"agent_code": agent_code, | |
"answers": answers_payload | |
} | |
api_url = DEFAULT_API_URL | |
submit_url = f"{api_url}/submit" | |
response = requests.post(submit_url, json=submission_data, timeout=60) | |
response.raise_for_status() | |
result_data = response.json() | |
print(f"📊 Validation API response: {result_data}") | |
# Parse which answers were correct | |
correct_answers = {} | |
# Try different response formats | |
if "detailed_results" in result_data: | |
for result in result_data["detailed_results"]: | |
if result.get("correct", False): | |
task_id = result.get("task_id") | |
for answer in answers_payload: | |
if answer["task_id"] == task_id: | |
correct_answers[task_id] = answer["submitted_answer"] | |
break | |
elif "results" in result_data: | |
for result in result_data["results"]: | |
if result.get("correct", False): | |
task_id = result.get("task_id") | |
for answer in answers_payload: | |
if answer["task_id"] == task_id: | |
correct_answers[task_id] = answer["submitted_answer"] | |
break | |
else: | |
# Try to infer from score and correct_count | |
correct_count = result_data.get("correct_count", 0) | |
total_count = len(answers_payload) | |
print(f"📈 Got {correct_count}/{total_count} correct, but no detailed breakdown") | |
# If we can't get detailed results, we'll need to use a different approach | |
# For now, return empty dict to avoid caching potentially wrong answers | |
print(f"✅ Found {len(correct_answers)} correct answers: {list(correct_answers.keys())}") | |
return correct_answers | |
except Exception as e: | |
print(f"❌ Error checking answer correctness: {e}") | |
return {} | |
def manually_cache_answer(task_id: str, answer: str): | |
""" | |
Manually add a verified correct answer to the cache | |
""" | |
if not running_on_hf: | |
return "Manual caching only available on HuggingFace Spaces" | |
try: | |
cache = load_answers_cache() | |
cache[task_id] = answer | |
if save_answers_cache(cache): | |
return f"✅ Manually cached answer for {task_id}: {answer}" | |
else: | |
return f"❌ Failed to save manual cache" | |
except Exception as e: | |
return f"❌ Error in manual caching: {e}" | |
def run_and_cache_answers(profile: gr.OAuthProfile | None): | |
""" | |
Runs agent on questions, validates answers, and caches only correct ones | |
""" | |
if not running_on_hf: | |
return "Caching only available on HuggingFace Spaces", None | |
username = f"{profile.username}" if profile else "unknown_user" | |
api_url = DEFAULT_API_URL | |
questions_url = f"{api_url}/questions" | |
# 1. Instantiate Agent | |
try: | |
agent = GaiaAgent() | |
except Exception as e: | |
return f"Error initializing agent: {e}", None | |
# 2. Fetch Questions | |
try: | |
response = requests.get(questions_url, timeout=15) | |
response.raise_for_status() | |
questions_data = response.json() | |
if not questions_data: | |
return "Fetched questions list is empty.", None | |
except Exception as e: | |
return f"Error fetching questions: {e}", None | |
# 3. Load existing cache (verified correct answers) | |
cache = load_answers_cache() | |
# 4. Run agent only on unsolved questions | |
results_log = [] | |
new_answers_payload = [] | |
for idx in SOLVABLE_INDICES: | |
if idx >= len(questions_data): | |
continue | |
item = questions_data[idx] | |
task_id = item.get("task_id") | |
question_text = item.get("question") | |
if not task_id or question_text is None: | |
continue | |
# Skip if already have correct answer cached | |
if task_id in cache: | |
results_log.append({ | |
"Task ID": task_id, | |
"Question": question_text[:100] + "...", | |
"Answer": cache[task_id], | |
"Status": "✅ CORRECT (CACHED)" | |
}) | |
continue | |
try: | |
print(f"Processing question {idx+1}: {question_text[:100]}...") | |
submitted_answer = agent(question_text) | |
# Add to payload for validation | |
new_answers_payload.append({ | |
"task_id": task_id, | |
"submitted_answer": submitted_answer | |
}) | |
results_log.append({ | |
"Task ID": task_id, | |
"Question": question_text[:100] + "...", | |
"Answer": submitted_answer, | |
"Status": "🔄 VALIDATING..." | |
}) | |
except Exception as e: | |
results_log.append({ | |
"Task ID": task_id, | |
"Question": question_text[:100] + "...", | |
"Answer": f"ERROR: {e}", | |
"Status": "❌ FAILED" | |
}) | |
# 5. Validate new answers one by one and cache only correct ones | |
if new_answers_payload: | |
print(f"🔍 Validating {len(new_answers_payload)} answers one by one...") | |
correct_answers = {} | |
space_id = os.getenv("SPACE_ID") | |
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" | |
api_url = DEFAULT_API_URL | |
submit_url = f"{api_url}/submit" | |
for answer in new_answers_payload: | |
try: | |
# Test this answer alone | |
single_submission = { | |
"username": f"test_{answer['task_id'][:8]}", | |
"agent_code": agent_code, | |
"answers": [answer] | |
} | |
print(f"Testing: {answer['submitted_answer']}") | |
response = requests.post(submit_url, json=single_submission, timeout=30) | |
response.raise_for_status() | |
result_data = response.json() | |
correct_count = result_data.get("correct_count", 0) | |
if correct_count > 0: | |
print(f"✅ CORRECT: {answer['submitted_answer']}") | |
correct_answers[answer['task_id']] = answer['submitted_answer'] | |
else: | |
print(f"❌ WRONG: {answer['submitted_answer']}") | |
except Exception as e: | |
print(f"⚠️ Error testing {answer['submitted_answer']}: {e}") | |
# Update cache with only correct answers | |
cache.update(correct_answers) | |
# Update results log with validation results | |
for log_entry in results_log: | |
if log_entry["Status"] == "🔄 VALIDATING...": | |
task_id = log_entry["Task ID"] | |
if task_id in correct_answers: | |
log_entry["Status"] = "✅ CORRECT (NEW)" | |
else: | |
log_entry["Status"] = "❌ INCORRECT" | |
# Save updated cache | |
if correct_answers: | |
save_answers_cache(cache) | |
status = f"🎉 Validated {len(new_answers_payload)} answers. Cached {len(correct_answers)} correct answers!" | |
else: | |
status = f"😔 Validated {len(new_answers_payload)} answers. None were correct this time." | |
else: | |
status = "All target questions already have correct answers cached!" | |
return status, pd.DataFrame(results_log) | |
def run_and_show_answers(profile: gr.OAuthProfile | None): | |
""" | |
Runs agent on questions and shows results without auto-validation (for manual review) | |
""" | |
if not running_on_hf: | |
return "This function only available on HuggingFace Spaces", None | |
username = f"{profile.username}" if profile else "unknown_user" | |
api_url = DEFAULT_API_URL | |
questions_url = f"{api_url}/questions" | |
# 1. Instantiate Agent | |
try: | |
agent = GaiaAgent() | |
except Exception as e: | |
return f"Error initializing agent: {e}", None | |
# 2. Fetch Questions | |
try: | |
response = requests.get(questions_url, timeout=15) | |
response.raise_for_status() | |
questions_data = response.json() | |
if not questions_data: | |
return "Fetched questions list is empty.", None | |
except Exception as e: | |
return f"Error fetching questions: {e}", None | |
# 3. Load existing cache | |
cache = load_answers_cache() | |
# 4. Run agent on all target questions | |
results_log = [] | |
for idx in SOLVABLE_INDICES: | |
if idx >= len(questions_data): | |
continue | |
item = questions_data[idx] | |
task_id = item.get("task_id") | |
question_text = item.get("question") | |
if not task_id or question_text is None: | |
continue | |
# Check if already cached | |
if task_id in cache: | |
results_log.append({ | |
"Task ID": task_id, | |
"Question": question_text[:100] + "...", | |
"Answer": cache[task_id], | |
"Status": "✅ CACHED" | |
}) | |
continue | |
try: | |
print(f"Processing question {idx+1}: {question_text[:100]}...") | |
submitted_answer = agent(question_text) | |
results_log.append({ | |
"Task ID": task_id, | |
"Question": question_text[:100] + "...", | |
"Answer": submitted_answer, | |
"Status": "🔍 REVIEW NEEDED" | |
}) | |
except Exception as e: | |
results_log.append({ | |
"Task ID": task_id, | |
"Question": question_text[:100] + "...", | |
"Answer": f"ERROR: {e}", | |
"Status": "❌ FAILED" | |
}) | |
status = ( | |
f"📋 Generated answers for manual review.\n" | |
f"If an answer looks correct, you can manually cache it.\n" | |
f"Known correct answers:\n" | |
f"- Reversed text question: should be 'right'\n" | |
f"- Mercedes Sosa albums: try different numbers if needed\n" | |
f"- Dinosaur Featured Article: check nomination info" | |
) | |
return status, pd.DataFrame(results_log) | |
def submit_cached_answers(profile: gr.OAuthProfile | None): | |
""" | |
Submits all cached answers | |
""" | |
if not running_on_hf: | |
return "Submission only available on HuggingFace Spaces", None | |
if not profile: | |
return "Please login to submit answers", None | |
username = f"{profile.username}" | |
space_id = os.getenv("SPACE_ID") | |
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" | |
# Load cache | |
cache = load_answers_cache() | |
if not cache: | |
return "No cached answers found", None | |
print(f"📤 Preparing to submit {len(cache)} cached answers:") | |
for task_id, answer in cache.items(): | |
print(f" {task_id[:8]}... = {answer}") | |
# Prepare submission - ensure answers are strings | |
answers_payload = [] | |
for task_id, answer in cache.items(): | |
answers_payload.append({ | |
"task_id": str(task_id), | |
"submitted_answer": str(answer) | |
}) | |
submission_data = { | |
"username": username.strip(), | |
"agent_code": agent_code, | |
"answers": answers_payload | |
} | |
print(f"📡 Submitting as user: {username}") | |
print(f"🔗 Agent code: {agent_code}") | |
# Submit | |
api_url = DEFAULT_API_URL | |
submit_url = f"{api_url}/submit" | |
try: | |
response = requests.post(submit_url, json=submission_data, timeout=60) | |
print(f"📊 Response status: {response.status_code}") | |
response.raise_for_status() | |
result_data = response.json() | |
print(f"📈 API Response: {result_data}") | |
final_status = ( | |
f"🎉 Submission Successful!\n" | |
f"User: {result_data.get('username')}\n" | |
f"Overall Score: {result_data.get('score', 'N/A')}% " | |
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" | |
f"Submitted {len(answers_payload)} cached answers\n" | |
f"Message: {result_data.get('message', 'No message received.')}" | |
) | |
# Show cached answers for reference | |
results_log = [{"Task ID": task_id, "Cached Answer": answer, "Status": "✅ SUBMITTED"} | |
for task_id, answer in cache.items()] | |
return final_status, pd.DataFrame(results_log) | |
except requests.exceptions.HTTPError as http_err: | |
error_detail = f"HTTP {response.status_code}: {response.text}" | |
return f"❌ Submission Failed: {error_detail}", pd.DataFrame([{"Task ID": task_id, "Cached Answer": answer, "Status": "❌ FAILED"} | |
for task_id, answer in cache.items()]) | |
except Exception as e: | |
return f"❌ Submission Failed: {e}", pd.DataFrame([{"Task ID": task_id, "Cached Answer": answer, "Status": "❌ FAILED"} | |
for task_id, answer in cache.items()]) | |
def run_and_submit_all( profile: gr.OAuthProfile | None): | |
""" | |
Fetches all questions, runs the BasicAgent on them, submits all answers, | |
and displays the results. | |
""" | |
# --- Determine HF Space Runtime URL and Repo URL --- | |
space_id = os.getenv("SPACE_ID") # Get the SPACE_ID for sending link to the code | |
if running_on_hf: | |
if profile: | |
username= f"{profile.username}" | |
print(f"User logged in: {username}") | |
else: | |
print("User not logged in.") | |
return "Please Login to Hugging Face with the button.", None | |
else: | |
username = "local_user" | |
api_url = DEFAULT_API_URL | |
questions_url = f"{api_url}/questions" | |
submit_url = f"{api_url}/submit" | |
# 1. Instantiate Agent ( modify this part to create your agent) | |
try: | |
agent = GaiaAgent() | |
except Exception as e: | |
print(f"Error instantiating agent: {e}") | |
return f"Error initializing agent: {e}", None | |
# In the case of an app running as a hugging Face space, this link points toward your codebase ( usefull for others so please keep it public) | |
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" | |
print(agent_code) | |
# 2. Fetch Questions | |
print(f"Fetching questions from: {questions_url}") | |
try: | |
response = requests.get(questions_url, timeout=15) | |
response.raise_for_status() | |
questions_data = response.json() | |
if not questions_data: | |
print("Fetched questions list is empty.") | |
return "Fetched questions list is empty or invalid format.", None | |
print(f"Fetched {len(questions_data)} questions.") | |
except requests.exceptions.RequestException as e: | |
print(f"Error fetching questions: {e}") | |
return f"Error fetching questions: {e}", None | |
except requests.exceptions.JSONDecodeError as e: | |
print(f"Error decoding JSON response from questions endpoint: {e}") | |
print(f"Response text: {response.text[:500]}") | |
return f"Error decoding server response for questions: {e}", None | |
except Exception as e: | |
print(f"An unexpected error occurred fetching questions: {e}") | |
return f"An unexpected error occurred fetching questions: {e}", None | |
# 3. Run your Agent | |
results_log = [] | |
answers_payload = [] | |
print(f"Running agent on {len(SOLVABLE_INDICES)} solvable questions...") | |
for idx in SOLVABLE_INDICES: | |
if idx >= len(questions_data): | |
continue | |
item = questions_data[idx] | |
task_id = item.get("task_id") | |
question_text = item.get("question") | |
if not task_id or question_text is None: | |
print(f"Skipping item with missing task_id or question: {item}") | |
continue | |
try: | |
print(f"Processing question {idx+1}: {question_text[:100]}...") | |
submitted_answer = agent(question_text) | |
print(f"Answer for question {idx+1}: {submitted_answer}") | |
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) | |
results_log.append({"Task ID": task_id, "Question": question_text[:150] + "..." if len(question_text) > 150 else question_text, "Submitted Answer": submitted_answer}) | |
except Exception as e: | |
print(f"Error running agent on task {task_id}: {e}") | |
results_log.append({"Task ID": task_id, "Question": question_text[:150] + "..." if len(question_text) > 150 else question_text, "Submitted Answer": f"AGENT ERROR: {e}"}) | |
if not answers_payload: | |
print("Agent did not produce any answers to submit.") | |
return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) | |
# 4. Prepare Submission | |
submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} | |
status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." | |
print(status_update) | |
# 5. Submit | |
if running_on_hf: | |
print(f"Submitting {len(answers_payload)} answers to: {submit_url}") | |
try: | |
response = requests.post(submit_url, json=submission_data, timeout=60) | |
response.raise_for_status() | |
result_data = response.json() | |
final_status = ( | |
f"Submission Successful!\n" | |
f"User: {result_data.get('username')}\n" | |
f"Overall Score: {result_data.get('score', 'N/A')}% " | |
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" | |
f"Message: {result_data.get('message', 'No message received.')}" | |
) | |
print("Submission successful.") | |
results_df = pd.DataFrame(results_log) | |
return final_status, results_df | |
except requests.exceptions.HTTPError as e: | |
error_detail = f"Server responded with status {e.response.status_code}." | |
try: | |
error_json = e.response.json() | |
error_detail += f" Detail: {error_json.get('detail', e.response.text)}" | |
except requests.exceptions.JSONDecodeError: | |
error_detail += f" Response: {e.response.text[:500]}" | |
status_message = f"Submission Failed: {error_detail}" | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
except requests.exceptions.Timeout: | |
status_message = "Submission Failed: The request timed out." | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
except requests.exceptions.RequestException as e: | |
status_message = f"Submission Failed: Network error - {e}" | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
except Exception as e: | |
status_message = f"An unexpected error occurred during submission: {e}" | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
else: | |
print(f"Agent finished locally on {len(answers_payload)} questions (not submitted).") | |
results_df = pd.DataFrame(results_log) | |
return f"Ran locally as '{username}', results below (no submission).", results_df | |
# --- Build Gradio Interface using Blocks --- | |
with gr.Blocks() as demo: | |
gr.Markdown("# GAIA Agent") | |
gr.Image(value="assets/AI_Programmer.png") | |
gr.Markdown("An agent using smolagents to solve the GAIA Benchmark. By @ArturoNereu") | |
if running_on_hf: | |
gr.LoginButton() | |
with gr.Row(): | |
review_button = gr.Button("Run & Review Answers") | |
cache_button = gr.Button("Run & Auto-Cache Correct") | |
submit_cache_button = gr.Button("Submit Cached Answers") | |
with gr.Row(): | |
run_button = gr.Button("Run & Submit All (Direct)") | |
# Manual caching section | |
gr.Markdown("### Manual Answer Caching") | |
with gr.Row(): | |
task_id_input = gr.Textbox(label="Task ID", placeholder="e.g., 2d83110e-a098-4ebb-9987-066c06fa42d0") | |
answer_input = gr.Textbox(label="Correct Answer", placeholder="e.g., right") | |
manual_cache_button = gr.Button("Cache This Answer") | |
else: | |
run_button = gr.Button("Run Evaluation (Local)") | |
status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) | |
results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) | |
if running_on_hf: | |
review_button.click( | |
fn=run_and_show_answers, | |
outputs=[status_output, results_table] | |
) | |
cache_button.click( | |
fn=run_and_cache_answers, | |
outputs=[status_output, results_table] | |
) | |
submit_cache_button.click( | |
fn=submit_cached_answers, | |
outputs=[status_output, results_table] | |
) | |
run_button.click( | |
fn=run_and_submit_all, | |
outputs=[status_output, results_table] | |
) | |
manual_cache_button.click( | |
fn=lambda task_id, answer: (manually_cache_answer(task_id, answer), None), | |
inputs=[task_id_input, answer_input], | |
outputs=[status_output, results_table] | |
) | |
else: | |
run_button.click( | |
fn=lambda: run_and_submit_all(None), | |
outputs=[status_output, results_table] | |
) | |
if __name__ == "__main__": | |
print("\n" + "-"*30 + " App Starting " + "-"*30) | |
# Check for SPACE_HOST and SPACE_ID at startup for information | |
space_host_startup = os.getenv("SPACE_HOST") | |
space_id_startup = os.getenv("SPACE_ID") # Get SPACE_ID at startup | |
if space_host_startup: | |
print(f"✅ SPACE_HOST found: {space_host_startup}") | |
print(f" Runtime URL should be: https://{space_host_startup}.hf.space") | |
else: | |
print("ℹ️ SPACE_HOST environment variable not found (running locally?).") | |
if space_id_startup: # Print repo URLs if SPACE_ID is found | |
print(f"✅ SPACE_ID found: {space_id_startup}") | |
print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}") | |
print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main") | |
else: | |
print("ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.") | |
print("-"*(60 + len(" App Starting ")) + "\n") | |
print("Launching Gradio Interface for Basic Agent Evaluation...") | |
demo.launch(debug=True, share=False) |