Spaces:
Sleeping
Sleeping
| import dask | |
| from distributed.client import Client, _get_global_client | |
| from distributed.worker import Worker | |
| from fsspec import filesystem | |
| from fsspec.spec import AbstractBufferedFile, AbstractFileSystem | |
| from fsspec.utils import infer_storage_options | |
| def _get_client(client): | |
| if client is None: | |
| return _get_global_client() | |
| elif isinstance(client, Client): | |
| return client | |
| else: | |
| # e.g., connection string | |
| return Client(client) | |
| def _in_worker(): | |
| return bool(Worker._instances) | |
| class DaskWorkerFileSystem(AbstractFileSystem): | |
| """View files accessible to a worker as any other remote file-system | |
| When instances are run on the worker, uses the real filesystem. When | |
| run on the client, they call the worker to provide information or data. | |
| **Warning** this implementation is experimental, and read-only for now. | |
| """ | |
| def __init__( | |
| self, target_protocol=None, target_options=None, fs=None, client=None, **kwargs | |
| ): | |
| super().__init__(**kwargs) | |
| if not (fs is None) ^ (target_protocol is None): | |
| raise ValueError( | |
| "Please provide one of filesystem instance (fs) or" | |
| " target_protocol, not both" | |
| ) | |
| self.target_protocol = target_protocol | |
| self.target_options = target_options | |
| self.worker = None | |
| self.client = client | |
| self.fs = fs | |
| self._determine_worker() | |
| def _get_kwargs_from_urls(path): | |
| so = infer_storage_options(path) | |
| if "host" in so and "port" in so: | |
| return {"client": f"{so['host']}:{so['port']}"} | |
| else: | |
| return {} | |
| def _determine_worker(self): | |
| if _in_worker(): | |
| self.worker = True | |
| if self.fs is None: | |
| self.fs = filesystem( | |
| self.target_protocol, **(self.target_options or {}) | |
| ) | |
| else: | |
| self.worker = False | |
| self.client = _get_client(self.client) | |
| self.rfs = dask.delayed(self) | |
| def mkdir(self, *args, **kwargs): | |
| if self.worker: | |
| self.fs.mkdir(*args, **kwargs) | |
| else: | |
| self.rfs.mkdir(*args, **kwargs).compute() | |
| def rm(self, *args, **kwargs): | |
| if self.worker: | |
| self.fs.rm(*args, **kwargs) | |
| else: | |
| self.rfs.rm(*args, **kwargs).compute() | |
| def copy(self, *args, **kwargs): | |
| if self.worker: | |
| self.fs.copy(*args, **kwargs) | |
| else: | |
| self.rfs.copy(*args, **kwargs).compute() | |
| def mv(self, *args, **kwargs): | |
| if self.worker: | |
| self.fs.mv(*args, **kwargs) | |
| else: | |
| self.rfs.mv(*args, **kwargs).compute() | |
| def ls(self, *args, **kwargs): | |
| if self.worker: | |
| return self.fs.ls(*args, **kwargs) | |
| else: | |
| return self.rfs.ls(*args, **kwargs).compute() | |
| def _open( | |
| self, | |
| path, | |
| mode="rb", | |
| block_size=None, | |
| autocommit=True, | |
| cache_options=None, | |
| **kwargs, | |
| ): | |
| if self.worker: | |
| return self.fs._open( | |
| path, | |
| mode=mode, | |
| block_size=block_size, | |
| autocommit=autocommit, | |
| cache_options=cache_options, | |
| **kwargs, | |
| ) | |
| else: | |
| return DaskFile( | |
| fs=self, | |
| path=path, | |
| mode=mode, | |
| block_size=block_size, | |
| autocommit=autocommit, | |
| cache_options=cache_options, | |
| **kwargs, | |
| ) | |
| def fetch_range(self, path, mode, start, end): | |
| if self.worker: | |
| with self._open(path, mode) as f: | |
| f.seek(start) | |
| return f.read(end - start) | |
| else: | |
| return self.rfs.fetch_range(path, mode, start, end).compute() | |
| class DaskFile(AbstractBufferedFile): | |
| def __init__(self, mode="rb", **kwargs): | |
| if mode != "rb": | |
| raise ValueError('Remote dask files can only be opened in "rb" mode') | |
| super().__init__(**kwargs) | |
| def _upload_chunk(self, final=False): | |
| pass | |
| def _initiate_upload(self): | |
| """Create remote file/upload""" | |
| pass | |
| def _fetch_range(self, start, end): | |
| """Get the specified set of bytes from remote""" | |
| return self.fs.fetch_range(self.path, self.mode, start, end) | |