Agents_Final_Assignment / answer_data_manager.py
iQuentin's picture
Create answer_data_manager.py
0339155 verified
import json
import os
from typing import List, Dict, Any, Optional
class AnswerDataManager:
"""
A class to handle saving and reading already answered task data to/from a JSON file to avoid calling costly agent twice.
"""
def __init__(self, filename: str = "already_answered.json"):
"""
Initialize the AnswerDataManager.
Args:
filename (str): The name of the JSON file to use for storage.
"""
self.filename = filename
self.data: List[Dict[str, Any]] = []
def load_data(self) -> List[Dict[str, Any]]:
"""
Load data from the JSON file.
Returns:
List[Dict[str, Any]]: The loaded data, or empty list if file doesn't exist.
"""
try:
if os.path.exists(self.filename):
with open(self.filename, 'r', encoding='utf-8') as f:
self.data = json.load(f)
else:
self.data = []
except (json.JSONDecodeError, IOError) as e:
print(f"Error loading data from {self.filename}: {e}")
self.data = []
return self.data
def save_data(self, data: Optional[List[Dict[str, Any]]] = None) -> bool:
"""
Save data to the JSON file.
Args:
data (Optional[List[Dict[str, Any]]]): Data to save. If None, saves current self.data.
Returns:
bool: True if successful, False otherwise.
"""
if data is not None:
self.data = data
try:
with open(self.filename, 'w', encoding='utf-8') as f:
json.dump(self.data, f, indent=4, ensure_ascii=False)
return True
except IOError as e:
print(f"Error saving data to {self.filename}: {e}")
return False
def add_answer(self, task_id: str, question: str, submitted_answer: str) -> bool:
"""
Add a new answer to the data structure.
Args:
task_id (str): The task identifier.
question (str): The question text.
submitted_answer (str): The submitted answer.
Returns:
bool: True if successful, False otherwise.
"""
new_answer = {
"task_id": task_id,
"question": question,
"submitted_answer": submitted_answer
}
self.data.append(new_answer)
return self.save_data()
def get_answer_by_task_id(self, task_id: str) -> Optional[Dict[str, Any]]:
"""
Retrieve an answer by task ID.
Args:
task_id (str): The task identifier to search for.
Returns:
Optional[Dict[str, Any]]: The answer data if found, None otherwise.
"""
for answer in self.data:
if answer.get("task_id") == task_id:
return answer
return None
def update_answer(self, task_id: str, question: str = None, submitted_answer: str = None) -> bool:
"""
Update an existing answer by task ID.
Args:
task_id (str): The task identifier.
question (str, optional): New question text.
submitted_answer (str, optional): New submitted answer.
Returns:
bool: True if successful, False if task_id not found.
"""
for answer in self.data:
if answer.get("task_id") == task_id:
if question is not None:
answer["question"] = question
if submitted_answer is not None:
answer["submitted_answer"] = submitted_answer
return self.save_data()
return False
def remove_answer(self, task_id: str) -> bool:
"""
Remove an answer by task ID.
Args:
task_id (str): The task identifier to remove.
Returns:
bool: True if successful, False if task_id not found.
"""
original_length = len(self.data)
self.data = [answer for answer in self.data if answer.get("task_id") != task_id]
if len(self.data) < original_length:
return self.save_data()
return False
def get_all_answers(self) -> List[Dict[str, Any]]:
"""
Get all answers in the current data structure.
Returns:
List[Dict[str, Any]]: All answer data.
"""
return self.data.copy()
def clear_all_data(self) -> bool:
"""
Clear all data from memory and file.
Returns:
bool: True if successful, False otherwise.
"""
self.data = []
return self.save_data()