Skip to content

Commit

Permalink
Zarr-v3 Consolidated Metadata (#2113)
Browse files Browse the repository at this point in the history
* Fixed MemoryStore.list_dir

Ensures that nested children are listed properly.

* fixup s3

* recursive Group.members

This PR adds a recursive=True flag to Group.members, for recursively
listing the members of some hierarhcy.

This is useful for Consolidated Metadata, which needs to recursively
inspect children. IMO, it's useful (and simple) enough to include
in the public API.

* Zarr-v3 Consolidated Metadata

Implements the optional Consolidated Metadata feature of zarr-v3.

* fixup

* read zarr-v2 consolidated metadata

* check writablem

* Handle non-root paths

* Some error handling

* cleanup

* refactor open

* remove dupe file

* v2 getitem

* fixup

* Optimzied members

* Impl flatten

* Fixups

* doc

* nest the tests

* fixup

* Fixups

* fixup

* fixup

* fixup

* fixup

* consistent open_consolidated handling

* fixup

* make clear that flat_to_nested mutates

* fixup

* fixup

* Fixup

* fixup

* fixup

* fixup

* fixup

* added docs

* fixup

* Ensure empty dict

* fixed name

* fixup nested

* removed dupe tests

* fixup

* doc fix

* fixups

* fixup

* fixup

* v2 writer

* fixup

* fixup

* path fix

* Fixed v2 use_consolidated=False

* fixupg

* Special case object dtype

Closes #2315

* fixup

* docs

* pr review

* must_understand

* Updated from_dict checking

* cleanup

* cleanup

* Fixed fill_value

* fixup
  • Loading branch information
TomAugspurger authored Oct 10, 2024
1 parent 6b11bb8 commit 3964eab
Show file tree
Hide file tree
Showing 14 changed files with 1,732 additions and 67 deletions.
74 changes: 74 additions & 0 deletions docs/consolidated_metadata.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
Consolidated Metadata
=====================

Zarr-Python implements the `Consolidated Metadata_` extension to the Zarr Spec.
Consolidated metadata can reduce the time needed to load the metadata for an
entire hierarchy, especially when the metadata is being served over a network.
Consolidated metadata essentially stores all the metadata for a hierarchy in the
metadata of the root Group.

Usage
-----

If consolidated metadata is present in a Zarr Group's metadata then it is used
by default. The initial read to open the group will need to communicate with
the store (reading from a file for a :class:`zarr.store.LocalStore`, making a
network request for a :class:`zarr.store.RemoteStore`). After that, any subsequent
metadata reads get child Group or Array nodes will *not* require reads from the store.

In Python, the consolidated metadata is available on the ``.consolidated_metadata``
attribute of the ``GroupMetadata`` object.

.. code-block:: python
>>> import zarr
>>> store = zarr.store.MemoryStore({}, mode="w")
>>> group = zarr.open_group(store=store)
>>> group.create_array(shape=(1,), name="a")
>>> group.create_array(shape=(2, 2), name="b")
>>> group.create_array(shape=(3, 3, 3), name="c")
>>> zarr.consolidate_metadata(store)
If we open that group, the Group's metadata has a :class:`zarr.ConsolidatedMetadata`
that can be used.

.. code-block:: python
>>> consolidated = zarr.open_group(store=store)
>>> consolidated.metadata.consolidated_metadata.metadata
{'b': ArrayV3Metadata(shape=(2, 2), fill_value=np.float64(0.0), ...),
'a': ArrayV3Metadata(shape=(1,), fill_value=np.float64(0.0), ...),
'c': ArrayV3Metadata(shape=(3, 3, 3), fill_value=np.float64(0.0), ...)}
Operations on the group to get children automatically use the consolidated metadata.

.. code-block:: python
>>> consolidated["a"] # no read / HTTP request to the Store is required
<Array memory://.../a shape=(1,) dtype=float64>
With nested groups, the consolidated metadata is available on the children, recursively.

... code-block:: python

>>> child = group.create_group("child", attributes={"kind": "child"})
>>> grandchild = child.create_group("child", attributes={"kind": "grandchild"})
>>> consolidated = zarr.consolidate_metadata(store)

>>> consolidated["child"].metadata.consolidated_metadata
ConsolidatedMetadata(metadata={'child': GroupMetadata(attributes={'kind': 'grandchild'}, zarr_format=3, )}, ...)

Synchronization and Concurrency
-------------------------------

Consolidated metadata is intended for read-heavy use cases on slowly changing
hierarchies. For hierarchies where new nodes are constantly being added,
removed, or modified, consolidated metadata may not be desirable.

1. It will add some overhead to each update operation, since the metadata
would need to be re-consolidated to keep it in sync with the store.
2. Readers using consolidated metadata will regularly see a "past" version
of the metadata, at the time they read the root node with its consolidated
metadata.

.. _Consolidated Metadata: https://zarr-specs.readthedocs.io/en/latest/v3/core/v3.0.html#consolidated-metadata
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Zarr-Python

getting_started
tutorial
consolidated_metadata
api/index
spec
release
Expand Down
112 changes: 104 additions & 8 deletions src/zarr/api/asynchronous.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import asyncio
import dataclasses
import warnings
from typing import TYPE_CHECKING, Any, Literal, cast

Expand All @@ -9,9 +10,17 @@

from zarr.abc.store import Store
from zarr.core.array import Array, AsyncArray, get_array_metadata
from zarr.core.common import JSON, AccessModeLiteral, ChunkCoords, MemoryOrder, ZarrFormat
from zarr.core.buffer import NDArrayLike
from zarr.core.chunk_key_encodings import ChunkKeyEncoding
from zarr.core.common import (
JSON,
AccessModeLiteral,
ChunkCoords,
MemoryOrder,
ZarrFormat,
)
from zarr.core.config import config
from zarr.core.group import AsyncGroup
from zarr.core.group import AsyncGroup, ConsolidatedMetadata, GroupMetadata
from zarr.core.metadata import ArrayMetadataDict, ArrayV2Metadata, ArrayV3Metadata
from zarr.errors import NodeTypeValidationError
from zarr.storage import (
Expand Down Expand Up @@ -132,8 +141,64 @@ def _default_zarr_version() -> ZarrFormat:
return cast(ZarrFormat, int(config.get("default_zarr_version", 3)))


async def consolidate_metadata(*args: Any, **kwargs: Any) -> AsyncGroup:
raise NotImplementedError
async def consolidate_metadata(
store: StoreLike,
path: str | None = None,
zarr_format: ZarrFormat | None = None,
) -> AsyncGroup:
"""
Consolidate the metadata of all nodes in a hierarchy.
Upon completion, the metadata of the root node in the Zarr hierarchy will be
updated to include all the metadata of child nodes.
Parameters
----------
store: StoreLike
The store-like object whose metadata you wish to consolidate.
path: str, optional
A path to a group in the store to consolidate at. Only children
below that group will be consolidated.
By default, the root node is used so all the metadata in the
store is consolidated.
zarr_format : {2, 3, None}, optional
The zarr format of the hierarchy. By default the zarr format
is inferred.
Returns
-------
group: AsyncGroup
The group, with the ``consolidated_metadata`` field set to include
the metadata of each child node.
"""
store_path = await make_store_path(store)

if path is not None:
store_path = store_path / path

group = await AsyncGroup.open(store_path, zarr_format=zarr_format, use_consolidated=False)
group.store_path.store._check_writable()

members_metadata = {k: v.metadata async for k, v in group.members(max_depth=None)}

# While consolidating, we want to be explicit about when child groups
# are empty by inserting an empty dict for consolidated_metadata.metadata
for k, v in members_metadata.items():
if isinstance(v, GroupMetadata) and v.consolidated_metadata is None:
v = dataclasses.replace(v, consolidated_metadata=ConsolidatedMetadata(metadata={}))
members_metadata[k] = v

ConsolidatedMetadata._flat_to_nested(members_metadata)

consolidated_metadata = ConsolidatedMetadata(metadata=members_metadata)
metadata = dataclasses.replace(group.metadata, consolidated_metadata=consolidated_metadata)
group = dataclasses.replace(
group,
metadata=metadata,
)
await group._save_metadata()
return group


async def copy(*args: Any, **kwargs: Any) -> tuple[int, int, int]:
Expand Down Expand Up @@ -256,8 +321,18 @@ async def open(
return await open_group(store=store_path, zarr_format=zarr_format, **kwargs)


async def open_consolidated(*args: Any, **kwargs: Any) -> AsyncGroup:
raise NotImplementedError
async def open_consolidated(
*args: Any, use_consolidated: Literal[True] = True, **kwargs: Any
) -> AsyncGroup:
"""
Alias for :func:`open_group` with ``use_consolidated=True``.
"""
if use_consolidated is not True:
raise TypeError(
"'use_consolidated' must be 'True' in 'open_consolidated'. Use 'open' with "
"'use_consolidated=False' to bypass consolidated metadata."
)
return await open_group(*args, use_consolidated=use_consolidated, **kwargs)


async def save(
Expand Down Expand Up @@ -549,6 +624,7 @@ async def open_group(
zarr_format: ZarrFormat | None = None,
meta_array: Any | None = None, # not used
attributes: dict[str, JSON] | None = None,
use_consolidated: bool | str | None = None,
) -> AsyncGroup:
"""Open a group using file-mode-like semantics.
Expand Down Expand Up @@ -589,6 +665,22 @@ async def open_group(
to users. Use `numpy.empty(())` by default.
attributes : dict
A dictionary of JSON-serializable values with user-defined attributes.
use_consolidated : bool or str, default None
Whether to use consolidated metadata.
By default, consolidated metadata is used if it's present in the
store (in the ``zarr.json`` for Zarr v3 and in the ``.zmetadata`` file
for Zarr v2).
To explicitly require consolidated metadata, set ``use_consolidated=True``,
which will raise an exception if consolidated metadata is not found.
To explicitly *not* use consolidated metadata, set ``use_consolidated=False``,
which will fall back to using the regular, non consolidated metadata.
Zarr v2 allowed configuring the key storing the consolidated metadata
(``.zmetadata`` by default). Specify the custom key as ``use_consolidated``
to load consolidated metadata from a non-default key.
Returns
-------
Expand All @@ -615,7 +707,9 @@ async def open_group(
attributes = {}

try:
return await AsyncGroup.open(store_path, zarr_format=zarr_format)
return await AsyncGroup.open(
store_path, zarr_format=zarr_format, use_consolidated=use_consolidated
)
except (KeyError, FileNotFoundError):
return await AsyncGroup.from_store(
store_path,
Expand Down Expand Up @@ -777,7 +871,9 @@ async def create(
)
else:
warnings.warn(
"dimension_separator is not yet implemented", RuntimeWarning, stacklevel=2
"dimension_separator is not yet implemented",
RuntimeWarning,
stacklevel=2,
)
if write_empty_chunks:
warnings.warn("write_empty_chunks is not yet implemented", RuntimeWarning, stacklevel=2)
Expand Down
10 changes: 7 additions & 3 deletions src/zarr/api/synchronous.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, Literal

import zarr.api.asynchronous as async_api
from zarr._compat import _deprecate_positional_args
Expand Down Expand Up @@ -90,8 +90,10 @@ def open(
return Group(obj)


def open_consolidated(*args: Any, **kwargs: Any) -> Group:
return Group(sync(async_api.open_consolidated(*args, **kwargs)))
def open_consolidated(*args: Any, use_consolidated: Literal[True] = True, **kwargs: Any) -> Group:
return Group(
sync(async_api.open_consolidated(*args, use_consolidated=use_consolidated, **kwargs))
)


def save(
Expand Down Expand Up @@ -208,6 +210,7 @@ def open_group(
zarr_format: ZarrFormat | None = None,
meta_array: Any | None = None, # not used in async api
attributes: dict[str, JSON] | None = None,
use_consolidated: bool | str | None = None,
) -> Group:
return Group(
sync(
Expand All @@ -223,6 +226,7 @@ def open_group(
zarr_format=zarr_format,
meta_array=meta_array,
attributes=attributes,
use_consolidated=use_consolidated,
)
)
)
Expand Down
4 changes: 4 additions & 0 deletions src/zarr/core/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
ArrayV3MetadataDict,
T_ArrayMetadata,
)
from zarr.core.metadata.v3 import parse_node_type_array
from zarr.core.sync import collect_aiterator, sync
from zarr.errors import MetadataValidationError
from zarr.registry import get_pipeline_class
Expand Down Expand Up @@ -165,6 +166,9 @@ async def get_array_metadata(
# V3 arrays are comprised of a zarr.json object
assert zarr_json_bytes is not None
metadata_dict = json.loads(zarr_json_bytes.to_bytes())

parse_node_type_array(metadata_dict.get("node_type"))

return metadata_dict


Expand Down
2 changes: 2 additions & 0 deletions src/zarr/core/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@
ZARRAY_JSON = ".zarray"
ZGROUP_JSON = ".zgroup"
ZATTRS_JSON = ".zattrs"
ZMETADATA_V2_JSON = ".zmetadata"

ByteRangeRequest = tuple[int | None, int | None]
BytesLike = bytes | bytearray | memoryview
ShapeLike = tuple[int, ...] | int
ChunkCoords = tuple[int, ...]
ChunkCoordsLike = Iterable[int]
ZarrFormat = Literal[2, 3]
NodeType = Literal["array", "group"]
JSON = None | str | int | float | Mapping[str, "JSON"] | tuple["JSON", ...]
MemoryOrder = Literal["C", "F"]
AccessModeLiteral = Literal["r", "r+", "a", "w", "w-"]
Expand Down
Loading

0 comments on commit 3964eab

Please sign in to comment.