import logging from io import BytesIO import pandas as pd import requests from PIL import Image from agent import BasicAgent DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" questions_url = f"{DEFAULT_API_URL}/questions" files_url = f"{DEFAULT_API_URL}/files" def fetch_questions(): """ Fetches questions from the API. Returns a list of questions or an error message. """ try: response = requests.get(questions_url, timeout=15) response.raise_for_status() questions_data = response.json() if not questions_data: logging.warning("Fetched questions list is empty.") return None logging.info(f"Fetched {len(questions_data)} questions.") for question in questions_data: content, content_type = _load_files(question) if content is not None: question["file_content"] = content question["file_type"] = content_type return questions_data except requests.exceptions.RequestException as e: logging.error(f"Error fetching questions: {e}") return None except Exception as e: logging.error(f"An unexpected error occurred fetching questions: {e}") return None return questions_data def _load_files(question_data: dict): if file_name := question_data.get("file_name"): extension = file_name.split(".")[-1] if extension not in ["xlsx", "png", "py", "mp3", "wav"]: logging.warning( f"File {file_name} has an unsupported extension. Skipping file loading." ) return None, None # Ensure a tuple is always returned if task_id := question_data.get("task_id"): try: if extension == "mp3" or extension == "wav": return f"{files_url}/{task_id}", "audio" response = requests.get(f"{files_url}/{task_id}", timeout=15) response.raise_for_status() if response.status_code == 200: # extensions: xlsx, png, py, else ignore match extension: case "xlsx": if ( response.headers.get("Content-Type") == "application/octet-stream" ): logging.info(f"Processing Excel file: {file_name}") return pd.read_excel(response.content).to_json(), "xlsx" case "png": if response.headers.get("Content-Type") == "image/png": logging.info(f"Processing image file: {file_name}") return Image.open(BytesIO(response.content)).convert("RGB"), "png" case "py": if response.headers.get("Content-Type", "").startswith( "text/x-python" ): logging.info(f"Processing Python file: {file_name}") return response.content.decode( "utf-8" ), "py" # Load Python file if needed except requests.exceptions.RequestException as e: logging.error(f"Error fetching file for task {task_id}: {e}") raise e except Exception as e: logging.error( f"An unexpected error occurred fetching file for task {task_id}: {e}" ) raise e return None, None return None, None # Always return a tuple def run_agent(agent, questions_data): results_log = [] answers_payload = [] logging.info(f"Running agent on {len(questions_data)} questions...") for item in questions_data: payload, log_item = run_agent_on_question(agent, item) if payload is not None: answers_payload.append(payload) if log_item is not None: results_log.append(log_item) if not answers_payload: logging.info("Agent did not produce any answers to submit.") return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) return answers_payload, pd.DataFrame(results_log) def run_agent_on_question(agent: BasicAgent, question): """ Runs the agent on a single question and returns the answer. """ task_id = question.get("task_id") question_text = question.get("question") content = question.get("file_content") content_type = question.get("file_type") if not task_id or question_text is None: logging.warning(f"Skipping item with missing task_id or question: {question}") return None, None try: submitted_answer = agent(question_text, content=content, content_type=content_type) return ( {"task_id": task_id, "submitted_answer": submitted_answer}, { "Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer, }, ) except Exception as e: logging.error(f"Error running agent on task {task_id}: {e}") return ( { "Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}", }, None, )