laverdes's picture
feat: tools update and system prompt
1188e65 verified
raw
history blame
13.1 kB
import os
import gradio as gr
import requests
import inspect
import pandas as pd
import json
import copy
from basic_agent import ToolAgent
from tools import (
smart_read_file,
search_and_extract,
search_and_extract_from_wikipedia,
aggregate_information,
extract_clean_text_from_url,
youtube_search_tool,
load_youtube_transcript,
get_audio_from_youtube,
image_query_tool,
transcribe_audio,
)
TOOLS = [
smart_read_file,
search_and_extract,
search_and_extract_from_wikipedia,
aggregate_information,
extract_clean_text_from_url,
youtube_search_tool,
load_youtube_transcript,
get_audio_from_youtube,
image_query_tool,
transcribe_audio,
]
tool_names = [tool.name if hasattr(tool, "name") else str(tool) for tool in TOOLS]
print(json.dumps(tool_names, indent=2))
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
TOOL_USE_SYS_PROMPT = """
You are a helpful AI assistant operating in a structured reasoning and action loop using the ReAct pattern.
Your reasoning loop consists of:
- Question: the input task you must solve
- Thought: Reflect on the task and decide what to do next.
- Action: Choose one of the following actions:
- Solve it directly using your own knowledge
- Break the problem into smaller steps
- Use a tool to get more information
- Action Input: Provide input for the selected action
- Observation: Record the result of the action and/or aggregate information from previous observations (summarize, count, analyse, ...).
(Repeat Thought/Action/Action Input/Observation as needed)
Terminate your loop with:
- Thought: I now know the final answer
- Final Answer: [your best answer to the original question]
**General Execution Rules:**
- If you can answer using only your trained knowledge, do so directly without using tools.
- If the question involves image content, use the `image_query_tool`:
- Action: image_query_tool
- Action Input: 'image_path': [image_path], 'question': [user's question about the image]
**Tool Use Constraints:**
- Never use any tool more than **3 consecutive times** without either:
- Aggregating the information received so far: you can call the `summarize_search_results` tool and analyze the tool outputs to answer the question.
- If you need more information, use a different tool or break the problem down further, but do not return a final answer yet.
- Do not exceed **5 total calls** to *search-type tools* per query (such as `search_and_extract`, `search_and_extract_from_wikipedia`, `extract_clean_text_from_url`).
- Do not ask the user for additional clarification or input. Work with only what is already provided.
**If you are unable to answer:**
- If neither your knowledge nor tool outputs yield useful information:
- Use the output tools the best you can to answer the question, even if it's not perfect.
If not, say:
> Final Answer: I could not find any useful information to answer your query.
- If the question is unanswerable due to lack of input (e.g., missing attachment) or is fundamentally outside your scope, say:
> Final Answer: I don't have the ability to answer this query: [brief reason]
Always aim to provide the **best and most complete** answer based on your trained knowledge and the tools available.
"""
def run_and_submit_all( profile: gr.OAuthProfile | None):
"""
Fetches all questions, runs the ToolAgent on them, submits all answers,
and displays the results.
"""
space_id = os.getenv("SPACE_ID")
if profile:
username= f"{profile.username}"
print(f"User logged in: {username}")
else:
print("User not logged in.")
return "Please Login to Hugging Face with the button.", None, gr.update(interactive=False)
api_url = DEFAULT_API_URL
questions_url = f"{api_url}/questions"
files_url = "{}/files/{}" # GET /files/{task_id}
submit_url = f"{api_url}/submit"
# 1. Instantiate Agent ( modify this part to create your agent)
try:
agent = ToolAgent(
tools=TOOLS,
backstory=TOOL_USE_SYS_PROMPT
)
agent.initialize()
except Exception as e:
print(f"Error instantiating agent: {e}")
return f"Error initializing agent: {e}", None, gr.update(interactive=False)
# In the case of an app running as a hugging Face space, this link points toward your codebase ( usefull for others so please keep it public)
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main"
print(agent_code)
# 2. Fetch Questions
print(f"Fetching questions from: {questions_url}")
try:
response = requests.get(questions_url, timeout=15)
response.raise_for_status()
questions_data = response.json()
if not questions_data:
print("Fetched questions list is empty.")
return "Fetched questions list is empty or invalid format.", None, gr.update(interactive=False)
print(f"Fetched {len(questions_data)} questions.")
except requests.exceptions.RequestException as e:
print(f"Error fetching questions: {e}")
return f"Error fetching questions: {e}", None, gr.update(interactive=False)
except requests.exceptions.JSONDecodeError as e:
print(f"Error decoding JSON response from questions endpoint: {e}")
print(f"Response text: {response.text[:500]}")
return f"Error decoding server response for questions: {e}", None, gr.update(interactive=False)
except Exception as e:
print(f"An unexpected error occurred fetching questions: {e}")
return f"An unexpected error occurred fetching questions: {e}", None, gr.update(interactive=False)
# 3. Run your Agent
results_log = []
answers_payload = []
print(f"Running agent on {len(questions_data)} questions...")
for item in questions_data:
task_id = item.get("task_id")
question_text = item.get("question")
question_level = item.get("Level")
question_file_name = item.get("file_name", None)
print("\nquestion level: ", question_level)
print("task_id: ", task_id)
print("question file_name: ", question_file_name)
if question_file_name:
file_url = files_url.format(api_url, task_id)
print("file_url: ", file_url)
file_response = requests.get(file_url, timeout=15)
file_response.raise_for_status()
print("file_response: ", file_response.content[0:50])
save_path = os.path.join("/tmp", question_file_name)
print("save_path: ", save_path)
with open(save_path, "wb") as f:
f.write(file_response.content)
print(f"✅ file saved to: {save_path}")
found= False
metadata = {}
for root, dirs, files in os.walk("/"):
if question_file_name in files:
file_path = os.path.join(root, question_file_name)
print("file found at: ", file_path)
metadata = {'image_path': file_path} if '.png' in question_file_name else {}
found=True
if question_file_name and not found:
print("FileNotFoundError: try making an api request to .files/ or ./static in the hf.space target (or check it manually first)")
break
if not task_id or question_text is None:
print(f"Skipping item with missing task_id or question: {item}")
continue
try:
q_data = {'query': question_text, 'metadata': metadata}
submitted_answer = agent(q_data) # todo: send more data (files)
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer})
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer})
except Exception as e:
print(f"Error running agent on task {task_id}: {e}")
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"})
if not answers_payload:
print("Agent did not produce any answers to submit.")
return "Agent did not produce any answers to submit.", pd.DataFrame(results_log), gr.update(interactive=False)
# 4. Prepare Submission
submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload}
status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..."
print(status_update)
# 5. Submit
print(f"Submitting {len(answers_payload)} answers to: {submit_url}")
try:
response = requests.post(submit_url, json=submission_data, timeout=60)
response.raise_for_status()
result_data = response.json()
log_file_dict = copy.deepcopy(results_log)
log_file_dict.append({'result_data': result_data})
with open("results_log.json", "w") as results_session_file:
json.dump(log_file_dict, results_session_file)
final_status = (
f"Submission Successful!\n"
f"User: {result_data.get('username')}\n"
f"Overall Score: {result_data.get('score', 'N/A')}% "
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n"
f"Message: {result_data.get('message', 'No message received.')}"
)
print("Submission successful.")
results_df = pd.DataFrame(results_log)
return final_status, results_df, gr.update(interactive=True)
except requests.exceptions.HTTPError as e:
error_detail = f"Server responded with status {e.response.status_code}."
try:
error_json = e.response.json()
error_detail += f" Detail: {error_json.get('detail', e.response.text)}"
except requests.exceptions.JSONDecodeError:
error_detail += f" Response: {e.response.text[:500]}"
status_message = f"Submission Failed: {error_detail}"
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df, gr.update(interactive=False)
except requests.exceptions.Timeout:
status_message = "Submission Failed: The request timed out."
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df, gr.update(interactive=False)
except requests.exceptions.RequestException as e:
status_message = f"Submission Failed: Network error - {e}"
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df, gr.update(interactive=False)
except Exception as e:
status_message = f"An unexpected error occurred during submission: {e}"
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df, gr.update(interactive=False)
def download_log():
return "results_log.json"
# Gradio App
with gr.Blocks() as demo:
gr.Markdown("# Basic Agent Evaluation Runner")
gr.LoginButton()
run_button = gr.Button("Run Evaluation & Submit All Answers")
download_button = gr.Button("Download Evaluation Log", interactive=False)
status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False)
results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True)
run_button.click(
fn=run_and_submit_all,
outputs=[status_output, results_table, download_button]
)
file_output = gr.File(label="Download Log File", visible=True)
download_button.click(
fn=download_log,
outputs=file_output
)
if __name__ == "__main__":
print("\n" + "-"*30 + " App Starting " + "-"*30)
space_host_startup = os.getenv("SPACE_HOST")
space_id_startup = os.getenv("SPACE_ID")
if space_host_startup:
print(f"✅ SPACE_HOST found: {space_host_startup}")
print(f" Runtime URL should be: https://{space_host_startup}.hf.space")
else:
print("ℹ️ SPACE_HOST environment variable not found (running locally?).")
if space_id_startup:
print(f"✅ SPACE_ID found: {space_id_startup}")
print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}")
print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main")
else:
print("ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.")
print("-"*(60 + len(" App Starting ")) + "\n")
print("Launching Gradio Interface for Basic Agent Evaluation...")
demo.launch(debug=True, share=False)