| | import os |
| | import gradio as gr |
| | import requests |
| | import inspect |
| | import pandas as pd |
| | import agents |
| | from PIL import Image |
| | from io import BytesIO |
| | import whisper |
| |
|
| | |
| | |
| | DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" |
| |
|
| | |
| | |
| |
|
| | agent = None |
| |
|
| | def select_agent(provider_name:str, model_name: str): |
| | """ |
| | Selects the agent based on the provided name. |
| | :param agent_name: Name of the agent to select. |
| | :return: The selected agent instance. |
| | """ |
| | global agent |
| | try: |
| | agent = agents.get_agent(model_name=model_name, model_type=provider_name) |
| | if agent is None: |
| | print(f"Agent not found for provider: {provider_name} and model: {model_name}") |
| | agent = BasicAgent() |
| | except Exception as e: |
| | print(f"Error selecting agent: {e}") |
| | agent = BasicAgent() |
| | |
| | print(f"Agent selected: {agent.model}") |
| | agent_info_text.value = get_agent_info() |
| | return agent |
| |
|
| |
|
| | def get_agent_info() -> str: |
| | global agent |
| | if (agent is None): |
| | return "No agent selected." |
| | try: |
| | |
| | agent_class_name = agent.__class__.__name__ |
| | |
| | model_name = agent.model |
| | |
| | docstring = inspect.getdoc(agent) |
| | |
| | info = f"Agent Class: {agent_class_name}\nModel Name: {model_name}\nDocstring: {docstring}" |
| | return info |
| | except Exception as e: |
| | print(f"Error getting agent info: {e}") |
| | return "Error getting agent info." |
| |
|
| |
|
| | |
| | |
| | class BasicAgent: |
| | def __init__(self): |
| | print("BasicAgent initialized.") |
| | def __call__(self, question: str) -> str: |
| | print(f"Agent received question (first 50 chars): {question[:50]}...") |
| | fixed_answer = "This is a default answer." |
| | print(f"Agent returning fixed answer: {fixed_answer}") |
| | return fixed_answer |
| |
|
| |
|
| | def get_all_questions(): |
| | """ |
| | Fetches all available questions from the API. |
| | """ |
| | yield from run_test_on_questions(False, False) |
| |
|
| | def run_test_on_all_questions(): |
| | """ |
| | Runs tests on all available questions by forwarding yields from run_test_on_questions. |
| | """ |
| | yield from run_test_on_questions(False, True) |
| |
|
| | def run_test_on_random_question(): |
| | """ |
| | Runs a single test on a random available question by forwarding yields from run_test_on_questions. |
| | """ |
| | yield from run_test_on_questions(True, True) |
| |
|
| |
|
| | def run_test_on_questions(use_random_question: bool, run_agent:bool): |
| | """ |
| | Fetches all questions, runs the BasicAgent on them, submits all answers, |
| | and displays the results. |
| | """ |
| |
|
| | global agent |
| | api_url = DEFAULT_API_URL |
| | questions_url = f"{api_url}/random-question" if use_random_question else f"{api_url}/questions" |
| |
|
| |
|
| | |
| | info = "# started request" |
| | yield info, None |
| | |
| | print(f"Fetching questions from: {questions_url}") |
| | try: |
| | response = requests.get(questions_url, timeout=15) |
| | response.raise_for_status() |
| | questions_dataset_raw = response.json() |
| | questions_dataset = [questions_dataset_raw] if use_random_question else questions_dataset_raw |
| | yield info, None |
| | if not questions_dataset: |
| | print("Fetched questions list is empty.") |
| | yield info +"\n\nFetched questions list is empty or invalid format.", None |
| | return |
| | print(f"Fetched {len(questions_dataset)} questions.") |
| | except requests.exceptions.RequestException as e: |
| | print(f"Error fetching questions: {e}") |
| | yield f"Error fetching questions: {e}", None |
| | return |
| | except requests.exceptions.JSONDecodeError as e: |
| | print(f"Error decoding JSON response from questions endpoint: {e}") |
| | print(f"Response text: {response.text[:500]}") |
| | yield f"Error decoding server response for questions: {e}", None |
| | return |
| | except Exception as e: |
| | print(f"An unexpected error occurred fetching questions: {e}") |
| | yield f"An unexpected error occurred fetching questions: {e}", None |
| | return |
| |
|
| | |
| | results_log = [] |
| | answers_payload = [] |
| | |
| | for i, questions_data in enumerate(questions_dataset): |
| |
|
| | agent.memory.reset() |
| | images = [] |
| | task_id = questions_data.get("task_id") |
| | question_text = questions_data.get("question") |
| | file_name = questions_data.get("file_name") |
| | if (file_name != "" and file_name is not None): |
| | question_text = question_text + f"\n\nYou can download the correspondig file using the download tool with the task id: {task_id}." |
| | fileData = requests.get(f"{DEFAULT_API_URL}/files/{task_id}") |
| | |
| | if fileData.headers['Content-Type'] in ['image/png', 'image/jpeg']: |
| | image = Image.open(BytesIO(fileData.content)).convert("RGB") |
| | images = [image] |
| | if fileData.headers['Content-Type'] in ['audio/mpeg', 'audio/wav']: |
| | |
| | model = whisper.load_model("base") |
| | |
| | with open("temp_audio.mp3", "wb") as f: |
| | f.write(fileData.content) |
| |
|
| | |
| | audioContent = model.transcribe("temp_audio.mp3") |
| | question_text = question_text + f"\n\nTranscription: {audioContent['text']}" |
| | info += f"\n\nRunning agent on question {i+1}/{len(questions_dataset)}:\n - task_id: {task_id}\n - question: {question_text}" |
| | yield info, None |
| | if not task_id or question_text is None: |
| | yield info+ f"\nError in question data: {questions_data}", None |
| | return |
| | try: |
| | submitted_answer = agent.run(question_text, images=images) if run_agent else "-- no agent interaction --" |
| | info += f"\n - got answer {submitted_answer}" |
| | answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) |
| | results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer, "FileInfo": file_name}) |
| | except Exception as e: |
| | print(f"Error running agent on task {task_id}: {e}") |
| | results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}", "FileInfo": file_name}) |
| |
|
| | if not answers_payload: |
| | print("Agent did not produce any answers.") |
| | yield info + "\nAgent did not produce any answers.", pd.DataFrame(results_log) |
| | return |
| |
|
| | |
| | |
| | try: |
| | results_df = pd.DataFrame(results_log) |
| | yield info + "\nGot an answer from agent", results_df |
| | except Exception as e: |
| | status_message = f"An unexpected error occurred during submission: {e}" |
| | print(status_message) |
| | results_df = pd.DataFrame(results_log) |
| | yield status_message, results_df |
| | return |
| |
|
| |
|
| |
|
| | def run_and_submit_all( profile: gr.OAuthProfile | None): |
| | """ |
| | Fetches all questions, runs the BasicAgent on them, submits all answers, |
| | and displays the results. |
| | """ |
| |
|
| | return "We are not there yet", None |
| | |
| | space_id = os.getenv("SPACE_ID") |
| |
|
| | if profile: |
| | username= f"{profile.username}" |
| | print(f"User logged in: {username}") |
| | else: |
| | print("User not logged in.") |
| | return "Please Login to Hugging Face with the button.", None |
| |
|
| | api_url = DEFAULT_API_URL |
| | questions_url = f"{api_url}/questions" |
| | submit_url = f"{api_url}/submit" |
| |
|
| | |
| | try: |
| | agent = BasicAgent() |
| | except Exception as e: |
| | print(f"Error instantiating agent: {e}") |
| | return f"Error initializing agent: {e}", None |
| | |
| | agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" |
| | print(agent_code) |
| |
|
| | |
| | print(f"Fetching questions from: {questions_url}") |
| | try: |
| | response = requests.get(questions_url, timeout=15) |
| | response.raise_for_status() |
| | questions_data = response.json() |
| | if not questions_data: |
| | print("Fetched questions list is empty.") |
| | return "Fetched questions list is empty or invalid format.", None |
| | print(f"Fetched {len(questions_data)} questions.") |
| | except requests.exceptions.RequestException as e: |
| | print(f"Error fetching questions: {e}") |
| | return f"Error fetching questions: {e}", None |
| | except requests.exceptions.JSONDecodeError as e: |
| | print(f"Error decoding JSON response from questions endpoint: {e}") |
| | print(f"Response text: {response.text[:500]}") |
| | return f"Error decoding server response for questions: {e}", None |
| | except Exception as e: |
| | print(f"An unexpected error occurred fetching questions: {e}") |
| | return f"An unexpected error occurred fetching questions: {e}", None |
| |
|
| | |
| | results_log = [] |
| | answers_payload = [] |
| | print(f"Running agent on {len(questions_data)} questions...") |
| | for item in questions_data: |
| | task_id = item.get("task_id") |
| | question_text = item.get("question") |
| | if not task_id or question_text is None: |
| | print(f"Skipping item with missing task_id or question: {item}") |
| | continue |
| | try: |
| | submitted_answer = agent(question_text) |
| | answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) |
| | results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer}) |
| | except Exception as e: |
| | print(f"Error running agent on task {task_id}: {e}") |
| | results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"}) |
| |
|
| | if not answers_payload: |
| | print("Agent did not produce any answers to submit.") |
| | return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) |
| |
|
| | |
| | submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} |
| | status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." |
| | print(status_update) |
| |
|
| | |
| | print(f"Submitting {len(answers_payload)} answers to: {submit_url}") |
| | try: |
| | response = requests.post(submit_url, json=submission_data, timeout=60) |
| | response.raise_for_status() |
| | result_data = response.json() |
| | final_status = ( |
| | f"Submission Successful!\n" |
| | f"User: {result_data.get('username')}\n" |
| | f"Overall Score: {result_data.get('score', 'N/A')}% " |
| | f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" |
| | f"Message: {result_data.get('message', 'No message received.')}" |
| | ) |
| | print("Submission successful.") |
| | results_df = pd.DataFrame(results_log) |
| | return final_status, results_df |
| | except requests.exceptions.HTTPError as e: |
| | error_detail = f"Server responded with status {e.response.status_code}." |
| | try: |
| | error_json = e.response.json() |
| | error_detail += f" Detail: {error_json.get('detail', e.response.text)}" |
| | except requests.exceptions.JSONDecodeError: |
| | error_detail += f" Response: {e.response.text[:500]}" |
| | status_message = f"Submission Failed: {error_detail}" |
| | print(status_message) |
| | results_df = pd.DataFrame(results_log) |
| | return status_message, results_df |
| | except requests.exceptions.Timeout: |
| | status_message = "Submission Failed: The request timed out." |
| | print(status_message) |
| | results_df = pd.DataFrame(results_log) |
| | return status_message, results_df |
| | except requests.exceptions.RequestException as e: |
| | status_message = f"Submission Failed: Network error - {e}" |
| | print(status_message) |
| | results_df = pd.DataFrame(results_log) |
| | return status_message, results_df |
| | except Exception as e: |
| | status_message = f"An unexpected error occurred during submission: {e}" |
| | print(status_message) |
| | results_df = pd.DataFrame(results_log) |
| | return status_message, results_df |
| |
|
| |
|
| | def fetch_ollama_models() -> list: |
| | """ |
| | Fetches available models from the Ollama server. |
| | :return: List of available models. |
| | """ |
| | try: |
| | response = requests.get("http://localhost:11434/api/tags") |
| | response.raise_for_status() |
| | data = response.json() |
| | return [model["name"] for model in data["models"]] |
| | except requests.exceptions.RequestException as e: |
| | print(f"Error fetching Ollama models: {e}") |
| | return ["None"] |
| | def fetch_lmstudio_models() -> list: |
| | """ |
| | Fetches available models from the LM Studio server. |
| | :return: List of available models. |
| | """ |
| | try: |
| | response = requests.get("http://localhost:1234/v1/models") |
| | response.raise_for_status() |
| | data = response.json() |
| | return [model["id"] for model in data["data"]] |
| | except requests.exceptions.RequestException as e: |
| | print(f"Error fetching LM Studio models: {e}") |
| | return ["None"] |
| |
|
| |
|
| | available_models = ["None"] |
| |
|
| | def update_available_models(provider:str): |
| | """ |
| | Fetches available models based on the selected provider. |
| | :param provider: The selected provider name. |
| | :return: Update object for the model dropdown. |
| | """ |
| | global available_models |
| | print(f"Selected provider: {provider}") |
| | |
| | match provider: |
| | case "hugging face": |
| | available_models = ["None", "Qwen/Qwen2.5-Coder-32B-Instruct", "Qwen/Qwen2.5-Omni-7B"] |
| | case "Ollama": |
| | available_models = fetch_ollama_models() |
| | case "LMStudio": |
| | available_models = fetch_lmstudio_models() |
| | case "Gemini": |
| | available_models = ["None", "Gemini-2.0-flash-exp", "Gemini-2.0-flash-lite"] |
| | case "Anthropic": |
| | available_models = ["None", "claude-3"] |
| | case "OpenAI": |
| | available_models = ["None", "gpt-4o", "gpt-3.5-turbo"] |
| | case "Basic Agent": |
| | available_models = ["None"] |
| | case _: |
| | available_models = ["None"] |
| | |
| | print(f"Available models for {provider}: {available_models}") |
| |
|
| |
|
| | return gr.Dropdown(choices=available_models) |
| |
|
| |
|
| |
|
| | |
| | with gr.Blocks() as demo: |
| | gr.Markdown("# Basic Agent Evaluation Runner") |
| |
|
| | agent_info_text = gr.Text(label="Agent Name", value=get_agent_info(), interactive=False, visible=True) |
| |
|
| | gr.Markdown( |
| | """ |
| | **Instructions:** |
| | |
| | Select a provider and then model to generate the agent. |
| | """ |
| | ) |
| |
|
| | provider_select = gr.Dropdown( |
| | label="Select Provider", |
| | choices=["Basic Agent", "LMStudio", "Ollama", "hugging face", "Gemini", "Anthropic", "OpenAI"], |
| | interactive=True, |
| | visible=True, |
| | multiselect=False) |
| |
|
| | model_select = gr.Dropdown( |
| | label="Select Model", |
| | choices=available_models, |
| | interactive=True, |
| | visible=True, |
| | multiselect=False) |
| |
|
| | |
| | provider_select.input(fn=update_available_models, inputs=provider_select, outputs=[model_select]) |
| | |
| | |
| | model_select.change(fn=select_agent, inputs=[provider_select, model_select]) |
| |
|
| | |
| | |
| | gr.LoginButton() |
| |
|
| | run_button = gr.Button("Run Evaluation & Submit All Answers") |
| |
|
| | run_test_button = gr.Button("Run Test on Random Question") |
| |
|
| | run_multiple_tests_button = gr.Button("Run tests on all questions") |
| |
|
| | run_get_questions_button = gr.Button("Get Questions") |
| |
|
| | status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) |
| | |
| | results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) |
| |
|
| | run_test_button.click( |
| | fn=run_test_on_random_question, |
| | outputs=[status_output, results_table] |
| | ) |
| |
|
| | run_multiple_tests_button.click( |
| | fn=run_test_on_all_questions, |
| | outputs=[status_output, results_table] |
| | ) |
| |
|
| | run_button.click( |
| | fn=run_and_submit_all, |
| | outputs=[status_output, results_table] |
| | ) |
| |
|
| | run_get_questions_button.click( |
| | fn=get_all_questions, |
| | outputs=[status_output, results_table] |
| | ) |
| |
|
| | if __name__ == "__main__": |
| | print("\n" + "-"*30 + " App Starting " + "-"*30) |
| | |
| | space_host_startup = os.getenv("SPACE_HOST") |
| | space_id_startup = os.getenv("SPACE_ID") |
| |
|
| | if space_host_startup: |
| | print(f"✅ SPACE_HOST found: {space_host_startup}") |
| | print(f" Runtime URL should be: https://{space_host_startup}.hf.space") |
| | else: |
| | print("ℹ️ SPACE_HOST environment variable not found (running locally?).") |
| |
|
| | if space_id_startup: |
| | print(f"✅ SPACE_ID found: {space_id_startup}") |
| | print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}") |
| | print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main") |
| | else: |
| | print("ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.") |
| |
|
| | print("-"*(60 + len(" App Starting ")) + "\n") |
| |
|
| | print("Launching Gradio Interface for Basic Agent Evaluation...") |
| | demo.launch(debug=True, share=False) |