From e06e7f11d5407f0535df2f3fef5c2c2cd83dc7b5 Mon Sep 17 00:00:00 2001 From: Joseph Hamman Date: Sat, 12 Oct 2024 20:35:39 -0700 Subject: [PATCH 01/10] fix(remotestore): raise error if path includes scheme --- src/zarr/abc/store.py | 4 +- src/zarr/storage/remote.py | 59 ++++++++++++++++++++++++++---- tests/v3/test_store/test_remote.py | 30 +++++++++++++-- 3 files changed, 80 insertions(+), 13 deletions(-) diff --git a/src/zarr/abc/store.py b/src/zarr/abc/store.py index 7771a1046..7fa1ccb7d 100644 --- a/src/zarr/abc/store.py +++ b/src/zarr/abc/store.py @@ -95,12 +95,12 @@ def with_mode(self, mode: AccessModeLiteral) -> Self: Parameters ---------- - mode: AccessModeLiteral + mode : AccessModeLiteral The new mode to use. Returns ------- - store: + store A new store of the same type with the new mode. Examples diff --git a/src/zarr/storage/remote.py b/src/zarr/storage/remote.py index 1adc7ee41..9e93d41d3 100644 --- a/src/zarr/storage/remote.py +++ b/src/zarr/storage/remote.py @@ -1,5 +1,6 @@ from __future__ import annotations +import warnings from typing import TYPE_CHECKING, Any, Self import fsspec @@ -42,15 +43,45 @@ def __init__( allowed_exceptions: tuple[type[Exception], ...] = ALLOWED_EXCEPTIONS, ) -> None: """ + A remote Store based on FSSpec + Parameters ---------- - url: root of the datastore. In fsspec notation, this is usually like "protocol://path/to". - Can also be a upath.UPath instance/ - allowed_exceptions: when fetching data, these cases will be deemed to correspond to missing - keys, rather than some other IO failure - storage_options: passed on to fsspec to make the filesystem instance. If url is a UPath, - this must not be used. - + fs : AsyncFileSystem + The Async FSSpec filesystem to use with this store. + mode : AccessModeLiteral + The access mode to use. + path : str + The root path of the store. This should be a relative path and must not include the + filesystem scheme. + allowed_exceptions : tuple[type[Exception], ...] + When fetching data, these cases will be deemed to correspond to missing keys. + + Attributes + ---------- + fs + allowed_exceptions + supports_writes + supports_deletes + supports_partial_writes + supports_listing + + Raises + ------ + TypeError + If the Filesystem does not support async operations. + ValueError + If the path argument includes a scheme. + + Warns + ----- + UserWarning + If the file system (fs) was not created with `asynchronous=True`. + + See Also + -------- + RemoteStore.from_upath + RemoteStore.from_url """ super().__init__(mode=mode) self.fs = fs @@ -59,6 +90,14 @@ def __init__( if not self.fs.async_impl: raise TypeError("Filesystem needs to support async operations.") + if not self.fs.asynchronous: + warnings.warn( + f"fs ({fs}) was not created with `asynchronous=True`, this may lead to surprising behavior", + stacklevel=2, + ) + if "://" in path: + scheme, _ = path.split("://", maxsplit=1) + raise ValueError(f"path argument to RemoteStore must not include scheme ({scheme}://)") @classmethod def from_upath( @@ -82,7 +121,11 @@ def from_url( mode: AccessModeLiteral = "r", allowed_exceptions: tuple[type[Exception], ...] = ALLOWED_EXCEPTIONS, ) -> RemoteStore: - fs, path = fsspec.url_to_fs(url, **storage_options) + opts = storage_options or {} + opts = {"asynchronous": True, **opts} + print("opts") + + fs, path = fsspec.url_to_fs(url, **opts) return cls(fs=fs, path=path, mode=mode, allowed_exceptions=allowed_exceptions) async def clear(self) -> None: diff --git a/tests/v3/test_store/test_remote.py b/tests/v3/test_store/test_remote.py index 083695ecc..3f6758127 100644 --- a/tests/v3/test_store/test_remote.py +++ b/tests/v3/test_store/test_remote.py @@ -86,10 +86,12 @@ def s3(s3_base: None) -> Generator[s3fs.S3FileSystem, None, None]: async def test_basic() -> None: store = RemoteStore.from_url( - f"s3://{test_bucket_name}", + f"s3://{test_bucket_name}/foo/spam/", mode="w", storage_options={"endpoint_url": endpoint_url, "anon": False}, ) + assert store.fs.asynchronous + assert store.path == f"{test_bucket_name}/foo/spam" assert await _collect_aiterator(store.list()) == () assert not await store.exists("foo") data = b"hello" @@ -109,7 +111,7 @@ class TestRemoteStoreS3(StoreTests[RemoteStore, cpu.Buffer]): @pytest.fixture def store_kwargs(self, request) -> dict[str, str | bool]: fs, path = fsspec.url_to_fs( - f"s3://{test_bucket_name}", endpoint_url=endpoint_url, anon=False + f"s3://{test_bucket_name}", endpoint_url=endpoint_url, anon=False, asynchronous=True ) return {"fs": fs, "path": path, "mode": "r+"} @@ -183,6 +185,28 @@ async def test_remote_store_from_uri( assert dict(group.attrs) == {"key": "value-3"} def test_from_upath(self) -> None: - path = UPath(f"s3://{test_bucket_name}", endpoint_url=endpoint_url, anon=False) + path = UPath( + f"s3://{test_bucket_name}/foo/bar/", + endpoint_url=endpoint_url, + anon=False, + asynchronous=True, + ) result = RemoteStore.from_upath(path) assert result.fs.endpoint_url == endpoint_url + assert result.fs.asynchronous + assert result.path == f"{test_bucket_name}/foo/bar" + + def test_init_raises_if_path_has_scheme(self, store_kwargs) -> None: + store_kwargs["path"] = "s3://" + store_kwargs["path"] + with pytest.raises( + ValueError, match="path argument to RemoteStore must not include scheme .*" + ): + self.store_cls(**store_kwargs) + + def test_init_warns_if_fs_asynchronous_is_false(self) -> None: + fs, path = fsspec.url_to_fs( + f"s3://{test_bucket_name}", endpoint_url=endpoint_url, anon=False, asynchronous=False + ) + store_kwargs = {"fs": fs, "path": path, "mode": "r+"} + with pytest.warns(UserWarning, match=r".* was not created with `asynchronous=True`.*"): + self.store_cls(**store_kwargs) From fddba26235e2687cab9e8e0362f3d58f4e8ce7b9 Mon Sep 17 00:00:00 2001 From: Joseph Hamman Date: Sat, 12 Oct 2024 20:37:41 -0700 Subject: [PATCH 02/10] fixup --- src/zarr/storage/remote.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/zarr/storage/remote.py b/src/zarr/storage/remote.py index 9e93d41d3..63790f987 100644 --- a/src/zarr/storage/remote.py +++ b/src/zarr/storage/remote.py @@ -123,7 +123,6 @@ def from_url( ) -> RemoteStore: opts = storage_options or {} opts = {"asynchronous": True, **opts} - print("opts") fs, path = fsspec.url_to_fs(url, **opts) return cls(fs=fs, path=path, mode=mode, allowed_exceptions=allowed_exceptions) From 8a0a38001dda8c9403df628b8f2938304b6e8898 Mon Sep 17 00:00:00 2001 From: Joseph Hamman Date: Sat, 12 Oct 2024 20:43:16 -0700 Subject: [PATCH 03/10] fixup --- tests/v3/test_store/test_remote.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/v3/test_store/test_remote.py b/tests/v3/test_store/test_remote.py index 3f6758127..59a84f9fa 100644 --- a/tests/v3/test_store/test_remote.py +++ b/tests/v3/test_store/test_remote.py @@ -145,9 +145,7 @@ def test_store_supports_partial_writes(self, store: RemoteStore) -> None: def test_store_supports_listing(self, store: RemoteStore) -> None: assert store.supports_listing - async def test_remote_store_from_uri( - self, store: RemoteStore, store_kwargs: dict[str, str | bool] - ): + async def test_remote_store_from_uri(self, store: RemoteStore): storage_options = { "endpoint_url": endpoint_url, "anon": False, @@ -197,6 +195,7 @@ def test_from_upath(self) -> None: assert result.path == f"{test_bucket_name}/foo/bar" def test_init_raises_if_path_has_scheme(self, store_kwargs) -> None: + # regression test for https://github.com/zarr-developers/zarr-python/issues/2342 store_kwargs["path"] = "s3://" + store_kwargs["path"] with pytest.raises( ValueError, match="path argument to RemoteStore must not include scheme .*" From d0ef793b5d16f6297b24988ae4d33744bb5530ef Mon Sep 17 00:00:00 2001 From: Joseph Hamman Date: Sat, 12 Oct 2024 20:56:48 -0700 Subject: [PATCH 04/10] strip scheme in from_url in case fsspec fails to --- src/zarr/storage/remote.py | 6 ++++++ tests/v3/test_store/test_core.py | 5 +---- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/src/zarr/storage/remote.py b/src/zarr/storage/remote.py index 63790f987..f7f88f25b 100644 --- a/src/zarr/storage/remote.py +++ b/src/zarr/storage/remote.py @@ -125,6 +125,12 @@ def from_url( opts = {"asynchronous": True, **opts} fs, path = fsspec.url_to_fs(url, **opts) + + # fsspec is not consistent about removing the scheme from the path, so check and strip it here + # https://github.com/fsspec/filesystem_spec/issues/1722 + if "://" in path: + _, path = path.split("://", maxsplit=1) + return cls(fs=fs, path=path, mode=mode, allowed_exceptions=allowed_exceptions) async def clear(self) -> None: diff --git a/tests/v3/test_store/test_core.py b/tests/v3/test_store/test_core.py index b2a8292ea..545bb9f19 100644 --- a/tests/v3/test_store/test_core.py +++ b/tests/v3/test_store/test_core.py @@ -39,10 +39,7 @@ async def test_make_store_path(tmpdir: str) -> None: async def test_make_store_path_fsspec(monkeypatch) -> None: - import fsspec.implementations.memory - - monkeypatch.setattr(fsspec.implementations.memory.MemoryFileSystem, "async_impl", True) - store_path = await make_store_path("memory://") + store_path = await make_store_path("http://foo.com/bar") assert isinstance(store_path.store, RemoteStore) From ee1db65bc801f1c26efd35e08abf0296a15ca7a4 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sun, 13 Oct 2024 17:06:07 +0000 Subject: [PATCH 05/10] style: pre-commit fixes --- src/zarr/storage/remote.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/zarr/storage/remote.py b/src/zarr/storage/remote.py index fbd8ee69e..b380e2ce3 100644 --- a/src/zarr/storage/remote.py +++ b/src/zarr/storage/remote.py @@ -157,7 +157,7 @@ def from_url( Returns ------- RemoteStore - """ + """ opts = storage_options or {} opts = {"asynchronous": True, **opts} @@ -167,7 +167,7 @@ def from_url( # https://github.com/fsspec/filesystem_spec/issues/1722 if "://" in path: _, path = path.split("://", maxsplit=1) - + return cls(fs=fs, path=path, mode=mode, allowed_exceptions=allowed_exceptions) async def clear(self) -> None: From fd088e02382e3e3b048d282a02e498219453ba5d Mon Sep 17 00:00:00 2001 From: Joseph Hamman Date: Sun, 13 Oct 2024 10:47:09 -0700 Subject: [PATCH 06/10] disable cache in listing ops --- src/zarr/storage/remote.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/src/zarr/storage/remote.py b/src/zarr/storage/remote.py index 71f8bd7ea..26cf4f1fb 100644 --- a/src/zarr/storage/remote.py +++ b/src/zarr/storage/remote.py @@ -96,7 +96,8 @@ def __init__( f"fs ({fs}) was not created with `asynchronous=True`, this may lead to surprising behavior", stacklevel=2, ) - if "://" in path: + if "://" in path and not path.startswith("http"): + # `not path.startswith("http")` is a special case for the http filesystem (¯\_(ツ)_/¯) scheme, _ = path.split("://", maxsplit=1) raise ValueError(f"path argument to RemoteStore must not include scheme ({scheme}://)") @@ -159,13 +160,14 @@ def from_url( RemoteStore """ opts = storage_options or {} - opts = {"asynchronous": True, **opts} + opts = {"asynchronous": True, "use_listings_cache": False, **opts} fs, path = fsspec.url_to_fs(url, **opts) # fsspec is not consistent about removing the scheme from the path, so check and strip it here # https://github.com/fsspec/filesystem_spec/issues/1722 - if "://" in path: + if "://" in path and not path.startswith("http"): + # `not path.startswith("http")` is a special case for the http filesystem (¯\_(ツ)_/¯) _, path = path.split("://", maxsplit=1) return cls(fs=fs, path=path, mode=mode, allowed_exceptions=allowed_exceptions) @@ -173,7 +175,7 @@ def from_url( async def clear(self) -> None: # docstring inherited try: - for subpath in await self.fs._find(self.path, withdirs=True): + for subpath in await self.fs._find(self.path, withdirs=True, refresh=True): if subpath != self.path: await self.fs._rm(subpath, recursive=True) except FileNotFoundError: @@ -185,7 +187,7 @@ async def empty(self) -> bool: # TODO: it would be nice if we didn't have to list all keys here # it should be possible to stop after the first key is discovered try: - return not await self.fs._ls(self.path) + return not await self.fs._ls(self.path, refresh=True) except FileNotFoundError: return True @@ -319,7 +321,7 @@ async def set_partial_values( async def list(self) -> AsyncGenerator[str, None]: # docstring inherited - allfiles = await self.fs._find(self.path, detail=False, withdirs=False) + allfiles = await self.fs._find(self.path, detail=False, withdirs=False, refresh=True) for onefile in (a.replace(self.path + "/", "") for a in allfiles): yield onefile @@ -327,7 +329,7 @@ async def list_dir(self, prefix: str) -> AsyncGenerator[str, None]: # docstring inherited prefix = f"{self.path}/{prefix.rstrip('/')}" try: - allfiles = await self.fs._ls(prefix, detail=False) + allfiles = await self.fs._ls(prefix, detail=False, refresh=True) except FileNotFoundError: return for onefile in (a.replace(prefix + "/", "") for a in allfiles): @@ -336,5 +338,7 @@ async def list_dir(self, prefix: str) -> AsyncGenerator[str, None]: async def list_prefix(self, prefix: str) -> AsyncGenerator[str, None]: # docstring inherited find_str = f"{self.path}/{prefix}" - for onefile in await self.fs._find(find_str, detail=False, maxdepth=None, withdirs=False): + for onefile in await self.fs._find( + find_str, detail=False, maxdepth=None, withdirs=False, refresh=True + ): yield onefile.removeprefix(find_str) From 9281304b3e93ef4170a693dc827f5e99f7254a49 Mon Sep 17 00:00:00 2001 From: Joseph Hamman Date: Sun, 13 Oct 2024 10:51:10 -0700 Subject: [PATCH 07/10] update docs --- docs/guide/storage.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/guide/storage.rst b/docs/guide/storage.rst index dfda553c4..0019f993a 100644 --- a/docs/guide/storage.rst +++ b/docs/guide/storage.rst @@ -72,7 +72,7 @@ that implements the `AbstractFileSystem` API, .. code-block:: python >>> import zarr - >>> store = zarr.storage.RemoteStore("gs://foo/bar", mode="r") + >>> store = zarr.storage.RemoteStore.from_url("gs://foo/bar", mode="r") >>> zarr.open(store=store) shape=(10, 20) dtype=float32> From d0e1928fb2b8dc157033029e1271faeaec8261c6 Mon Sep 17 00:00:00 2001 From: Joseph Hamman Date: Sun, 13 Oct 2024 20:43:04 -0700 Subject: [PATCH 08/10] use listings cache again --- src/zarr/storage/remote.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/zarr/storage/remote.py b/src/zarr/storage/remote.py index 26cf4f1fb..0860e09e3 100644 --- a/src/zarr/storage/remote.py +++ b/src/zarr/storage/remote.py @@ -160,7 +160,7 @@ def from_url( RemoteStore """ opts = storage_options or {} - opts = {"asynchronous": True, "use_listings_cache": False, **opts} + opts = {"asynchronous": True, **opts} fs, path = fsspec.url_to_fs(url, **opts) From c09489170641d734a3d369bb522af390f0e5edb3 Mon Sep 17 00:00:00 2001 From: Joseph Hamman Date: Mon, 21 Oct 2024 09:55:19 -0700 Subject: [PATCH 09/10] no refresh --- src/zarr/storage/remote.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/src/zarr/storage/remote.py b/src/zarr/storage/remote.py index af4b0cec2..55d6d7e24 100644 --- a/src/zarr/storage/remote.py +++ b/src/zarr/storage/remote.py @@ -167,14 +167,14 @@ def from_url( # https://github.com/fsspec/filesystem_spec/issues/1722 if "://" in path and not path.startswith("http"): # `not path.startswith("http")` is a special case for the http filesystem (¯\_(ツ)_/¯) - _, path = path.split("://", maxsplit=1) + path = fs._strip_protocol(path) return cls(fs=fs, path=path, mode=mode, allowed_exceptions=allowed_exceptions) async def clear(self) -> None: # docstring inherited try: - for subpath in await self.fs._find(self.path, withdirs=True, refresh=True): + for subpath in await self.fs._find(self.path, withdirs=True): if subpath != self.path: await self.fs._rm(subpath, recursive=True) except FileNotFoundError: @@ -186,7 +186,7 @@ async def empty(self) -> bool: # TODO: it would be nice if we didn't have to list all keys here # it should be possible to stop after the first key is discovered try: - return not await self.fs._ls(self.path, refresh=True) + return not await self.fs._ls(self.path) except FileNotFoundError: return True @@ -320,7 +320,7 @@ async def set_partial_values( async def list(self) -> AsyncGenerator[str, None]: # docstring inherited - allfiles = await self.fs._find(self.path, detail=False, withdirs=False, refresh=True) + allfiles = await self.fs._find(self.path, detail=False, withdirs=False) for onefile in (a.replace(self.path + "/", "") for a in allfiles): yield onefile @@ -328,7 +328,7 @@ async def list_dir(self, prefix: str) -> AsyncGenerator[str, None]: # docstring inherited prefix = f"{self.path}/{prefix.rstrip('/')}" try: - allfiles = await self.fs._ls(prefix, detail=False, refresh=True) + allfiles = await self.fs._ls(prefix, detail=False) except FileNotFoundError: return for onefile in (a.replace(prefix + "/", "") for a in allfiles): @@ -337,7 +337,5 @@ async def list_dir(self, prefix: str) -> AsyncGenerator[str, None]: async def list_prefix(self, prefix: str) -> AsyncGenerator[str, None]: # docstring inherited find_str = f"{self.path}/{prefix}" - for onefile in await self.fs._find( - find_str, detail=False, maxdepth=None, withdirs=False, refresh=True - ): + for onefile in await self.fs._find(find_str, detail=False, maxdepth=None, withdirs=False): yield onefile.removeprefix(find_str) From b56c6c1ccfea76f9b75f9c1d4f1e1d012daf1949 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 22 Oct 2024 18:09:20 +0000 Subject: [PATCH 10/10] style: pre-commit fixes --- src/zarr/storage/remote.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/zarr/storage/remote.py b/src/zarr/storage/remote.py index f6285c0e6..1f7d5f7a1 100644 --- a/src/zarr/storage/remote.py +++ b/src/zarr/storage/remote.py @@ -164,7 +164,7 @@ def from_url( opts = storage_options or {} opts = {"asynchronous": True, **opts} - + fs, path = url_to_fs(url, **opts) # fsspec is not consistent about removing the scheme from the path, so check and strip it here