Yago Bolivar
feat: enhance BasicAgent to handle file inputs and improve question processing
2e3dd06
raw
history blame
14.3 kB
import os
import gradio as gr
import requests
import inspect # Keep if you plan to use it for agent introspection later
import pandas as pd
from src.file_processing_tool import FileIdentifier
from src.speech_to_text import transcribe_audio
from src.download_utils import download_file
# --- Constants ---
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
DOWNLOADED_FILES_DIR = "downloaded_task_files" # Directory to store downloaded files
# Ensure the directory for downloaded files exists when the app starts
os.makedirs(DOWNLOADED_FILES_DIR, exist_ok=True)
# --- Basic Agent Definition ---
# ----- THIS IS WHERE YOU CAN BUILD WHAT YOU WANT ------
class BasicAgent:
def __init__(self):
print("BasicAgent initialized.")
self.file_identifier = FileIdentifier()
# You might initialize other tools here if needed (e.g., spreadsheet parser, OCR)
def __call__(self, question_data: dict) -> str:
question_text = question_data.get("question")
file_url = question_data.get("file_url")
task_id = question_data.get("task_id", "unknown_task") # For unique file naming
print(f"Agent received task_id: {task_id}, question: {question_text}, file_url: {file_url}")
downloaded_file_path = None
if file_url:
print(f"File URL provided: {file_url}")
# Construct a unique filename
original_filename = file_url.split('/')[-1] if file_url else "file"
# Basic sanitization for filename
safe_original_filename = "".join(c for c in original_filename if c.isalnum() or c in ['.', '_', '-']).strip()
if not safe_original_filename: # Handle cases where sanitization leaves an empty string
safe_original_filename = "downloaded_file"
unique_filename = f"{task_id}_{safe_original_filename}"
downloaded_file_path = download_file(file_url, DOWNLOADED_FILES_DIR, filename=unique_filename)
if not downloaded_file_path:
print(f"Error: Failed to download the associated file for task {task_id} from {file_url}.")
return "Error: Failed to download the associated file."
print(f"File downloaded to: {downloaded_file_path}")
file_info = self.file_identifier.identify_file(downloaded_file_path)
print(f"File info for {downloaded_file_path}: {file_info}")
if file_info.get("error"):
return f"Error processing file: {file_info['error']}"
if file_info["determined_type"] == "audio" and file_info["suggested_action"] == "speech-to-text":
print(f"File {downloaded_file_path} identified as audio, attempting transcription...")
transcribed_text = transcribe_audio(downloaded_file_path)
if "Error during transcription" in transcribed_text: # Basic error check
print(f"Transcription error for {downloaded_file_path}: {transcribed_text}")
return f"Could not transcribe audio: {transcribed_text}"
# Placeholder: Use the question and transcribed_text to form an answer
# In a real agent, you'd use an LLM or other logic here.
answer = f"The audio file says: {transcribed_text[:200]}... (This is a placeholder answer based on transcription)"
print(f"Returning answer based on audio: {answer}")
return answer
# Add more conditions for other file types and actions
# elif file_info["determined_type"] == "spreadsheet":
# # Call your spreadsheet parser
# # data = self.spreadsheet_parser.parse(downloaded_file_path)
# # answer = self.reason_about_spreadsheet(question_text, data)
# # return answer
# pass
# elif file_info["determined_type"] == "image":
# # Call your OCR/vision tool
# # details = self.ocr_tool.analyze(downloaded_file_path)
# # answer = self.reason_about_image(question_text, details)
# # return answer
# pass
else:
warning_msg = f"File type '{file_info['determined_type']}' (action: '{file_info['suggested_action']}') not yet handled for file: {os.path.basename(downloaded_file_path)}."
print(warning_msg)
# Fallback if file type is known but not handled, or if it's an unknown type
# You might still try to answer the question if it doesn't strictly depend on the file content.
return f"File received, but type '{file_info['determined_type']}' is not yet processed by this agent. Question: {question_text}"
# Fallback or question-only processing (if no file_url or file not handled)
# This is where you'd put logic for questions that don't involve files,
# or if a file was present but not processable by the current tools.
# For GAIA, many questions will have files.
if question_text:
# Placeholder for LLM call or other reasoning for text-only questions
default_answer = f"Received question: '{question_text}'. No specific file action taken or file not processable. (Default Response)"
print(f"Agent returning default text-based answer: {default_answer}")
return default_answer
else:
# Should not happen if GAIA questions always have text or file
return "No question text provided and no file processed."
def run_and_submit_all(profile: gr.OAuthProfile | None):
"""
Fetches all questions, runs the BasicAgent on them, submits all answers,
and displays the results.
"""
space_id = os.getenv("SPACE_ID")
if profile:
username = f"{profile.username}"
print(f"User logged in: {username}")
else:
print("User not logged in.")
return "Please Login to Hugging Face with the button.", None
api_url = DEFAULT_API_URL
questions_url = f"{api_url}/questions"
submit_url = f"{api_url}/submit"
try:
agent = BasicAgent()
except Exception as e:
print(f"Error instantiating agent: {e}")
return f"Error initializing agent: {e}", None
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" if space_id else "local_run_no_space_id"
print(f"Agent code reference: {agent_code}")
print(f"Fetching questions from: {questions_url}")
try:
response = requests.get(questions_url, timeout=30) # Increased timeout
response.raise_for_status()
questions_data = response.json()
if not questions_data:
print("Fetched questions list is empty.")
return "Fetched questions list is empty or invalid format.", None
print(f"Fetched {len(questions_data)} questions.")
except requests.exceptions.RequestException as e:
print(f"Error fetching questions: {e}")
return f"Error fetching questions: {e}", None
except requests.exceptions.JSONDecodeError as e:
print(f"Error decoding JSON response from questions endpoint: {e}")
print(f"Response text (first 500 chars): {response.text[:500] if response else 'No response object'}")
return f"Error decoding server response for questions: {e}", None
except Exception as e:
print(f"An unexpected error occurred fetching questions: {e}")
return f"An unexpected error occurred fetching questions: {e}", None
results_log = []
answers_payload = []
print(f"Running agent on {len(questions_data)} questions...")
for item in questions_data:
task_id = item.get("task_id")
question_text = item.get("question") # Can be None if file_url is primary
file_url = item.get("file_url")
if not task_id:
print(f"Skipping item with missing task_id: {item}")
continue
# Prepare the input for the agent's __call__ method
agent_input_data = {"task_id": task_id, "question": question_text, "file_url": file_url}
try:
submitted_answer = agent(agent_input_data)
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer})
results_log.append({
"Task ID": task_id,
"Question": question_text if question_text else "N/A (File-based question)",
"File URL": file_url if file_url else "N/A",
"Submitted Answer": submitted_answer
})
except Exception as e:
print(f"Error running agent on task {task_id}: {e}")
results_log.append({
"Task ID": task_id,
"Question": question_text if question_text else "N/A (File-based question)",
"File URL": file_url if file_url else "N/A",
"Submitted Answer": f"AGENT ERROR: {e}"
})
if not answers_payload:
print("Agent did not produce any answers to submit.")
# Still return results_log if it has entries (e.g. all agent errors)
results_df = pd.DataFrame(results_log) if results_log else None
return "Agent did not produce any answers to submit.", results_df
submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload}
status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..."
print(status_update)
print(f"Submitting {len(answers_payload)} answers to: {submit_url}")
try:
response = requests.post(submit_url, json=submission_data, timeout=60)
response.raise_for_status()
result_data = response.json()
final_status = (
f"Submission Successful!\n"
f"User: {result_data.get('username')}\n"
f"Overall Score: {result_data.get('score', 'N/A')}% "
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n"
f"Message: {result_data.get('message', 'No message received.')}"
)
print("Submission successful.")
results_df = pd.DataFrame(results_log)
return final_status, results_df
except requests.exceptions.HTTPError as e:
error_detail = f"Server responded with status {e.response.status_code}."
try:
error_json = e.response.json()
error_detail += f" Detail: {error_json.get('detail', e.response.text)}"
except requests.exceptions.JSONDecodeError:
error_detail += f" Response (first 500 chars): {e.response.text[:500]}"
status_message = f"Submission Failed: {error_detail}"
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
except requests.exceptions.Timeout:
status_message = "Submission Failed: The request timed out."
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
except requests.exceptions.RequestException as e:
status_message = f"Submission Failed: Network error - {e}"
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
except Exception as e:
status_message = f"An unexpected error occurred during submission: {e}"
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
# --- Build Gradio Interface using Blocks ---
with gr.Blocks() as demo:
gr.Markdown("# GAIA Benchmark Agent Runner")
gr.Markdown(
"""
**Instructions:**
1. Clone this space, then modify `src/` files (especially `BasicAgent` in `app.py`, and tool implementations) to define your agent's logic.
2. Log in to your Hugging Face account using the button below. This uses your HF username for submission.
3. Click 'Run Evaluation & Submit All Answers' to fetch questions, run your agent, submit answers, and see the score.
---
**Notes:**
- The agent's processing can take time, especially with file downloads and model inferences.
- This is a basic framework. For more complex agents, consider asynchronous operations, caching, and more robust error handling.
"""
)
gr.LoginButton()
run_button = gr.Button("Run Evaluation & Submit All Answers")
status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False)
results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True, interactive=False) # Set interactive=False for display
run_button.click(
fn=run_and_submit_all,
inputs=None, # LoginButton provides profile implicitly if used as input, but here it's handled by checking profile in the function
outputs=[status_output, results_table]
)
if __name__ == "__main__":
print("\n" + "-"*30 + " App Starting " + "-"*30)
space_host_startup = os.getenv("SPACE_HOST")
space_id_startup = os.getenv("SPACE_ID")
if space_host_startup:
print(f"✅ SPACE_HOST found: {space_host_startup}")
print(f" Runtime URL should be: https://{space_host_startup}.hf.space")
else:
print("ℹ️ SPACE_HOST environment variable not found (likely running locally).")
if space_id_startup:
print(f"✅ SPACE_ID found: {space_id_startup}")
print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}")
print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main")
else:
print("ℹ️ SPACE_ID environment variable not found (likely running locally). Repo URL cannot be determined.")
print("-"*(60 + len(" App Starting ")) + "\n")
print("Launching Gradio Interface for GAIA Agent Evaluation...")
# Set server_name and server_port for local development if needed, e.g. demo.launch(server_name="0.0.0.0", server_port=7860)
# For Hugging Face Spaces, share=True is often handled by the platform.
# debug=True is useful for development.
demo.launch(debug=True)