import requests from huggingface_hub import login import agent import json import base64 DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" SPACE_ID = "https://huggingface.co/spaces/IngoTB303/Final_Assignment_Template/tree/main" api_url = DEFAULT_API_URL questions_url = f"{api_url}/questions" attachments_url = f"{api_url}/files/" submit_url = f"{api_url}/submit" agent = agent.BasicAgent() def fetch_questions(proxy: bool = True): """Fetch questions from the API endpoint.""" print(f"Fetching questions from: {questions_url}") try: if proxy: response = requests.get(questions_url, timeout=30, proxies={'http': 'http://localhost:3128', 'https': 'http://localhost:3128'}, verify=False) else: response = requests.get(questions_url, timeout=30) response.raise_for_status() questions_data = response.json() if not questions_data: return None # Fetch attachments for questions with file_name for question in questions_data: file_name = question.get("file_name", "") task_id = question.get("task_id") if file_name and task_id: try: if proxy: att_response = requests.get(f"{attachments_url}{task_id}", timeout=15, proxies={'http': 'http://localhost:3128', 'https': 'http://localhost:3128'}, verify=False) else: att_response = requests.get(f"{attachments_url}{task_id}", timeout=15) att_response.raise_for_status() # Encode binary content to base64 string question["attachment_b64"] = base64.b64encode(att_response.content).decode("utf-8") except Exception as e: print(f"Error fetching attachment for task {task_id}: {e}") question["attachment_b64"] = None return questions_data except Exception as e: print(f"Error fetching questions: {e}") finally: if questions_data: with open("questions.json", "w", encoding="utf-8") as f: json.dump(questions_data, f, ensure_ascii=False, indent=2) def run_agent(questions_data): answers_payload = [] print(f"Running agent on {len(questions_data)} questions...") for item in questions_data: task_id = item.get("task_id") question_text = item.get("question", "") attachment_b64 = item.get("attachment_b64", "") # Concatenate question and attachment_b64 if present if attachment_b64: question_text = f"{question_text}\n\n[ATTACHMENT:]\n{attachment_b64}" if not task_id or question_text is None: print(f"Skipping item with missing task_id or question: {item}") continue try: submitted_answer = agent.forward(question=question_text) answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) except Exception as e: print(f"Error running agent on task {task_id}: {e}") return answers_payload def load_questions(filename): """Load questions from a local JSON file and return as questions_data.""" try: with open(filename, "r", encoding="utf-8") as f: questions_data = json.load(f) return questions_data except Exception as e: print(f"Error loading questions from {filename}: {e}") return None def load_answers(filename): """Load answers from a local JSON file.""" try: with open(filename, "r", encoding="utf-8") as f: answers = json.load(f) return answers except Exception as e: print(f"Error loading answers from {filename}: {e}") return None def submit_answers_to_hf(username, agent_code, answers_payload): # Prepare Submission submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." print(status_update) # Submit print(f"Submitting {len(answers_payload)} answers to: {submit_url}") try: response = requests.post(submit_url, json=submission_data, timeout=60) response.raise_for_status() result_data = response.json() final_status = ( f"Submission Successful!\n" f"User: {result_data.get('username')}\n" f"Overall Score: {result_data.get('score', 'N/A')}% " f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" f"Message: {result_data.get('message', 'No message received.')}" ) print("Submission successful.") return final_status except Exception as e: status_message = f"An unexpected error occurred during submission: {e}" print(status_message) return status_message # # for use without proxy # questions = fetch_questions() # for use with proxy, after questions have been fetched questions = load_questions("questions.json") # # test print the questions to verify, if attachments were loaded # for question in questions: # print(question["question"],"\n") # # for use with proxy # answers = run_agent(questions) # # save answers to publish them later without use of proxy # if answers: # with open("answers.json", "w", encoding="utf-8") as f: # json.dump(answers, f, ensure_ascii=False, indent=2) # submit results to Huggingface answers = load_answers("answers.json") assignment_results = submit_answers_to_hf("IngoTB303", SPACE_ID, answers) print(assignment_results)