Yago Bolivar
feat: integrate SpreadsheetTool for enhanced spreadsheet parsing and analysis in BasicAgent
450ecb4
import os | |
import gradio as gr | |
import requests | |
import inspect # Keep if you plan to use it for agent introspection later | |
import pandas as pd | |
from src.file_processing_tool import FileIdentifier | |
from src.speech_to_text import transcribe_audio | |
from src.download_utils import download_file | |
from src.spreadsheet_tool import SpreadsheetTool | |
# --- Constants --- | |
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
DOWNLOADED_FILES_DIR = "downloaded_task_files" # Directory to store downloaded files | |
# Ensure the directory for downloaded files exists when the app starts | |
os.makedirs(DOWNLOADED_FILES_DIR, exist_ok=True) | |
# --- Basic Agent Definition --- | |
# ----- THIS IS WHERE YOU CAN BUILD WHAT YOU WANT ------ | |
class BasicAgent: | |
def __init__(self): | |
print("BasicAgent initialized.") | |
self.file_identifier = FileIdentifier() | |
self.spreadsheet_tool = SpreadsheetTool() | |
self.speech_to_text_tool = transcribe_audio | |
# You might initialize other tools here if needed (e.g., spreadsheet parser, OCR) | |
def __call__(self, question_data: dict) -> str: | |
question_text = question_data.get("question") | |
file_url = question_data.get("file_url") | |
task_id = question_data.get("task_id", "unknown_task") # For unique file naming | |
print(f"Agent received task_id: {task_id}, question: {question_text}, file_url: {file_url}") | |
downloaded_file_path = None | |
if file_url: | |
print(f"File URL provided: {file_url}") | |
# Construct a unique filename | |
original_filename = file_url.split('/')[-1] if file_url else "file" | |
# Basic sanitization for filename | |
safe_original_filename = "".join(c for c in original_filename if c.isalnum() or c in ['.', '_', '-']).strip() | |
if not safe_original_filename: # Handle cases where sanitization leaves an empty string | |
safe_original_filename = "downloaded_file" | |
unique_filename = f"{task_id}_{safe_original_filename}" | |
downloaded_file_path = download_file(file_url, DOWNLOADED_FILES_DIR, filename=unique_filename) | |
if not downloaded_file_path: | |
print(f"Error: Failed to download the associated file for task {task_id} from {file_url}.") | |
return "Error: Failed to download the associated file." | |
print(f"File downloaded to: {downloaded_file_path}") | |
file_info = self.file_identifier.identify_file(downloaded_file_path) | |
print(f"File info for {downloaded_file_path}: {file_info}") | |
if file_info.get("error"): | |
return f"Error processing file: {file_info['error']}" | |
if file_info["determined_type"] == "audio" and file_info["suggested_action"] == "speech-to-text": | |
print(f"File {downloaded_file_path} identified as audio, attempting transcription...") | |
transcribed_text = transcribe_audio(downloaded_file_path) | |
if "Error during transcription" in transcribed_text: # Basic error check | |
print(f"Transcription error for {downloaded_file_path}: {transcribed_text}") | |
return f"Could not transcribe audio: {transcribed_text}" | |
# Placeholder: Use the question and transcribed_text to form an answer | |
# In a real agent, you'd use an LLM or other logic here. | |
answer = f"The audio file says: {transcribed_text[:200]}... (This is a placeholder answer based on transcription)" | |
print(f"Returning answer based on audio: {answer}") | |
return answer | |
# Add more conditions for other file types and actions | |
elif file_info["determined_type"] == "spreadsheet": | |
print(f"File {downloaded_file_path} identified as spreadsheet, parsing data...") | |
parsed_data = self.spreadsheet_tool.parse_spreadsheet(downloaded_file_path) | |
if parsed_data.get("error"): | |
return f"Error processing spreadsheet: {parsed_data['error']}" | |
# This is where you would integrate with your LLM for question answering | |
# For now, return a basic summary of the spreadsheet | |
sheet_summaries = [] | |
for sheet_name, info in parsed_data["summary"].items(): | |
sheet_summaries.append( | |
f"Sheet '{sheet_name}': {info['shape'][0]} rows, {info['shape'][1]} columns. " | |
f"Numeric columns: {', '.join(info['numeric_columns']) if info['numeric_columns'] else 'none'}" | |
) | |
answer = f"Spreadsheet analysis: {'; '.join(sheet_summaries)}" | |
return answer | |
# elif file_info["determined_type"] == "image": | |
# # Call your OCR/vision tool | |
# # details = self.ocr_tool.analyze(downloaded_file_path) | |
# # answer = self.reason_about_image(question_text, details) | |
# # return answer | |
# pass | |
else: | |
warning_msg = f"File type '{file_info['determined_type']}' (action: '{file_info['suggested_action']}') not yet handled for file: {os.path.basename(downloaded_file_path)}." | |
print(warning_msg) | |
# Fallback if file type is known but not handled, or if it's an unknown type | |
# You might still try to answer the question if it doesn't strictly depend on the file content. | |
return f"File received, but type '{file_info['determined_type']}' is not yet processed by this agent. Question: {question_text}" | |
# Fallback or question-only processing (if no file_url or file not handled) | |
# This is where you'd put logic for questions that don't involve files, | |
# or if a file was present but not processable by the current tools. | |
# For GAIA, many questions will have files. | |
if question_text: | |
# Placeholder for LLM call or other reasoning for text-only questions | |
default_answer = f"Received question: '{question_text}'. No specific file action taken or file not processable. (Default Response)" | |
print(f"Agent returning default text-based answer: {default_answer}") | |
return default_answer | |
else: | |
# Should not happen if GAIA questions always have text or file | |
return "No question text provided and no file processed." | |
def run_and_submit_all(profile: gr.OAuthProfile | None): | |
""" | |
Fetches all questions, runs the BasicAgent on them, submits all answers, | |
and displays the results. | |
""" | |
space_id = os.getenv("SPACE_ID") | |
if profile: | |
username = f"{profile.username}" | |
print(f"User logged in: {username}") | |
else: | |
print("User not logged in.") | |
return "Please Login to Hugging Face with the button.", None | |
api_url = DEFAULT_API_URL | |
questions_url = f"{api_url}/questions" | |
submit_url = f"{api_url}/submit" | |
try: | |
agent = BasicAgent() | |
except Exception as e: | |
print(f"Error instantiating agent: {e}") | |
return f"Error initializing agent: {e}", None | |
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" if space_id else "local_run_no_space_id" | |
print(f"Agent code reference: {agent_code}") | |
print(f"Fetching questions from: {questions_url}") | |
try: | |
response = requests.get(questions_url, timeout=30) # Increased timeout | |
response.raise_for_status() | |
questions_data = response.json() | |
if not questions_data: | |
print("Fetched questions list is empty.") | |
return "Fetched questions list is empty or invalid format.", None | |
print(f"Fetched {len(questions_data)} questions.") | |
except requests.exceptions.RequestException as e: | |
print(f"Error fetching questions: {e}") | |
return f"Error fetching questions: {e}", None | |
except requests.exceptions.JSONDecodeError as e: | |
print(f"Error decoding JSON response from questions endpoint: {e}") | |
print(f"Response text (first 500 chars): {response.text[:500] if response else 'No response object'}") | |
return f"Error decoding server response for questions: {e}", None | |
except Exception as e: | |
print(f"An unexpected error occurred fetching questions: {e}") | |
return f"An unexpected error occurred fetching questions: {e}", None | |
results_log = [] | |
answers_payload = [] | |
print(f"Running agent on {len(questions_data)} questions...") | |
for item in questions_data: | |
task_id = item.get("task_id") | |
question_text = item.get("question") # Can be None if file_url is primary | |
file_url = item.get("file_url") | |
if not task_id: | |
print(f"Skipping item with missing task_id: {item}") | |
continue | |
# Prepare the input for the agent's __call__ method | |
agent_input_data = {"task_id": task_id, "question": question_text, "file_url": file_url} | |
try: | |
submitted_answer = agent(agent_input_data) | |
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) | |
results_log.append({ | |
"Task ID": task_id, | |
"Question": question_text if question_text else "N/A (File-based question)", | |
"File URL": file_url if file_url else "N/A", | |
"Submitted Answer": submitted_answer | |
}) | |
except Exception as e: | |
print(f"Error running agent on task {task_id}: {e}") | |
results_log.append({ | |
"Task ID": task_id, | |
"Question": question_text if question_text else "N/A (File-based question)", | |
"File URL": file_url if file_url else "N/A", | |
"Submitted Answer": f"AGENT ERROR: {e}" | |
}) | |
if not answers_payload: | |
print("Agent did not produce any answers to submit.") | |
# Still return results_log if it has entries (e.g. all agent errors) | |
results_df = pd.DataFrame(results_log) if results_log else None | |
return "Agent did not produce any answers to submit.", results_df | |
submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} | |
status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." | |
print(status_update) | |
print(f"Submitting {len(answers_payload)} answers to: {submit_url}") | |
try: | |
response = requests.post(submit_url, json=submission_data, timeout=60) | |
response.raise_for_status() | |
result_data = response.json() | |
final_status = ( | |
f"Submission Successful!\n" | |
f"User: {result_data.get('username')}\n" | |
f"Overall Score: {result_data.get('score', 'N/A')}% " | |
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" | |
f"Message: {result_data.get('message', 'No message received.')}" | |
) | |
print("Submission successful.") | |
results_df = pd.DataFrame(results_log) | |
return final_status, results_df | |
except requests.exceptions.HTTPError as e: | |
error_detail = f"Server responded with status {e.response.status_code}." | |
try: | |
error_json = e.response.json() | |
error_detail += f" Detail: {error_json.get('detail', e.response.text)}" | |
except requests.exceptions.JSONDecodeError: | |
error_detail += f" Response (first 500 chars): {e.response.text[:500]}" | |
status_message = f"Submission Failed: {error_detail}" | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
except requests.exceptions.Timeout: | |
status_message = "Submission Failed: The request timed out." | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
except requests.exceptions.RequestException as e: | |
status_message = f"Submission Failed: Network error - {e}" | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
except Exception as e: | |
status_message = f"An unexpected error occurred during submission: {e}" | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
# --- Build Gradio Interface using Blocks --- | |
with gr.Blocks() as demo: | |
gr.Markdown("# GAIA Benchmark Agent Runner") | |
gr.Markdown( | |
""" | |
**Instructions:** | |
1. Clone this space, then modify `src/` files (especially `BasicAgent` in `app.py`, and tool implementations) to define your agent's logic. | |
2. Log in to your Hugging Face account using the button below. This uses your HF username for submission. | |
3. Click 'Run Evaluation & Submit All Answers' to fetch questions, run your agent, submit answers, and see the score. | |
--- | |
**Notes:** | |
- The agent's processing can take time, especially with file downloads and model inferences. | |
- This is a basic framework. For more complex agents, consider asynchronous operations, caching, and more robust error handling. | |
""" | |
) | |
gr.LoginButton() | |
run_button = gr.Button("Run Evaluation & Submit All Answers") | |
status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) | |
results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True, interactive=False) # Set interactive=False for display | |
run_button.click( | |
fn=run_and_submit_all, | |
inputs=None, # LoginButton provides profile implicitly if used as input, but here it's handled by checking profile in the function | |
outputs=[status_output, results_table] | |
) | |
if __name__ == "__main__": | |
print("\n" + "-"*30 + " App Starting " + "-"*30) | |
space_host_startup = os.getenv("SPACE_HOST") | |
space_id_startup = os.getenv("SPACE_ID") | |
if space_host_startup: | |
print(f"✅ SPACE_HOST found: {space_host_startup}") | |
print(f" Runtime URL should be: https://{space_host_startup}.hf.space") | |
else: | |
print("ℹ️ SPACE_HOST environment variable not found (likely running locally).") | |
if space_id_startup: | |
print(f"✅ SPACE_ID found: {space_id_startup}") | |
print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}") | |
print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main") | |
else: | |
print("ℹ️ SPACE_ID environment variable not found (likely running locally). Repo URL cannot be determined.") | |
print("-"*(60 + len(" App Starting ")) + "\n") | |
print("Launching Gradio Interface for GAIA Agent Evaluation...") | |
# Set server_name and server_port for local development if needed, e.g. demo.launch(server_name="0.0.0.0", server_port=7860) | |
# For Hugging Face Spaces, share=True is often handled by the platform. | |
# debug=True is useful for development. | |
demo.launch(debug=True) |