Spaces:
Running
Running
| from __future__ import annotations | |
| from abc import abstractmethod | |
| from typing import TYPE_CHECKING | |
| from langflow.services.base import Service | |
| if TYPE_CHECKING: | |
| from langflow.services.session.service import SessionService | |
| from langflow.services.settings.service import SettingsService | |
| class StorageService(Service): | |
| name = "storage_service" | |
| def __init__(self, session_service: SessionService, settings_service: SettingsService): | |
| self.settings_service = settings_service | |
| self.session_service = session_service | |
| self.set_ready() | |
| def build_full_path(self, flow_id: str, file_name: str) -> str: | |
| raise NotImplementedError | |
| def set_ready(self) -> None: | |
| self.ready = True | |
| async def save_file(self, flow_id: str, file_name: str, data) -> None: | |
| raise NotImplementedError | |
| async def get_file(self, flow_id: str, file_name: str) -> bytes: | |
| raise NotImplementedError | |
| async def list_files(self, flow_id: str) -> list[str]: | |
| raise NotImplementedError | |
| async def delete_file(self, flow_id: str, file_name: str) -> None: | |
| raise NotImplementedError | |
| async def teardown(self) -> None: | |
| raise NotImplementedError | |