Spaces:
Running
Running
| from pathlib import Path | |
| from aiofile import async_open | |
| from loguru import logger | |
| from .service import StorageService | |
| class LocalStorageService(StorageService): | |
| """A service class for handling local storage operations without aiofiles.""" | |
| def __init__(self, session_service, settings_service) -> None: | |
| """Initialize the local storage service with session and settings services.""" | |
| super().__init__(session_service, settings_service) | |
| self.data_dir = Path(settings_service.settings.config_dir) | |
| self.set_ready() | |
| def build_full_path(self, flow_id: str, file_name: str) -> str: | |
| """Build the full path of a file in the local storage.""" | |
| return str(self.data_dir / flow_id / file_name) | |
| async def save_file(self, flow_id: str, file_name: str, data: bytes) -> None: | |
| """Save a file in the local storage. | |
| :param flow_id: The identifier for the flow. | |
| :param file_name: The name of the file to be saved. | |
| :param data: The byte content of the file. | |
| :raises FileNotFoundError: If the specified flow does not exist. | |
| :raises IsADirectoryError: If the file name is a directory. | |
| :raises PermissionError: If there is no permission to write the file. | |
| """ | |
| folder_path = self.data_dir / flow_id | |
| folder_path.mkdir(parents=True, exist_ok=True) | |
| file_path = folder_path / file_name | |
| try: | |
| async with async_open(file_path, "wb") as f: | |
| await f.write(data) | |
| logger.info(f"File {file_name} saved successfully in flow {flow_id}.") | |
| except Exception: | |
| logger.exception(f"Error saving file {file_name} in flow {flow_id}") | |
| raise | |
| async def get_file(self, flow_id: str, file_name: str) -> bytes: | |
| """Retrieve a file from the local storage. | |
| :param flow_id: The identifier for the flow. | |
| :param file_name: The name of the file to be retrieved. | |
| :return: The byte content of the file. | |
| :raises FileNotFoundError: If the file does not exist. | |
| """ | |
| file_path = self.data_dir / flow_id / file_name | |
| if not file_path.exists(): | |
| logger.warning(f"File {file_name} not found in flow {flow_id}.") | |
| msg = f"File {file_name} not found in flow {flow_id}" | |
| raise FileNotFoundError(msg) | |
| async with async_open(file_path, "rb") as f: | |
| content = await f.read() | |
| logger.debug(f"File {file_name} retrieved successfully from flow {flow_id}.") | |
| return content | |
| async def list_files(self, flow_id: str): | |
| """List all files in a specified flow. | |
| :param flow_id: The identifier for the flow. | |
| :return: A list of file names. | |
| :raises FileNotFoundError: If the flow directory does not exist. | |
| """ | |
| folder_path = self.data_dir / flow_id | |
| if not folder_path.exists() or not folder_path.is_dir(): | |
| logger.warning(f"Flow {flow_id} directory does not exist.") | |
| msg = f"Flow {flow_id} directory does not exist." | |
| raise FileNotFoundError(msg) | |
| files = [file.name for file in folder_path.iterdir() if file.is_file()] | |
| logger.info(f"Listed {len(files)} files in flow {flow_id}.") | |
| return files | |
| async def delete_file(self, flow_id: str, file_name: str) -> None: | |
| """Delete a file from the local storage. | |
| :param flow_id: The identifier for the flow. | |
| :param file_name: The name of the file to be deleted. | |
| """ | |
| file_path = self.data_dir / flow_id / file_name | |
| if file_path.exists(): | |
| file_path.unlink() | |
| logger.info(f"File {file_name} deleted successfully from flow {flow_id}.") | |
| else: | |
| logger.warning(f"Attempted to delete non-existent file {file_name} in flow {flow_id}.") | |
| async def teardown(self) -> None: | |
| """Perform any cleanup operations when the service is being torn down.""" | |
| # No specific teardown actions required for local | |