|
""" |
|
Storage utilities for data persistence |
|
""" |
|
import json |
|
import pickle |
|
import os |
|
from pathlib import Path |
|
from typing import Any, Dict, Optional, Union |
|
import pandas as pd |
|
import streamlit as st |
|
from utils.error_handling import handle_data_exceptions, DataError, ValidationError |
|
|
|
|
|
|
|
DATA_DIR = Path("data") |
|
CACHE_DIR = Path("cache") |
|
|
|
|
|
DATA_DIR.mkdir(exist_ok=True) |
|
CACHE_DIR.mkdir(exist_ok=True) |
|
|
|
|
|
@handle_data_exceptions |
|
def load_data(filename: str, data_dir: Optional[Path] = None) -> Optional[Dict[str, Any]]: |
|
""" |
|
Load JSON data from file |
|
""" |
|
if data_dir is None: |
|
data_dir = DATA_DIR |
|
|
|
filepath = data_dir / filename |
|
|
|
if not filepath.exists(): |
|
raise DataError(f"File {filepath} does not exist") |
|
|
|
try: |
|
with open(filepath, 'r', encoding='utf-8') as f: |
|
data = json.load(f) |
|
return data |
|
except json.JSONDecodeError as e: |
|
raise DataError(f"Invalid JSON in file {filepath}: {str(e)}") |
|
except Exception as e: |
|
raise DataError(f"Error loading data from {filepath}: {str(e)}") |
|
|
|
|
|
@handle_data_exceptions |
|
def save_data(data: Dict[str, Any], filename: str, data_dir: Optional[Path] = None) -> bool: |
|
""" |
|
Save data to JSON file |
|
""" |
|
if data_dir is None: |
|
data_dir = DATA_DIR |
|
|
|
if not isinstance(data, dict): |
|
raise ValidationError("Data must be a dictionary") |
|
|
|
filepath = data_dir / filename |
|
|
|
|
|
filepath.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
try: |
|
with open(filepath, 'w', encoding='utf-8') as f: |
|
json.dump(data, f, indent=2, ensure_ascii=False) |
|
return True |
|
except Exception as e: |
|
raise DataError(f"Error saving data to {filepath}: {str(e)}") |
|
|
|
|
|
@handle_data_exceptions |
|
def load_dataframe(filename: str, data_dir: Optional[Path] = None) -> Optional[pd.DataFrame]: |
|
""" |
|
Load DataFrame from various file formats |
|
""" |
|
if data_dir is None: |
|
data_dir = DATA_DIR |
|
|
|
filepath = data_dir / filename |
|
|
|
if not filepath.exists(): |
|
raise DataError(f"File {filepath} does not exist") |
|
|
|
file_extension = filepath.suffix.lower() |
|
|
|
try: |
|
if file_extension == '.csv': |
|
df = pd.read_csv(filepath) |
|
elif file_extension == '.xlsx': |
|
df = pd.read_excel(filepath) |
|
elif file_extension == '.json': |
|
df = pd.read_json(filepath) |
|
elif file_extension == '.pkl': |
|
df = pd.read_pickle(filepath) |
|
else: |
|
raise DataError(f"Unsupported file format: {file_extension}") |
|
|
|
return df |
|
except Exception as e: |
|
raise DataError(f"Error loading dataframe from {filepath}: {str(e)}") |
|
|
|
|
|
@handle_data_exceptions |
|
def save_dataframe(df: pd.DataFrame, filename: str, data_dir: Optional[Path] = None, |
|
file_format: str = 'csv') -> bool: |
|
""" |
|
Save DataFrame to file |
|
""" |
|
if data_dir is None: |
|
data_dir = DATA_DIR |
|
|
|
if not isinstance(df, pd.DataFrame): |
|
raise ValidationError("Data must be a pandas DataFrame") |
|
|
|
if df.empty: |
|
raise ValidationError("DataFrame cannot be empty") |
|
|
|
|
|
if not filename.endswith(f'.{file_format}'): |
|
filename = f"{filename}.{file_format}" |
|
|
|
filepath = data_dir / filename |
|
|
|
|
|
filepath.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
try: |
|
if file_format == 'csv': |
|
df.to_csv(filepath, index=False) |
|
elif file_format == 'xlsx': |
|
df.to_excel(filepath, index=False) |
|
elif file_format == 'json': |
|
df.to_json(filepath, orient='records', indent=2) |
|
elif file_format == 'pkl': |
|
df.to_pickle(filepath) |
|
else: |
|
raise DataError(f"Unsupported file format: {file_format}") |
|
|
|
return True |
|
except Exception as e: |
|
raise DataError(f"Error saving dataframe to {filepath}: {str(e)}") |
|
|
|
|
|
@handle_data_exceptions |
|
def load_pickle(filename: str, data_dir: Optional[Path] = None) -> Optional[Any]: |
|
""" |
|
Load pickled data from file |
|
""" |
|
if data_dir is None: |
|
data_dir = DATA_DIR |
|
|
|
filepath = data_dir / filename |
|
|
|
if not filepath.exists(): |
|
raise DataError(f"File {filepath} does not exist") |
|
|
|
try: |
|
with open(filepath, 'rb') as f: |
|
data = pickle.load(f) |
|
return data |
|
except Exception as e: |
|
raise DataError(f"Error loading pickle from {filepath}: {str(e)}") |
|
|
|
|
|
@handle_data_exceptions |
|
def save_pickle(data: Any, filename: str, data_dir: Optional[Path] = None) -> bool: |
|
""" |
|
Save data to pickle file |
|
""" |
|
if data_dir is None: |
|
data_dir = DATA_DIR |
|
|
|
if not filename.endswith('.pkl'): |
|
filename = f"{filename}.pkl" |
|
|
|
filepath = data_dir / filename |
|
|
|
|
|
filepath.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
try: |
|
with open(filepath, 'wb') as f: |
|
pickle.dump(data, f) |
|
return True |
|
except Exception as e: |
|
raise DataError(f"Error saving pickle to {filepath}: {str(e)}") |
|
|
|
|
|
def list_files(data_dir: Optional[Path] = None, pattern: str = "*") -> list[Path]: |
|
""" |
|
List files in data directory |
|
""" |
|
if data_dir is None: |
|
data_dir = DATA_DIR |
|
|
|
if not data_dir.exists(): |
|
return [] |
|
|
|
return list(data_dir.glob(pattern)) |
|
|
|
|
|
def file_exists(filename: str, data_dir: Optional[Path] = None) -> bool: |
|
""" |
|
Check if file exists |
|
""" |
|
if data_dir is None: |
|
data_dir = DATA_DIR |
|
|
|
filepath = data_dir / filename |
|
return filepath.exists() |
|
|
|
|
|
def delete_file(filename: str, data_dir: Optional[Path] = None) -> bool: |
|
""" |
|
Delete file |
|
""" |
|
if data_dir is None: |
|
data_dir = DATA_DIR |
|
|
|
filepath = data_dir / filename |
|
|
|
if filepath.exists(): |
|
try: |
|
filepath.unlink() |
|
return True |
|
except Exception as e: |
|
st.error(f"Error deleting file {filepath}: {str(e)}") |
|
return False |
|
return False |
|
|
|
|
|
def get_file_size(filename: str, data_dir: Optional[Path] = None) -> Optional[int]: |
|
""" |
|
Get file size in bytes |
|
""" |
|
if data_dir is None: |
|
data_dir = DATA_DIR |
|
|
|
filepath = data_dir / filename |
|
|
|
if filepath.exists(): |
|
return filepath.stat().st_size |
|
return None |
|
|
|
|
|
def create_backup(filename: str, data_dir: Optional[Path] = None) -> bool: |
|
""" |
|
Create backup of file |
|
""" |
|
if data_dir is None: |
|
data_dir = DATA_DIR |
|
|
|
filepath = data_dir / filename |
|
|
|
if not filepath.exists(): |
|
return False |
|
|
|
backup_filename = f"{filepath.stem}_backup{filepath.suffix}" |
|
backup_filepath = data_dir / backup_filename |
|
|
|
try: |
|
import shutil |
|
shutil.copy2(filepath, backup_filepath) |
|
return True |
|
except Exception as e: |
|
st.error(f"Error creating backup: {str(e)}") |
|
return False |
|
|
|
|
|
|
|
def init_session_state(key: str, default_value: Any = None): |
|
""" |
|
Initialize session state variable if it doesn't exist |
|
""" |
|
if key not in st.session_state: |
|
st.session_state[key] = default_value |
|
|
|
|
|
def get_session_state(key: str, default_value: Any = None) -> Any: |
|
""" |
|
Get value from session state |
|
""" |
|
return st.session_state.get(key, default_value) |
|
|
|
|
|
def set_session_state(key: str, value: Any): |
|
""" |
|
Set value in session state |
|
""" |
|
st.session_state[key] = value |
|
|
|
|
|
def clear_session_state(key: Optional[str] = None): |
|
""" |
|
Clear session state (specific key or all) |
|
""" |
|
if key: |
|
if key in st.session_state: |
|
del st.session_state[key] |
|
else: |
|
st.session_state.clear() |