|
8 | 8 |
|
9 | 9 | import asyncssh |
10 | 10 | from asyncssh.sftp import SFTPOpUnsupported |
11 | | -from fsspec.asyn import AsyncFileSystem, async_methods, sync, sync_wrapper |
| 11 | +from fsspec.asyn import ( |
| 12 | + AsyncFileSystem, |
| 13 | + FSTimeoutError, |
| 14 | + async_methods, |
| 15 | + sync, |
| 16 | + sync_wrapper, |
| 17 | +) |
12 | 18 | from fsspec.utils import infer_storage_options |
13 | 19 |
|
14 | 20 | from sshfs.file import SSHFile |
@@ -71,7 +77,12 @@ def __init__( |
71 | 77 | **_client_args, |
72 | 78 | ) |
73 | 79 | weakref.finalize( |
74 | | - self, sync, self.loop, self._finalize, self._pool, self._stack |
| 80 | + self, |
| 81 | + self._finalize, |
| 82 | + self.loop, |
| 83 | + self._client, |
| 84 | + self._pool, |
| 85 | + self._stack, |
75 | 86 | ) |
76 | 87 |
|
77 | 88 | @classmethod |
@@ -101,15 +112,34 @@ async def _connect( |
101 | 112 | connect = sync_wrapper(_connect) |
102 | 113 |
|
103 | 114 | @staticmethod |
104 | | - async def _finalize(pool, stack): |
105 | | - await pool.close() |
106 | | - |
107 | | - # If an error occurs while the SSHFile is trying to |
108 | | - # open the native file, then the client might get broken |
109 | | - # due to partial initialization. We are just going to ignore |
110 | | - # the errors that arises on the finalization layer |
111 | | - with suppress(BrokenPipeError): |
112 | | - await stack.aclose() |
| 115 | + def _finalize(loop, client, pool, stack): |
| 116 | + async def close(): |
| 117 | + await pool.close() |
| 118 | + # If an error occurs while the SSHFile is trying to |
| 119 | + # open the native file, then the client might get broken |
| 120 | + # due to partial initialization. We are just going to ignore |
| 121 | + # the errors that arises on the finalization layer |
| 122 | + with suppress(BrokenPipeError): |
| 123 | + await stack.aclose() |
| 124 | + |
| 125 | + if loop is not None and loop.is_running(): |
| 126 | + try: |
| 127 | + loop = asyncio.get_running_loop() |
| 128 | + loop.create_task(close()) |
| 129 | + return |
| 130 | + except RuntimeError: |
| 131 | + pass |
| 132 | + |
| 133 | + try: |
| 134 | + sync(loop, close, timeout=0.1) |
| 135 | + return |
| 136 | + except FSTimeoutError: |
| 137 | + pass |
| 138 | + |
| 139 | + try: |
| 140 | + client.abort() # try to abort the connection if no loop is running |
| 141 | + except Exception: |
| 142 | + pass |
113 | 143 |
|
114 | 144 | @property |
115 | 145 | def client(self): |
|
0 commit comments