From f8cabaa6ddc7642d4c10a2c6ef3fd3c9cc7b2e58 Mon Sep 17 00:00:00 2001 From: Jonathan Striebel Date: Wed, 17 Nov 2021 12:33:13 +0100 Subject: [PATCH 1/5] initial sharding prototype --- zarr/_storage/sharded_store.py | 102 +++++++++++++++++++++++++++++++++ zarr/_storage/store.py | 1 + zarr/core.py | 37 ++++++++++-- zarr/creation.py | 6 +- zarr/meta.py | 8 ++- zarr/storage.py | 10 +++- zarr/util.py | 33 +++++++++++ 7 files changed, 186 insertions(+), 11 deletions(-) create mode 100644 zarr/_storage/sharded_store.py diff --git a/zarr/_storage/sharded_store.py b/zarr/_storage/sharded_store.py new file mode 100644 index 000000000..440ec20a2 --- /dev/null +++ b/zarr/_storage/sharded_store.py @@ -0,0 +1,102 @@ +from functools import reduce +from itertools import product +from typing import Any, Iterable, Iterator, Optional, Tuple + +from zarr._storage.store import BaseStore, Store +from zarr.storage import StoreLike, array_meta_key, attrs_key, group_meta_key + + +def _cum_prod(x: Iterable[int]) -> Iterable[int]: + prod = 1 + yield prod + for i in x[:-1]: + prod *= i + yield prod + + +class ShardedStore(Store): + """This class should not be used directly, + but is added to an Array as a wrapper when needed automatically.""" + + def __init__( + self, store: + StoreLike, + shards: Tuple[int, ...], + dimension_separator: str, + chunk_has_constant_size: bool, + fill_value: bytes, + value_len: Optional[int], + ) -> None: + self._store: BaseStore = BaseStore._ensure_store(store) + self._shards = shards + # This defines C/F-order + self._shards_cumprod = tuple(_cum_prod(shards)) + self._num_chunks_per_shard = reduce(lambda x, y: x*y, shards, 1) + self._dimension_separator = dimension_separator + # TODO: add jumptable for compressed data + assert not chunk_has_constant_size, "Currently only uncompressed data can be used." + self._chunk_has_constant_size = chunk_has_constant_size + if not chunk_has_constant_size: + assert value_len is not None + self._fill_chunk = fill_value * value_len + else: + self._fill_chunk = None + + # TODO: add warnings for ineffective reads/writes: + # * warn if partial reads are not available + # * optionally warn on unaligned writes if no partial writes are available + + def __key_to_sharded__(self, key: str) -> Tuple[str, int]: + # TODO: allow to be in a group (aka only use last parts for dimensions) + subkeys = map(int, key.split(self._dimension_separator)) + + shard_tuple, index_tuple = zip(*((subkey // shard_i, subkey % shard_i) for subkey, shard_i in zip(subkeys, self._shards))) + shard_key = self._dimension_separator.join(map(str, shard_tuple)) + index = sum(i * j for i, j in zip(index_tuple, self._shards_cumprod)) + return shard_key, index + + def __get_chunk_slice__(self, shard_key: str, shard_index: int) -> Tuple[int, int]: + # TODO: here we would use the jumptable for compression + start = shard_index * len(self._fill_chunk) + return slice(start, start + len(self._fill_chunk)) + + def __getitem__(self, key: str) -> bytes: + shard_key, shard_index = self.__key_to_sharded__(key) + chunk_slice = self.__get_chunk_slice__(shard_key, shard_index) + # TODO use partial reads if available + full_shard_value = self._store[shard_key] + return full_shard_value[chunk_slice] + + def __setitem__(self, key: str, value: bytes) -> None: + shard_key, shard_index = self.__key_to_sharded__(key) + if shard_key in self._store: + full_shard_value = bytearray(self._store[shard_key]) + else: + full_shard_value = bytearray(self._fill_chunk * self._num_chunks_per_shard) + chunk_slice = self.__get_chunk_slice__(shard_key, shard_index) + # TODO use partial writes if available + full_shard_value[chunk_slice] = value + self._store[shard_key] = full_shard_value + + def __delitem__(self, key) -> None: + # TODO not implemented yet + # For uncompressed chunks, deleting the "last" chunk might need to be detected. + raise NotImplementedError("Deletion is not yet implemented") + + def __iter__(self) -> Iterator[str]: + for shard_key in self._store.__iter__(): + if any(shard_key.endswith(i) for i in (array_meta_key, group_meta_key, attrs_key)): + yield shard_key + else: + # TODO: allow to be in a group (aka only use last parts for dimensions) + subkeys = tuple(map(int, shard_key.split(self._dimension_separator))) + for offset in product(*(range(i) for i in self._shards)): + original_key = (subkeys_i * shards_i + offset_i for subkeys_i, offset_i, shards_i in zip(subkeys, offset, self._shards)) + yield self._dimension_separator.join(map(str, original_key)) + + def __len__(self) -> int: + return sum(1 for _ in self.keys()) + + # TODO: For efficient reads and writes, we need to implement + # getitems, setitems & delitems + # and combine writes/reads/deletions to the same shard. diff --git a/zarr/_storage/store.py b/zarr/_storage/store.py index 6f5bf78e2..6714e729f 100644 --- a/zarr/_storage/store.py +++ b/zarr/_storage/store.py @@ -110,6 +110,7 @@ def _ensure_store(store: Any): class Store(BaseStore): + # TODO: document methods which allow optimizations, e.g. delitems, setitems, getitems, listdir, … """Abstract store class used by implementations following the Zarr v2 spec. Adds public `listdir`, `rename`, and `rmdir` methods on top of BaseStore. diff --git a/zarr/core.py b/zarr/core.py index d36613942..562e75607 100644 --- a/zarr/core.py +++ b/zarr/core.py @@ -10,6 +10,7 @@ from numcodecs.compat import ensure_bytes, ensure_ndarray from collections.abc import MutableMapping +from zarr._storage.sharded_store import ShardedStore from zarr.attrs import Attributes from zarr.codecs import AsType, get_codec @@ -213,6 +214,7 @@ def _load_metadata_nosync(self): self._meta = meta self._shape = meta['shape'] self._chunks = meta['chunks'] + self._shards = meta.get('shards') self._dtype = meta['dtype'] self._fill_value = meta['fill_value'] self._order = meta['order'] @@ -264,7 +266,9 @@ def _flush_metadata_nosync(self): filters_config = None meta = dict(shape=self._shape, chunks=self._chunks, dtype=self._dtype, compressor=compressor_config, fill_value=self._fill_value, - order=self._order, filters=filters_config) + order=self._order, filters=filters_config, shards=self._shards) + if self._shards is not None: + meta['shards'] = self._shards mkey = self._key_prefix + array_meta_key self._store[mkey] = self._store._metadata_class.encode_array_metadata(meta) @@ -307,11 +311,26 @@ def read_only(self, value): @property def chunk_store(self): - """A MutableMapping providing the underlying storage for array chunks.""" if self._chunk_store is None: - return self._store + chunk_store = self._store + else: + chunk_store = self._chunk_store + """A MutableMapping providing the underlying storage for array chunks.""" + if self._shards is None: + return chunk_store else: - return self._chunk_store + try: + return self._cached_sharded_store + except AttributeError: + self._cached_sharded_store = BaseStore._ensure_store(ShardedStore( + chunk_store, + shards=self._shards, + dimension_separator=self._dimension_separator, + chunk_has_constant_size = self._compressor is not None, # TODO add exceptions, e.g. dtype==object + fill_value = np.full(1, fill_value=self._fill_value or 0, dtype=self._dtype).tobytes(), + value_len = reduce(operator.mul, self._chunks, 1), + )) + return self._cached_sharded_store @property def shape(self): @@ -332,6 +351,12 @@ def chunks(self): chunk of the array.""" return self._chunks + @property + def shards(self): + """A tuple of integers describing the number of chunks in each shard + of the array.""" + return self._shards + @property def dtype(self): """The NumPy data type.""" @@ -1899,7 +1924,7 @@ def _chunk_getitems(self, lchunk_coords, lchunk_selection, out, lout_selection, and hasattr(self._compressor, "decode_partial") and not fields and self.dtype != object - and hasattr(self.chunk_store, "getitems") + and hasattr(self.chunk_store, "getitems") # TODO: this should rather check for read_block or similar ): partial_read_decode = True cdatas = { @@ -2236,6 +2261,7 @@ def digest(self, hashname="sha1"): h = hashlib.new(hashname) + # TODO: operate on shards here if available: for i in itertools.product(*[range(s) for s in self.cdata_shape]): h.update(self.chunk_store.get(self._chunk_key(i), b"")) @@ -2362,6 +2388,7 @@ def _resize_nosync(self, *args): except KeyError: # chunk not initialized pass + # TODO: collect all chunks do delete and use _chunk_delitems def append(self, data, axis=0): """Append `data` to `axis`. diff --git a/zarr/creation.py b/zarr/creation.py index 244a9b080..b669c4241 100644 --- a/zarr/creation.py +++ b/zarr/creation.py @@ -1,3 +1,4 @@ +from typing import Optional, Tuple, Union from warnings import warn import numpy as np @@ -19,7 +20,8 @@ def create(shape, chunks=True, dtype=None, compressor='default', fill_value=0, order='C', store=None, synchronizer=None, overwrite=False, path=None, chunk_store=None, filters=None, cache_metadata=True, cache_attrs=True, read_only=False, - object_codec=None, dimension_separator=None, write_empty_chunks=True, **kwargs): + object_codec=None, dimension_separator=None, write_empty_chunks=True, + shards: Union[int, Tuple[int, ...], None]=None, **kwargs): """Create an array. Parameters @@ -145,7 +147,7 @@ def create(shape, chunks=True, dtype=None, compressor='default', init_array(store, shape=shape, chunks=chunks, dtype=dtype, compressor=compressor, fill_value=fill_value, order=order, overwrite=overwrite, path=path, chunk_store=chunk_store, filters=filters, object_codec=object_codec, - dimension_separator=dimension_separator) + dimension_separator=dimension_separator, shards=shards) # instantiate array z = Array(store, path=path, chunk_store=chunk_store, synchronizer=synchronizer, diff --git a/zarr/meta.py b/zarr/meta.py index c292b09a1..964858de4 100644 --- a/zarr/meta.py +++ b/zarr/meta.py @@ -51,6 +51,7 @@ def decode_array_metadata(cls, s: Union[MappingType, str]) -> MappingType[str, A object_codec = None dimension_separator = meta.get("dimension_separator", None) + shards = meta.get("shards", None) fill_value = cls.decode_fill_value(meta['fill_value'], dtype, object_codec) meta = dict( zarr_format=meta["zarr_format"], @@ -64,6 +65,8 @@ def decode_array_metadata(cls, s: Union[MappingType, str]) -> MappingType[str, A ) if dimension_separator: meta['dimension_separator'] = dimension_separator + if shards: + meta['shards'] = tuple(shards) except Exception as e: raise MetadataError("error decoding metadata") from e else: @@ -77,6 +80,7 @@ def encode_array_metadata(cls, meta: MappingType[str, Any]) -> bytes: dtype, sdshape = dtype.subdtype dimension_separator = meta.get("dimension_separator") + shards = meta.get("shards") if dtype.hasobject: import numcodecs object_codec = numcodecs.get_codec(meta['filters'][0]) @@ -96,8 +100,8 @@ def encode_array_metadata(cls, meta: MappingType[str, Any]) -> bytes: if dimension_separator: meta['dimension_separator'] = dimension_separator - if dimension_separator: - meta["dimension_separator"] = dimension_separator + if shards: + meta['shards'] = shards return json_dumps(meta) diff --git a/zarr/storage.py b/zarr/storage.py index 7170eeaf2..e5e07cc23 100644 --- a/zarr/storage.py +++ b/zarr/storage.py @@ -54,7 +54,7 @@ from zarr.util import (buffer_size, json_loads, nolock, normalize_chunks, normalize_dimension_separator, normalize_dtype, normalize_fill_value, normalize_order, - normalize_shape, normalize_storage_path, retry_call) + normalize_shape, normalize_shards, normalize_storage_path, retry_call) from zarr._storage.absstore import ABSStore # noqa: F401 from zarr._storage.store import (_listdir_from_keys, @@ -236,6 +236,7 @@ def init_array( filters=None, object_codec=None, dimension_separator=None, + shards: Union[int, Tuple[int, ...], None]=None, ): """Initialize an array store with the given configuration. Note that this is a low-level function and there should be no need to call this directly from user code. @@ -353,7 +354,8 @@ def init_array( order=order, overwrite=overwrite, path=path, chunk_store=chunk_store, filters=filters, object_codec=object_codec, - dimension_separator=dimension_separator) + dimension_separator=dimension_separator, + shards=shards) def _init_array_metadata( @@ -370,6 +372,7 @@ def _init_array_metadata( filters=None, object_codec=None, dimension_separator=None, + shards:Union[int, Tuple[int, ...], None] = None, ): # guard conditions @@ -388,6 +391,7 @@ def _init_array_metadata( shape = normalize_shape(shape) + dtype.shape dtype = dtype.base chunks = normalize_chunks(chunks, shape, dtype.itemsize) + shards = normalize_shards(shards, shape) order = normalize_order(order) fill_value = normalize_fill_value(fill_value, dtype) @@ -445,6 +449,8 @@ def _init_array_metadata( compressor=compressor_config, fill_value=fill_value, order=order, filters=filters_config, dimension_separator=dimension_separator) + if shards is not None: + meta["shards"] = shards key = _path_to_prefix(path) + array_meta_key if hasattr(store, '_metadata_class'): store[key] = store._metadata_class.encode_array_metadata(meta) # type: ignore diff --git a/zarr/util.py b/zarr/util.py index d092ffe0d..738e4ae4e 100644 --- a/zarr/util.py +++ b/zarr/util.py @@ -149,6 +149,38 @@ def normalize_chunks( return tuple(chunks) +def normalize_shards( + shards: Optional[Tuple[int, ...]], shape: Tuple[int, ...], +) -> Tuple[int, ...]: + """Convenience function to normalize the `shards` argument for an array + with the given `shape`.""" + + # N.B., expect shape already normalized + + if shards is None: + return None + + # handle 1D convenience form + if isinstance(shards, numbers.Integral): + shards = tuple(int(shards) for _ in shape) + + # handle bad dimensionality + if len(shards) > len(shape): + raise ValueError('too many dimensions in shards') + + # handle underspecified shards + if len(shards) < len(shape): + # assume single shards across remaining dimensions + shards += (1, ) * len(shape) - len(shards) + + # handle None or -1 in shards + if -1 in shards or None in shards: + shards = tuple(s if c == -1 or c is None else int(c) + for s, c in zip(shape, shards)) + + return tuple(shards) + + def normalize_dtype(dtype: Union[str, np.dtype], object_codec) -> Tuple[np.dtype, Any]: # convenience API for object arrays @@ -560,6 +592,7 @@ def __init__(self, store_key, chunk_store): # is it fsstore or an actual fsspec map object assert hasattr(self.chunk_store, "map") self.map = self.chunk_store.map + # maybe use partial_read here also self.fs = self.chunk_store.fs self.store_key = store_key self.buff = None From 8290f1e9f0525555dd0b8fef3094c858ad4b6a6e Mon Sep 17 00:00:00 2001 From: Jonathan Striebel Date: Wed, 17 Nov 2021 12:33:34 +0100 Subject: [PATCH 2/5] add small script to test chunking --- chunking_test.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) create mode 100644 chunking_test.py diff --git a/chunking_test.py b/chunking_test.py new file mode 100644 index 000000000..1cb9ea6ee --- /dev/null +++ b/chunking_test.py @@ -0,0 +1,24 @@ +import json +import os + +import zarr + +store = zarr.DirectoryStore("data/chunking_test.zarr") +z = zarr.zeros((20, 3), chunks=(3, 3), shards=(2, 2), store=store, overwrite=True, compressor=None) +z[...] = 42 +z[15, 1] = 389 +z[19, 2] = 1 +z[0, 1] = -4.2 + +print("ONDISK", sorted(os.listdir("data/chunking_test.zarr"))) +assert json.loads(store[".zarray"].decode()) ["shards"] == [2, 2] + +print("STORE", list(store)) +print("CHUNKSTORE (SHARDED)", list(z.chunk_store)) + +z_reopened = zarr.open("data/chunking_test.zarr") +assert z_reopened.shards == (2, 2) +assert z_reopened[15, 1] == 389 +assert z_reopened[19, 2] == 1 +assert z_reopened[0, 1] == -4.2 +assert z_reopened[0, 0] == 42 From a44b2e5802d4fe85859b55effa3d6f1b14431de8 Mon Sep 17 00:00:00 2001 From: Jonathan Striebel Date: Wed, 17 Nov 2021 14:12:57 +0100 Subject: [PATCH 3/5] Update util.py --- zarr/util.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zarr/util.py b/zarr/util.py index 738e4ae4e..b603edd1e 100644 --- a/zarr/util.py +++ b/zarr/util.py @@ -592,7 +592,7 @@ def __init__(self, store_key, chunk_store): # is it fsstore or an actual fsspec map object assert hasattr(self.chunk_store, "map") self.map = self.chunk_store.map - # maybe use partial_read here also + # TODO maybe use partial_read here also self.fs = self.chunk_store.fs self.store_key = store_key self.buff = None From 97a9368ddb021cf01be5b7b1676c8801bdde5663 Mon Sep 17 00:00:00 2001 From: Jonathan Striebel Date: Thu, 18 Nov 2021 14:24:44 +0100 Subject: [PATCH 4/5] implement feedback --- zarr/_storage/sharded_store.py | 31 +++++++++++++++++++------------ zarr/core.py | 31 +++++++++++++++++-------------- zarr/util.py | 4 ++-- 3 files changed, 38 insertions(+), 28 deletions(-) diff --git a/zarr/_storage/sharded_store.py b/zarr/_storage/sharded_store.py index 440ec20a2..87206755b 100644 --- a/zarr/_storage/sharded_store.py +++ b/zarr/_storage/sharded_store.py @@ -2,6 +2,8 @@ from itertools import product from typing import Any, Iterable, Iterator, Optional, Tuple +import numpy as np + from zarr._storage.store import BaseStore, Store from zarr.storage import StoreLike, array_meta_key, attrs_key, group_meta_key @@ -19,26 +21,28 @@ class ShardedStore(Store): but is added to an Array as a wrapper when needed automatically.""" def __init__( - self, store: - StoreLike, + self, + store: StoreLike, shards: Tuple[int, ...], dimension_separator: str, - chunk_has_constant_size: bool, - fill_value: bytes, - value_len: Optional[int], + are_chunks_compressed: bool, + dtype: np.dtype, + fill_value: Any, + chunk_size: int, ) -> None: self._store: BaseStore = BaseStore._ensure_store(store) self._shards = shards # This defines C/F-order - self._shards_cumprod = tuple(_cum_prod(shards)) + self._shard_strides = tuple(_cum_prod(shards)) self._num_chunks_per_shard = reduce(lambda x, y: x*y, shards, 1) self._dimension_separator = dimension_separator # TODO: add jumptable for compressed data - assert not chunk_has_constant_size, "Currently only uncompressed data can be used." + chunk_has_constant_size = not are_chunks_compressed and not dtype == object + assert chunk_has_constant_size, "Currently only uncompressed, fixed-length data can be used." self._chunk_has_constant_size = chunk_has_constant_size - if not chunk_has_constant_size: - assert value_len is not None - self._fill_chunk = fill_value * value_len + if chunk_has_constant_size: + binary_fill_value = np.full(1, fill_value=fill_value or 0, dtype=dtype).tobytes() + self._fill_chunk = binary_fill_value * chunk_size else: self._fill_chunk = None @@ -52,11 +56,11 @@ def __key_to_sharded__(self, key: str) -> Tuple[str, int]: shard_tuple, index_tuple = zip(*((subkey // shard_i, subkey % shard_i) for subkey, shard_i in zip(subkeys, self._shards))) shard_key = self._dimension_separator.join(map(str, shard_tuple)) - index = sum(i * j for i, j in zip(index_tuple, self._shards_cumprod)) + index = sum(i * j for i, j in zip(index_tuple, self._shard_strides)) return shard_key, index def __get_chunk_slice__(self, shard_key: str, shard_index: int) -> Tuple[int, int]: - # TODO: here we would use the jumptable for compression + # TODO: here we would use the jumptable for compression, which uses shard_key start = shard_index * len(self._fill_chunk) return slice(start, start + len(self._fill_chunk)) @@ -86,8 +90,11 @@ def __delitem__(self, key) -> None: def __iter__(self) -> Iterator[str]: for shard_key in self._store.__iter__(): if any(shard_key.endswith(i) for i in (array_meta_key, group_meta_key, attrs_key)): + # Special keys such as ".zarray" are passed on as-is yield shard_key else: + # For each shard key in the wrapped store, all corresponding chunks are yielded. + # TODO: For compressed chunks we might yield only the actualy contained chunks by reading the jumptables. # TODO: allow to be in a group (aka only use last parts for dimensions) subkeys = tuple(map(int, shard_key.split(self._dimension_separator))) for offset in product(*(range(i) for i in self._shards)): diff --git a/zarr/core.py b/zarr/core.py index 562e75607..01ba6a058 100644 --- a/zarr/core.py +++ b/zarr/core.py @@ -5,6 +5,7 @@ import operator import re from functools import reduce +from typing import Optional, Tuple import numpy as np from numcodecs.compat import ensure_bytes, ensure_ndarray @@ -192,6 +193,9 @@ def __init__( self._oindex = OIndex(self) self._vindex = VIndex(self) + # the sharded store is only initialized when needed + self._cached_sharded_store = None + def _load_metadata(self): """(Re)load metadata from store.""" if self._synchronizer is None: @@ -264,11 +268,11 @@ def _flush_metadata_nosync(self): filters_config = [f.get_config() for f in self._filters] else: filters_config = None + # Possible (unrelated) bug: + # should the dimension_separator also be included in this dict? meta = dict(shape=self._shape, chunks=self._chunks, dtype=self._dtype, compressor=compressor_config, fill_value=self._fill_value, order=self._order, filters=filters_config, shards=self._shards) - if self._shards is not None: - meta['shards'] = self._shards mkey = self._key_prefix + array_meta_key self._store[mkey] = self._store._metadata_class.encode_array_metadata(meta) @@ -311,26 +315,25 @@ def read_only(self, value): @property def chunk_store(self): + """A MutableMapping providing the underlying storage for array chunks.""" if self._chunk_store is None: chunk_store = self._store else: chunk_store = self._chunk_store - """A MutableMapping providing the underlying storage for array chunks.""" if self._shards is None: return chunk_store else: - try: - return self._cached_sharded_store - except AttributeError: - self._cached_sharded_store = BaseStore._ensure_store(ShardedStore( + if self._cached_sharded_store is None: + self._cached_sharded_store = ShardedStore( chunk_store, shards=self._shards, dimension_separator=self._dimension_separator, - chunk_has_constant_size = self._compressor is not None, # TODO add exceptions, e.g. dtype==object - fill_value = np.full(1, fill_value=self._fill_value or 0, dtype=self._dtype).tobytes(), - value_len = reduce(operator.mul, self._chunks, 1), - )) - return self._cached_sharded_store + are_chunks_compressed=self._compressor is not None, + dtype=self._dtype, + fill_value=self._fill_value or 0, + chunk_size=reduce(operator.mul, self._chunks, 1), + ) + return self._cached_sharded_store @property def shape(self): @@ -346,9 +349,9 @@ def shape(self, value): self.resize(value) @property - def chunks(self): + def chunks(self) -> Optional[Tuple[int, ...]]: """A tuple of integers describing the length of each dimension of a - chunk of the array.""" + chunk of the array, or None.""" return self._chunks @property diff --git a/zarr/util.py b/zarr/util.py index b603edd1e..220e49cbd 100644 --- a/zarr/util.py +++ b/zarr/util.py @@ -150,8 +150,8 @@ def normalize_chunks( def normalize_shards( - shards: Optional[Tuple[int, ...]], shape: Tuple[int, ...], -) -> Tuple[int, ...]: + shards: Optional[Tuple[Optional[int], ...]], shape: Tuple[int, ...], +) -> Optional[Tuple[int, ...]]: """Convenience function to normalize the `shards` argument for an array with the given `shape`.""" From 7e2768ac8f44a1cac32707a26ecbb49a1171a2aa Mon Sep 17 00:00:00 2001 From: Jonathan Striebel Date: Fri, 26 Nov 2021 13:44:43 +0100 Subject: [PATCH 5/5] make shard_format configurable, add bitmask for uncompressed chunks --- chunking_test.py | 9 +-- zarr/_storage/sharded_store.py | 119 ++++++++++++++++++++++----------- zarr/core.py | 14 ++-- zarr/creation.py | 4 +- zarr/meta.py | 7 +- zarr/storage.py | 6 +- 6 files changed, 107 insertions(+), 52 deletions(-) diff --git a/chunking_test.py b/chunking_test.py index 1cb9ea6ee..43491f677 100644 --- a/chunking_test.py +++ b/chunking_test.py @@ -4,17 +4,18 @@ import zarr store = zarr.DirectoryStore("data/chunking_test.zarr") -z = zarr.zeros((20, 3), chunks=(3, 3), shards=(2, 2), store=store, overwrite=True, compressor=None) -z[...] = 42 +z = zarr.zeros((20, 3), chunks=(3, 2), shards=(2, 2), store=store, overwrite=True, compressor=None) +z[:10, :] = 42 z[15, 1] = 389 z[19, 2] = 1 z[0, 1] = -4.2 +print(store[".zarray"].decode()) print("ONDISK", sorted(os.listdir("data/chunking_test.zarr"))) assert json.loads(store[".zarray"].decode()) ["shards"] == [2, 2] -print("STORE", list(store)) -print("CHUNKSTORE (SHARDED)", list(z.chunk_store)) +print("STORE", sorted(store)) +print("CHUNKSTORE (SHARDED)", sorted(z.chunk_store)) z_reopened = zarr.open("data/chunking_test.zarr") assert z_reopened.shards == (2, 2) diff --git a/zarr/_storage/sharded_store.py b/zarr/_storage/sharded_store.py index 87206755b..2857e738c 100644 --- a/zarr/_storage/sharded_store.py +++ b/zarr/_storage/sharded_store.py @@ -1,6 +1,7 @@ +from collections import defaultdict from functools import reduce -from itertools import product -from typing import Any, Iterable, Iterator, Optional, Tuple +import math +from typing import Any, Dict, Iterable, Iterator, List, Tuple, Union import numpy as np @@ -16,7 +17,7 @@ def _cum_prod(x: Iterable[int]) -> Iterable[int]: yield prod -class ShardedStore(Store): +class MortonOrderShardedStore(Store): """This class should not be used directly, but is added to an Array as a wrapper when needed automatically.""" @@ -32,59 +33,97 @@ def __init__( ) -> None: self._store: BaseStore = BaseStore._ensure_store(store) self._shards = shards - # This defines C/F-order - self._shard_strides = tuple(_cum_prod(shards)) self._num_chunks_per_shard = reduce(lambda x, y: x*y, shards, 1) self._dimension_separator = dimension_separator - # TODO: add jumptable for compressed data + chunk_has_constant_size = not are_chunks_compressed and not dtype == object assert chunk_has_constant_size, "Currently only uncompressed, fixed-length data can be used." self._chunk_has_constant_size = chunk_has_constant_size if chunk_has_constant_size: binary_fill_value = np.full(1, fill_value=fill_value or 0, dtype=dtype).tobytes() self._fill_chunk = binary_fill_value * chunk_size - else: - self._fill_chunk = None + self._emtpy_meta = b"\x00" * math.ceil(self._num_chunks_per_shard / 8) + + # unused when using Morton order + self._shard_strides = tuple(_cum_prod(shards)) # TODO: add warnings for ineffective reads/writes: # * warn if partial reads are not available # * optionally warn on unaligned writes if no partial writes are available - - def __key_to_sharded__(self, key: str) -> Tuple[str, int]: + + def __get_meta__(self, shard_content: Union[bytes, bytearray]) -> int: + return int.from_bytes(shard_content[-len(self._emtpy_meta):], byteorder="big") + + def __set_meta__(self, shard_content: bytearray, meta: int) -> None: + shard_content[-len(self._emtpy_meta):] = meta.to_bytes(len(self._emtpy_meta), byteorder="big") + + # The following two methods define the order of the chunks in a shard + # TODO use morton order + def __chunk_key_to_shard_key_and_index__(self, chunk_key: str) -> Tuple[str, int]: # TODO: allow to be in a group (aka only use last parts for dimensions) - subkeys = map(int, key.split(self._dimension_separator)) + chunk_subkeys = map(int, chunk_key.split(self._dimension_separator)) - shard_tuple, index_tuple = zip(*((subkey // shard_i, subkey % shard_i) for subkey, shard_i in zip(subkeys, self._shards))) + shard_tuple, index_tuple = zip(*((subkey // shard_i, subkey % shard_i) for subkey, shard_i in zip(chunk_subkeys, self._shards))) shard_key = self._dimension_separator.join(map(str, shard_tuple)) index = sum(i * j for i, j in zip(index_tuple, self._shard_strides)) return shard_key, index - def __get_chunk_slice__(self, shard_key: str, shard_index: int) -> Tuple[int, int]: - # TODO: here we would use the jumptable for compression, which uses shard_key + def __shard_key_and_index_to_chunk_key__(self, shard_key_tuple: Tuple[int, ...], shard_index: int) -> str: + offset = tuple(shard_index % s2 // s1 for s1, s2 in zip(self._shard_strides, self._shard_strides[1:] + (self._num_chunks_per_shard,))) + original_key = (shard_key_i * shards_i + offset_i for shard_key_i, offset_i, shards_i in zip(shard_key_tuple, offset, self._shards)) + return self._dimension_separator.join(map(str, original_key)) + + def __keys_to_shard_groups__(self, keys: Iterable[str]) -> Dict[str, List[Tuple[str, str]]]: + shard_indices_per_shard_key = defaultdict(list) + for chunk_key in keys: + shard_key, shard_index = self.__chunk_key_to_shard_key_and_index__(chunk_key) + shard_indices_per_shard_key[shard_key].append((shard_index, chunk_key)) + return shard_indices_per_shard_key + + def __get_chunk_slice__(self, shard_index: int) -> Tuple[int, int]: start = shard_index * len(self._fill_chunk) return slice(start, start + len(self._fill_chunk)) def __getitem__(self, key: str) -> bytes: - shard_key, shard_index = self.__key_to_sharded__(key) - chunk_slice = self.__get_chunk_slice__(shard_key, shard_index) - # TODO use partial reads if available - full_shard_value = self._store[shard_key] - return full_shard_value[chunk_slice] + return self.getitems([key])[key] + + def getitems(self, keys: Iterable[str], **kwargs) -> Dict[str, bytes]: + result = {} + for shard_key, chunks_in_shard in self.__keys_to_shard_groups__(keys).items(): + # TODO use partial reads if available + full_shard_value = self._store[shard_key] + # TODO omit items if they don't exist + for shard_index, chunk_key in chunks_in_shard: + result[chunk_key] = full_shard_value[self.__get_chunk_slice__(shard_index)] + return result def __setitem__(self, key: str, value: bytes) -> None: - shard_key, shard_index = self.__key_to_sharded__(key) - if shard_key in self._store: - full_shard_value = bytearray(self._store[shard_key]) - else: - full_shard_value = bytearray(self._fill_chunk * self._num_chunks_per_shard) - chunk_slice = self.__get_chunk_slice__(shard_key, shard_index) - # TODO use partial writes if available - full_shard_value[chunk_slice] = value - self._store[shard_key] = full_shard_value + self.setitems({key: value}) + + def setitems(self, values: Dict[str, bytes]) -> None: + for shard_key, chunks_in_shard in self.__keys_to_shard_groups__(values.keys()).items(): + if len(chunks_in_shard) == self._num_chunks_per_shard: + # TODO shards at a non-dataset-size aligned surface are not captured here yet + full_shard_value = b"".join( + values[chunk_key] for _, chunk_key in sorted(chunks_in_shard) + ) + b"\xff" * len(self._emtpy_meta) + self._store[shard_key] = full_shard_value + else: + # TODO use partial writes if available + try: + full_shard_value = bytearray(self._store[shard_key]) + except KeyError: + full_shard_value = bytearray(self._fill_chunk * self._num_chunks_per_shard + self._emtpy_meta) + chunk_mask = self.__get_meta__(full_shard_value) + for shard_index, chunk_key in chunks_in_shard: + chunk_mask |= 1 << shard_index + full_shard_value[self.__get_chunk_slice__(shard_index)] = values[chunk_key] + self.__set_meta__(full_shard_value, chunk_mask) + self._store[shard_key] = full_shard_value def __delitem__(self, key) -> None: - # TODO not implemented yet - # For uncompressed chunks, deleting the "last" chunk might need to be detected. + # TODO not implemented yet, also delitems + # Deleting the "last" chunk in a shard needs to remove the whole shard raise NotImplementedError("Deletion is not yet implemented") def __iter__(self) -> Iterator[str]: @@ -94,16 +133,20 @@ def __iter__(self) -> Iterator[str]: yield shard_key else: # For each shard key in the wrapped store, all corresponding chunks are yielded. - # TODO: For compressed chunks we might yield only the actualy contained chunks by reading the jumptables. # TODO: allow to be in a group (aka only use last parts for dimensions) - subkeys = tuple(map(int, shard_key.split(self._dimension_separator))) - for offset in product(*(range(i) for i in self._shards)): - original_key = (subkeys_i * shards_i + offset_i for subkeys_i, offset_i, shards_i in zip(subkeys, offset, self._shards)) - yield self._dimension_separator.join(map(str, original_key)) + shard_key_tuple = tuple(map(int, shard_key.split(self._dimension_separator))) + mask = self.__get_meta__(self._store[shard_key]) + for i in range(self._num_chunks_per_shard): + if mask == 0: + break + if mask & 1: + yield self.__shard_key_and_index_to_chunk_key__(shard_key_tuple, i) + mask >>= 1 def __len__(self) -> int: return sum(1 for _ in self.keys()) - # TODO: For efficient reads and writes, we need to implement - # getitems, setitems & delitems - # and combine writes/reads/deletions to the same shard. + +SHARDED_STORES = { + "morton_order": MortonOrderShardedStore, +} diff --git a/zarr/core.py b/zarr/core.py index 01ba6a058..2c5505079 100644 --- a/zarr/core.py +++ b/zarr/core.py @@ -11,7 +11,7 @@ from numcodecs.compat import ensure_bytes, ensure_ndarray from collections.abc import MutableMapping -from zarr._storage.sharded_store import ShardedStore +from zarr._storage.sharded_store import SHARDED_STORES from zarr.attrs import Attributes from zarr.codecs import AsType, get_codec @@ -219,6 +219,7 @@ def _load_metadata_nosync(self): self._shape = meta['shape'] self._chunks = meta['chunks'] self._shards = meta.get('shards') + self._shard_format = meta.get('shard_format') self._dtype = meta['dtype'] self._fill_value = meta['fill_value'] self._order = meta['order'] @@ -272,7 +273,8 @@ def _flush_metadata_nosync(self): # should the dimension_separator also be included in this dict? meta = dict(shape=self._shape, chunks=self._chunks, dtype=self._dtype, compressor=compressor_config, fill_value=self._fill_value, - order=self._order, filters=filters_config, shards=self._shards) + order=self._order, filters=filters_config, + shards=self._shards, shard_format=self._shard_format) mkey = self._key_prefix + array_meta_key self._store[mkey] = self._store._metadata_class.encode_array_metadata(meta) @@ -324,7 +326,7 @@ def chunk_store(self): return chunk_store else: if self._cached_sharded_store is None: - self._cached_sharded_store = ShardedStore( + self._cached_sharded_store = SHARDED_STORES[self._shard_format]( chunk_store, shards=self._shards, dimension_separator=self._dimension_separator, @@ -1731,7 +1733,7 @@ def _set_selection(self, indexer, value, fields=None): check_array_shape('value', value, sel_shape) # iterate over chunks in range - if not hasattr(self.store, "setitems") or self._synchronizer is not None \ + if not hasattr(self.chunk_store, "setitems") or self._synchronizer is not None \ or any(map(lambda x: x == 0, self.shape)): # iterative approach for chunk_coords, chunk_selection, out_selection in indexer: @@ -1974,8 +1976,8 @@ def _chunk_setitems(self, lchunk_coords, lchunk_selection, values, fields=None): self.chunk_store.setitems(to_store) def _chunk_delitems(self, ckeys): - if hasattr(self.store, "delitems"): - self.store.delitems(ckeys) + if hasattr(self.chunk_store, "delitems"): + self.chunk_store.delitems(ckeys) else: # pragma: no cover # exempting this branch from coverage as there are no extant stores # that will trigger this condition, but it's possible that they diff --git a/zarr/creation.py b/zarr/creation.py index b669c4241..d31860164 100644 --- a/zarr/creation.py +++ b/zarr/creation.py @@ -21,7 +21,7 @@ def create(shape, chunks=True, dtype=None, compressor='default', overwrite=False, path=None, chunk_store=None, filters=None, cache_metadata=True, cache_attrs=True, read_only=False, object_codec=None, dimension_separator=None, write_empty_chunks=True, - shards: Union[int, Tuple[int, ...], None]=None, **kwargs): + shards: Union[int, Tuple[int, ...], None]=None, shard_format: str="morton_order", **kwargs): """Create an array. Parameters @@ -147,7 +147,7 @@ def create(shape, chunks=True, dtype=None, compressor='default', init_array(store, shape=shape, chunks=chunks, dtype=dtype, compressor=compressor, fill_value=fill_value, order=order, overwrite=overwrite, path=path, chunk_store=chunk_store, filters=filters, object_codec=object_codec, - dimension_separator=dimension_separator, shards=shards) + dimension_separator=dimension_separator, shards=shards, shard_format=shard_format) # instantiate array z = Array(store, path=path, chunk_store=chunk_store, synchronizer=synchronizer, diff --git a/zarr/meta.py b/zarr/meta.py index 964858de4..d63be624d 100644 --- a/zarr/meta.py +++ b/zarr/meta.py @@ -52,6 +52,7 @@ def decode_array_metadata(cls, s: Union[MappingType, str]) -> MappingType[str, A dimension_separator = meta.get("dimension_separator", None) shards = meta.get("shards", None) + shard_format = meta.get("shard_format", None) fill_value = cls.decode_fill_value(meta['fill_value'], dtype, object_codec) meta = dict( zarr_format=meta["zarr_format"], @@ -67,6 +68,8 @@ def decode_array_metadata(cls, s: Union[MappingType, str]) -> MappingType[str, A meta['dimension_separator'] = dimension_separator if shards: meta['shards'] = tuple(shards) + assert shard_format is not None + meta['shard_format'] = shard_format except Exception as e: raise MetadataError("error decoding metadata") from e else: @@ -81,6 +84,7 @@ def encode_array_metadata(cls, meta: MappingType[str, Any]) -> bytes: dimension_separator = meta.get("dimension_separator") shards = meta.get("shards") + shard_format = meta.get("shard_format") if dtype.hasobject: import numcodecs object_codec = numcodecs.get_codec(meta['filters'][0]) @@ -99,9 +103,10 @@ def encode_array_metadata(cls, meta: MappingType[str, Any]) -> bytes: ) if dimension_separator: meta['dimension_separator'] = dimension_separator - if shards: meta['shards'] = shards + assert shard_format is not None + meta['shard_format'] = shard_format return json_dumps(meta) diff --git a/zarr/storage.py b/zarr/storage.py index e5e07cc23..19709cc11 100644 --- a/zarr/storage.py +++ b/zarr/storage.py @@ -237,6 +237,7 @@ def init_array( object_codec=None, dimension_separator=None, shards: Union[int, Tuple[int, ...], None]=None, + shard_format: Optional[str]=None, ): """Initialize an array store with the given configuration. Note that this is a low-level function and there should be no need to call this directly from user code. @@ -355,7 +356,7 @@ def init_array( chunk_store=chunk_store, filters=filters, object_codec=object_codec, dimension_separator=dimension_separator, - shards=shards) + shards=shards, shard_format=shard_format) def _init_array_metadata( @@ -373,6 +374,7 @@ def _init_array_metadata( object_codec=None, dimension_separator=None, shards:Union[int, Tuple[int, ...], None] = None, + shard_format: Optional[str]=None, ): # guard conditions @@ -392,6 +394,7 @@ def _init_array_metadata( dtype = dtype.base chunks = normalize_chunks(chunks, shape, dtype.itemsize) shards = normalize_shards(shards, shape) + shard_format = shard_format or "morton_order" order = normalize_order(order) fill_value = normalize_fill_value(fill_value, dtype) @@ -451,6 +454,7 @@ def _init_array_metadata( dimension_separator=dimension_separator) if shards is not None: meta["shards"] = shards + meta["shard_format"] = shard_format key = _path_to_prefix(path) + array_meta_key if hasattr(store, '_metadata_class'): store[key] = store._metadata_class.encode_array_metadata(meta) # type: ignore