|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import json |
|
import warnings |
|
from contextlib import contextmanager |
|
from io import BytesIO, StringIO |
|
from pathlib import Path |
|
from typing import IO, Any, Generator, Iterator, Optional, Tuple, Union |
|
|
|
from cosmos_transfer1.utils.easy_io.backends import backends, prefix_to_backends |
|
from cosmos_transfer1.utils.easy_io.file_client import FileClient |
|
from cosmos_transfer1.utils.easy_io.handlers import file_handlers |
|
|
|
backend_instances: dict = {} |
|
|
|
|
|
def is_filepath(filepath): |
|
return isinstance(filepath, (str, Path)) |
|
|
|
|
|
def _parse_uri_prefix(uri: Union[str, Path]) -> str: |
|
"""Parse the prefix of uri. |
|
|
|
Args: |
|
uri (str or Path): Uri to be parsed that contains the file prefix. |
|
|
|
Examples: |
|
>>> _parse_uri_prefix('/home/path/of/your/file') |
|
'' |
|
>>> _parse_uri_prefix('s3://path/of/your/file') |
|
's3' |
|
>>> _parse_uri_prefix('clusterName:s3://path/of/your/file') |
|
's3' |
|
|
|
Returns: |
|
str: Return the prefix of uri if the uri contains '://'. Otherwise, |
|
return ''. |
|
""" |
|
assert is_filepath(uri) |
|
uri = str(uri) |
|
|
|
|
|
if "://" not in uri: |
|
return "" |
|
else: |
|
prefix, _ = uri.split("://") |
|
if ":" in prefix: |
|
_, prefix = prefix.split(":") |
|
return prefix |
|
|
|
|
|
def _get_file_backend(prefix: str, backend_args: dict): |
|
"""Return a file backend based on the prefix or backend_args. |
|
|
|
Args: |
|
prefix (str): Prefix of uri. |
|
backend_args (dict): Arguments to instantiate the corresponding |
|
backend. |
|
""" |
|
|
|
if "backend" in backend_args: |
|
|
|
backend_args_bak = backend_args.copy() |
|
backend_name = backend_args_bak.pop("backend") |
|
backend = backends[backend_name](**backend_args_bak) |
|
else: |
|
backend = prefix_to_backends[prefix](**backend_args) |
|
return backend |
|
|
|
|
|
def get_file_backend( |
|
uri: Union[str, Path, None] = None, |
|
*, |
|
backend_args: Optional[dict] = None, |
|
enable_singleton: bool = False, |
|
backend_key: Optional[str] = None, |
|
): |
|
"""Return a file backend based on the prefix of uri or backend_args. |
|
|
|
Args: |
|
uri (str or Path): Uri to be parsed that contains the file prefix. |
|
backend_args (dict, optional): Arguments to instantiate the |
|
corresponding backend. Defaults to None. |
|
enable_singleton (bool): Whether to enable the singleton pattern. |
|
If it is True, the backend created will be reused if the |
|
signature is same with the previous one. Defaults to False. |
|
backend_key: str: The key to register the backend. Defaults to None. |
|
|
|
Returns: |
|
BaseStorageBackend: Instantiated Backend object. |
|
|
|
Examples: |
|
>>> # get file backend based on the prefix of uri |
|
>>> uri = 's3://path/of/your/file' |
|
>>> backend = get_file_backend(uri) |
|
>>> # get file backend based on the backend_args |
|
>>> backend = get_file_backend(backend_args={'backend': 's3'}) |
|
>>> # backend name has a higher priority if 'backend' in backend_args |
|
>>> backend = get_file_backend(uri, backend_args={'backend': 's3'}) |
|
""" |
|
global backend_instances |
|
if backend_key is not None: |
|
if backend_key in backend_instances: |
|
return backend_instances[backend_key] |
|
|
|
if backend_args is None: |
|
backend_args = {} |
|
|
|
if uri is None and "backend" not in backend_args and backend_key is None: |
|
raise ValueError( |
|
'uri should not be None when "backend" does not exist in ' "backend_args and backend_key is None" |
|
) |
|
|
|
if uri is not None: |
|
prefix = _parse_uri_prefix(uri) |
|
else: |
|
prefix = "" |
|
|
|
if enable_singleton: |
|
unique_key = f"{prefix}:{json.dumps(backend_args)}" |
|
if unique_key in backend_instances: |
|
return backend_instances[unique_key] |
|
|
|
backend = _get_file_backend(prefix, backend_args) |
|
backend_instances[unique_key] = backend |
|
if backend_key is not None: |
|
backend_instances[backend_key] = backend |
|
return backend |
|
else: |
|
backend = _get_file_backend(prefix, backend_args) |
|
return backend |
|
|
|
|
|
def get( |
|
filepath: Union[str, Path], |
|
backend_args: Optional[dict] = None, |
|
backend_key: Optional[str] = None, |
|
) -> bytes: |
|
"""Read bytes from a given ``filepath`` with 'rb' mode. |
|
|
|
Args: |
|
filepath (str or Path): Path to read data. |
|
backend_args (dict, optional): Arguments to instantiate the |
|
corresponding backend. Defaults to None. |
|
backend_key (str, optional): The key to get the backend from register. |
|
|
|
Returns: |
|
bytes: Expected bytes object. |
|
|
|
Examples: |
|
>>> filepath = '/path/of/file' |
|
>>> get(filepath) |
|
b'hello world' |
|
""" |
|
backend = get_file_backend( |
|
filepath, |
|
backend_args=backend_args, |
|
enable_singleton=True, |
|
backend_key=backend_key, |
|
) |
|
return backend.get(filepath) |
|
|
|
|
|
def get_text( |
|
filepath: Union[str, Path], |
|
encoding="utf-8", |
|
backend_args: Optional[dict] = None, |
|
backend_key: Optional[str] = None, |
|
) -> str: |
|
"""Read text from a given ``filepath`` with 'r' mode. |
|
|
|
Args: |
|
filepath (str or Path): Path to read data. |
|
encoding (str): The encoding format used to open the ``filepath``. |
|
Defaults to 'utf-8'. |
|
backend_args (dict, optional): Arguments to instantiate the |
|
corresponding backend. Defaults to None. |
|
backend_key (str, optional): The key to get the backend from register. |
|
|
|
Returns: |
|
str: Expected text reading from ``filepath``. |
|
|
|
Examples: |
|
>>> filepath = '/path/of/file' |
|
>>> get_text(filepath) |
|
'hello world' |
|
""" |
|
backend = get_file_backend( |
|
filepath, |
|
backend_args=backend_args, |
|
enable_singleton=True, |
|
backend_key=backend_key, |
|
) |
|
return backend.get_text(filepath, encoding) |
|
|
|
|
|
def put( |
|
obj: bytes, |
|
filepath: Union[str, Path], |
|
backend_args: Optional[dict] = None, |
|
backend_key: Optional[str] = None, |
|
) -> None: |
|
"""Write bytes to a given ``filepath`` with 'wb' mode. |
|
|
|
Note: |
|
``put`` should create a directory if the directory of |
|
``filepath`` does not exist. |
|
|
|
Args: |
|
obj (bytes): Data to be written. |
|
filepath (str or Path): Path to write data. |
|
backend_args (dict, optional): Arguments to instantiate the |
|
corresponding backend. Defaults to None. |
|
backend_key (str, optional): The key to get the backend from register. |
|
|
|
Examples: |
|
>>> filepath = '/path/of/file' |
|
>>> put(b'hello world', filepath) |
|
""" |
|
backend = get_file_backend( |
|
filepath, |
|
backend_args=backend_args, |
|
enable_singleton=True, |
|
backend_key=backend_key, |
|
) |
|
backend.put(obj, filepath) |
|
|
|
|
|
def put_text( |
|
obj: str, |
|
filepath: Union[str, Path], |
|
backend_args: Optional[dict] = None, |
|
backend_key: Optional[str] = None, |
|
) -> None: |
|
"""Write text to a given ``filepath`` with 'w' mode. |
|
|
|
Note: |
|
``put_text`` should create a directory if the directory of |
|
``filepath`` does not exist. |
|
|
|
Args: |
|
obj (str): Data to be written. |
|
filepath (str or Path): Path to write data. |
|
encoding (str, optional): The encoding format used to open the |
|
``filepath``. Defaults to 'utf-8'. |
|
backend_args (dict, optional): Arguments to instantiate the |
|
corresponding backend. Defaults to None. |
|
backend_key (str, optional): The key to get the backend from register. |
|
|
|
Examples: |
|
>>> filepath = '/path/of/file' |
|
>>> put_text('hello world', filepath) |
|
""" |
|
backend = get_file_backend( |
|
filepath, |
|
backend_args=backend_args, |
|
enable_singleton=True, |
|
backend_key=backend_key, |
|
) |
|
backend.put_text(obj, filepath) |
|
|
|
|
|
def exists( |
|
filepath: Union[str, Path], |
|
backend_args: Optional[dict] = None, |
|
backend_key: Optional[str] = None, |
|
) -> bool: |
|
"""Check whether a file path exists. |
|
|
|
Args: |
|
filepath (str or Path): Path to be checked whether exists. |
|
backend_args (dict, optional): Arguments to instantiate the |
|
corresponding backend. Defaults to None. |
|
backend_key (str, optional): The key to get the backend from register. |
|
|
|
Returns: |
|
bool: Return ``True`` if ``filepath`` exists, ``False`` otherwise. |
|
|
|
Examples: |
|
>>> filepath = '/path/of/file' |
|
>>> exists(filepath) |
|
True |
|
""" |
|
backend = get_file_backend( |
|
filepath, |
|
backend_args=backend_args, |
|
enable_singleton=True, |
|
backend_key=backend_key, |
|
) |
|
return backend.exists(filepath) |
|
|
|
|
|
def isdir( |
|
filepath: Union[str, Path], |
|
backend_args: Optional[dict] = None, |
|
backend_key: Optional[str] = None, |
|
) -> bool: |
|
"""Check whether a file path is a directory. |
|
|
|
Args: |
|
filepath (str or Path): Path to be checked whether it is a |
|
directory. |
|
backend_args (dict, optional): Arguments to instantiate the |
|
corresponding backend. Defaults to None. |
|
backend_key (str, optional): The key to get the backend from register. |
|
|
|
Returns: |
|
bool: Return ``True`` if ``filepath`` points to a directory, |
|
``False`` otherwise. |
|
|
|
Examples: |
|
>>> filepath = '/path/of/dir' |
|
>>> isdir(filepath) |
|
True |
|
""" |
|
backend = get_file_backend( |
|
filepath, |
|
backend_args=backend_args, |
|
enable_singleton=True, |
|
backend_key=backend_key, |
|
) |
|
return backend.isdir(filepath) |
|
|
|
|
|
def isfile( |
|
filepath: Union[str, Path], |
|
backend_args: Optional[dict] = None, |
|
backend_key: Optional[str] = None, |
|
) -> bool: |
|
"""Check whether a file path is a file. |
|
|
|
Args: |
|
filepath (str or Path): Path to be checked whether it is a file. |
|
backend_args (dict, optional): Arguments to instantiate the |
|
corresponding backend. Defaults to None. |
|
backend_key (str, optional): The key to get the backend from register. |
|
|
|
Returns: |
|
bool: Return ``True`` if ``filepath`` points to a file, ``False`` |
|
otherwise. |
|
|
|
Examples: |
|
>>> filepath = '/path/of/file' |
|
>>> isfile(filepath) |
|
True |
|
""" |
|
backend = get_file_backend( |
|
filepath, |
|
backend_args=backend_args, |
|
enable_singleton=True, |
|
backend_key=backend_key, |
|
) |
|
return backend.isfile(filepath) |
|
|
|
|
|
def join_path( |
|
filepath: Union[str, Path], |
|
*filepaths: Union[str, Path], |
|
backend_args: Optional[dict] = None, |
|
backend_key: Optional[str] = None, |
|
) -> Union[str, Path]: |
|
r"""Concatenate all file paths. |
|
|
|
Join one or more filepath components intelligently. The return value |
|
is the concatenation of filepath and any members of \*filepaths. |
|
|
|
Args: |
|
filepath (str or Path): Path to be concatenated. |
|
*filepaths (str or Path): Other paths to be concatenated. |
|
backend_args (dict, optional): Arguments to instantiate the |
|
corresponding backend. Defaults to None. |
|
backend_key (str, optional): The key to get the backend from register. |
|
|
|
Returns: |
|
str: The result of concatenation. |
|
|
|
Examples: |
|
>>> filepath1 = '/path/of/dir1' |
|
>>> filepath2 = 'dir2' |
|
>>> filepath3 = 'path/of/file' |
|
>>> join_path(filepath1, filepath2, filepath3) |
|
'/path/of/dir/dir2/path/of/file' |
|
""" |
|
backend = get_file_backend( |
|
filepath, |
|
backend_args=backend_args, |
|
enable_singleton=True, |
|
backend_key=backend_key, |
|
) |
|
return backend.join_path(filepath, *filepaths) |
|
|
|
|
|
@contextmanager |
|
def get_local_path( |
|
filepath: Union[str, Path], |
|
backend_args: Optional[dict] = None, |
|
backend_key: Optional[str] = None, |
|
) -> Generator[Union[str, Path], None, None]: |
|
"""Download data from ``filepath`` and write the data to local path. |
|
|
|
``get_local_path`` is decorated by :meth:`contxtlib.contextmanager`. It |
|
can be called with ``with`` statement, and when exists from the |
|
``with`` statement, the temporary path will be released. |
|
|
|
Note: |
|
If the ``filepath`` is a local path, just return itself and it will |
|
not be released (removed). |
|
|
|
Args: |
|
filepath (str or Path): Path to be read data. |
|
backend_args (dict, optional): Arguments to instantiate the |
|
corresponding backend. Defaults to None. |
|
|
|
Yields: |
|
Iterable[str]: Only yield one path. |
|
|
|
Examples: |
|
>>> with get_local_path('abc/def.jpg') as path: |
|
... # do something here |
|
""" |
|
backend = get_file_backend( |
|
filepath, |
|
backend_args=backend_args, |
|
enable_singleton=True, |
|
backend_key=backend_key, |
|
) |
|
with backend.get_local_path(str(filepath)) as local_path: |
|
yield local_path |
|
|
|
|
|
def copyfile( |
|
src: Union[str, Path], |
|
dst: Union[str, Path], |
|
backend_args: Optional[dict] = None, |
|
backend_key: Optional[str] = None, |
|
) -> Union[str, Path]: |
|
"""Copy a file src to dst and return the destination file. |
|
|
|
src and dst should have the same prefix. If dst specifies a directory, |
|
the file will be copied into dst using the base filename from src. If |
|
dst specifies a file that already exists, it will be replaced. |
|
|
|
Args: |
|
src (str or Path): A file to be copied. |
|
dst (str or Path): Copy file to dst. |
|
backend_args (dict, optional): Arguments to instantiate the |
|
corresponding backend. Defaults to None. |
|
|
|
Returns: |
|
str: The destination file. |
|
|
|
Raises: |
|
SameFileError: If src and dst are the same file, a SameFileError will |
|
be raised. |
|
|
|
Examples: |
|
>>> # dst is a file |
|
>>> src = '/path/of/file' |
|
>>> dst = '/path1/of/file1' |
|
>>> # src will be copied to '/path1/of/file1' |
|
>>> copyfile(src, dst) |
|
'/path1/of/file1' |
|
|
|
>>> # dst is a directory |
|
>>> dst = '/path1/of/dir' |
|
>>> # src will be copied to '/path1/of/dir/file' |
|
>>> copyfile(src, dst) |
|
'/path1/of/dir/file' |
|
""" |
|
backend = get_file_backend(src, backend_args=backend_args, enable_singleton=True, backend_key=backend_key) |
|
return backend.copyfile(src, dst) |
|
|
|
|
|
def copytree( |
|
src: Union[str, Path], |
|
dst: Union[str, Path], |
|
backend_args: Optional[dict] = None, |
|
backend_key: Optional[str] = None, |
|
) -> Union[str, Path]: |
|
"""Recursively copy an entire directory tree rooted at src to a directory |
|
named dst and return the destination directory. |
|
|
|
src and dst should have the same prefix and dst must not already exist. |
|
|
|
Args: |
|
src (str or Path): A directory to be copied. |
|
dst (str or Path): Copy directory to dst. |
|
backend_args (dict, optional): Arguments to instantiate the |
|
corresponding backend. Defaults to None. |
|
backend_key (str, optional): The key to get the backend from register. |
|
|
|
Returns: |
|
str: The destination directory. |
|
|
|
Raises: |
|
FileExistsError: If dst had already existed, a FileExistsError will be |
|
raised. |
|
|
|
Examples: |
|
>>> src = '/path/of/dir1' |
|
>>> dst = '/path/of/dir2' |
|
>>> copytree(src, dst) |
|
'/path/of/dir2' |
|
""" |
|
backend = get_file_backend(src, backend_args=backend_args, enable_singleton=True, backend_key=backend_key) |
|
return backend.copytree(src, dst) |
|
|
|
|
|
def copyfile_from_local( |
|
src: Union[str, Path], |
|
dst: Union[str, Path], |
|
backend_args: Optional[dict] = None, |
|
backend_key: Optional[str] = None, |
|
) -> Union[str, Path]: |
|
"""Copy a local file src to dst and return the destination file. |
|
|
|
Note: |
|
If the backend is the instance of LocalBackend, it does the same |
|
thing with :func:`copyfile`. |
|
|
|
Args: |
|
src (str or Path): A local file to be copied. |
|
dst (str or Path): Copy file to dst. |
|
backend_args (dict, optional): Arguments to instantiate the |
|
corresponding backend. Defaults to None. |
|
|
|
Returns: |
|
str: If dst specifies a directory, the file will be copied into dst |
|
using the base filename from src. |
|
|
|
Examples: |
|
>>> # dst is a file |
|
>>> src = '/path/of/file' |
|
>>> dst = 's3://openmmlab/mmengine/file1' |
|
>>> # src will be copied to 's3://openmmlab/mmengine/file1' |
|
>>> copyfile_from_local(src, dst) |
|
s3://openmmlab/mmengine/file1 |
|
|
|
>>> # dst is a directory |
|
>>> dst = 's3://openmmlab/mmengine' |
|
>>> # src will be copied to 's3://openmmlab/mmengine/file'' |
|
>>> copyfile_from_local(src, dst) |
|
's3://openmmlab/mmengine/file' |
|
""" |
|
backend = get_file_backend(dst, backend_args=backend_args, enable_singleton=True, backend_key=backend_key) |
|
return backend.copyfile_from_local(src, dst) |
|
|
|
|
|
def copytree_from_local( |
|
src: Union[str, Path], |
|
dst: Union[str, Path], |
|
backend_args: Optional[dict] = None, |
|
backend_key: Optional[str] = None, |
|
) -> Union[str, Path]: |
|
"""Recursively copy an entire directory tree rooted at src to a directory |
|
named dst and return the destination directory. |
|
|
|
Note: |
|
If the backend is the instance of LocalBackend, it does the same |
|
thing with :func:`copytree`. |
|
|
|
Args: |
|
src (str or Path): A local directory to be copied. |
|
dst (str or Path): Copy directory to dst. |
|
backend_args (dict, optional): Arguments to instantiate the |
|
corresponding backend. Defaults to None. |
|
|
|
Returns: |
|
str: The destination directory. |
|
|
|
Examples: |
|
>>> src = '/path/of/dir' |
|
>>> dst = 's3://openmmlab/mmengine/dir' |
|
>>> copyfile_from_local(src, dst) |
|
's3://openmmlab/mmengine/dir' |
|
""" |
|
backend = get_file_backend(dst, backend_args=backend_args, enable_singleton=True, backend_key=backend_key) |
|
return backend.copytree_from_local(src, dst) |
|
|
|
|
|
def copyfile_to_local( |
|
src: Union[str, Path], |
|
dst: Union[str, Path], |
|
dst_type: str, |
|
backend_args: Optional[dict] = None, |
|
backend_key: Optional[str] = None, |
|
) -> Union[str, Path]: |
|
"""Copy the file src to local dst and return the destination file. |
|
|
|
If dst specifies a directory, the file will be copied into dst using |
|
the base filename from src. If dst specifies a file that already |
|
exists, it will be replaced. |
|
|
|
Note: |
|
If the backend is the instance of LocalBackend, it does the same |
|
thing with :func:`copyfile`. |
|
|
|
Args: |
|
src (str or Path): A file to be copied. |
|
dst (str or Path): Copy file to to local dst. |
|
backend_args (dict, optional): Arguments to instantiate the |
|
corresponding backend. Defaults to None. |
|
|
|
Returns: |
|
str: If dst specifies a directory, the file will be copied into dst |
|
using the base filename from src. |
|
|
|
Examples: |
|
>>> # dst is a file |
|
>>> src = 's3://openmmlab/mmengine/file' |
|
>>> dst = '/path/of/file' |
|
>>> # src will be copied to '/path/of/file' |
|
>>> copyfile_to_local(src, dst) |
|
'/path/of/file' |
|
|
|
>>> # dst is a directory |
|
>>> dst = '/path/of/dir' |
|
>>> # src will be copied to '/path/of/dir/file' |
|
>>> copyfile_to_local(src, dst) |
|
'/path/of/dir/file' |
|
""" |
|
assert dst_type in ["file", "dir"] |
|
Path(dst).parent.mkdir(parents=True, exist_ok=True) |
|
backend = get_file_backend(src, backend_args=backend_args, enable_singleton=True, backend_key=backend_key) |
|
return backend.copyfile_to_local(src, dst, dst_type=dst_type) |
|
|
|
|
|
def copytree_to_local( |
|
src: Union[str, Path], |
|
dst: Union[str, Path], |
|
backend_args: Optional[dict] = None, |
|
backend_key: Optional[str] = None, |
|
) -> Union[str, Path]: |
|
"""Recursively copy an entire directory tree rooted at src to a local |
|
directory named dst and return the destination directory. |
|
|
|
Note: |
|
If the backend is the instance of LocalBackend, it does the same |
|
thing with :func:`copytree`. |
|
|
|
Args: |
|
src (str or Path): A directory to be copied. |
|
dst (str or Path): Copy directory to local dst. |
|
backend_args (dict, optional): Arguments to instantiate the |
|
corresponding backend. Defaults to None. |
|
|
|
Returns: |
|
str: The destination directory. |
|
|
|
Examples: |
|
>>> src = 's3://openmmlab/mmengine/dir' |
|
>>> dst = '/path/of/dir' |
|
>>> copytree_to_local(src, dst) |
|
'/path/of/dir' |
|
""" |
|
Path(dst).parent.mkdir(parents=True, exist_ok=True) |
|
backend = get_file_backend(dst, backend_args=backend_args, enable_singleton=True, backend_key=backend_key) |
|
return backend.copytree_to_local(src, dst) |
|
|
|
|
|
def remove( |
|
filepath: Union[str, Path], |
|
backend_args: Optional[dict] = None, |
|
backend_key: Optional[str] = None, |
|
) -> None: |
|
"""Remove a file. |
|
|
|
Args: |
|
filepath (str, Path): Path to be removed. |
|
backend_args (dict, optional): Arguments to instantiate the |
|
corresponding backend. Defaults to None. |
|
|
|
Raises: |
|
FileNotFoundError: If filepath does not exist, an FileNotFoundError |
|
will be raised. |
|
IsADirectoryError: If filepath is a directory, an IsADirectoryError |
|
will be raised. |
|
|
|
Examples: |
|
>>> filepath = '/path/of/file' |
|
>>> remove(filepath) |
|
""" |
|
backend = get_file_backend( |
|
filepath, |
|
backend_args=backend_args, |
|
enable_singleton=True, |
|
backend_key=backend_key, |
|
) |
|
backend.remove(filepath) |
|
|
|
|
|
def rmtree( |
|
dir_path: Union[str, Path], |
|
backend_args: Optional[dict] = None, |
|
backend_key: Optional[str] = None, |
|
) -> None: |
|
"""Recursively delete a directory tree. |
|
|
|
Args: |
|
dir_path (str or Path): A directory to be removed. |
|
backend_args (dict, optional): Arguments to instantiate the |
|
corresponding backend. Defaults to None. |
|
|
|
Examples: |
|
>>> dir_path = '/path/of/dir' |
|
>>> rmtree(dir_path) |
|
""" |
|
backend = get_file_backend( |
|
dir_path, |
|
backend_args=backend_args, |
|
enable_singleton=True, |
|
backend_key=backend_key, |
|
) |
|
backend.rmtree(dir_path) |
|
|
|
|
|
def copy_if_symlink_fails( |
|
src: Union[str, Path], |
|
dst: Union[str, Path], |
|
backend_args: Optional[dict] = None, |
|
backend_key: Optional[str] = None, |
|
) -> bool: |
|
"""Create a symbolic link pointing to src named dst. |
|
|
|
If failed to create a symbolic link pointing to src, directory copy src to |
|
dst instead. |
|
|
|
Args: |
|
src (str or Path): Create a symbolic link pointing to src. |
|
dst (str or Path): Create a symbolic link named dst. |
|
backend_args (dict, optional): Arguments to instantiate the |
|
corresponding backend. Defaults to None. |
|
|
|
Returns: |
|
bool: Return True if successfully create a symbolic link pointing to |
|
src. Otherwise, return False. |
|
|
|
Examples: |
|
>>> src = '/path/of/file' |
|
>>> dst = '/path1/of/file1' |
|
>>> copy_if_symlink_fails(src, dst) |
|
True |
|
>>> src = '/path/of/dir' |
|
>>> dst = '/path1/of/dir1' |
|
>>> copy_if_symlink_fails(src, dst) |
|
True |
|
""" |
|
backend = get_file_backend(src, backend_args=backend_args, enable_singleton=True, backend_key=backend_key) |
|
return backend.copy_if_symlink_fails(src, dst) |
|
|
|
|
|
def list_dir( |
|
dir_path: Union[str, Path], |
|
backend_args: Optional[dict] = None, |
|
backend_key: Optional[str] = None, |
|
): |
|
"""List all folders in an S3 bucket with a given prefix. |
|
|
|
Args: |
|
dir_path (str | Path): Path of the directory. |
|
|
|
Examples: |
|
>>> dir_path = '/path/of/dir' |
|
>>> for file_path in list_dir(dir_path): |
|
... print(file_path) |
|
""" |
|
if not dir_path.endswith("/"): |
|
dir_path += "/" |
|
backend = get_file_backend( |
|
dir_path, |
|
backend_args=backend_args, |
|
enable_singleton=True, |
|
backend_key=backend_key, |
|
) |
|
|
|
return backend.list_dir(dir_path) |
|
|
|
|
|
def list_dir_or_file( |
|
dir_path: Union[str, Path], |
|
list_dir: bool = True, |
|
list_file: bool = True, |
|
suffix: Optional[Union[str, Tuple[str]]] = None, |
|
recursive: bool = False, |
|
backend_args: Optional[dict] = None, |
|
backend_key: Optional[str] = None, |
|
) -> Iterator[str]: |
|
"""Scan a directory to find the interested directories or files in |
|
arbitrary order. |
|
|
|
Note: |
|
:meth:`list_dir_or_file` returns the path relative to ``dir_path``. |
|
|
|
Args: |
|
dir_path (str or Path): Path of the directory. |
|
list_dir (bool): List the directories. Defaults to True. |
|
list_file (bool): List the path of files. Defaults to True. |
|
suffix (str or tuple[str], optional): File suffix that we are |
|
interested in. Defaults to None. |
|
recursive (bool): If set to True, recursively scan the directory. |
|
Defaults to False. |
|
backend_args (dict, optional): Arguments to instantiate the |
|
corresponding backend. Defaults to None. |
|
|
|
Yields: |
|
Iterable[str]: A relative path to ``dir_path``. |
|
|
|
Examples: |
|
>>> dir_path = '/path/of/dir' |
|
>>> for file_path in list_dir_or_file(dir_path): |
|
... print(file_path) |
|
>>> # list those files and directories in current directory |
|
>>> for file_path in list_dir_or_file(dir_path): |
|
... print(file_path) |
|
>>> # only list files |
|
>>> for file_path in list_dir_or_file(dir_path, list_dir=False): |
|
... print(file_path) |
|
>>> # only list directories |
|
>>> for file_path in list_dir_or_file(dir_path, list_file=False): |
|
... print(file_path) |
|
>>> # only list files ending with specified suffixes |
|
>>> for file_path in list_dir_or_file(dir_path, suffix='.txt'): |
|
... print(file_path) |
|
>>> # list all files and directory recursively |
|
>>> for file_path in list_dir_or_file(dir_path, recursive=True): |
|
... print(file_path) |
|
""" |
|
backend = get_file_backend( |
|
dir_path, |
|
backend_args=backend_args, |
|
enable_singleton=True, |
|
backend_key=backend_key, |
|
) |
|
yield from backend.list_dir_or_file(dir_path, list_dir, list_file, suffix, recursive) |
|
|
|
|
|
def generate_presigned_url( |
|
url: str, |
|
client_method: str = "get_object", |
|
expires_in: int = 3600, |
|
backend_args: Optional[dict] = None, |
|
backend_key: Optional[str] = None, |
|
) -> str: |
|
"""Generate the presigned url of video stream which can be passed to |
|
mmcv.VideoReader. Now only work on s3 backend. |
|
|
|
Note: |
|
Now only work on s3 backend. |
|
|
|
Args: |
|
url (str): Url of video stream. |
|
client_method (str): Method of client, 'get_object' or |
|
'put_object'. Defaults to 'get_object'. |
|
expires_in (int): expires, in seconds. Defaults to 3600. |
|
backend_args (dict, optional): Arguments to instantiate the |
|
corresponding backend. Defaults to None. |
|
|
|
Returns: |
|
str: Generated presigned url. |
|
""" |
|
backend = get_file_backend(url, backend_args=backend_args, enable_singleton=True, backend_key=backend_key) |
|
return backend.generate_presigned_url(url, client_method, expires_in) |
|
|
|
|
|
def load( |
|
file: Union[str, Path, IO[Any]], |
|
file_format: Optional[str] = None, |
|
file_client_args: Optional[dict] = None, |
|
fast_backend: bool = False, |
|
backend_args: Optional[dict] = None, |
|
backend_key: Optional[str] = None, |
|
**kwargs, |
|
): |
|
"""Load data from json/yaml/pickle files. |
|
|
|
This method provides a unified api for loading data from serialized files. |
|
|
|
``load`` supports loading data from serialized files those can be storaged |
|
in different backends. |
|
|
|
Args: |
|
file (str or :obj:`Path` or file-like object): Filename or a file-like |
|
object. |
|
file_format (str, optional): If not specified, the file format will be |
|
inferred from the file extension, otherwise use the specified one. |
|
Currently supported formats include "json", "yaml/yml" and |
|
"pickle/pkl". |
|
file_client_args (dict, optional): Arguments to instantiate a |
|
FileClient. See :class:`mmengine.fileio.FileClient` for details. |
|
Defaults to None. It will be deprecated in future. Please use |
|
``backend_args`` instead. |
|
fast_backend: bool: Whether to use multiprocess. Defaults to False. |
|
backend_args (dict, optional): Arguments to instantiate the |
|
prefix of uri corresponding backend. Defaults to None. |
|
New in v0.2.0. |
|
|
|
Examples: |
|
>>> load('/path/of/your/file') # file is storaged in disk |
|
>>> load('https://path/of/your/file') # file is storaged in Internet |
|
>>> load('s3://path/of/your/file') # file is storaged in s3 |
|
|
|
Returns: |
|
The content from the file. |
|
""" |
|
if isinstance(file, Path): |
|
file = str(file) |
|
if file_format is None and isinstance(file, str): |
|
file_format = file.split(".")[-1] |
|
|
|
file_format = file_format.lower() |
|
if file_format not in file_handlers: |
|
raise TypeError(f"Unsupported format: {file_format}") |
|
|
|
if file_client_args is not None: |
|
warnings.warn( |
|
'"file_client_args" will be deprecated in future. ' 'Please use "backend_args" instead', |
|
DeprecationWarning, |
|
) |
|
if backend_args is not None: |
|
raise ValueError('"file_client_args and "backend_args" cannot be set at the ' "same time.") |
|
|
|
handler = file_handlers[file_format] |
|
if isinstance(file, str): |
|
if file_client_args is not None: |
|
file_client = FileClient.infer_client(file_client_args, file) |
|
file_backend = file_client |
|
else: |
|
file_backend = get_file_backend( |
|
file, |
|
backend_args=backend_args, |
|
backend_key=backend_key, |
|
enable_singleton=True, |
|
) |
|
|
|
if handler.str_like: |
|
with StringIO(file_backend.get_text(file)) as f: |
|
obj = handler.load_from_fileobj(f, **kwargs) |
|
else: |
|
if fast_backend: |
|
if hasattr(file_backend, "fast_get"): |
|
with BytesIO(file_backend.fast_get(file)) as f: |
|
obj = handler.load_from_fileobj(f, **kwargs) |
|
else: |
|
warnings.warn( |
|
f"fast_backend is not supported by the backend, type {type(file_backend)} fallback to normal get" |
|
) |
|
with BytesIO(file_backend.get(file)) as f: |
|
obj = handler.load_from_fileobj(f, **kwargs) |
|
else: |
|
with BytesIO(file_backend.get(file)) as f: |
|
obj = handler.load_from_fileobj(f, **kwargs) |
|
elif hasattr(file, "read"): |
|
obj = handler.load_from_fileobj(file, **kwargs) |
|
else: |
|
raise TypeError('"file" must be a filepath str or a file-object') |
|
return obj |
|
|
|
|
|
def dump( |
|
obj: Any, |
|
file: Union[str, Path, IO[Any], None] = None, |
|
file_format: Optional[str] = None, |
|
file_client_args: Optional[dict] = None, |
|
fast_backend: bool = False, |
|
backend_args: Optional[dict] = None, |
|
backend_key: Optional[str] = None, |
|
**kwargs, |
|
): |
|
"""Dump data to json/yaml/pickle strings or files. |
|
|
|
This method provides a unified api for dumping data as strings or to files, |
|
and also supports custom arguments for each file format. |
|
|
|
``dump`` supports dumping data as strings or to files which is saved to |
|
different backends. |
|
|
|
Args: |
|
obj (any): The python object to be dumped. |
|
file (str or :obj:`Path` or file-like object, optional): If not |
|
specified, then the object is dumped to a str, otherwise to a file |
|
specified by the filename or file-like object. |
|
file_format (str, optional): Same as :func:`load`. |
|
file_client_args (dict, optional): Arguments to instantiate a |
|
FileClient. See :class:`mmengine.fileio.FileClient` for details. |
|
Defaults to None. It will be deprecated in future. Please use |
|
``backend_args`` instead. |
|
fast_backend: bool: Whether to use multiprocess. Defaults to False. |
|
backend_args (dict, optional): Arguments to instantiate the |
|
prefix of uri corresponding backend. Defaults to None. |
|
New in v0.2.0. |
|
backend_key: str: The key to register the backend. Defaults to None. |
|
|
|
Examples: |
|
>>> dump('hello world', '/path/of/your/file') # disk |
|
>>> dump('hello world', 's3://path/of/your/file') # ceph or s3 |
|
|
|
Returns: |
|
bool: True for success, False otherwise. |
|
""" |
|
if isinstance(file, Path): |
|
file = str(file) |
|
if file_format is None: |
|
if isinstance(file, str): |
|
file_format = file.split(".")[-1] |
|
elif file is None: |
|
raise ValueError("file_format must be specified since file is None") |
|
|
|
file_format = file_format.lower() |
|
if file_format not in file_handlers: |
|
raise TypeError(f"Unsupported format: {file_format}") |
|
|
|
if file_client_args is not None: |
|
warnings.warn( |
|
'"file_client_args" will be deprecated in future. ' 'Please use "backend_args" instead', |
|
DeprecationWarning, |
|
) |
|
if backend_args is not None: |
|
raise ValueError('"file_client_args" and "backend_args" cannot be set at the ' "same time.") |
|
|
|
handler = file_handlers[file_format] |
|
if file is None: |
|
return handler.dump_to_str(obj, **kwargs) |
|
elif isinstance(file, str): |
|
if file_client_args is not None: |
|
file_client = FileClient.infer_client(file_client_args, file) |
|
file_backend = file_client |
|
else: |
|
file_backend = get_file_backend( |
|
file, |
|
backend_args=backend_args, |
|
backend_key=backend_key, |
|
enable_singleton=True, |
|
) |
|
|
|
if handler.str_like: |
|
with StringIO() as f: |
|
handler.dump_to_fileobj(obj, f, **kwargs) |
|
file_backend.put_text(f.getvalue(), file) |
|
else: |
|
with BytesIO() as f: |
|
handler.dump_to_fileobj(obj, f, **kwargs) |
|
if fast_backend: |
|
if hasattr(file_backend, "fast_put"): |
|
file_backend.fast_put(f, file) |
|
else: |
|
warnings.warn("fast_backend is not supported by the backend, fallback to normal put") |
|
file_backend.put(f, file) |
|
else: |
|
file_backend.put(f, file) |
|
elif hasattr(file, "write"): |
|
handler.dump_to_fileobj(obj, file, **kwargs) |
|
else: |
|
raise TypeError('"file" must be a filename str or a file-object') |
|
|