mrradix commited on
Commit
052657c
·
verified ·
1 Parent(s): 8be45a9

Update utils/storage.py

Browse files
Files changed (1) hide show
  1. utils/storage.py +268 -84
utils/storage.py CHANGED
@@ -1,171 +1,355 @@
 
 
 
 
 
1
  import json
2
  import os
3
  import pickle
 
4
  from pathlib import Path
5
- from typing import Any, Dict, Optional, Union
6
- import pandas as pd
7
 
8
  from utils.error_handling import handle_data_exceptions, DataError, ValidationError
9
- from utils.logging import get_logger
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
10
 
11
- logger = get_logger(__name__)
12
 
13
- # Create data directory if it doesn't exist
14
- DATA_DIR = Path("data")
15
- DATA_DIR.mkdir(exist_ok=True)
16
 
17
  @handle_data_exceptions
18
- def save_data(data: Any, filename: str, format_type: str = "json") -> bool:
19
  """
20
- Save data to file
21
 
22
  Args:
23
  data: Data to save
24
- filename: Name of the file
25
- format_type: Format to save ('json', 'pickle', 'csv', 'excel')
26
-
 
27
  Returns:
28
- bool: True if successful, False otherwise
 
 
 
 
29
  """
 
 
 
 
 
 
30
  try:
31
- filepath = DATA_DIR / filename
32
 
33
  if format_type == "json":
34
- with open(filepath, 'w', encoding='utf-8') as f:
35
  json.dump(data, f, indent=2, ensure_ascii=False, default=str)
36
 
37
  elif format_type == "pickle":
38
- with open(filepath, 'wb') as f:
39
  pickle.dump(data, f)
40
 
41
  elif format_type == "csv":
42
- if isinstance(data, pd.DataFrame):
43
- data.to_csv(filepath, index=False)
44
- else:
45
- raise DataError("CSV format requires pandas DataFrame")
46
-
47
- elif format_type == "excel":
48
- if isinstance(data, pd.DataFrame):
49
- data.to_excel(filepath, index=False)
50
- else:
51
- raise DataError("Excel format requires pandas DataFrame")
 
 
 
52
 
53
- else:
54
- raise DataError(f"Unsupported format: {format_type}")
 
 
 
 
55
 
56
- logger.info(f"Data saved successfully to {filepath}")
57
  return True
58
 
59
  except Exception as e:
60
- logger.error(f"Failed to save data to {filename}: {str(e)}")
61
- raise DataError(f"Failed to save data: {str(e)}")
62
 
63
  @handle_data_exceptions
64
- def load_data(filename: str, format_type: str = "json") -> Any:
65
  """
66
- Load data from file
67
 
68
  Args:
69
- filename: Name of the file
70
- format_type: Format to load ('json', 'pickle', 'csv', 'excel')
71
-
 
 
72
  Returns:
73
- Any: Loaded data or None if failed
 
 
 
 
74
  """
 
 
 
 
 
 
75
  try:
76
- filepath = DATA_DIR / filename
77
 
78
- if not filepath.exists():
79
- logger.warning(f"File not found: {filepath}")
80
- return None
 
 
 
81
 
82
  if format_type == "json":
83
- with open(filepath, 'r', encoding='utf-8') as f:
84
  data = json.load(f)
85
 
86
  elif format_type == "pickle":
87
- with open(filepath, 'rb') as f:
88
  data = pickle.load(f)
89
 
90
  elif format_type == "csv":
91
- data = pd.read_csv(filepath)
92
-
93
- elif format_type == "excel":
94
- data = pd.read_excel(filepath)
 
 
 
 
 
 
 
 
 
 
 
95
 
96
- else:
97
- raise DataError(f"Unsupported format: {format_type}")
 
98
 
99
- logger.info(f"Data loaded successfully from {filepath}")
100
  return data
101
 
102
  except Exception as e:
103
- logger.error(f"Failed to load data from {filename}: {str(e)}")
104
- raise DataError(f"Failed to load data: {str(e)}")
 
 
 
105
 
106
  @handle_data_exceptions
107
- def delete_data(filename: str) -> bool:
108
  """
109
- Delete data file
110
 
111
  Args:
112
  filename: Name of the file to delete
113
-
 
114
  Returns:
115
- bool: True if successful, False otherwise
 
 
 
116
  """
117
  try:
118
- filepath = DATA_DIR / filename
119
 
120
- if filepath.exists():
121
- filepath.unlink()
122
- logger.info(f"File deleted successfully: {filepath}")
123
  return True
124
  else:
