File size: 4,841 Bytes
0339155
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import json
import os
from typing import List, Dict, Any, Optional


class AnswerDataManager:
    """
    A class to handle saving and reading already answered task data to/from a JSON file to avoid calling costly agent twice.
    """
    
    def __init__(self, filename: str = "already_answered.json"):
        """
        Initialize the AnswerDataManager.
        
        Args:
            filename (str): The name of the JSON file to use for storage.
        """
        self.filename = filename
        self.data: List[Dict[str, Any]] = []
    
    def load_data(self) -> List[Dict[str, Any]]:
        """
        Load data from the JSON file.
        
        Returns:
            List[Dict[str, Any]]: The loaded data, or empty list if file doesn't exist.
        """
        try:
            if os.path.exists(self.filename):
                with open(self.filename, 'r', encoding='utf-8') as f:
                    self.data = json.load(f)
            else:
                self.data = []
        except (json.JSONDecodeError, IOError) as e:
            print(f"Error loading data from {self.filename}: {e}")
            self.data = []
        
        return self.data
    
    def save_data(self, data: Optional[List[Dict[str, Any]]] = None) -> bool:
        """
        Save data to the JSON file.
        
        Args:
            data (Optional[List[Dict[str, Any]]]): Data to save. If None, saves current self.data.
        
        Returns:
            bool: True if successful, False otherwise.
        """
        if data is not None:
            self.data = data
        
        try:
            with open(self.filename, 'w', encoding='utf-8') as f:
                json.dump(self.data, f, indent=4, ensure_ascii=False)
            return True
        except IOError as e:
            print(f"Error saving data to {self.filename}: {e}")
            return False
    
    def add_answer(self, task_id: str, question: str, submitted_answer: str) -> bool:
        """
        Add a new answer to the data structure.
        
        Args:
            task_id (str): The task identifier.
            question (str): The question text.
            submitted_answer (str): The submitted answer.
        
        Returns:
            bool: True if successful, False otherwise.
        """
        new_answer = {
            "task_id": task_id,
            "question": question,
            "submitted_answer": submitted_answer
        }
        
        self.data.append(new_answer)
        return self.save_data()
    
    def get_answer_by_task_id(self, task_id: str) -> Optional[Dict[str, Any]]:
        """
        Retrieve an answer by task ID.
        
        Args:
            task_id (str): The task identifier to search for.
        
        Returns:
            Optional[Dict[str, Any]]: The answer data if found, None otherwise.
        """
        for answer in self.data:
            if answer.get("task_id") == task_id:
                return answer
        return None
    
    def update_answer(self, task_id: str, question: str = None, submitted_answer: str = None) -> bool:
        """
        Update an existing answer by task ID.
        
        Args:
            task_id (str): The task identifier.
            question (str, optional): New question text.
            submitted_answer (str, optional): New submitted answer.
        
        Returns:
            bool: True if successful, False if task_id not found.
        """
        for answer in self.data:
            if answer.get("task_id") == task_id:
                if question is not None:
                    answer["question"] = question
                if submitted_answer is not None:
                    answer["submitted_answer"] = submitted_answer
                return self.save_data()
        return False
    
    def remove_answer(self, task_id: str) -> bool:
        """
        Remove an answer by task ID.
        
        Args:
            task_id (str): The task identifier to remove.
        
        Returns:
            bool: True if successful, False if task_id not found.
        """
        original_length = len(self.data)
        self.data = [answer for answer in self.data if answer.get("task_id") != task_id]
        
        if len(self.data) < original_length:
            return self.save_data()
        return False
    
    def get_all_answers(self) -> List[Dict[str, Any]]:
        """
        Get all answers in the current data structure.
        
        Returns:
            List[Dict[str, Any]]: All answer data.
        """
        return self.data.copy()
    
    def clear_all_data(self) -> bool:
        """
        Clear all data from memory and file.
        
        Returns:
            bool: True if successful, False otherwise.
        """
        self.data = []
        return self.save_data()