Skip to content

Commit fe9e3ff

Browse files
authored
Merge pull request #11 from iterative/refactor-asyncfs
Refactor asyncfs
2 parents 6481513 + 8ed3ced commit fe9e3ff

File tree

3 files changed

+277
-84
lines changed

3 files changed

+277
-84
lines changed

setup.cfg

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ tests =
4040
pytest-sugar==0.9.5
4141
pytest-cov==3.0.0
4242
pytest-mock==3.8.2
43+
pytest-asyncio==0.19.0
4344
pylint==2.15.0
4445
mypy==0.971
4546
%(all)s

src/morefs/asyn_local.py

Lines changed: 64 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,21 @@
11
import asyncio
2-
import datetime
32
import errno
4-
import functools
53
import os
4+
import posixpath
65
import shutil
6+
from contextlib import asynccontextmanager
77

88
import aiofile
99
import aiofiles.os
1010
from aiofiles.os import wrap # type: ignore[attr-defined]
11-
from fsspec import AbstractFileSystem
12-
from fsspec.asyn import AbstractBufferedFile, AsyncFileSystem
11+
from fsspec.asyn import AbstractAsyncStreamedFile, AsyncFileSystem
1312
from fsspec.implementations.local import LocalFileSystem
1413

15-
aiofiles.os.chmod = wrap(os.chmod) # type: ignore[attr-defined]
16-
aiofiles.os.utime = wrap(os.utime) # type: ignore[attr-defined]
17-
aiofiles.os.path.islink = wrap(os.path.islink) # type: ignore[attr-defined]
18-
aiofiles.os.path.lexists = wrap(os.path.lexists) # type: ignore[attr-defined]
19-
async_rmtree = wrap(shutil.rmtree) # type: ignore[attr-defined]
20-
async_move = wrap(shutil.move) # type: ignore[attr-defined]
21-
async_copyfile = wrap(shutil.copyfile) # type: ignore[attr-defined]
22-
23-
24-
def _copy_to_fobj(fs, path1, fdst):
25-
with fs.open(path1, "rb") as fsrc:
26-
shutil.copyfileobj(fsrc, fdst)
27-
28-
29-
async_copy_to_fobj = wrap(_copy_to_fobj)
14+
async_utime = wrap(os.utime)
15+
async_islink = wrap(os.path.islink)
16+
async_rmtree = wrap(shutil.rmtree)
17+
async_copyfile = wrap(shutil.copyfile)
18+
async_get_file = wrap(LocalFileSystem.get_file)
3019

3120

3221
async def copy_asyncfileobj(fsrc, fdst, length=shutil.COPY_BUFSIZE):
@@ -36,40 +25,31 @@ async def copy_asyncfileobj(fsrc, fdst, length=shutil.COPY_BUFSIZE):
3625
await fdst_write(buf)
3726

3827

39-
# pylint: disable=arguments-renamed
40-
28+
# pylint: disable=abstract-method
4129

42-
def wrapped(func):
43-
@functools.wraps(func)
44-
def inner(self, *args, **kwargs):
45-
return func(self, *args, **kwargs)
4630

47-
return inner
31+
class AsyncLocalFileSystem(AsyncFileSystem, LocalFileSystem):
32+
# temporary hack, upstream should support `mirror_sync_methods` instead.
33+
async_impl = False
4834

49-
50-
class AsyncLocalFileSystem(AsyncFileSystem): # pylint: disable=abstract-method
51-
find = wrapped(AbstractFileSystem.find)
52-
walk = wrapped(AbstractFileSystem.walk)
53-
exists = wrapped(AbstractFileSystem.exists)
54-
isdir = wrapped(AbstractFileSystem.isdir)
55-
isfile = wrapped(AbstractFileSystem.isfile)
56-
lexists = staticmethod(LocalFileSystem.lexists)
57-
58-
ls = wrapped(LocalFileSystem.ls)
59-
info = wrapped(LocalFileSystem.info)
35+
_chmod = wrap(LocalFileSystem.chmod)
36+
_created = wrap(LocalFileSystem.created)
6037
_info = wrap(LocalFileSystem.info)
38+
_lexists = wrap(LocalFileSystem.lexists)
39+
_makedirs = wrap(LocalFileSystem.makedirs)
40+
_modified = wrap(LocalFileSystem.modified)
41+
_mv_file = wrap(LocalFileSystem.mv_file)
42+
_rm_file = wrap(LocalFileSystem.rm_file)
43+
_rmdir = wrap(LocalFileSystem.rmdir)
6144

6245
async def _ls(self, path, detail=True, **kwargs):
46+
path = self._strip_protocol(path)
6347
if detail:
64-
entries = await aiofiles.os.scandir(path)
65-
return [await self._info(f) for f in entries]
66-
return [os.path.join(path, f) for f in await aiofiles.os.listdir(path)]
67-
68-
async def _rm_file(self, path, **kwargs):
69-
await aiofiles.os.remove(path)
70-
71-
async def _rmdir(self, path):
72-
await aiofiles.os.rmdir(path)
48+
with await aiofiles.os.scandir(path) as entries:
49+
return [await self._info(f) for f in entries]
50+
return [
51+
posixpath.join(path, f) for f in await aiofiles.os.listdir(path)
52+
]
7353

