Spaces:
Sleeping
Sleeping
| import base64 | |
| import urllib | |
| import requests | |
| import requests.exceptions | |
| from requests.adapters import HTTPAdapter, Retry | |
| from fsspec import AbstractFileSystem | |
| from fsspec.spec import AbstractBufferedFile | |
| class DatabricksException(Exception): | |
| """ | |
| Helper class for exceptions raised in this module. | |
| """ | |
| def __init__(self, error_code, message): | |
| """Create a new DatabricksException""" | |
| super().__init__(message) | |
| self.error_code = error_code | |
| self.message = message | |
| class DatabricksFileSystem(AbstractFileSystem): | |
| """ | |
| Get access to the Databricks filesystem implementation over HTTP. | |
| Can be used inside and outside of a databricks cluster. | |
| """ | |
| def __init__(self, instance, token, **kwargs): | |
| """ | |
| Create a new DatabricksFileSystem. | |
| Parameters | |
| ---------- | |
| instance: str | |
| The instance URL of the databricks cluster. | |
| For example for an Azure databricks cluster, this | |
| has the form adb-<some-number>.<two digits>.azuredatabricks.net. | |
| token: str | |
| Your personal token. Find out more | |
| here: https://docs.databricks.com/dev-tools/api/latest/authentication.html | |
| """ | |
| self.instance = instance | |
| self.token = token | |
| self.session = requests.Session() | |
| self.retries = Retry( | |
| total=10, | |
| backoff_factor=0.05, | |
| status_forcelist=[408, 429, 500, 502, 503, 504], | |
| ) | |
| self.session.mount("https://", HTTPAdapter(max_retries=self.retries)) | |
| self.session.headers.update({"Authorization": f"Bearer {self.token}"}) | |
| super().__init__(**kwargs) | |
| def ls(self, path, detail=True, **kwargs): | |
| """ | |
| List the contents of the given path. | |
| Parameters | |
| ---------- | |
| path: str | |
| Absolute path | |
| detail: bool | |
| Return not only the list of filenames, | |
| but also additional information on file sizes | |
| and types. | |
| """ | |
| out = self._ls_from_cache(path) | |
| if not out: | |
| try: | |
| r = self._send_to_api( | |
| method="get", endpoint="list", json={"path": path} | |
| ) | |
| except DatabricksException as e: | |
| if e.error_code == "RESOURCE_DOES_NOT_EXIST": | |
| raise FileNotFoundError(e.message) | |
| raise e | |
| files = r["files"] | |
| out = [ | |
| { | |
| "name": o["path"], | |
| "type": "directory" if o["is_dir"] else "file", | |
| "size": o["file_size"], | |
| } | |
| for o in files | |
| ] | |
| self.dircache[path] = out | |
| if detail: | |
| return out | |
| return [o["name"] for o in out] | |
| def makedirs(self, path, exist_ok=True): | |
| """ | |
| Create a given absolute path and all of its parents. | |
| Parameters | |
| ---------- | |
| path: str | |
| Absolute path to create | |
| exist_ok: bool | |
| If false, checks if the folder | |
| exists before creating it (and raises an | |
| Exception if this is the case) | |
| """ | |
| if not exist_ok: | |
| try: | |
| # If the following succeeds, the path is already present | |
| self._send_to_api( | |
| method="get", endpoint="get-status", json={"path": path} | |
| ) | |
| raise FileExistsError(f"Path {path} already exists") | |
| except DatabricksException as e: | |
| if e.error_code == "RESOURCE_DOES_NOT_EXIST": | |
| pass | |
| try: | |
| self._send_to_api(method="post", endpoint="mkdirs", json={"path": path}) | |
| except DatabricksException as e: | |
| if e.error_code == "RESOURCE_ALREADY_EXISTS": | |
| raise FileExistsError(e.message) | |
| raise e | |
| self.invalidate_cache(self._parent(path)) | |
| def mkdir(self, path, create_parents=True, **kwargs): | |
| """ | |
| Create a given absolute path and all of its parents. | |
| Parameters | |
| ---------- | |
| path: str | |
| Absolute path to create | |
| create_parents: bool | |
| Whether to create all parents or not. | |
| "False" is not implemented so far. | |
| """ | |
| if not create_parents: | |
| raise NotImplementedError | |
| self.mkdirs(path, **kwargs) | |
| def rm(self, path, recursive=False, **kwargs): | |
| """ | |
| Remove the file or folder at the given absolute path. | |
| Parameters | |
| ---------- | |
| path: str | |
| Absolute path what to remove | |
| recursive: bool | |
| Recursively delete all files in a folder. | |
| """ | |
| try: | |
| self._send_to_api( | |
| method="post", | |
| endpoint="delete", | |
| json={"path": path, "recursive": recursive}, | |
| ) | |
| except DatabricksException as e: | |
| # This is not really an exception, it just means | |
| # not everything was deleted so far | |
| if e.error_code == "PARTIAL_DELETE": | |
| self.rm(path=path, recursive=recursive) | |
| elif e.error_code == "IO_ERROR": | |
| # Using the same exception as the os module would use here | |
| raise OSError(e.message) | |
| raise e | |
| self.invalidate_cache(self._parent(path)) | |
| def mv( | |
| self, source_path, destination_path, recursive=False, maxdepth=None, **kwargs | |
| ): | |
| """ | |
| Move a source to a destination path. | |
| A note from the original [databricks API manual] | |
| (https://docs.databricks.com/dev-tools/api/latest/dbfs.html#move). | |
| When moving a large number of files the API call will time out after | |
| approximately 60s, potentially resulting in partially moved data. | |
| Therefore, for operations that move more than 10k files, we strongly | |
| discourage using the DBFS REST API. | |
| Parameters | |
| ---------- | |
| source_path: str | |
| From where to move (absolute path) | |
| destination_path: str | |
| To where to move (absolute path) | |
| recursive: bool | |
| Not implemented to far. | |
| maxdepth: | |
| Not implemented to far. | |
| """ | |
| if recursive: | |
| raise NotImplementedError | |
| if maxdepth: | |
| raise NotImplementedError | |
| try: | |
| self._send_to_api( | |
| method="post", | |
| endpoint="move", | |
| json={"source_path": source_path, "destination_path": destination_path}, | |
| ) | |
| except DatabricksException as e: | |
| if e.error_code == "RESOURCE_DOES_NOT_EXIST": | |
| raise FileNotFoundError(e.message) | |
| elif e.error_code == "RESOURCE_ALREADY_EXISTS": | |
| raise FileExistsError(e.message) | |
| raise e | |
| self.invalidate_cache(self._parent(source_path)) | |
| self.invalidate_cache(self._parent(destination_path)) | |
| def _open(self, path, mode="rb", block_size="default", **kwargs): | |
| """ | |
| Overwrite the base class method to make sure to create a DBFile. | |
| All arguments are copied from the base method. | |
| Only the default blocksize is allowed. | |
| """ | |
| return DatabricksFile(self, path, mode=mode, block_size=block_size, **kwargs) | |
| def _send_to_api(self, method, endpoint, json): | |
| """ | |
| Send the given json to the DBFS API | |
| using a get or post request (specified by the argument `method`). | |
| Parameters | |
| ---------- | |
| method: str | |
| Which http method to use for communication; "get" or "post". | |
| endpoint: str | |
| Where to send the request to (last part of the API URL) | |
| json: dict | |
| Dictionary of information to send | |
| """ | |
| if method == "post": | |
| session_call = self.session.post | |
| elif method == "get": | |
| session_call = self.session.get | |
| else: | |
| raise ValueError(f"Do not understand method {method}") | |
| url = urllib.parse.urljoin(f"https://{self.instance}/api/2.0/dbfs/", endpoint) | |
| r = session_call(url, json=json) | |
| # The DBFS API will return a json, also in case of an exception. | |
| # We want to preserve this information as good as possible. | |
| try: | |
| r.raise_for_status() | |
| except requests.HTTPError as e: | |
| # try to extract json error message | |
| # if that fails, fall back to the original exception | |
| try: | |
| exception_json = e.response.json() | |
| except Exception: | |
| raise e | |
| raise DatabricksException(**exception_json) | |
| return r.json() | |
| def _create_handle(self, path, overwrite=True): | |
| """ | |
| Internal function to create a handle, which can be used to | |
| write blocks of a file to DBFS. | |
| A handle has a unique identifier which needs to be passed | |
| whenever written during this transaction. | |
| The handle is active for 10 minutes - after that a new | |
| write transaction needs to be created. | |
| Make sure to close the handle after you are finished. | |
| Parameters | |
| ---------- | |
| path: str | |
| Absolute path for this file. | |
| overwrite: bool | |
| If a file already exist at this location, either overwrite | |
| it or raise an exception. | |
| """ | |
| try: | |
| r = self._send_to_api( | |
| method="post", | |
| endpoint="create", | |
| json={"path": path, "overwrite": overwrite}, | |
| ) | |
| return r["handle"] | |
| except DatabricksException as e: | |
| if e.error_code == "RESOURCE_ALREADY_EXISTS": | |
| raise FileExistsError(e.message) | |
| raise e | |
| def _close_handle(self, handle): | |
| """ | |
| Close a handle, which was opened by :func:`_create_handle`. | |
| Parameters | |
| ---------- | |
| handle: str | |
| Which handle to close. | |
| """ | |
| try: | |
| self._send_to_api(method="post", endpoint="close", json={"handle": handle}) | |
| except DatabricksException as e: | |
| if e.error_code == "RESOURCE_DOES_NOT_EXIST": | |
| raise FileNotFoundError(e.message) | |
| raise e | |
| def _add_data(self, handle, data): | |
| """ | |
| Upload data to an already opened file handle | |
| (opened by :func:`_create_handle`). | |
| The maximal allowed data size is 1MB after | |
| conversion to base64. | |
| Remember to close the handle when you are finished. | |
| Parameters | |
| ---------- | |
| handle: str | |
| Which handle to upload data to. | |
| data: bytes | |
| Block of data to add to the handle. | |
| """ | |
| data = base64.b64encode(data).decode() | |
| try: | |
| self._send_to_api( | |
| method="post", | |
| endpoint="add-block", | |
| json={"handle": handle, "data": data}, | |
| ) | |
| except DatabricksException as e: | |
| if e.error_code == "RESOURCE_DOES_NOT_EXIST": | |
| raise FileNotFoundError(e.message) | |
| elif e.error_code == "MAX_BLOCK_SIZE_EXCEEDED": | |
| raise ValueError(e.message) | |
| raise e | |
| def _get_data(self, path, start, end): | |
| """ | |
| Download data in bytes from a given absolute path in a block | |
| from [start, start+length]. | |
| The maximum number of allowed bytes to read is 1MB. | |
| Parameters | |
| ---------- | |
| path: str | |
| Absolute path to download data from | |
| start: int | |
| Start position of the block | |
| end: int | |
| End position of the block | |
| """ | |
| try: | |
| r = self._send_to_api( | |
| method="get", | |
| endpoint="read", | |
| json={"path": path, "offset": start, "length": end - start}, | |
| ) | |
| return base64.b64decode(r["data"]) | |
| except DatabricksException as e: | |
| if e.error_code == "RESOURCE_DOES_NOT_EXIST": | |
| raise FileNotFoundError(e.message) | |
| elif e.error_code in ["INVALID_PARAMETER_VALUE", "MAX_READ_SIZE_EXCEEDED"]: | |
| raise ValueError(e.message) | |
| raise e | |
| def invalidate_cache(self, path=None): | |
| if path is None: | |
| self.dircache.clear() | |
| else: | |
| self.dircache.pop(path, None) | |
| super().invalidate_cache(path) | |
| class DatabricksFile(AbstractBufferedFile): | |
| """ | |
| Helper class for files referenced in the DatabricksFileSystem. | |
| """ | |
| DEFAULT_BLOCK_SIZE = 1 * 2**20 # only allowed block size | |
| def __init__( | |
| self, | |
| fs, | |
| path, | |
| mode="rb", | |
| block_size="default", | |
| autocommit=True, | |
| cache_type="readahead", | |
| cache_options=None, | |
| **kwargs, | |
| ): | |
| """ | |
| Create a new instance of the DatabricksFile. | |
| The blocksize needs to be the default one. | |
| """ | |
| if block_size is None or block_size == "default": | |
| block_size = self.DEFAULT_BLOCK_SIZE | |
| assert ( | |
| block_size == self.DEFAULT_BLOCK_SIZE | |
| ), f"Only the default block size is allowed, not {block_size}" | |
| super().__init__( | |
| fs, | |
| path, | |
| mode=mode, | |
| block_size=block_size, | |
| autocommit=autocommit, | |
| cache_type=cache_type, | |
| cache_options=cache_options or {}, | |
| **kwargs, | |
| ) | |
| def _initiate_upload(self): | |
| """Internal function to start a file upload""" | |
| self.handle = self.fs._create_handle(self.path) | |
| def _upload_chunk(self, final=False): | |
| """Internal function to add a chunk of data to a started upload""" | |
| self.buffer.seek(0) | |
| data = self.buffer.getvalue() | |
| data_chunks = [ | |
| data[start:end] for start, end in self._to_sized_blocks(len(data)) | |
| ] | |
| for data_chunk in data_chunks: | |
| self.fs._add_data(handle=self.handle, data=data_chunk) | |
| if final: | |
| self.fs._close_handle(handle=self.handle) | |
| return True | |
| def _fetch_range(self, start, end): | |
| """Internal function to download a block of data""" | |
| return_buffer = b"" | |
| length = end - start | |
| for chunk_start, chunk_end in self._to_sized_blocks(length, start): | |
| return_buffer += self.fs._get_data( | |
| path=self.path, start=chunk_start, end=chunk_end | |
| ) | |
| return return_buffer | |
| def _to_sized_blocks(self, length, start=0): | |
| """Helper function to split a range from 0 to total_length into bloksizes""" | |
| end = start + length | |
| for data_chunk in range(start, end, self.blocksize): | |
| data_start = data_chunk | |
| data_end = min(end, data_chunk + self.blocksize) | |
| yield data_start, data_end | |