import json import os from typing import List, Dict, Any, Optional class AnswerDataManager: """ A class to handle saving and reading already answered task data to/from a JSON file to avoid calling costly agent twice. """ def __init__(self, filename: str = "already_answered.json"): """ Initialize the AnswerDataManager. Args: filename (str): The name of the JSON file to use for storage. """ self.filename = filename self.data: List[Dict[str, Any]] = [] def load_data(self) -> List[Dict[str, Any]]: """ Load data from the JSON file. Returns: List[Dict[str, Any]]: The loaded data, or empty list if file doesn't exist. """ try: if os.path.exists(self.filename): with open(self.filename, 'r', encoding='utf-8') as f: self.data = json.load(f) else: self.data = [] except (json.JSONDecodeError, IOError) as e: print(f"Error loading data from {self.filename}: {e}") self.data = [] return self.data def save_data(self, data: Optional[List[Dict[str, Any]]] = None) -> bool: """ Save data to the JSON file. Args: data (Optional[List[Dict[str, Any]]]): Data to save. If None, saves current self.data. Returns: bool: True if successful, False otherwise. """ if data is not None: self.data = data try: with open(self.filename, 'w', encoding='utf-8') as f: json.dump(self.data, f, indent=4, ensure_ascii=False) return True except IOError as e: print(f"Error saving data to {self.filename}: {e}") return False def add_answer(self, task_id: str, question: str, submitted_answer: str) -> bool: """ Add a new answer to the data structure. Args: task_id (str): The task identifier. question (str): The question text. submitted_answer (str): The submitted answer. Returns: bool: True if successful, False otherwise. """ new_answer = { "task_id": task_id, "question": question, "submitted_answer": submitted_answer } self.data.append(new_answer) return self.save_data() def get_answer_by_task_id(self, task_id: str) -> Optional[Dict[str, Any]]: """ Retrieve an answer by task ID. Args: task_id (str): The task identifier to search for. Returns: Optional[Dict[str, Any]]: The answer data if found, None otherwise. """ for answer in self.data: if answer.get("task_id") == task_id: return answer return None def update_answer(self, task_id: str, question: str = None, submitted_answer: str = None) -> bool: """ Update an existing answer by task ID. Args: task_id (str): The task identifier. question (str, optional): New question text. submitted_answer (str, optional): New submitted answer. Returns: bool: True if successful, False if task_id not found. """ for answer in self.data: if answer.get("task_id") == task_id: if question is not None: answer["question"] = question if submitted_answer is not None: answer["submitted_answer"] = submitted_answer return self.save_data() return False def remove_answer(self, task_id: str) -> bool: """ Remove an answer by task ID. Args: task_id (str): The task identifier to remove. Returns: bool: True if successful, False if task_id not found. """ original_length = len(self.data) self.data = [answer for answer in self.data if answer.get("task_id") != task_id] if len(self.data) < original_length: return self.save_data() return False def get_all_answers(self) -> List[Dict[str, Any]]: """ Get all answers in the current data structure. Returns: List[Dict[str, Any]]: All answer data. """ return self.data.copy() def clear_all_data(self) -> bool: """ Clear all data from memory and file. Returns: bool: True if successful, False otherwise. """ self.data = [] return self.save_data()