7454
async def _mkdir(self, path, create_parents=True, **kwargs):
7555
if create_parents:
@@ -78,11 +58,9 @@ async def _mkdir(self, path, create_parents=True, **kwargs):
7858
errno.EEXIST, os.strerror(errno.EEXIST), path
7959
)
8060
return await self._makedirs(path, exist_ok=True)
61+
path = self._strip_protocol(path)
8162
await aiofiles.os.mkdir(path)
8263

83-
async def _makedirs(self, path, exist_ok=False):
84-
await aiofiles.os.makedirs(path, exist_ok=exist_ok)
85-
8664
async def _cat_file(self, path, start=None, end=None, **kwargs):
8765
async with self.open_async(path, "rb") as f:
8866
if start is not None:
@@ -100,76 +78,78 @@ async def _pipe_file(self, path, value, **kwargs):
10078
async with self.open_async(path, "wb") as f:
10179
await f.write(value)
10280

103-
async def _put_file(self, path1, path2, **kwargs):
104-
await self._cp_file(path1, path2, **kwargs)
105-
106-
async def _get_file(self, path1, path2, **kwargs):
81+
async def _get_file( # pylint: disable=arguments-renamed
82+
self, path1, path2, **kwargs
83+
):
10784
write_method = getattr(path2, "write", None)
10885
if not write_method:
10986
return await self._cp_file(path1, path2, **kwargs)
11087
if isinstance(
111-
path2, AbstractBufferedFile
88+
path2, AbstractAsyncStreamedFile
11289
) or asyncio.iscoroutinefunction(write_method):
11390
async with self.open_async(path1, "rb") as fsrc:
114-
return await async_copy_to_fobj(fsrc, path2)
115-
return await async_copy_to_fobj(path1, path2)
91+
return await copy_asyncfileobj(fsrc, path2)
92+
93+
path1 = self._strip_protocol(path1)
94+
return await async_get_file(self, path1, path2)
11695

11796
async def _cp_file(self, path1, path2, **kwargs):
97+
path1 = self._strip_protocol(path1)
98+
path2 = self._strip_protocol(path2)
99+
if self.auto_mkdir:
100+
await self._makedirs(self._parent(path2), exist_ok=True)
118101
if await self._isfile(path1):
119102
return await async_copyfile(path1, path2)
120103
if await self._isdir(path1):
121104
return await self._makedirs(path2, exist_ok=True)
122-
raise FileNotFoundError
123-
124-
async def _mv_file(self, path1, path2, **kwargs):
125-
await async_move(path1, path2)
126-
127-
async def _lexists(self, path, **kwargs):
128-
return await aiofiles.os.path.lexists(path)
105+
raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), path1)
129106

130-
async def _created(self, path):
131-
info = await self._info(path=path)
132-
return datetime.datetime.utcfromtimestamp(info["created"])
133-
134-
async def _modified(self, path):
135-
info = await self._info(path=path)
136-
return datetime.datetime.utcfromtimestamp(info["mtime"])
107+
_put_file = _cp_file
137108

138109
async def _rm(
139-
self, path, recursive=False, maxdepth=None
140-
): # pylint: disable=arguments-differ, unused-argument
141-
if isinstance(path, str):
110+
self, path, recursive=False, batch_size=None, maxdepth=None, **kwargs
111+
):
112+
if isinstance(path, (str, os.PathLike)):
142113
path = [path]
143114

115+
assert not maxdepth and not batch_size
144116
for p in path:
117+
p = self._strip_protocol(p)
145118
if recursive and await self._isdir(p):
146119
if os.path.abspath(p) == os.getcwd():
147120
raise ValueError("Cannot delete current working directory")
148121
await async_rmtree(p)
149122
else:
150123
await aiofiles.os.remove(p)
151124

152-
async def _chmod(self, path, mode):
153-
await aiofiles.os.chmod(path, mode)
154-
155125
async def _link(self, src, dst):
126+
src = self._strip_protocol(src)
127+
dst = self._strip_protocol(dst)
156128
await aiofiles.os.link(src, dst)
157129

158130
async def _symlink(self, src, dst):
131+
src = self._strip_protocol(src)
132+
dst = self._strip_protocol(dst)
159133
await aiofiles.os.symlink(src, dst)
160134

161135
async def _islink(self, path):
162-
return await aiofiles.os.path.islink(path)
136+
path = self._strip_protocol(path)
137+
return await async_islink(path)
163138

164139
async def _touch(self, path, **kwargs):
165-
if self._exists(path):
166-
return await aiofiles.os.utime(path, None)
140+
if self.auto_mkdir:
141+
await self._makedirs(self._parent(path), exist_ok=True)
142+
if await self._exists(path):
143+
path = self._strip_protocol(path)
144+
return await async_utime(path, None)
167145
async with self.open_async(path, "a"):
168146
pass
169147

170-
_open = LocalFileSystem._open # pylint: disable=protected-access
148+
@asynccontextmanager
149+
async def open_async(self, path, mode="rb", **kwargs):
150+
path = self._strip_protocol(path)
151+
if self.auto_mkdir and "w" in mode:
152+
await self._makedirs(self._parent(path), exist_ok=True)
171153

172-
def open_async( # pylint: disable=invalid-overridden-method
173-
self, path, mode="rb", **kwargs
174-
):
175-
return aiofile.async_open(path, mode, **kwargs)
154+
async with aiofile.async_open(path, mode, **kwargs) as f:
155+
yield f

0 commit comments

Comments
 (0)