1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
|
import asyncio
import functools
import inspect
from fsspec.asyn import AsyncFileSystem
def async_wrapper(func, obj=None):
"""
Wraps a synchronous function to make it awaitable.
Parameters
----------
func : callable
The synchronous function to wrap.
obj : object, optional
The instance to bind the function to, if applicable.
Returns
-------
coroutine
An awaitable version of the function.
"""
@functools.wraps(func)
async def wrapper(*args, **kwargs):
return await asyncio.to_thread(func, *args, **kwargs)
return wrapper
class AsyncFileSystemWrapper(AsyncFileSystem):
"""
A wrapper class to convert a synchronous filesystem into an asynchronous one.
This class takes an existing synchronous filesystem implementation and wraps all
its methods to provide an asynchronous interface.
Parameters
----------
sync_fs : AbstractFileSystem
The synchronous filesystem instance to wrap.
"""
def __init__(self, sync_fs, *args, **kwargs):
super().__init__(*args, **kwargs)
self.asynchronous = True
self.sync_fs = sync_fs
self.protocol = self.sync_fs.protocol
self._wrap_all_sync_methods()
@property
def fsid(self):
return f"async_{self.sync_fs.fsid}"
def _wrap_all_sync_methods(self):
"""
Wrap all synchronous methods of the underlying filesystem with asynchronous versions.
"""
for method_name in dir(self.sync_fs):
if method_name.startswith("_"):
continue
attr = inspect.getattr_static(self.sync_fs, method_name)
if isinstance(attr, property):
continue
method = getattr(self.sync_fs, method_name)
if callable(method) and not asyncio.iscoroutinefunction(method):
async_method = async_wrapper(method, obj=self)
setattr(self, f"_{method_name}", async_method)
@classmethod
def wrap_class(cls, sync_fs_class):
"""
Create a new class that can be used to instantiate an AsyncFileSystemWrapper
with lazy instantiation of the underlying synchronous filesystem.
Parameters
----------
sync_fs_class : type
The class of the synchronous filesystem to wrap.
Returns
-------
type
A new class that wraps the provided synchronous filesystem class.
"""
class GeneratedAsyncFileSystemWrapper(cls):
def __init__(self, *args, **kwargs):
sync_fs = sync_fs_class(*args, **kwargs)
super().__init__(sync_fs)
GeneratedAsyncFileSystemWrapper.__name__ = (
f"Async{sync_fs_class.__name__}Wrapper"
)
return GeneratedAsyncFileSystemWrapper
|