Spaces:
Runtime error
Runtime error
File size: 6,789 Bytes
63deadc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 |
import asyncio
import inspect
import io
import os
import time
import pytest
import fsspec
import fsspec.asyn
from fsspec.asyn import _run_coros_in_chunks
def test_sync_methods():
inst = fsspec.asyn.AsyncFileSystem()
assert inspect.iscoroutinefunction(inst._info)
assert hasattr(inst, "info")
assert inst.info.__qualname__ == "AsyncFileSystem._info"
assert not inspect.iscoroutinefunction(inst.info)
def test_when_sync_methods_are_disabled():
class TestFS(fsspec.asyn.AsyncFileSystem):
mirror_sync_methods = False
inst = TestFS()
assert inspect.iscoroutinefunction(inst._info)
assert not inspect.iscoroutinefunction(inst.info)
assert inst.info.__qualname__ == "AbstractFileSystem.info"
def test_interrupt():
loop = fsspec.asyn.get_loop()
async def f():
await asyncio.sleep(1000000)
return True
fut = asyncio.run_coroutine_threadsafe(f(), loop)
time.sleep(0.01) # task launches
out = fsspec.asyn._dump_running_tasks(with_task=True)
task = out[0]["task"]
assert task.done() and fut.done()
assert isinstance(fut.exception(), fsspec.asyn.FSSpecCoroutineCancel)
class _DummyAsyncKlass:
def __init__(self):
self.loop = fsspec.asyn.get_loop()
async def _dummy_async_func(self):
# Sleep 1 second function to test timeout
await asyncio.sleep(1)
return True
async def _bad_multiple_sync(self):
fsspec.asyn.sync_wrapper(_DummyAsyncKlass._dummy_async_func)(self)
return True
dummy_func = fsspec.asyn.sync_wrapper(_dummy_async_func)
bad_multiple_sync_func = fsspec.asyn.sync_wrapper(_bad_multiple_sync)
def test_sync_wrapper_timeout_on_less_than_expected_wait_time_not_finish_function():
test_obj = _DummyAsyncKlass()
with pytest.raises(fsspec.FSTimeoutError):
test_obj.dummy_func(timeout=0.1)
def test_sync_wrapper_timeout_on_more_than_expected_wait_time_will_finish_function():
test_obj = _DummyAsyncKlass()
assert test_obj.dummy_func(timeout=5)
def test_sync_wrapper_timeout_none_will_wait_func_finished():
test_obj = _DummyAsyncKlass()
assert test_obj.dummy_func(timeout=None)
def test_sync_wrapper_treat_timeout_0_as_none():
test_obj = _DummyAsyncKlass()
assert test_obj.dummy_func(timeout=0)
def test_sync_wrapper_bad_multiple_sync():
test_obj = _DummyAsyncKlass()
with pytest.raises(NotImplementedError):
test_obj.bad_multiple_sync_func(timeout=5)
def test_run_coros_in_chunks(monkeypatch):
total_running = 0
async def runner():
nonlocal total_running
total_running += 1
await asyncio.sleep(0)
if total_running > 4:
raise ValueError("More than 4 coroutines are running together")
total_running -= 1
return 1
async def main(**kwargs):
nonlocal total_running
total_running = 0
coros = [runner() for _ in range(32)]
results = await _run_coros_in_chunks(coros, **kwargs)
for result in results:
if isinstance(result, Exception):
raise result
return results
assert sum(asyncio.run(main(batch_size=4))) == 32
with pytest.raises(ValueError):
asyncio.run(main(batch_size=5))
with pytest.raises(ValueError):
asyncio.run(main(batch_size=-1))
assert sum(asyncio.run(main(batch_size=4))) == 32
monkeypatch.setitem(fsspec.config.conf, "gather_batch_size", 5)
with pytest.raises(ValueError):
asyncio.run(main())
assert sum(asyncio.run(main(batch_size=4))) == 32 # override
monkeypatch.setitem(fsspec.config.conf, "gather_batch_size", 4)
assert sum(asyncio.run(main())) == 32 # override
@pytest.mark.skipif(os.name != "nt", reason="only for windows")
def test_windows_policy():
from asyncio.windows_events import SelectorEventLoop
loop = fsspec.asyn.get_loop()
policy = asyncio.get_event_loop_policy()
# Ensure that the created loop always uses selector policy
assert isinstance(loop, SelectorEventLoop)
# Ensure that the global policy is not changed and it is
# set to the default one. This is important since the
# get_loop() method will temporarily override the policy
# with the one which uses selectors on windows, so this
# check ensures that we are restoring the old policy back
# after our change.
assert isinstance(policy, asyncio.DefaultEventLoopPolicy)
def test_running_async():
assert not fsspec.asyn.running_async()
async def go():
assert fsspec.asyn.running_async()
asyncio.run(go())
class DummyAsyncFS(fsspec.asyn.AsyncFileSystem):
_file_class = fsspec.asyn.AbstractAsyncStreamedFile
async def _info(self, path, **kwargs):
return {"name": "misc/foo.txt", "type": "file", "size": 100}
async def open_async(
self,
path,
mode="rb",
block_size=None,
autocommit=True,
cache_options=None,
**kwargs,
):
return DummyAsyncStreamedFile(
self,
path,
mode,
block_size,
autocommit,
cache_options=cache_options,
**kwargs,
)
class DummyAsyncStreamedFile(fsspec.asyn.AbstractAsyncStreamedFile):
def __init__(self, fs, path, mode, block_size, autocommit, **kwargs):
super().__init__(fs, path, mode, block_size, autocommit, **kwargs)
self.temp_buffer = io.BytesIO(b"foo-bar" * 20)
async def _fetch_range(self, start, end):
return self.temp_buffer.read(end - start)
async def _initiate_upload(self):
# Reinitialize for new uploads.
self.temp_buffer = io.BytesIO()
async def _upload_chunk(self, final=False):
self.temp_buffer.write(self.buffer.getbuffer())
async def get_data(self):
return self.temp_buffer.getbuffer().tobytes()
async def get_data(self):
return self.temp_buffer.getbuffer().tobytes()
@pytest.mark.asyncio
async def test_async_streamed_file_write():
test_fs = DummyAsyncFS()
streamed_file = await test_fs.open_async("misc/foo.txt", mode="wb")
inp_data = "foo-bar".encode("utf8") * streamed_file.blocksize * 2
await streamed_file.write(inp_data)
assert streamed_file.loc == len(inp_data)
await streamed_file.close()
out_data = await streamed_file.get_data()
assert out_data.count(b"foo-bar") == streamed_file.blocksize * 2
@pytest.mark.asyncio
async def test_async_streamed_file_read():
test_fs = DummyAsyncFS()
streamed_file = await test_fs.open_async("misc/foo.txt", mode="rb")
assert (
await streamed_file.read(7 * 3) + await streamed_file.read(7 * 18)
== b"foo-bar" * 20
)
await streamed_file.close()
|