125
- logger.warning(f"File not found for deletion: {filepath}")
126
  return False
127
 
128
  except Exception as e:
129
- logger.error(f"Failed to delete file {filename}: {str(e)}")
130
- raise DataError(f"Failed to delete file: {str(e)}")
131
 
132
  @handle_data_exceptions
133
- def list_data_files() -> list:
134
  """
135
- List all data files in the data directory
136
 
 
 
 
 
137
  Returns:
138
- list: List of filenames
 
 
 
139
  """
140
  try:
141
- files = [f.name for f in DATA_DIR.iterdir() if f.is_file()]
142
- logger.info(f"Found {len(files)} data files")
143
- return files
 
 
 
 
 
 
 
 
 
 
 
 
144
 
145
  except Exception as e:
146
- logger.error(f"Failed to list data files: {str(e)}")
147
- return []
148
 
149
- def get_file_size(filename: str) -> Optional[int]:
 
150
  """
151
- Get file size in bytes
152
 
153
  Args:
154
- filename: Name of the file
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
155
 
 
 
 
 
156
  Returns:
157
- int: File size in bytes or None if file doesn't exist
 
 
 
158
  """
159
  try:
160
- filepath = DATA_DIR / filename
161
- if filepath.exists():
162
- return filepath.stat().st_size
163
- return None
 
 
 
 
 
 
 
 
 
 
 
 
164
  except Exception as e:
