""" Улучшенный GAIA Agent с поддержкой кэширования ответов и исправленным полем agent_code """ import os import json import time import torch import requests import gradio as gr import pandas as pd from huggingface_hub import login from typing import List, Dict, Any, Optional, Union, Callable, Tuple from transformers import AutoTokenizer, AutoModelForSeq2SeqLM # Константы CACHE_FILE = "gaia_answers_cache.json" DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" MAX_RETRIES = 3 # Максимальное количество попыток отправки RETRY_DELAY = 5 # Секунды ожидания между попытками class EnhancedGAIAAgent: """ Улучшенный агент для Hugging Face GAIA с поддержкой кэширования ответов """ def __init__(self, model_name="google/flan-t5-small", use_cache=True): """ Инициализация агента с моделью и кэшем Args: model_name: Название модели для загрузки use_cache: Использовать ли кэширование ответов """ print(f"Initializing EnhancedGAIAAgent with model: {model_name}") self.model_name = model_name self.use_cache = use_cache self.cache = self._load_cache() if use_cache else {} # Загружаем модель и токенизатор print("Loading tokenizer...") self.tokenizer = AutoTokenizer.from_pretrained(model_name) print("Loading model...") self.model = AutoModelForSeq2SeqLM.from_pretrained(model_name) print("Model and tokenizer loaded successfully") def _load_cache(self) -> Dict[str, str]: """ Загружает кэш ответов из файла Returns: Dict[str, str]: Словарь с кэшированными ответами """ if os.path.exists(CACHE_FILE): try: with open(CACHE_FILE, 'r', encoding='utf-8') as f: print(f"Loading cache from {CACHE_FILE}") return json.load(f) except Exception as e: print(f"Error loading cache: {e}") return {} else: print(f"Cache file {CACHE_FILE} not found, creating new cache") return {} def _save_cache(self) -> None: """ Сохраняет кэш ответов в файл """ try: with open(CACHE_FILE, 'w', encoding='utf-8') as f: json.dump(self.cache, f, ensure_ascii=False, indent=2) print(f"Cache saved to {CACHE_FILE}") except Exception as e: print(f"Error saving cache: {e}") def _classify_question(self, question: str) -> str: """ Классифицирует вопрос по типу для лучшего форматирования ответа Args: question: Текст вопроса Returns: str: Тип вопроса (factual, calculation, list, date_time, etc.) """ # Простая эвристическая классификация question_lower = question.lower() if any(word in question_lower for word in ["calculate", "sum", "product", "divide", "multiply", "add", "subtract", "how many"]): return "calculation" elif any(word in question_lower for word in ["list", "enumerate", "items", "elements"]): return "list" elif any(word in question_lower for word in ["date", "time", "day", "month", "year", "when"]): return "date_time" else: return "factual" def _format_answer(self, raw_answer: str, question_type: str) -> str: """ Форматирует ответ в соответствии с типом вопроса Args: raw_answer: Необработанный ответ от модели question_type: Тип вопроса Returns: str: Отформатированный ответ """ # Удаляем лишние пробелы и переносы строк answer = raw_answer.strip() # Удаляем префиксы, которые часто добавляет модель prefixes = ["Answer:", "The answer is:", "I think", "I believe", "According to", "Based on"] for prefix in prefixes: if answer.startswith(prefix): answer = answer[len(prefix):].strip() # Специфическое форматирование в зависимости от типа вопроса if question_type == "calculation": # Для числовых ответов удаляем лишний текст # Оставляем только числа, если они есть import re numbers = re.findall(r'-?\d+\.?\d*', answer) if numbers: answer = numbers[0] elif question_type == "list": # Для списков убеждаемся, что элементы разделены запятыми if "," not in answer and " " in answer: items = [item.strip() for item in answer.split() if item.strip()] answer = ", ".join(items) return answer def __call__(self, question: str, task_id: Optional[str] = None) -> str: """ Обрабатывает вопрос и возвращает ответ Args: question: Текст вопроса task_id: Идентификатор задачи (опционально) Returns: str: Ответ в формате JSON с ключом final_answer """ # Создаем ключ для кэша (используем task_id, если доступен) cache_key = task_id if task_id else question # Проверяем наличие ответа в кэше if self.use_cache and cache_key in self.cache: print(f"Cache hit for question: {question[:50]}...") return self.cache[cache_key] # Классифицируем вопрос question_type = self._classify_question(question) print(f"Processing question: {question[:100]}...") print(f"Classified as: {question_type}") try: # Генерируем ответ с помощью модели inputs = self.tokenizer(question, return_tensors="pt") outputs = self.model.generate(**inputs, max_length=100) raw_answer = self.tokenizer.decode(outputs[0], skip_special_tokens=True) # Форматируем ответ formatted_answer = self._format_answer(raw_answer, question_type) # Формируем JSON-ответ result = {"final_answer": formatted_answer} json_response = json.dumps(result) # Сохраняем в кэш if self.use_cache: self.cache[cache_key] = json_response self._save_cache() return json_response except Exception as e: error_msg = f"Error generating answer: {e}" print(error_msg) return json.dumps({"final_answer": f"AGENT ERROR: {e}"}) class EvaluationRunner: """ Обрабатывает процесс оценки: получение вопросов, запуск агента, и отправку ответов на сервер оценки. """ def __init__(self, api_url=DEFAULT_API_URL): """Инициализация с API endpoints.""" self.api_url = api_url self.questions_url = f"{api_url}/questions" self.submit_url = f"{api_url}/submit" self.results_url = f"{api_url}/results" self.correct_answers = 0 self.total_questions = 0 def run_evaluation(self, agent: Callable[[str], str], username: str, agent_code_url: str) -> tuple[str, pd.DataFrame]: """ Запускает полный процесс оценки: 1. Получает вопросы 2. Запускает агента на всех вопросах 3. Отправляет ответы 4. Возвращает результаты """ # Получаем вопросы questions_data = self._fetch_questions() if isinstance(questions_data, str): # Сообщение об ошибке return questions_data, None # Запускаем агента на всех вопросах results_log, answers_payload = self._run_agent_on_questions(agent, questions_data) if not answers_payload: return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) # Отправляем ответы с логикой повторных попыток submission_result = self._submit_answers(username, agent_code_url, answers_payload) # Возвращаем результаты return submission_result, pd.DataFrame(results_log) def _fetch_questions(self) -> Union[List[Dict[str, Any]], str]: """Получает вопросы с сервера оценки.""" print(f"Fetching questions from: {self.questions_url}") try: response = requests.get(self.questions_url, timeout=15) response.raise_for_status() questions_data = response.json() if not questions_data: error_msg = "Fetched questions list is empty or invalid format." print(error_msg) return error_msg self.total_questions = len(questions_data) print(f"Successfully fetched {self.total_questions} questions.") return questions_data except requests.exceptions.RequestException as e: error_msg = f"Error fetching questions: {e}" print(error_msg) return error_msg except requests.exceptions.JSONDecodeError as e: error_msg = f"Error decoding JSON response from questions endpoint: {e}" print(error_msg) print(f"Response text: {response.text[:500]}") return error_msg except Exception as e: error_msg = f"An unexpected error occurred fetching questions: {e}" print(error_msg) return error_msg def _run_agent_on_questions(self, agent: Any, questions_data: List[Dict[str, Any]]) -> tuple[List[Dict[str, Any]], List[Dict[str, Any]]]: """Запускает агента на всех вопросах и собирает результаты.""" results_log = [] answers_payload = [] print(f"Running agent on {len(questions_data)} questions...") for item in questions_data: task_id = item.get("task_id") question_text = item.get("question") if not task_id or question_text is None: print(f"Skipping item with missing task_id or question: {item}") continue try: # Вызываем агента с task_id для правильного форматирования json_response = agent(question_text, task_id) # Парсим JSON-ответ response_obj = json.loads(json_response) # Извлекаем final_answer для отправки submitted_answer = response_obj.get("final_answer", "") answers_payload.append({ "task_id": task_id, "submitted_answer": submitted_answer }) results_log.append({ "Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer, "Full Response": json_response }) except Exception as e: print(f"Error running agent on task {task_id}: {e}") results_log.append({ "Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}" }) return results_log, answers_payload def _submit_answers(self, username: str, agent_code_url: str, answers_payload: List[Dict[str, Any]]) -> str: """Отправляет ответы на сервер оценки.""" # ИСПРАВЛЕНО: Используем agent_code вместо agent_code_url submission_data = { "username": username.strip(), "agent_code": agent_code_url.strip(), # Имя переменной осталось прежним, но поле изменено "answers": answers_payload } print(f"Submitting {len(answers_payload)} answers to: {self.submit_url}") max_retries = MAX_RETRIES retry_delay = RETRY_DELAY for attempt in range(1, max_retries + 1): try: print(f"Submission attempt {attempt} of {max_retries}...") response = requests.post( self.submit_url, json=submission_data, headers={"Content-Type": "application/json"}, timeout=30 ) response.raise_for_status() try: result = response.json() score = result.get("score") max_score = result.get("max_score") if score is not None and max_score is not None: self.correct_answers = score # Обновляем счетчик правильных ответов return f"Evaluation complete! Score: {score}/{max_score}" else: print(f"Received N/A results. Waiting {retry_delay} seconds before retry...") time.sleep(retry_delay) continue except requests.exceptions.JSONDecodeError: print(f"Submission attempt {attempt}: Response was not JSON. Response: {response.text}") if attempt < max_retries: print(f"Waiting {retry_delay} seconds before retry...") time.sleep(retry_delay) else: return f"Submission successful, but response was not JSON. Response: {response.text}" except requests.exceptions.RequestException as e: print(f"Submission attempt {attempt} failed: {e}") if attempt < max_retries: print(f"Waiting {retry_delay} seconds before retry...") time.sleep(retry_delay) else: return f"Error submitting answers after {max_retries} attempts: {e}" # Если мы здесь, все попытки не удались, но не вызвали исключений return "Submission Successful, but results are pending!" def _check_results(self, username: str) -> None: """Проверяет результаты для подсчета правильных ответов.""" try: results_url = f"{self.results_url}?username={username}" print(f"Checking results at: {results_url}") response = requests.get(results_url, timeout=15) if response.status_code == 200: try: data = response.json() if isinstance(data, dict): score = data.get("score") if score is not None: self.correct_answers = int(score) print(f"✓ Correct answers: {self.correct_answers}/{self.total_questions}") else: print("Score information not available in results") else: print("Results data is not in expected format") except: print("Could not parse results JSON") else: print(f"Could not fetch results, status code: {response.status_code}") except Exception as e: print(f"Error checking results: {e}") def get_correct_answers_count(self) -> int: """Возвращает количество правильных ответов.""" return self.correct_answers def get_total_questions_count(self) -> int: """Возвращает общее количество вопросов.""" return self.total_questions def print_evaluation_summary(self, username: str) -> None: """Выводит сводку результатов оценки.""" print("\n===== EVALUATION SUMMARY =====") print(f"User: {username}") print(f"Overall Score: {self.correct_answers}/{self.total_questions}") print(f"Correct Answers: {self.correct_answers}") print(f"Total Questions: {self.total_questions}") print(f"Accuracy: {(self.correct_answers / self.total_questions * 100) if self.total_questions > 0 else 0:.1f}%") print("=============================\n") def run_evaluation(username: str, agent_code_url: str, model_name: str = "google/flan-t5-small", use_cache: bool = True) -> Tuple[str, int, int, str, str, str]: """ Запускает полный процесс оценки с поддержкой кэширования Args: username: Имя пользователя Hugging Face agent_code_url: URL кода агента (или код агента) model_name: Название модели для использования use_cache: Использовать ли кэширование ответов Returns: Tuple[str, int, int, str, str, str]: Кортеж из 6 значений: - result_text: Текстовый результат оценки - correct_answers: Количество правильных ответов - total_questions: Общее количество вопросов - elapsed_time: Время выполнения - results_url: URL для проверки результатов - cache_status: Статус кэширования """ start_time = time.time() # Инициализируем агента с поддержкой кэширования agent = EnhancedGAIAAgent(model_name=model_name, use_cache=use_cache) # Инициализируем runner с исправленным полем agent_code runner = EvaluationRunner(api_url=DEFAULT_API_URL) # Запускаем оценку result, results_log = runner.run_evaluation(agent, username, agent_code_url) # Проверяем результаты runner._check_results(username) # Выводим сводку runner.print_evaluation_summary(username) # Вычисляем время выполнения elapsed_time = time.time() - start_time elapsed_time_str = f"{elapsed_time:.2f} seconds" # Формируем URL результатов results_url = f"{DEFAULT_API_URL}/results?username={username}" # Формируем статус кэширования cache_status = "Cache enabled and used" if use_cache else "Cache disabled" # ИСПРАВЛЕНО: Возвращаем 6 отдельных значений вместо словаря return ( result, # result_text runner.get_correct_answers_count(), # correct_answers runner.get_total_questions_count(), # total_questions elapsed_time_str, # elapsed_time results_url, # results_url cache_status # cache_status ) def create_gradio_interface(): """ Создает Gradio интерфейс для запуска оценки """ with gr.Blocks(title="GAIA Agent Evaluation") as demo: gr.Markdown("# GAIA Agent Evaluation with Caching") with gr.Row(): with gr.Column(): username = gr.Textbox(label="Hugging Face Username") agent_code_url = gr.Textbox(label="Agent Code URL or Code", lines=10) model_name = gr.Dropdown( label="Model", choices=["google/flan-t5-small", "google/flan-t5-base", "google/flan-t5-large"], value="google/flan-t5-small" ) use_cache = gr.Checkbox(label="Use Answer Cache", value=True) run_button = gr.Button("Run Evaluation & Submit All Answers") with gr.Column(): result_text = gr.Textbox(label="Result", lines=2) correct_answers = gr.Number(label="Correct Answers") total_questions = gr.Number(label="Total Questions") elapsed_time = gr.Textbox(label="Elapsed Time") results_url = gr.Textbox(label="Results URL") cache_status = gr.Textbox(label="Cache Status") run_button.click( fn=run_evaluation, inputs=[username, agent_code_url, model_name, use_cache], outputs=[ result_text, correct_answers, total_questions, elapsed_time, results_url, cache_status ] ) return demo if __name__ == "__main__": # Создаем и запускаем Gradio интерфейс demo = create_gradio_interface() demo.launch(share=True)