|
import os |
|
import gradio as gr |
|
import requests |
|
import inspect |
|
import pandas as pd |
|
import agents |
|
from PIL import Image |
|
from io import BytesIO |
|
import whisper |
|
|
|
|
|
|
|
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" |
|
|
|
|
|
|
|
|
|
agent = None |
|
|
|
def select_agent(provider_name:str, model_name: str): |
|
""" |
|
Selects the agent based on the provided name. |
|
:param agent_name: Name of the agent to select. |
|
:return: The selected agent instance. |
|
""" |
|
global agent |
|
try: |
|
agent = agents.get_agent(model_name=model_name, model_type=provider_name) |
|
if agent is None: |
|
print(f"Agent not found for provider: {provider_name} and model: {model_name}") |
|
agent = BasicAgent() |
|
except Exception as e: |
|
print(f"Error selecting agent: {e}") |
|
agent = BasicAgent() |
|
|
|
print(f"Agent selected: {agent.model}") |
|
agent_info_text.value = get_agent_info() |
|
return agent |
|
|
|
|
|
def get_agent_info() -> str: |
|
global agent |
|
if (agent is None): |
|
return "No agent selected." |
|
try: |
|
|
|
agent_class_name = agent.__class__.__name__ |
|
|
|
model_name = agent.model |
|
|
|
docstring = inspect.getdoc(agent) |
|
|
|
info = f"Agent Class: {agent_class_name}\nModel Name: {model_name}\nDocstring: {docstring}" |
|
return info |
|
except Exception as e: |
|
print(f"Error getting agent info: {e}") |
|
return "Error getting agent info." |
|
|
|
|
|
|
|
|
|
class BasicAgent: |
|
def __init__(self): |
|
print("BasicAgent initialized.") |
|
def __call__(self, question: str) -> str: |
|
print(f"Agent received question (first 50 chars): {question[:50]}...") |
|
fixed_answer = "This is a default answer." |
|
print(f"Agent returning fixed answer: {fixed_answer}") |
|
return fixed_answer |
|
|
|
|
|
def get_all_questions(): |
|
""" |
|
Fetches all available questions from the API. |
|
""" |
|
yield from run_test_on_questions(False, False) |
|
|
|
def run_test_on_all_questions(): |
|
""" |
|
Runs tests on all available questions by forwarding yields from run_test_on_questions. |
|
""" |
|
yield from run_test_on_questions(False, True) |
|
|
|
def run_test_on_random_question(): |
|
""" |
|
Runs a single test on a random available question by forwarding yields from run_test_on_questions. |
|
""" |
|
yield from run_test_on_questions(True, True) |
|
|
|
|
|
def run_test_on_questions(use_random_question: bool, run_agent:bool): |
|
""" |
|
Fetches all questions, runs the BasicAgent on them, submits all answers, |
|
and displays the results. |
|
""" |
|
|
|
global agent |
|
api_url = DEFAULT_API_URL |
|
questions_url = f"{api_url}/random-question" if use_random_question else f"{api_url}/questions" |
|
|
|
|
|
|
|
info = "# started request" |
|
yield info, None |
|
|
|
print(f"Fetching questions from: {questions_url}") |
|
try: |
|
response = requests.get(questions_url, timeout=15) |
|
response.raise_for_status() |
|
questions_dataset_raw = response.json() |
|
questions_dataset = [questions_dataset_raw] if use_random_question else questions_dataset_raw |
|
yield info, None |
|
if not questions_dataset: |
|
print("Fetched questions list is empty.") |
|
yield info +"\n\nFetched questions list is empty or invalid format.", None |
|
return |
|
print(f"Fetched {len(questions_dataset)} questions.") |
|
except requests.exceptions.RequestException as e: |
|
print(f"Error fetching questions: {e}") |
|
yield f"Error fetching questions: {e}", None |
|
return |
|
except requests.exceptions.JSONDecodeError as e: |
|
print(f"Error decoding JSON response from questions endpoint: {e}") |
|
print(f"Response text: {response.text[:500]}") |
|
yield f"Error decoding server response for questions: {e}", None |
|
return |
|
except Exception as e: |
|
print(f"An unexpected error occurred fetching questions: {e}") |
|
yield f"An unexpected error occurred fetching questions: {e}", None |
|
return |
|
|
|
|
|
results_log = [] |
|
answers_payload = [] |
|
|
|
for i, questions_data in enumerate(questions_dataset): |
|
|
|
agent.memory.reset() |
|
images = [] |
|
task_id = questions_data.get("task_id") |
|
question_text = questions_data.get("question") |
|
file_name = questions_data.get("file_name") |
|
if (file_name != "" and file_name is not None): |
|
question_text = question_text + f"\n\nYou can download the correspondig file using the download tool with the task id: {task_id}." |
|
fileData = requests.get(f"{DEFAULT_API_URL}/files/{task_id}") |
|
|
|
if fileData.headers['Content-Type'] in ['image/png', 'image/jpeg']: |
|
image = Image.open(BytesIO(fileData.content)).convert("RGB") |
|
images = [image] |
|
if fileData.headers['Content-Type'] in ['audio/mpeg', 'audio/wav']: |
|
|
|
model = whisper.load_model("base") |
|
|
|
with open("temp_audio.mp3", "wb") as f: |
|
f.write(fileData.content) |
|
|
|
|
|
audioContent = model.transcribe("temp_audio.mp3") |
|
question_text = question_text + f"\n\nTranscription: {audioContent['text']}" |
|
info += f"\n\nRunning agent on question {i+1}/{len(questions_dataset)}:\n - task_id: {task_id}\n - question: {question_text}" |
|
yield info, None |
|
if not task_id or question_text is None: |
|
yield info+ f"\nError in question data: {questions_data}", None |
|
return |
|
try: |
|
submitted_answer = agent.run(question_text, images=images) if run_agent else "-- no agent interaction --" |
|
info += f"\n - got answer {submitted_answer}" |
|
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) |
|
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer, "FileInfo": file_name}) |
|
except Exception as e: |
|
print(f"Error running agent on task {task_id}: {e}") |
|
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}", "FileInfo": file_name}) |
|
|
|
if not answers_payload: |
|
print("Agent did not produce any answers.") |
|
yield info + "\nAgent did not produce any answers.", pd.DataFrame(results_log) |
|
return |
|
|
|
|
|
|
|
try: |
|
results_df = pd.DataFrame(results_log) |
|
yield info + "\nGot an answer from agent", results_df |
|
except Exception as e: |
|
status_message = f"An unexpected error occurred during submission: {e}" |
|
print(status_message) |
|
results_df = pd.DataFrame(results_log) |
|
yield status_message, results_df |
|
return |
|
|
|
|
|
|
|
def run_and_submit_all( profile: gr.OAuthProfile | None): |
|
""" |
|
Fetches all questions, runs the BasicAgent on them, submits all answers, |
|
and displays the results. |
|
""" |
|
|
|
return "We are not there yet", None |
|
|
|
space_id = os.getenv("SPACE_ID") |
|
|
|
if profile: |
|
username= f"{profile.username}" |
|
print(f"User logged in: {username}") |
|
else: |
|
print("User not logged in.") |
|
return "Please Login to Hugging Face with the button.", None |
|
|
|
api_url = DEFAULT_API_URL |
|
questions_url = f"{api_url}/questions" |
|
submit_url = f"{api_url}/submit" |
|
|
|
|
|
try: |
|
agent = BasicAgent() |
|
except Exception as e: |
|
print(f"Error instantiating agent: {e}") |
|
return f"Error initializing agent: {e}", None |
|
|
|
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" |
|
print(agent_code) |
|
|
|
|
|
print(f"Fetching questions from: {questions_url}") |
|
try: |
|
response = requests.get(questions_url, timeout=15) |
|
response.raise_for_status() |
|
questions_data = response.json() |
|
if not questions_data: |
|
print("Fetched questions list is empty.") |
|
return "Fetched questions list is empty or invalid format.", None |
|
print(f"Fetched {len(questions_data)} questions.") |
|
except requests.exceptions.RequestException as e: |
|
print(f"Error fetching questions: {e}") |
|
return f"Error fetching questions: {e}", None |
|
except requests.exceptions.JSONDecodeError as e: |
|
print(f"Error decoding JSON response from questions endpoint: {e}") |
|
print(f"Response text: {response.text[:500]}") |
|
return f"Error decoding server response for questions: {e}", None |
|
except Exception as e: |
|
print(f"An unexpected error occurred fetching questions: {e}") |
|
return f"An unexpected error occurred fetching questions: {e}", None |
|
|
|
|
|
results_log = [] |
|
answers_payload = [] |
|
print(f"Running agent on {len(questions_data)} questions...") |
|
for item in questions_data: |
|
task_id = item.get("task_id") |
|
question_text = item.get("question") |
|
if not task_id or question_text is None: |
|
print(f"Skipping item with missing task_id or question: {item}") |
|
continue |
|
try: |
|
submitted_answer = agent(question_text) |
|
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) |
|
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer}) |
|
except Exception as e: |
|
print(f"Error running agent on task {task_id}: {e}") |
|
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"}) |
|
|
|
if not answers_payload: |
|
print("Agent did not produce any answers to submit.") |
|
return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) |
|
|
|
|
|
submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} |
|
status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." |
|
print(status_update) |
|
|
|
|
|
print(f"Submitting {len(answers_payload)} answers to: {submit_url}") |
|
try: |
|
response = requests.post(submit_url, json=submission_data, timeout=60) |
|
response.raise_for_status() |
|
result_data = response.json() |
|
final_status = ( |
|
f"Submission Successful!\n" |
|
f"User: {result_data.get('username')}\n" |
|
f"Overall Score: {result_data.get('score', 'N/A')}% " |
|
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" |
|
f"Message: {result_data.get('message', 'No message received.')}" |
|
) |
|
print("Submission successful.") |
|
results_df = pd.DataFrame(results_log) |
|
return final_status, results_df |
|
except requests.exceptions.HTTPError as e: |
|
error_detail = f"Server responded with status {e.response.status_code}." |
|
try: |
|
error_json = e.response.json() |
|
error_detail += f" Detail: {error_json.get('detail', e.response.text)}" |
|
except requests.exceptions.JSONDecodeError: |
|
error_detail += f" Response: {e.response.text[:500]}" |
|
status_message = f"Submission Failed: {error_detail}" |
|
print(status_message) |
|
results_df = pd.DataFrame(results_log) |
|
return status_message, results_df |
|
except requests.exceptions.Timeout: |
|
status_message = "Submission Failed: The request timed out." |
|
print(status_message) |
|
results_df = pd.DataFrame(results_log) |
|
return status_message, results_df |
|
except requests.exceptions.RequestException as e: |
|
status_message = f"Submission Failed: Network error - {e}" |
|
print(status_message) |
|
results_df = pd.DataFrame(results_log) |
|
return status_message, results_df |
|
except Exception as e: |
|
status_message = f"An unexpected error occurred during submission: {e}" |
|
print(status_message) |
|
results_df = pd.DataFrame(results_log) |
|
return status_message, results_df |
|
|
|
|
|
def fetch_ollama_models() -> list: |
|
""" |
|
Fetches available models from the Ollama server. |
|
:return: List of available models. |
|
""" |
|
try: |
|
response = requests.get("http://localhost:11434/api/tags") |
|
response.raise_for_status() |
|
data = response.json() |
|
return [model["name"] for model in data["models"]] |
|
except requests.exceptions.RequestException as e: |
|
print(f"Error fetching Ollama models: {e}") |
|
return ["None"] |
|
def fetch_lmstudio_models() -> list: |
|
""" |
|
Fetches available models from the LM Studio server. |
|
:return: List of available models. |
|
""" |
|
try: |
|
response = requests.get("http://localhost:1234/v1/models") |
|
response.raise_for_status() |
|
data = response.json() |
|
return [model["id"] for model in data["data"]] |
|
except requests.exceptions.RequestException as e: |
|
print(f"Error fetching LM Studio models: {e}") |
|
return ["None"] |
|
|
|
|
|
available_models = ["None"] |
|
|
|
def update_available_models(provider:str): |
|
""" |
|
Fetches available models based on the selected provider. |
|
:param provider: The selected provider name. |
|
:return: Update object for the model dropdown. |
|
""" |
|
global available_models |
|
print(f"Selected provider: {provider}") |
|
|
|
match provider: |
|
case "hugging face": |
|
available_models = ["None", "Qwen/Qwen2.5-Coder-32B-Instruct", "Qwen/Qwen2.5-Omni-7B"] |
|
case "Ollama": |
|
available_models = fetch_ollama_models() |
|
case "LMStudio": |
|
available_models = fetch_lmstudio_models() |
|
case "Gemini": |
|
available_models = ["None", "Gemini-2.0-flash-exp", "Gemini-2.0-flash-lite"] |
|
case "Anthropic": |
|
available_models = ["None", "claude-3"] |
|
case "OpenAI": |
|
available_models = ["None", "gpt-4o", "gpt-3.5-turbo"] |
|
case "Basic Agent": |
|
available_models = ["None"] |
|
case _: |
|
available_models = ["None"] |
|
|
|
print(f"Available models for {provider}: {available_models}") |
|
|
|
|
|
return gr.Dropdown(choices=available_models) |
|
|
|
|
|
|
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown("# Basic Agent Evaluation Runner") |
|
|
|
agent_info_text = gr.Text(label="Agent Name", value=get_agent_info(), interactive=False, visible=True) |
|
|
|
gr.Markdown( |
|
""" |
|
**Instructions:** |
|
|
|
Select a provider and then model to generate the agent. |
|
""" |
|
) |
|
|
|
provider_select = gr.Dropdown( |
|
label="Select Provider", |
|
choices=["Basic Agent", "LMStudio", "Ollama", "hugging face", "Gemini", "Anthropic", "OpenAI"], |
|
interactive=True, |
|
visible=True, |
|
multiselect=False) |
|
|
|
model_select = gr.Dropdown( |
|
label="Select Model", |
|
choices=available_models, |
|
interactive=True, |
|
visible=True, |
|
multiselect=False) |
|
|
|
|
|
provider_select.input(fn=update_available_models, inputs=provider_select, outputs=[model_select]) |
|
|
|
|
|
model_select.change(fn=select_agent, inputs=[provider_select, model_select]) |
|
|
|
|
|
|
|
gr.LoginButton() |
|
|
|
run_button = gr.Button("Run Evaluation & Submit All Answers") |
|
|
|
run_test_button = gr.Button("Run Test on Random Question") |
|
|
|
run_multiple_tests_button = gr.Button("Run tests on all questions") |
|
|
|
run_get_questions_button = gr.Button("Get Questions") |
|
|
|
status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) |
|
|
|
results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) |
|
|
|
run_test_button.click( |
|
fn=run_test_on_random_question, |
|
outputs=[status_output, results_table] |
|
) |
|
|
|
run_multiple_tests_button.click( |
|
fn=run_test_on_all_questions, |
|
outputs=[status_output, results_table] |
|
) |
|
|
|
run_button.click( |
|
fn=run_and_submit_all, |
|
outputs=[status_output, results_table] |
|
) |
|
|
|
run_get_questions_button.click( |
|
fn=get_all_questions, |
|
outputs=[status_output, results_table] |
|
) |
|
|
|
if __name__ == "__main__": |
|
print("\n" + "-"*30 + " App Starting " + "-"*30) |
|
|
|
space_host_startup = os.getenv("SPACE_HOST") |
|
space_id_startup = os.getenv("SPACE_ID") |
|
|
|
if space_host_startup: |
|
print(f"✅ SPACE_HOST found: {space_host_startup}") |
|
print(f" Runtime URL should be: https://{space_host_startup}.hf.space") |
|
else: |
|
print("ℹ️ SPACE_HOST environment variable not found (running locally?).") |
|
|
|
if space_id_startup: |
|
print(f"✅ SPACE_ID found: {space_id_startup}") |
|
print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}") |
|
print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main") |
|
else: |
|
print("ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.") |
|
|
|
print("-"*(60 + len(" App Starting ")) + "\n") |
|
|
|
print("Launching Gradio Interface for Basic Agent Evaluation...") |
|
demo.launch(debug=True, share=False) |