165
- logger.error(f"Failed to get file size for {filename}: {str(e)}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
166
  return None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
167
 
168
- def ensure_data_directory():
169
- """Ensure data directory exists"""
170
- DATA_DIR.mkdir(exist_ok=True)
171
- return DATA_DIR
 
1
+ """
2
+ Storage utilities for the MONA application
3
+ Fixed version with proper error handling and data management
4
+ """
5
+
6
  import json
7
  import os
8
  import pickle
9
+ import csv
10
  from pathlib import Path
11
+ from typing import Any, Dict, List, Optional, Union
12
+ from datetime import datetime
13
 
14
  from utils.error_handling import handle_data_exceptions, DataError, ValidationError
15
+ from utils.logging import get_logger, log_error, log_info, log_warning
16
+
17
+
18
+ class StorageManager:
19
+ """Manages data storage and retrieval operations"""
20
+
21
+ def __init__(self, base_path: str = "data"):
22
+ self.base_path = Path(base_path)
23
+ self.base_path.mkdir(exist_ok=True)
24
+ self.logger = get_logger(__name__)
25
+
26
+ def get_file_path(self, filename: str, subfolder: str = None) -> Path:
27
+ """Get the full file path for a given filename"""
28
+ if subfolder:
29
+ folder = self.base_path / subfolder
30
+ folder.mkdir(exist_ok=True)
31
+ return folder / filename
32
+ return self.base_path / filename
33
 
 
34
 
35
+ # Global storage manager instance
36
+ _storage_manager = StorageManager()
37
+
38
 
39
  @handle_data_exceptions
40
+ def save_data(data: Any, filename: str, format_type: str = "json", subfolder: str = None) -> bool:
41
  """
42
+ Save data to file in specified format
43
 
44
  Args:
45
  data: Data to save
46
+ filename: Name of the file to save
47
+ format_type: Format to save in ('json', 'pickle', 'csv', 'txt')
48
+ subfolder: Optional subfolder to save in
49
+
50
  Returns:
51
+ bool: True if successful
52
+
53
+ Raises:
54
+ DataError: If saving fails
55
+ ValidationError: If parameters are invalid
56
  """
57
+ if not filename:
58
+ raise ValidationError("Filename cannot be empty")
59
+
60
+ if format_type not in ['json', 'pickle', 'csv', 'txt']:
61
+ raise ValidationError(f"Unsupported format type: {format_type}")
62
+
63
  try:
64
+ file_path = _storage_manager.get_file_path(filename, subfolder)
65
 
66
  if format_type == "json":
67
+ with open(file_path, 'w', encoding='utf-8') as f:
68
  json.dump(data, f, indent=2, ensure_ascii=False, default=str)
69
 
70
  elif format_type == "pickle":
71
+ with open(file_path, 'wb') as f:
72
  pickle.dump(data, f)
73
 
74
  elif format_type == "csv":
75
+ if not isinstance(data, (list, tuple)):
76
+ raise ValidationError("CSV format requires list or tuple data")
77
+
78
+ with open(file_path, 'w', newline='', encoding='utf-8') as f:
79
+ if data and isinstance(data[0], dict):
80
+ # Dictionary data
81
+ writer = csv.DictWriter(f, fieldnames=data[0].keys())
82
+ writer.writeheader()
83
+ writer.writerows(data)
84
+ else:
85
+ # List data
86
+ writer = csv.writer(f)
87
+ writer.writerows(data)
88
 
89
+ elif format_type == "txt":
90
+ with open(file_path, 'w', encoding='utf-8') as f:
91
+ if isinstance(data, str):
92
+ f.write(data)
93
+ else:
94
+ f.write(str(data))
95
 
96
+ log_info(f"Successfully saved data to {file_path}")
97
  return True
98
 
99
  except Exception as e:
100
+ raise DataError(f"Failed to save data to {filename}", details={"format": format_type, "error": str(e)})
101
+
102
 
103
  @handle_data_exceptions
104
+ def load_data(filename: str, format_type: str = "json", subfolder: str = None, default: Any = None) -> Any:
105
  """
106
+ Load data from file in specified format
107
 
108
  Args:
109
+ filename: Name of the file to load
110
+ format_type: Format to load from ('json', 'pickle', 'csv', 'txt')
111
+ subfolder: Optional subfolder to load from
112
+ default: Default value if file doesn't exist
113
+
114
  Returns:
115
+ Any: Loaded data or default value
116
+
117
+ Raises:
118
+ DataError: If loading fails
119
+ ValidationError: If parameters are invalid
120
  """
121
+ if not filename:
122
+ raise ValidationError("Filename cannot be empty")
123
+
124
+ if format_type not in ['json', 'pickle', 'csv', 'txt']:
125
+ raise ValidationError(f"Unsupported format type: {format_type}")
126
+
127
  try:
128
+ file_path = _storage_manager.get_file_path(filename, subfolder)
129
 
130
+ if not file_path.exists():
131
+ if default is not None:
132
+ log_warning(f"File {file_path} not found, returning default value")
133
+ return default
134
+ else:
135
+ raise DataError(f"File not found: {file_path}")
136
 
137
  if format_type == "json":
138
+ with open(file_path, 'r', encoding='utf-8') as f:
139
  data = json.load(f)
140
 
141
  elif format_type == "pickle":
142
+ with open(file_path, 'rb') as f:
143
  data = pickle.load(f)
144
 
145
  elif format_type == "csv":
146
+ data = []
147
+ with open(file_path, 'r', encoding='utf-8') as f:
148
+ # Try to detect if first row is header
149
+ sample = f.read(1024)
150
+ f.seek(0)
151
+
152
+ sniffer = csv.Sniffer()
153
+ has_header = sniffer.has_header(sample)
154
+
155
+ if has_header:
156
+ reader = csv.DictReader(f)
157
+ data = list(reader)
158
+ else:
159
+ reader = csv.reader(f)
160
+ data = list(reader)
161
 
162
+ elif format_type == "txt":
163
+ with open(file_path, 'r', encoding='utf-8') as f:
164
+ data = f.read()
165
 
166
+ log_info(f"Successfully loaded data from {file_path}")
167
  return data
168
 
169
  except Exception as e:
170
+ if default is not None:
171
+ log_warning(f"Failed to load {filename}, returning default: {str(e)}")
172
+ return default
173
+ raise DataError(f"Failed to load data from {filename}", details={"format": format_type, "error": str(e)})
174
+
175
 
176
  @handle_data_exceptions
177
+ def delete_file(filename: str, subfolder: str = None) -> bool:
178
  """
179
+ Delete a file
180
 
181
  Args:
182
  filename: Name of the file to delete
183
+ subfolder: Optional subfolder
184
+
185
  Returns:
186
+ bool: True if successful
187
+
188
+ Raises:
189
+ DataError: If deletion fails
190
  """
191
  try:
192
+ file_path = _storage_manager.get_file_path(filename, subfolder)
193
 
194
+ if file_path.exists():
195
+ file_path.unlink()
196
+ log_info(f"Successfully deleted {file_path}")
197
  return True
198
  else:
199
+ log_warning(f"File {file_path} does not exist")
200
  return False
201
 
202
  except Exception as e:
203
+ raise DataError(f"Failed to delete {filename}", details={"error": str(e)})
204
+
205
 
206
  @handle_data_exceptions
207
+ def list_files(subfolder: str = None, extension: str = None) -> List[str]:
208
  """
209
+ List files in the storage directory
210
 
211
+ Args:
212
+ subfolder: Optional subfolder to list
213
+ extension: Optional file extension filter
214
+
215
  Returns:
216
+ List[str]: List of filenames
217
+
218
+ Raises:
219
+ DataError: If listing fails
220
  """
221
  try:
222
+ if subfolder:
223
+ folder_path = _storage_manager.base_path / subfolder
224
+ else:
225
+ folder_path = _storage_manager.base_path
226
+
227
+ if not folder_path.exists():
228
+ return []
229
+
230
+ files = []
231
+ for file_path in folder_path.iterdir():
232
+ if file_path.is_file():
233
+ if extension is None or file_path.suffix.lower() == extension.lower():
234
+ files.append(file_path.name)
235
+
236
+ return sorted(files)
237
 
238
  except Exception as e:
239
+ raise DataError("Failed to list files", details={"subfolder": subfolder, "error": str(e)})
240
+
241
 
242
+ @handle_data_exceptions
243
+ def file_exists(filename: str, subfolder: str = None) -> bool:
244
  """
245
+ Check if a file exists
246
 
247
  Args:
248
+ filename: Name of the file to check
249
+ subfolder: Optional subfolder
250
+
251
+ Returns:
252
+ bool: True if file exists
253
+ """
254
+ try:
255
+ file_path = _storage_manager.get_file_path(filename, subfolder)
256
+ return file_path.exists()
257
+ except Exception as e:
258
+ log_error(f"Error checking file existence: {str(e)}", error=e)
259
+ return False
260
+
261
+
262
+ @handle_data_exceptions
263
+ def get_file_info(filename: str, subfolder: str = None) -> Dict:
264
+ """
265
+ Get information about a file
266
 
267
+ Args:
268
+ filename: Name of the file
269
+ subfolder: Optional subfolder
270
+
271
  Returns:
272
+ Dict: File information including size, modified time, etc.
273
+
274
+ Raises:
275
+ DataError: If file doesn't exist or info retrieval fails
276
  """
277
  try:
278
+ file_path = _storage_manager.get_file_path(filename, subfolder)
279
+
280
+ if not file_path.exists():
281
+ raise DataError(f"File not found: {filename}")
282
+
283
+ stat = file_path.stat()
284
+
285
+ return {
286
+ "name": file_path.name,
287
+ "size": stat.st_size,
288
+ "modified": datetime.fromtimestamp(stat.st_mtime).isoformat(),
289
+ "created": datetime.fromtimestamp(stat.st_ctime).isoformat(),
290
+ "extension": file_path.suffix,
291
+ "path": str(file_path)
292
+ }
293
+
294
  except Exception as e:
295
+ raise DataError(f"Failed to get file info for {filename}", details={"error": str(e)})
296
+
297
+
298
+ # Cache management
299
+ class DataCache:
300
+ """Simple in-memory data cache"""
301
+
302
+ def __init__(self, max_size: int = 100):
303
+ self.cache: Dict[str, Any] = {}
304
+ self.access_times: Dict[str, datetime] = {}
305
+ self.max_size = max_size
306
+
307
+ def get(self, key: str) -> Optional[Any]:
308
+ """Get value from cache"""
309
+ if key in self.cache:
310
+ self.access_times[key] = datetime.now()
311
+ return self.cache[key]
312
  return None
313
+
314
+ def set(self, key: str, value: Any) -> None:
315
+ """Set value in cache"""
316
+ if len(self.cache) >= self.max_size:
317
+ # Remove oldest item
318
+ oldest_key = min(self.access_times.keys(), key=lambda k: self.access_times[k])
319
+ del self.cache[oldest_key]
320
+ del self.access_times[oldest_key]
321
+
322
+ self.cache[key] = value
323
+ self.access_times[key] = datetime.now()
324
+
325
+ def clear(self) -> None:
326
+ """Clear all cache"""
327
+ self.cache.clear()
328
+ self.access_times.clear()
329
+
330
+ def remove(self, key: str) -> bool:
331
+ """Remove specific key from cache"""
332
+ if key in self.cache:
333
+ del self.cache[key]
334
+ del self.access_times[key]
335
+ return True
336
+ return False
337
+
338
+
339
+ # Global cache instance
340
+ _data_cache = DataCache()
341
+
342
+
343
+ def get_cached_data(key: str) -> Optional[Any]:
344
+ """Get data from cache"""
345
+ return _data_cache.get(key)
346
+
347
+
348
+ def set_cached_data(key: str, value: Any) -> None:
349
+ """Set data in cache"""
350
+ _data_cache.set(key, value)
351
+
352
 
353
+ def clear_cache() -> None:
354
+ """Clear all cached data"""
355
+ _data_cache.clear()