Spaces:
Sleeping
Sleeping
File size: 4,841 Bytes
0339155 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
import json
import os
from typing import List, Dict, Any, Optional
class AnswerDataManager:
"""
A class to handle saving and reading already answered task data to/from a JSON file to avoid calling costly agent twice.
"""
def __init__(self, filename: str = "already_answered.json"):
"""
Initialize the AnswerDataManager.
Args:
filename (str): The name of the JSON file to use for storage.
"""
self.filename = filename
self.data: List[Dict[str, Any]] = []
def load_data(self) -> List[Dict[str, Any]]:
"""
Load data from the JSON file.
Returns:
List[Dict[str, Any]]: The loaded data, or empty list if file doesn't exist.
"""
try:
if os.path.exists(self.filename):
with open(self.filename, 'r', encoding='utf-8') as f:
self.data = json.load(f)
else:
self.data = []
except (json.JSONDecodeError, IOError) as e:
print(f"Error loading data from {self.filename}: {e}")
self.data = []
return self.data
def save_data(self, data: Optional[List[Dict[str, Any]]] = None) -> bool:
"""
Save data to the JSON file.
Args:
data (Optional[List[Dict[str, Any]]]): Data to save. If None, saves current self.data.
Returns:
bool: True if successful, False otherwise.
"""
if data is not None:
self.data = data
try:
with open(self.filename, 'w', encoding='utf-8') as f:
json.dump(self.data, f, indent=4, ensure_ascii=False)
return True
except IOError as e:
print(f"Error saving data to {self.filename}: {e}")
return False
def add_answer(self, task_id: str, question: str, submitted_answer: str) -> bool:
"""
Add a new answer to the data structure.
Args:
task_id (str): The task identifier.
question (str): The question text.
submitted_answer (str): The submitted answer.
Returns:
bool: True if successful, False otherwise.
"""
new_answer = {
"task_id": task_id,
"question": question,
"submitted_answer": submitted_answer
}
self.data.append(new_answer)
return self.save_data()
def get_answer_by_task_id(self, task_id: str) -> Optional[Dict[str, Any]]:
"""
Retrieve an answer by task ID.
Args:
task_id (str): The task identifier to search for.
Returns:
Optional[Dict[str, Any]]: The answer data if found, None otherwise.
"""
for answer in self.data:
if answer.get("task_id") == task_id:
return answer
return None
def update_answer(self, task_id: str, question: str = None, submitted_answer: str = None) -> bool:
"""
Update an existing answer by task ID.
Args:
task_id (str): The task identifier.
question (str, optional): New question text.
submitted_answer (str, optional): New submitted answer.
Returns:
bool: True if successful, False if task_id not found.
"""
for answer in self.data:
if answer.get("task_id") == task_id:
if question is not None:
answer["question"] = question
if submitted_answer is not None:
answer["submitted_answer"] = submitted_answer
return self.save_data()
return False
def remove_answer(self, task_id: str) -> bool:
"""
Remove an answer by task ID.
Args:
task_id (str): The task identifier to remove.
Returns:
bool: True if successful, False if task_id not found.
"""
original_length = len(self.data)
self.data = [answer for answer in self.data if answer.get("task_id") != task_id]
if len(self.data) < original_length:
return self.save_data()
return False
def get_all_answers(self) -> List[Dict[str, Any]]:
"""
Get all answers in the current data structure.
Returns:
List[Dict[str, Any]]: All answer data.
"""
return self.data.copy()
def clear_all_data(self) -> bool:
"""
Clear all data from memory and file.
Returns:
bool: True if successful, False otherwise.
"""
self.data = []
return self.save_data() |