Spaces:
Sleeping
Sleeping
File size: 5,538 Bytes
89cdc9f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
import logging
from io import BytesIO
import pandas as pd
import requests
from PIL import Image
from agent import BasicAgent
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
questions_url = f"{DEFAULT_API_URL}/questions"
files_url = f"{DEFAULT_API_URL}/files"
def fetch_questions():
"""
Fetches questions from the API.
Returns a list of questions or an error message.
"""
try:
response = requests.get(questions_url, timeout=15)
response.raise_for_status()
questions_data = response.json()
if not questions_data:
logging.warning("Fetched questions list is empty.")
return None
logging.info(f"Fetched {len(questions_data)} questions.")
for question in questions_data:
content, content_type = _load_files(question)
if content is not None:
question["file_content"] = content
question["file_type"] = content_type
return questions_data
except requests.exceptions.RequestException as e:
logging.error(f"Error fetching questions: {e}")
return None
except Exception as e:
logging.error(f"An unexpected error occurred fetching questions: {e}")
return None
return questions_data
def _load_files(question_data: dict):
if file_name := question_data.get("file_name"):
extension = file_name.split(".")[-1]
if extension not in ["xlsx", "png", "py", "mp3", "wav"]:
logging.warning(
f"File {file_name} has an unsupported extension. Skipping file loading."
)
return None, None # Ensure a tuple is always returned
if task_id := question_data.get("task_id"):
try:
if extension == "mp3" or extension == "wav":
return f"{files_url}/{task_id}", "audio"
response = requests.get(f"{files_url}/{task_id}", timeout=15)
response.raise_for_status()
if response.status_code == 200:
# extensions: xlsx, png, py, else ignore
match extension:
case "xlsx":
if (
response.headers.get("Content-Type")
== "application/octet-stream"
):
logging.info(f"Processing Excel file: {file_name}")
return pd.read_excel(response.content).to_json(), "xlsx"
case "png":
if response.headers.get("Content-Type") == "image/png":
logging.info(f"Processing image file: {file_name}")
return Image.open(BytesIO(response.content)).convert("RGB"), "png"
case "py":
if response.headers.get("Content-Type", "").startswith(
"text/x-python"
):
logging.info(f"Processing Python file: {file_name}")
return response.content.decode(
"utf-8"
), "py" # Load Python file if needed
except requests.exceptions.RequestException as e:
logging.error(f"Error fetching file for task {task_id}: {e}")
raise e
except Exception as e:
logging.error(
f"An unexpected error occurred fetching file for task {task_id}: {e}"
)
raise e
return None, None
return None, None # Always return a tuple
def run_agent(agent, questions_data):
results_log = []
answers_payload = []
logging.info(f"Running agent on {len(questions_data)} questions...")
for item in questions_data:
payload, log_item = run_agent_on_question(agent, item)
if payload is not None:
answers_payload.append(payload)
if log_item is not None:
results_log.append(log_item)
if not answers_payload:
logging.info("Agent did not produce any answers to submit.")
return "Agent did not produce any answers to submit.", pd.DataFrame(results_log)
return answers_payload, pd.DataFrame(results_log)
def run_agent_on_question(agent: BasicAgent, question):
"""
Runs the agent on a single question and returns the answer.
"""
task_id = question.get("task_id")
question_text = question.get("question")
content = question.get("file_content")
content_type = question.get("file_type")
if not task_id or question_text is None:
logging.warning(f"Skipping item with missing task_id or question: {question}")
return None, None
try:
submitted_answer = agent(question_text, content=content, content_type=content_type)
return (
{"task_id": task_id, "submitted_answer": submitted_answer},
{
"Task ID": task_id,
"Question": question_text,
"Submitted Answer": submitted_answer,
},
)
except Exception as e:
logging.error(f"Error running agent on task {task_id}: {e}")
return (
{
"Task ID": task_id,
"Question": question_text,
"Submitted Answer": f"AGENT ERROR: {e}",
},
None,
)
|