diff --git a/docs/releases.rst b/docs/releases.rst index ae28fbbe..ee1ae402 100644 --- a/docs/releases.rst +++ b/docs/releases.rst @@ -67,6 +67,8 @@ Internal Changes - Refactored internal structure significantly to split up everything to do with reading references from that to do with writing references. (:issue:`229`) (:pull:`231`) By `Tom Nicholas `_. +- Refactored readers to consider every filetype as a separate reader, all standardized to present the same `open_virtual_dataset` interface internally. + (:pull:`261`) By `Tom Nicholas `_. .. _v1.0.0: diff --git a/virtualizarr/backend.py b/virtualizarr/backend.py index 4da9e896..0322f604 100644 --- a/virtualizarr/backend.py +++ b/virtualizarr/backend.py @@ -1,25 +1,39 @@ -import os import warnings -from collections.abc import Iterable, Mapping, MutableMapping +from collections.abc import Iterable, Mapping from enum import Enum, auto -from io import BufferedIOBase +from pathlib import Path from typing import ( Any, - Hashable, Optional, - cast, ) -import xarray as xr -from xarray.backends import AbstractDataStore, BackendArray -from xarray.core.indexes import Index, PandasIndex -from xarray.core.variable import IndexVariable +from xarray import Dataset +from xarray.core.indexes import Index from virtualizarr.manifests import ManifestArray -from virtualizarr.types.kerchunk import KerchunkStoreRefs -from virtualizarr.utils import _FsspecFSFromFilepath - -XArrayOpenT = str | os.PathLike[Any] | BufferedIOBase | AbstractDataStore +from virtualizarr.readers import ( + DMRPPVirtualBackend, + FITSVirtualBackend, + HDF5VirtualBackend, + KerchunkVirtualBackend, + NetCDF3VirtualBackend, + TIFFVirtualBackend, + ZarrV3VirtualBackend, +) +from virtualizarr.utils import _FsspecFSFromFilepath, check_for_collisions + +# TODO add entrypoint to allow external libraries to add to this mapping +VIRTUAL_BACKENDS = { + "kerchunk": KerchunkVirtualBackend, + "zarr_v3": ZarrV3VirtualBackend, + "dmrpp": DMRPPVirtualBackend, + # all the below call one of the kerchunk backends internally (https://fsspec.github.io/kerchunk/reference.html#file-format-backends) + "netcdf3": NetCDF3VirtualBackend, + "hdf5": HDF5VirtualBackend, + "netcdf4": HDF5VirtualBackend, # note this is the same as for hdf5 + "tiff": TIFFVirtualBackend, + "fits": FITSVirtualBackend, +} class AutoName(Enum): @@ -43,10 +57,49 @@ class FileType(AutoName): kerchunk = auto() -class ManifestBackendArray(ManifestArray, BackendArray): - """Using this prevents xarray from wrapping the KerchunkArray in ExplicitIndexingAdapter etc.""" +def automatically_determine_filetype( + *, + filepath: str, + reader_options: Optional[dict[str, Any]] = {}, +) -> FileType: + """ + Attempt to automatically infer the correct reader for this filetype. + + Uses magic bytes and file / directory suffixes. + """ - ... + # TODO this should ideally handle every filetype that we have a reader for, not just kerchunk + + # TODO how do we handle kerchunk json / parquet here? + if Path(filepath).suffix == ".zarr": + # TODO we could imagine opening an existing zarr store, concatenating it, and writing a new virtual one... + raise NotImplementedError() + + # Read magic bytes from local or remote file + fpath = _FsspecFSFromFilepath( + filepath=filepath, reader_options=reader_options + ).open_file() + magic_bytes = fpath.read(8) + fpath.close() + + if magic_bytes.startswith(b"CDF"): + filetype = FileType.netcdf3 + elif magic_bytes.startswith(b"\x0e\x03\x13\x01"): + raise NotImplementedError("HDF4 formatted files not supported") + elif magic_bytes.startswith(b"\x89HDF"): + filetype = FileType.hdf5 + elif magic_bytes.startswith(b"GRIB"): + filetype = FileType.grib + elif magic_bytes.startswith(b"II*"): + filetype = FileType.tiff + elif magic_bytes.startswith(b"SIMPLE"): + filetype = FileType.fits + else: + raise NotImplementedError( + f"Unrecognised file based on header bytes: {magic_bytes}" + ) + + return filetype def open_virtual_dataset( @@ -61,7 +114,7 @@ def open_virtual_dataset( indexes: Mapping[str, Index] | None = None, virtual_array_class=ManifestArray, reader_options: Optional[dict] = None, -) -> xr.Dataset: +) -> Dataset: """ Open a file or store as an xarray Dataset wrapping virtualized zarr arrays. @@ -69,7 +122,6 @@ def open_virtual_dataset( Xarray indexes can optionally be created (the default behaviour). To avoid creating any xarray indexes pass ``indexes={}``. - Parameters ---------- filepath : str, default None @@ -112,217 +164,38 @@ def open_virtual_dataset( stacklevel=2, ) - loadable_vars: dict[str, xr.Variable] - virtual_vars: dict[str, xr.Variable] - vars: dict[str, xr.Variable] - - if drop_variables is None: - drop_variables = [] - elif isinstance(drop_variables, str): - drop_variables = [drop_variables] - else: - drop_variables = list(drop_variables) - if loadable_variables is None: - loadable_variables = [] - elif isinstance(loadable_variables, str): - loadable_variables = [loadable_variables] - else: - loadable_variables = list(loadable_variables) - common = set(drop_variables).intersection(set(loadable_variables)) - if common: - raise ValueError(f"Cannot both load and drop variables {common}") + drop_variables, loadable_variables = check_for_collisions( + drop_variables, + loadable_variables, + ) if virtual_array_class is not ManifestArray: raise NotImplementedError() - # if filetype is user defined, convert to FileType + if reader_options is None: + reader_options = {} if filetype is not None: + # if filetype is user defined, convert to FileType filetype = FileType(filetype) - - if filetype == FileType.kerchunk: - from virtualizarr.readers.kerchunk import dataset_from_kerchunk_refs - - fs = _FsspecFSFromFilepath(filepath=filepath, reader_options=reader_options) - - # The kerchunk .parquet storage format isn't actually a parquet, but a directory that contains named parquets for each group/variable. - if fs.filepath.endswith("ref.parquet"): - from fsspec.implementations.reference import LazyReferenceMapper - - lrm = LazyReferenceMapper(filepath, fs.fs) - - # build reference dict from KV pairs in LazyReferenceMapper - # is there a better / more preformant way to extract this? - array_refs = {k: lrm[k] for k in lrm.keys()} - - full_reference = {"refs": array_refs} - - return dataset_from_kerchunk_refs(KerchunkStoreRefs(full_reference)) - - # JSON has no magic bytes, but the Kerchunk version 1 spec starts with 'version': - # https://fsspec.github.io/kerchunk/spec.html - elif fs.read_bytes(9).startswith(b'{"version'): - import ujson - - with fs.open_file() as of: - refs = ujson.load(of) - - return dataset_from_kerchunk_refs(KerchunkStoreRefs(refs)) - - else: - raise ValueError( - "The input Kerchunk reference did not seem to be in Kerchunk's JSON or Parquet spec: https://fsspec.github.io/kerchunk/spec.html. The Kerchunk format autodetection is quite flaky, so if your reference matches the Kerchunk spec feel free to open an issue: https://github.com/zarr-developers/VirtualiZarr/issues" - ) - - if filetype == FileType.zarr_v3: - # TODO is there a neat way of auto-detecting this? - from virtualizarr.readers.zarr import open_virtual_dataset_from_v3_store - - return open_virtual_dataset_from_v3_store( - storepath=filepath, drop_variables=drop_variables, indexes=indexes - ) - elif filetype == FileType.dmrpp: - from virtualizarr.readers.dmrpp import DMRParser - - if loadable_variables != [] or indexes is None: - raise NotImplementedError( - "Specifying `loadable_variables` or auto-creating indexes with `indexes=None` is not supported for dmrpp files." - ) - - fpath = _FsspecFSFromFilepath( - filepath=filepath, reader_options=reader_options - ).open_file() - parser = DMRParser(fpath.read(), data_filepath=filepath.strip(".dmrpp")) - vds = parser.parse_dataset() - vds.drop_vars(drop_variables) - return vds else: - # we currently read every other filetype using kerchunks various file format backends - from virtualizarr.readers.kerchunk import ( - fully_decode_arr_refs, - read_kerchunk_references_from_file, - virtual_vars_from_kerchunk_refs, - ) - - if reader_options is None: - reader_options = {} - - # this is the only place we actually always need to use kerchunk directly - # TODO avoid even reading byte ranges for variables that will be dropped later anyway? - vds_refs = read_kerchunk_references_from_file( - filepath=filepath, - filetype=filetype, - group=group, - reader_options=reader_options, - ) - virtual_vars = virtual_vars_from_kerchunk_refs( - vds_refs, - drop_variables=drop_variables + loadable_variables, - virtual_array_class=virtual_array_class, - ) - ds_attrs = fully_decode_arr_refs(vds_refs["refs"]).get(".zattrs", {}) - coord_names = ds_attrs.pop("coordinates", []) - - if indexes is None or len(loadable_variables) > 0: - # TODO we are reading a bunch of stuff we know we won't need here, e.g. all of the data variables... - # TODO it would also be nice if we could somehow consolidate this with the reading of the kerchunk references - # TODO really we probably want a dedicated xarray backend that iterates over all variables only once - fpath = _FsspecFSFromFilepath( - filepath=filepath, reader_options=reader_options - ).open_file() - - # fpath can be `Any` thanks to fsspec.filesystem(...).open() returning Any. - # We'll (hopefully safely) cast it to what xarray is expecting, but this might let errors through. - - ds = xr.open_dataset( - cast(XArrayOpenT, fpath), - drop_variables=drop_variables, - group=group, - decode_times=decode_times, - ) - - if indexes is None: - warnings.warn( - "Specifying `indexes=None` will create in-memory pandas indexes for each 1D coordinate, but concatenation of ManifestArrays backed by pandas indexes is not yet supported (see issue #18)." - "You almost certainly want to pass `indexes={}` to `open_virtual_dataset` instead." - ) - - # add default indexes by reading data from file - indexes = {name: index for name, index in ds.xindexes.items()} - elif indexes != {}: - # TODO allow manual specification of index objects - raise NotImplementedError() - else: - indexes = dict(**indexes) # for type hinting: to allow mutation - - loadable_vars = { - str(name): var - for name, var in ds.variables.items() - if name in loadable_variables - } - - # if we only read the indexes we can just close the file right away as nothing is lazy - if loadable_vars == {}: - ds.close() - else: - loadable_vars = {} - indexes = {} - - vars = {**virtual_vars, **loadable_vars} - - data_vars, coords = separate_coords(vars, indexes, coord_names) - - vds = xr.Dataset( - data_vars, - coords=coords, - # indexes={}, # TODO should be added in a later version of xarray - attrs=ds_attrs, + filetype = automatically_determine_filetype( + filepath=filepath, reader_options=reader_options ) - # TODO we should probably also use vds.set_close() to tell xarray how to close the file we opened - - return vds - - -def separate_coords( - vars: Mapping[str, xr.Variable], - indexes: MutableMapping[str, Index], - coord_names: Iterable[str] | None = None, -) -> tuple[dict[str, xr.Variable], xr.Coordinates]: - """ - Try to generate a set of coordinates that won't cause xarray to automatically build a pandas.Index for the 1D coordinates. - - Currently requires this function as a workaround unless xarray PR #8124 is merged. - - Will also preserve any loaded variables and indexes it is passed. - """ - - if coord_names is None: - coord_names = [] - - # split data and coordinate variables (promote dimension coordinates) - data_vars = {} - coord_vars: dict[ - str, tuple[Hashable, Any, dict[Any, Any], dict[Any, Any]] | xr.Variable - ] = {} - for name, var in vars.items(): - if name in coord_names or var.dims == (name,): - # use workaround to avoid creating IndexVariables described here https://github.com/pydata/xarray/pull/8107#discussion_r1311214263 - if len(var.dims) == 1: - dim1d, *_ = var.dims - coord_vars[name] = (dim1d, var.data, var.attrs, var.encoding) + backend_cls = VIRTUAL_BACKENDS.get(filetype.name.lower()) - if isinstance(var, IndexVariable): - # unless variable actually already is a loaded IndexVariable, - # in which case we need to keep it and add the corresponding indexes explicitly - coord_vars[str(name)] = var - # TODO this seems suspect - will it handle datetimes? - indexes[name] = PandasIndex(var, dim1d) - else: - coord_vars[name] = var - else: - data_vars[name] = var + if backend_cls is None: + raise NotImplementedError(f"Unsupported file type: {filetype.name}") - coords = xr.Coordinates(coord_vars, indexes=indexes) + vds = backend_cls.open_virtual_dataset( + filepath, + group=group, + drop_variables=drop_variables, + loadable_variables=loadable_variables, + decode_times=decode_times, + indexes=indexes, + reader_options=reader_options, + ) - return data_vars, coords + return vds diff --git a/virtualizarr/manifests/array.py b/virtualizarr/manifests/array.py index 5ac0aef0..179bcf1c 100644 --- a/virtualizarr/manifests/array.py +++ b/virtualizarr/manifests/array.py @@ -3,10 +3,13 @@ import numpy as np -from ..types.kerchunk import KerchunkArrRefs -from ..zarr import ZArray -from .array_api import MANIFESTARRAY_HANDLED_ARRAY_FUNCTIONS, _isnan -from .manifest import ChunkManifest +from virtualizarr.manifests.array_api import ( + MANIFESTARRAY_HANDLED_ARRAY_FUNCTIONS, + _isnan, +) +from virtualizarr.manifests.manifest import ChunkManifest +from virtualizarr.types.kerchunk import KerchunkArrRefs +from virtualizarr.zarr import ZArray class ManifestArray: @@ -61,7 +64,7 @@ def __init__( @classmethod def _from_kerchunk_refs(cls, arr_refs: KerchunkArrRefs) -> "ManifestArray": - from virtualizarr.readers.kerchunk import ( + from virtualizarr.translators.kerchunk import ( fully_decode_arr_refs, parse_array_refs, ) diff --git a/virtualizarr/manifests/array_api.py b/virtualizarr/manifests/array_api.py index 18f15933..f5cf220b 100644 --- a/virtualizarr/manifests/array_api.py +++ b/virtualizarr/manifests/array_api.py @@ -2,7 +2,7 @@ import numpy as np -from virtualizarr.zarr import Codec, ceildiv +from virtualizarr.zarr import Codec, determine_chunk_grid_shape from .manifest import ChunkManifest @@ -293,10 +293,7 @@ def broadcast_to(x: "ManifestArray", /, shape: tuple[int, ...]) -> "ManifestArra ) # find new chunk grid shape by dividing new array shape by new chunk shape - new_chunk_grid_shape = tuple( - ceildiv(axis_length, chunk_length) - for axis_length, chunk_length in zip(new_shape, new_chunk_shape) - ) + new_chunk_grid_shape = determine_chunk_grid_shape(new_shape, new_chunk_shape) # do broadcasting of entries in manifest broadcasted_paths = cast( # `np.broadcast_to` apparently is type hinted as if the output could have Any dtype diff --git a/virtualizarr/readers/__init__.py b/virtualizarr/readers/__init__.py new file mode 100644 index 00000000..0f83ba39 --- /dev/null +++ b/virtualizarr/readers/__init__.py @@ -0,0 +1,17 @@ +from virtualizarr.readers.dmrpp import DMRPPVirtualBackend +from virtualizarr.readers.fits import FITSVirtualBackend +from virtualizarr.readers.hdf5 import HDF5VirtualBackend +from virtualizarr.readers.kerchunk import KerchunkVirtualBackend +from virtualizarr.readers.netcdf3 import NetCDF3VirtualBackend +from virtualizarr.readers.tiff import TIFFVirtualBackend +from virtualizarr.readers.zarr_v3 import ZarrV3VirtualBackend + +__all__ = [ + "DMRPPVirtualBackend", + "FITSVirtualBackend", + "HDF5VirtualBackend", + "KerchunkVirtualBackend", + "NetCDF3VirtualBackend", + "TIFFVirtualBackend", + "ZarrV3VirtualBackend", +] diff --git a/virtualizarr/readers/common.py b/virtualizarr/readers/common.py new file mode 100644 index 00000000..54aedfe2 --- /dev/null +++ b/virtualizarr/readers/common.py @@ -0,0 +1,195 @@ +import os +import warnings +from abc import ABC +from collections.abc import Iterable, Mapping, MutableMapping +from io import BufferedIOBase +from typing import ( + TYPE_CHECKING, + Any, + Hashable, + Optional, + cast, +) + +import xarray as xr +from xarray import Dataset +from xarray.backends import AbstractDataStore, BackendArray +from xarray.core.indexes import Index, PandasIndex +from xarray.core.variable import IndexVariable, Variable + +from virtualizarr.manifests import ManifestArray +from virtualizarr.utils import _FsspecFSFromFilepath + +XArrayOpenT = str | os.PathLike[Any] | BufferedIOBase | AbstractDataStore + +if TYPE_CHECKING: + try: + from xarray import DataTree # type: ignore[attr-defined] + except ImportError: + DataTree = Any + + +class ManifestBackendArray(ManifestArray, BackendArray): + """Using this prevents xarray from wrapping the KerchunkArray in ExplicitIndexingAdapter etc.""" + + ... + + +def open_loadable_vars_and_indexes( + filepath: str, + loadable_variables, + reader_options, + drop_variables, + indexes, + group, + decode_times, +) -> tuple[Mapping[str, Variable], Mapping[str, Index]]: + """ + Open selected variables and indexes using xarray. + + Relies on xr.open_dataset and its auto-detection of filetypes to find the correct installed backend. + """ + + # TODO get rid of this if? + if indexes is None or len(loadable_variables) > 0: + # TODO we are reading a bunch of stuff we know we won't need here, e.g. all of the data variables... + # TODO it would also be nice if we could somehow consolidate this with the reading of the kerchunk references + # TODO really we probably want a dedicated xarray backend that iterates over all variables only once + fpath = _FsspecFSFromFilepath( + filepath=filepath, reader_options=reader_options + ).open_file() + + # fpath can be `Any` thanks to fsspec.filesystem(...).open() returning Any. + # We'll (hopefully safely) cast it to what xarray is expecting, but this might let errors through. + + ds = xr.open_dataset( + cast(XArrayOpenT, fpath), + drop_variables=drop_variables, + group=group, + decode_times=decode_times, + ) + + if indexes is None: + warnings.warn( + "Specifying `indexes=None` will create in-memory pandas indexes for each 1D coordinate, but concatenation of ManifestArrays backed by pandas indexes is not yet supported (see issue #18)." + "You almost certainly want to pass `indexes={}` to `open_virtual_dataset` instead." + ) + + # add default indexes by reading data from file + indexes = {name: index for name, index in ds.xindexes.items()} + elif indexes != {}: + # TODO allow manual specification of index objects + raise NotImplementedError() + else: + indexes = dict(**indexes) # for type hinting: to allow mutation + + # TODO we should drop these earlier by using drop_variables + loadable_vars = { + str(name): var + for name, var in ds.variables.items() + if name in loadable_variables + } + + # if we only read the indexes we can just close the file right away as nothing is lazy + if loadable_vars == {}: + ds.close() + else: + loadable_vars = {} + indexes = {} + + return loadable_vars, indexes + + +def construct_virtual_dataset( + virtual_vars, + loadable_vars, + indexes, + coord_names, + attrs, +) -> Dataset: + """Construct a virtual Datset from consistuent parts.""" + + vars = {**virtual_vars, **loadable_vars} + + data_vars, coords = separate_coords(vars, indexes, coord_names) + + vds = xr.Dataset( + data_vars, + coords=coords, + # indexes={}, # TODO should be added in a later version of xarray + attrs=attrs, + ) + + # TODO we should probably also use vds.set_close() to tell xarray how to close the file we opened + + return vds + + +def separate_coords( + vars: Mapping[str, xr.Variable], + indexes: MutableMapping[str, Index], + coord_names: Iterable[str] | None = None, +) -> tuple[dict[str, xr.Variable], xr.Coordinates]: + """ + Try to generate a set of coordinates that won't cause xarray to automatically build a pandas.Index for the 1D coordinates. + + Currently requires this function as a workaround unless xarray PR #8124 is merged. + + Will also preserve any loaded variables and indexes it is passed. + """ + + if coord_names is None: + coord_names = [] + + # split data and coordinate variables (promote dimension coordinates) + data_vars = {} + coord_vars: dict[ + str, tuple[Hashable, Any, dict[Any, Any], dict[Any, Any]] | xr.Variable + ] = {} + for name, var in vars.items(): + if name in coord_names or var.dims == (name,): + # use workaround to avoid creating IndexVariables described here https://github.com/pydata/xarray/pull/8107#discussion_r1311214263 + if len(var.dims) == 1: + dim1d, *_ = var.dims + coord_vars[name] = (dim1d, var.data, var.attrs, var.encoding) + + if isinstance(var, IndexVariable): + # unless variable actually already is a loaded IndexVariable, + # in which case we need to keep it and add the corresponding indexes explicitly + coord_vars[str(name)] = var + # TODO this seems suspect - will it handle datetimes? + indexes[name] = PandasIndex(var, dim1d) + else: + coord_vars[name] = var + else: + data_vars[name] = var + + coords = xr.Coordinates(coord_vars, indexes=indexes) + + return data_vars, coords + + +class VirtualBackend(ABC): + @staticmethod + def open_virtual_dataset( + filepath: str, + group: str | None = None, + drop_variables: Iterable[str] | None = None, + loadable_variables: Iterable[str] | None = None, + decode_times: bool | None = None, + indexes: Mapping[str, Index] | None = None, + reader_options: Optional[dict] = None, + ) -> Dataset: + raise NotImplementedError() + + @staticmethod + def open_virtual_datatree( + path: str, + group: str | None = None, + drop_variables: Iterable[str] | None = None, + loadable_variables: Iterable[str] | None = None, + decode_times: bool | None = None, + indexes: Mapping[str, Index] | None = None, + reader_options: Optional[dict] = None, + ) -> "DataTree": + raise NotImplementedError() diff --git a/virtualizarr/readers/dmrpp.py b/virtualizarr/readers/dmrpp.py index fa66205a..766b1c62 100644 --- a/virtualizarr/readers/dmrpp.py +++ b/virtualizarr/readers/dmrpp.py @@ -2,18 +2,55 @@ import warnings from collections import defaultdict from collections.abc import Mapping -from typing import Any, Optional +from typing import Any, Iterable, Optional from xml.etree import ElementTree as ET import numpy as np -import xarray as xr +from xarray import Coordinates, Dataset from xarray.core.indexes import Index +from xarray.core.variable import Variable from virtualizarr.manifests import ChunkManifest, ManifestArray +from virtualizarr.readers.common import VirtualBackend from virtualizarr.types import ChunkKey +from virtualizarr.utils import _FsspecFSFromFilepath, check_for_collisions from virtualizarr.zarr import ZArray +class DMRPPVirtualBackend(VirtualBackend): + @staticmethod + def open_virtual_dataset( + filepath: str, + group: str | None = None, + drop_variables: Iterable[str] | None = None, + loadable_variables: Iterable[str] | None = None, + decode_times: bool | None = None, + indexes: Mapping[str, Index] | None = None, + reader_options: Optional[dict] = None, + ) -> Dataset: + loadable_variables, drop_variables = check_for_collisions( + drop_variables=drop_variables, + loadable_variables=loadable_variables, + ) + + if loadable_variables != [] or decode_times or indexes is None: + raise NotImplementedError( + "Specifying `loadable_variables` or auto-creating indexes with `indexes=None` is not supported for dmrpp files." + ) + + if group: + raise NotImplementedError() + + fpath = _FsspecFSFromFilepath( + filepath=filepath, reader_options=reader_options + ).open_file() + + parser = DMRParser(fpath.read(), data_filepath=filepath.strip(".dmrpp")) + vds = parser.parse_dataset() + + return vds.drop_vars(drop_variables) + + class DMRParser: """ Parser for the OPeNDAP DMR++ XML format. @@ -69,9 +106,7 @@ def __init__(self, dmr: str, data_filepath: Optional[str] = None): data_filepath if data_filepath is not None else self.root.attrib["name"] ) - def parse_dataset( - self, group=None, indexes: Mapping[str, Index] = {} - ) -> xr.Dataset: + def parse_dataset(self, group=None, indexes: Mapping[str, Index] = {}) -> Dataset: """ Parses the given file and creates a virtual xr.Dataset with ManifestArrays. @@ -128,7 +163,7 @@ def _parse_netcdf4_dataset( root: ET.Element, group: Optional[str] = None, indexes: Mapping[str, Index] = {}, - ) -> xr.Dataset: + ) -> Dataset: """ Parse the dataset from the netcdf4 based dmrpp with groups, starting at the given group. Set root to the given group. @@ -201,7 +236,7 @@ def _parse_hdf5_dataset( root: ET.Element, group: Optional[str] = None, indexes: Mapping[str, Index] = {}, - ) -> xr.Dataset: + ) -> Dataset: """ Parse the dataset from the HDF5 based dmrpp with groups, starting at the given group. Set root to the given group. @@ -331,7 +366,7 @@ def _split_hdf5(self, root: ET.Element) -> dict[str, ET.Element]: def _parse_dataset( self, root: ET.Element, indexes: Mapping[str, Index] = {} - ) -> xr.Dataset: + ) -> Dataset: """ Parse the dataset using the root element of the DMR file. @@ -353,8 +388,8 @@ def _parse_dataset( if len(coord_names) == 0 or len(coord_names) < len(dataset_dims): coord_names = set(dataset_dims.keys()) # Seperate and parse coords + data variables - coord_vars: dict[str, xr.Variable] = {} - data_vars: dict[str, xr.Variable] = {} + coord_vars: dict[str, Variable] = {} + data_vars: dict[str, Variable] = {} for var_tag in self._find_var_tags(root): variable = self._parse_variable(var_tag, dataset_dims) if var_tag.attrib["name"] in coord_names: @@ -365,9 +400,9 @@ def _parse_dataset( attrs: dict[str, str] = {} for attr_tag in self.root.iterfind("dap:Attribute", self._ns): attrs.update(self._parse_attribute(attr_tag)) - return xr.Dataset( + return Dataset( data_vars=data_vars, - coords=xr.Coordinates(coords=coord_vars, indexes=indexes), + coords=Coordinates(coords=coord_vars, indexes=indexes), attrs=attrs, ) @@ -484,7 +519,7 @@ def _parse_multi_dims( def _parse_variable( self, var_tag: ET.Element, dataset_dims: dict[str, int] - ) -> xr.Variable: + ) -> Variable: """ Parse a variable from a DMR tag. @@ -542,7 +577,7 @@ def _parse_variable( ) marr = ManifestArray(zarray=zarray, chunkmanifest=chunkmanifest) encoding = {k: attrs.get(k) for k in self._encoding_keys if k in attrs} - return xr.Variable( + return Variable( dims=dim_shapes.keys(), data=marr, attrs=attrs, encoding=encoding ) diff --git a/virtualizarr/readers/fits.py b/virtualizarr/readers/fits.py new file mode 100644 index 00000000..618d81cd --- /dev/null +++ b/virtualizarr/readers/fits.py @@ -0,0 +1,59 @@ +from typing import Iterable, Mapping, Optional + +from xarray import Dataset +from xarray.core.indexes import Index + +from virtualizarr.readers.common import ( + VirtualBackend, + construct_virtual_dataset, + open_loadable_vars_and_indexes, +) +from virtualizarr.translators.kerchunk import ( + extract_group, + virtual_vars_and_metadata_from_kerchunk_refs, +) +from virtualizarr.types.kerchunk import KerchunkStoreRefs + + +class FITSVirtualBackend(VirtualBackend): + @staticmethod + def open_virtual_dataset( + filepath: str, + group: str | None = None, + drop_variables: Iterable[str] | None = None, + loadable_variables: Iterable[str] | None = None, + decode_times: bool | None = None, + indexes: Mapping[str, Index] | None = None, + reader_options: Optional[dict] = None, + ) -> Dataset: + from kerchunk.fits import process_file + + # handle inconsistency in kerchunk, see GH issue https://github.com/zarr-developers/VirtualiZarr/issues/160 + refs = KerchunkStoreRefs({"refs": process_file(filepath, **reader_options)}) + + refs = extract_group(refs, group) + + virtual_vars, attrs, coord_names = virtual_vars_and_metadata_from_kerchunk_refs( + refs, + loadable_variables, + drop_variables, + ) + + # TODO this wouldn't work until you had an xarray backend for FITS installed + loadable_vars, indexes = open_loadable_vars_and_indexes( + filepath, + loadable_variables=loadable_variables, + reader_options=reader_options, + drop_variables=drop_variables, + indexes=indexes, + group=group, + decode_times=decode_times, + ) + + return construct_virtual_dataset( + virtual_vars=virtual_vars, + loadable_vars=loadable_vars, + indexes=indexes, + coord_names=coord_names, + attrs=attrs, + ) diff --git a/virtualizarr/readers/hdf5.py b/virtualizarr/readers/hdf5.py new file mode 100644 index 00000000..c0d38e20 --- /dev/null +++ b/virtualizarr/readers/hdf5.py @@ -0,0 +1,64 @@ +from typing import Iterable, Mapping, Optional + +from xarray import Dataset +from xarray.core.indexes import Index + +from virtualizarr.readers.common import ( + VirtualBackend, + construct_virtual_dataset, + open_loadable_vars_and_indexes, +) +from virtualizarr.translators.kerchunk import ( + extract_group, + virtual_vars_and_metadata_from_kerchunk_refs, +) +from virtualizarr.utils import check_for_collisions + + +class HDF5VirtualBackend(VirtualBackend): + @staticmethod + def open_virtual_dataset( + filepath: str, + group: str | None = None, + drop_variables: Iterable[str] | None = None, + loadable_variables: Iterable[str] | None = None, + decode_times: bool | None = None, + indexes: Mapping[str, Index] | None = None, + reader_options: Optional[dict] = None, + ) -> Dataset: + from kerchunk.hdf import SingleHdf5ToZarr + + drop_variables, loadable_variables = check_for_collisions( + drop_variables, + loadable_variables, + ) + + refs = SingleHdf5ToZarr( + filepath, inline_threshold=0, **reader_options + ).translate() + + refs = extract_group(refs, group) + + virtual_vars, attrs, coord_names = virtual_vars_and_metadata_from_kerchunk_refs( + refs, + loadable_variables, + drop_variables, + ) + + loadable_vars, indexes = open_loadable_vars_and_indexes( + filepath, + loadable_variables=loadable_variables, + reader_options=reader_options, + drop_variables=drop_variables, + indexes=indexes, + group=group, + decode_times=decode_times, + ) + + return construct_virtual_dataset( + virtual_vars=virtual_vars, + loadable_vars=loadable_vars, + indexes=indexes, + coord_names=coord_names, + attrs=attrs, + ) diff --git a/virtualizarr/readers/kerchunk.py b/virtualizarr/readers/kerchunk.py index a8740b19..35fa4932 100644 --- a/virtualizarr/readers/kerchunk.py +++ b/virtualizarr/readers/kerchunk.py @@ -1,323 +1,69 @@ -import warnings -from pathlib import Path -from typing import Any, MutableMapping, Optional, cast +from typing import Iterable, Mapping, Optional +import ujson from xarray import Dataset from xarray.core.indexes import Index -from xarray.core.variable import Variable -from virtualizarr.backend import FileType, separate_coords -from virtualizarr.manifests import ChunkManifest, ManifestArray +from virtualizarr.readers.common import VirtualBackend +from virtualizarr.translators.kerchunk import dataset_from_kerchunk_refs from virtualizarr.types.kerchunk import ( - KerchunkArrRefs, KerchunkStoreRefs, ) -from virtualizarr.utils import _FsspecFSFromFilepath -from virtualizarr.zarr import ZArray, ZAttrs, ceildiv - - -# TODO shouldn't this live in backend.py? Because it's not just useful for the kerchunk-specific readers... -def _automatically_determine_filetype( - *, - filepath: str, - reader_options: Optional[dict[str, Any]] = {}, -) -> FileType: - if Path(filepath).suffix == ".zarr": - # TODO we could imagine opening an existing zarr store, concatenating it, and writing a new virtual one... - raise NotImplementedError() - - # Read magic bytes from local or remote file - fpath = _FsspecFSFromFilepath( - filepath=filepath, reader_options=reader_options - ).open_file() - magic_bytes = fpath.read(8) - fpath.close() - - if magic_bytes.startswith(b"CDF"): - filetype = FileType.netcdf3 - elif magic_bytes.startswith(b"\x0e\x03\x13\x01"): - raise NotImplementedError("HDF4 formatted files not supported") - elif magic_bytes.startswith(b"\x89HDF"): - filetype = FileType.hdf5 - elif magic_bytes.startswith(b"GRIB"): - filetype = FileType.grib - elif magic_bytes.startswith(b"II*"): - filetype = FileType.tiff - elif magic_bytes.startswith(b"SIMPLE"): - filetype = FileType.fits - else: - raise NotImplementedError( - f"Unrecognised file based on header bytes: {magic_bytes}" +from virtualizarr.utils import _FsspecFSFromFilepath, check_for_collisions + + +class KerchunkVirtualBackend(VirtualBackend): + @staticmethod + def open_virtual_dataset( + filepath: str, + group: str | None = None, + drop_variables: Iterable[str] | None = None, + loadable_variables: Iterable[str] | None = None, + decode_times: bool | None = None, + indexes: Mapping[str, Index] | None = None, + reader_options: Optional[dict] = None, + ) -> Dataset: + """Reads existing kerchunk references (in JSON or parquet) format.""" + + if group: + raise NotImplementedError() + + loadable_variables, drop_variables = check_for_collisions( + drop_variables=drop_variables, + loadable_variables=loadable_variables, ) - return filetype + if loadable_variables or indexes or decode_times: + raise NotImplementedError() + fs = _FsspecFSFromFilepath(filepath=filepath, reader_options=reader_options) -def read_kerchunk_references_from_file( - filepath: str, - filetype: FileType | None, - group: str | None, - reader_options: Optional[dict[str, Any]] = None, -) -> KerchunkStoreRefs: - """ - Read a single legacy file and return kerchunk references to its contents. - - Parameters - ---------- - filepath : str, default: None - File path to open as a set of virtualized zarr arrays. - filetype : FileType, default: None - Type of file to be opened. Used to determine which kerchunk file format backend to use. - If not provided will attempt to automatically infer the correct filetype from the the filepath's extension. - group : str, default is None - Path to the HDF5/netCDF4 group in the given file to open. Given as a str, supported by filetypes “netcdf4” and “hdf5”. - Dict passed into Kerchunk file readers. Note: Each Kerchunk file reader has distinct arguments, - so ensure reader_options match selected Kerchunk reader arguments. - """ - - if reader_options is None: - reader_options = {} - - if filetype is None: - filetype = _automatically_determine_filetype( - filepath=filepath, reader_options=reader_options - ) + # The kerchunk .parquet storage format isn't actually a parquet, but a directory that contains named parquets for each group/variable. + if fs.filepath.endswith("ref.parquet"): + from fsspec.implementations.reference import LazyReferenceMapper - # if filetype is user defined, convert to FileType - filetype = FileType(filetype) + lrm = LazyReferenceMapper(filepath, fs.fs) - if filetype.name.lower() == "netcdf3": - from kerchunk.netCDF3 import NetCDF3ToZarr - - refs = NetCDF3ToZarr(filepath, inline_threshold=0, **reader_options).translate() - - elif filetype.name.lower() == "hdf5" or filetype.name.lower() == "netcdf4": - from kerchunk.hdf import SingleHdf5ToZarr - - refs = SingleHdf5ToZarr( - filepath, inline_threshold=0, **reader_options - ).translate() - - refs = extract_group(refs, group) - - elif filetype.name.lower() == "grib": - # TODO Grib files should be handled as a DataTree object - # see https://github.com/TomNicholas/VirtualiZarr/issues/11 - raise NotImplementedError(f"Unsupported file type: {filetype}") - elif filetype.name.lower() == "tiff": - from kerchunk.tiff import tiff_to_zarr - - reader_options.pop("storage_options", {}) - warnings.warn( - "storage_options have been dropped from reader_options as they are not supported by kerchunk.tiff.tiff_to_zarr", - UserWarning, - ) + # build reference dict from KV pairs in LazyReferenceMapper + # is there a better / more preformant way to extract this? + array_refs = {k: lrm[k] for k in lrm.keys()} - # handle inconsistency in kerchunk, see GH issue https://github.com/zarr-developers/VirtualiZarr/issues/160 - refs = {"refs": tiff_to_zarr(filepath, **reader_options)} - elif filetype.name.lower() == "fits": - from kerchunk.fits import process_file + full_reference = {"refs": array_refs} - # handle inconsistency in kerchunk, see GH issue https://github.com/zarr-developers/VirtualiZarr/issues/160 - refs = {"refs": process_file(filepath, **reader_options)} - else: - raise NotImplementedError(f"Unsupported file type: {filetype.name}") + vds = dataset_from_kerchunk_refs(KerchunkStoreRefs(full_reference)) - # TODO validate the references that were read before returning? - return refs + # JSON has no magic bytes, but the Kerchunk version 1 spec starts with 'version': + # https://fsspec.github.io/kerchunk/spec.html + elif fs.read_bytes(9).startswith(b'{"version'): + with fs.open_file() as of: + refs = ujson.load(of) + vds = dataset_from_kerchunk_refs(KerchunkStoreRefs(refs)) -def extract_group(vds_refs: KerchunkStoreRefs, group: str | None) -> KerchunkStoreRefs: - """Extract only the part of the kerchunk reference dict that is relevant to a single HDF group""" - hdf_groups = [ - k.removesuffix(".zgroup") for k in vds_refs["refs"].keys() if ".zgroup" in k - ] - if len(hdf_groups) == 1: - return vds_refs - else: - if group is None: + else: raise ValueError( - f"Multiple HDF Groups found. Must specify group= keyword to select one of {hdf_groups}" + "The input Kerchunk reference did not seem to be in Kerchunk's JSON or Parquet spec: https://fsspec.github.io/kerchunk/spec.html. The Kerchunk format autodetection is quite flaky, so if your reference matches the Kerchunk spec feel free to open an issue: https://github.com/zarr-developers/VirtualiZarr/issues" ) - else: - # Ensure supplied group kwarg is consistent with kerchunk keys - if not group.endswith("/"): - group += "/" - if group.startswith("/"): - group = group.removeprefix("/") - - if group not in hdf_groups: - raise ValueError(f'Group "{group}" not found in {hdf_groups}') - - # Filter by group prefix and remove prefix from all keys - groupdict = { - k.removeprefix(group): v - for k, v in vds_refs["refs"].items() - if k.startswith(group) - } - # Also remove group prefix from _ARRAY_DIMENSIONS - for k, v in groupdict.items(): - if isinstance(v, str): - groupdict[k] = v.replace("\\/", "/").replace(group, "") - - vds_refs["refs"] = groupdict - - return KerchunkStoreRefs(vds_refs) - - -def virtual_vars_from_kerchunk_refs( - refs: KerchunkStoreRefs, - drop_variables: list[str] | None = None, - virtual_array_class=ManifestArray, -) -> dict[str, Variable]: - """ - Translate a store-level kerchunk reference dict into aaset of xarray Variables containing virtualized arrays. - - Parameters - ---------- - drop_variables: list[str], default is None - Variables in the file to drop before returning. - virtual_array_class - Virtual array class to use to represent the references to the chunks in each on-disk array. - Currently can only be ManifestArray, but once VirtualZarrArray is implemented the default should be changed to that. - """ - - var_names = find_var_names(refs) - if drop_variables is None: - drop_variables = [] - var_names_to_keep = [ - var_name for var_name in var_names if var_name not in drop_variables - ] - - vars = { - var_name: variable_from_kerchunk_refs(refs, var_name, virtual_array_class) - for var_name in var_names_to_keep - } - return vars - - -def dataset_from_kerchunk_refs( - refs: KerchunkStoreRefs, - drop_variables: list[str] = [], - virtual_array_class: type = ManifestArray, - indexes: MutableMapping[str, Index] | None = None, -) -> Dataset: - """ - Translate a store-level kerchunk reference dict into an xarray Dataset containing virtualized arrays. - - drop_variables: list[str], default is None - Variables in the file to drop before returning. - virtual_array_class - Virtual array class to use to represent the references to the chunks in each on-disk array. - Currently can only be ManifestArray, but once VirtualZarrArray is implemented the default should be changed to that. - """ - - vars = virtual_vars_from_kerchunk_refs(refs, drop_variables, virtual_array_class) - ds_attrs = fully_decode_arr_refs(refs["refs"]).get(".zattrs", {}) - coord_names = ds_attrs.pop("coordinates", []) - - if indexes is None: - indexes = {} - data_vars, coords = separate_coords(vars, indexes, coord_names) - - vds = Dataset( - data_vars, - coords=coords, - # indexes={}, # TODO should be added in a later version of xarray - attrs=ds_attrs, - ) - - return vds - - -def determine_chunk_grid_shape(zarray): - return tuple( - ceildiv(length, chunksize) - for length, chunksize in zip(zarray.shape, zarray.chunks) - ) - - -def variable_from_kerchunk_refs( - refs: KerchunkStoreRefs, var_name: str, virtual_array_class -) -> Variable: - """Create a single xarray Variable by reading specific keys of a kerchunk references dict.""" - - arr_refs = extract_array_refs(refs, var_name) - chunk_dict, zarray, zattrs = parse_array_refs(arr_refs) - # we want to remove the _ARRAY_DIMENSIONS from the final variables' .attrs - dims = zattrs.pop("_ARRAY_DIMENSIONS") - if chunk_dict: - manifest = ChunkManifest._from_kerchunk_chunk_dict(chunk_dict) - varr = virtual_array_class(zarray=zarray, chunkmanifest=manifest) - elif len(zarray.shape) != 0: - # empty variables don't have physical chunks, but zarray shows that the variable - # is at least 1D - shape = determine_chunk_grid_shape(zarray) - manifest = ChunkManifest(entries={}, shape=shape) - varr = virtual_array_class(zarray=zarray, chunkmanifest=manifest) - else: - # This means we encountered a scalar variable of dimension 0, - # very likely that it actually has no numeric value and its only purpose - # is to communicate dataset attributes. - varr = zarray.fill_value - - return Variable(data=varr, dims=dims, attrs=zattrs) - - -def find_var_names(ds_reference_dict: KerchunkStoreRefs) -> list[str]: - """Find the names of zarr variables in this store/group.""" - - refs = ds_reference_dict["refs"] - found_var_names = {key.split("/")[0] for key in refs.keys() if "/" in key} - - return list(found_var_names) - - -def extract_array_refs( - ds_reference_dict: KerchunkStoreRefs, var_name: str -) -> KerchunkArrRefs: - """Extract only the part of the kerchunk reference dict that is relevant to this one zarr array""" - - found_var_names = find_var_names(ds_reference_dict) - - refs = ds_reference_dict["refs"] - if var_name in found_var_names: - # TODO these function probably have more loops in them than they need to... - - arr_refs = { - key.split("/")[1]: refs[key] - for key in refs.keys() - if var_name == key.split("/")[0] - } - - return fully_decode_arr_refs(arr_refs) - - else: - raise KeyError( - f"Could not find zarr array variable name {var_name}, only {found_var_names}" - ) - - -def parse_array_refs( - arr_refs: KerchunkArrRefs, -) -> tuple[dict, ZArray, ZAttrs]: - zarray = ZArray.from_kerchunk_refs(arr_refs.pop(".zarray")) - zattrs = arr_refs.pop(".zattrs", {}) - chunk_dict = arr_refs - - return chunk_dict, zarray, zattrs - - -def fully_decode_arr_refs(d: dict) -> KerchunkArrRefs: - """ - Only have to do this because kerchunk.SingleHdf5ToZarr apparently doesn't bother converting .zarray and .zattrs contents to dicts, see https://github.com/fsspec/kerchunk/issues/415 . - """ - import ujson - - sanitized = d.copy() - for k, v in d.items(): - if k.startswith("."): - # ensure contents of .zattrs and .zarray are python dictionaries - sanitized[k] = ujson.loads(v) - return cast(KerchunkArrRefs, sanitized) + # TODO would be more efficient to drop these before converting them into ManifestArrays, i.e. drop them from the kerchunk refs dict + return vds.drop_vars(drop_variables) diff --git a/virtualizarr/readers/netcdf3.py b/virtualizarr/readers/netcdf3.py new file mode 100644 index 00000000..30c6746e --- /dev/null +++ b/virtualizarr/readers/netcdf3.py @@ -0,0 +1,62 @@ +from typing import Iterable, Mapping, Optional + +from xarray import Dataset +from xarray.core.indexes import Index + +from virtualizarr.readers.common import ( + VirtualBackend, + construct_virtual_dataset, + open_loadable_vars_and_indexes, +) +from virtualizarr.translators.kerchunk import ( + extract_group, + virtual_vars_and_metadata_from_kerchunk_refs, +) +from virtualizarr.utils import check_for_collisions + + +class NetCDF3VirtualBackend(VirtualBackend): + @staticmethod + def open_virtual_dataset( + filepath: str, + group: str | None = None, + drop_variables: Iterable[str] | None = None, + loadable_variables: Iterable[str] | None = None, + decode_times: bool | None = None, + indexes: Mapping[str, Index] | None = None, + reader_options: Optional[dict] = None, + ) -> Dataset: + from kerchunk.netCDF3 import NetCDF3ToZarr + + drop_variables, loadable_variables = check_for_collisions( + drop_variables, + loadable_variables, + ) + + refs = NetCDF3ToZarr(filepath, inline_threshold=0, **reader_options).translate() + + refs = extract_group(refs, group) + + virtual_vars, attrs, coord_names = virtual_vars_and_metadata_from_kerchunk_refs( + refs, + loadable_variables, + drop_variables, + ) + + loadable_vars, indexes = open_loadable_vars_and_indexes( + filepath, + loadable_variables=loadable_variables, + reader_options=reader_options, + drop_variables=drop_variables, + indexes=indexes, + group=group, + decode_times=decode_times, + ) + + return construct_virtual_dataset( + virtual_vars=virtual_vars, + loadable_vars=loadable_vars, + indexes=indexes, + coord_names=coord_names, + attrs=attrs, + ) diff --git a/virtualizarr/readers/tiff.py b/virtualizarr/readers/tiff.py new file mode 100644 index 00000000..bb32e647 --- /dev/null +++ b/virtualizarr/readers/tiff.py @@ -0,0 +1,73 @@ +import warnings +from typing import Iterable, Mapping, Optional + +from xarray import Dataset +from xarray.core.indexes import Index + +from virtualizarr.readers.common import ( + VirtualBackend, + construct_virtual_dataset, + open_loadable_vars_and_indexes, +) +from virtualizarr.translators.kerchunk import ( + extract_group, + virtual_vars_and_metadata_from_kerchunk_refs, +) +from virtualizarr.types.kerchunk import KerchunkStoreRefs +from virtualizarr.utils import check_for_collisions + + +class TIFFVirtualBackend(VirtualBackend): + @staticmethod + def open_virtual_dataset( + filepath: str, + group: str | None = None, + drop_variables: Iterable[str] | None = None, + loadable_variables: Iterable[str] | None = None, + decode_times: bool | None = None, + indexes: Mapping[str, Index] | None = None, + reader_options: Optional[dict] = None, + ) -> Dataset: + from kerchunk.tiff import tiff_to_zarr + + drop_variables, loadable_variables = check_for_collisions( + drop_variables=drop_variables, loadable_variables=loadable_variables + ) + + if reader_options is None: + reader_options = {} + + reader_options.pop("storage_options", {}) + warnings.warn( + "storage_options have been dropped from reader_options as they are not supported by kerchunk.tiff.tiff_to_zarr", + UserWarning, + ) + + # handle inconsistency in kerchunk, see GH issue https://github.com/zarr-developers/VirtualiZarr/issues/160 + refs = KerchunkStoreRefs({"refs": tiff_to_zarr(filepath, **reader_options)}) + + refs = extract_group(refs, group) + + virtual_vars, attrs, coord_names = virtual_vars_and_metadata_from_kerchunk_refs( + refs, + loadable_variables, + drop_variables, + ) + + loadable_vars, indexes = open_loadable_vars_and_indexes( + filepath, + loadable_variables=loadable_variables, + reader_options=reader_options, + drop_variables=drop_variables, + indexes=indexes, + group=group, + decode_times=decode_times, + ) + + return construct_virtual_dataset( + virtual_vars=virtual_vars, + loadable_vars=loadable_vars, + indexes=indexes, + coord_names=coord_names, + attrs=attrs, + ) diff --git a/virtualizarr/readers/zarr.py b/virtualizarr/readers/zarr.py deleted file mode 100644 index 168faa2b..00000000 --- a/virtualizarr/readers/zarr.py +++ /dev/null @@ -1,131 +0,0 @@ -import json -from pathlib import Path -from typing import Mapping - -import numcodecs -import numpy as np -from xarray import Dataset -from xarray.core.indexes import Index -from xarray.core.variable import Variable - -from virtualizarr.backend import separate_coords -from virtualizarr.manifests import ChunkManifest, ManifestArray -from virtualizarr.zarr import ZArray - - -def open_virtual_dataset_from_v3_store( - storepath: str, - drop_variables: list[str] = [], - indexes: Mapping[str, Index] | None = None, -) -> Dataset: - """ - Read a Zarr v3 store and return an xarray Dataset containing virtualized arrays. - """ - _storepath = Path(storepath) - - ds_attrs = attrs_from_zarr_group_json(_storepath / "zarr.json") - coord_names = ds_attrs.pop("coordinates", []) - - # TODO recursive glob to create a datatree - # Note: this .is_file() check should not be necessary according to the pathlib docs, but tests fail on github CI without it - # see https://github.com/TomNicholas/VirtualiZarr/pull/45#discussion_r1547833166 - all_paths = _storepath.glob("*/") - directory_paths = [p for p in all_paths if not p.is_file()] - - vars = {} - for array_dir in directory_paths: - var_name = array_dir.name - if var_name in drop_variables: - break - - zarray, dim_names, attrs = metadata_from_zarr_json(array_dir / "zarr.json") - manifest = ChunkManifest.from_zarr_json(str(array_dir / "manifest.json")) - - marr = ManifestArray(chunkmanifest=manifest, zarray=zarray) - var = Variable(data=marr, dims=dim_names, attrs=attrs) - vars[var_name] = var - - if indexes is None: - raise NotImplementedError() - elif indexes != {}: - # TODO allow manual specification of index objects - raise NotImplementedError() - else: - indexes = dict(**indexes) # for type hinting: to allow mutation - - data_vars, coords = separate_coords(vars, indexes, coord_names) - - ds = Dataset( - data_vars, - coords=coords, - # indexes={}, # TODO should be added in a later version of xarray - attrs=ds_attrs, - ) - - return ds - - -def attrs_from_zarr_group_json(filepath: Path) -> dict: - with open(filepath) as metadata_file: - attrs = json.load(metadata_file) - return attrs["attributes"] - - -def metadata_from_zarr_json(filepath: Path) -> tuple[ZArray, list[str], dict]: - with open(filepath) as metadata_file: - metadata = json.load(metadata_file) - - if { - "name": "chunk-manifest-json", - "configuration": { - "manifest": "./manifest.json", - }, - } not in metadata.get("storage_transformers", []): - raise ValueError( - "Can only read byte ranges from Zarr v3 stores which implement the manifest storage transformer ZEP." - ) - - attrs = metadata.pop("attributes") - dim_names = metadata.pop("dimension_names") - - chunk_shape = tuple(metadata["chunk_grid"]["configuration"]["chunk_shape"]) - shape = tuple(metadata["shape"]) - zarr_format = metadata["zarr_format"] - - if metadata["fill_value"] is None: - raise ValueError( - "fill_value must be specified https://zarr-specs.readthedocs.io/en/latest/v3/core/v3.0.html#fill-value" - ) - else: - fill_value = metadata["fill_value"] - - all_codecs = [ - codec - for codec in metadata["codecs"] - if codec["name"] not in ("transpose", "bytes") - ] - compressor, *filters = [ - _configurable_to_num_codec_config(_filter) for _filter in all_codecs - ] - zarray = ZArray( - chunks=chunk_shape, - compressor=compressor, - dtype=np.dtype(metadata["data_type"]), - fill_value=fill_value, - filters=filters or None, - order="C", - shape=shape, - zarr_format=zarr_format, - ) - - return zarray, dim_names, attrs - - -def _configurable_to_num_codec_config(configurable: dict) -> dict: - """ - Convert a zarr v3 configurable into a numcodecs codec. - """ - configurable_copy = configurable.copy() - codec_id = configurable_copy.pop("name") - configuration = configurable_copy.pop("configuration") - return numcodecs.get_codec({"id": codec_id, **configuration}).get_config() diff --git a/virtualizarr/readers/zarr_v3.py b/virtualizarr/readers/zarr_v3.py new file mode 100644 index 00000000..6da81581 --- /dev/null +++ b/virtualizarr/readers/zarr_v3.py @@ -0,0 +1,154 @@ +import json +from pathlib import Path +from typing import Iterable, Mapping, Optional + +import numcodecs +import numpy as np +from xarray import Dataset +from xarray.core.indexes import Index +from xarray.core.variable import Variable + +from virtualizarr.manifests import ChunkManifest, ManifestArray +from virtualizarr.readers.common import VirtualBackend, separate_coords +from virtualizarr.zarr import ZArray + + +class ZarrV3VirtualBackend(VirtualBackend): + @staticmethod + def open_virtual_dataset( + filepath: str, + group: str | None = None, + drop_variables: Iterable[str] | None = None, + loadable_variables: Iterable[str] | None = None, + decode_times: bool | None = None, + indexes: Mapping[str, Index] | None = None, + reader_options: Optional[dict] = None, + ) -> Dataset: + """ + Read a Zarr v3 store containing chunk manifests and return an xarray Dataset containing virtualized arrays. + + This is experimental - chunk manifests are not part of the Zarr v3 Spec. + """ + storepath = Path(filepath) + + if group: + raise NotImplementedError() + + if loadable_variables or decode_times: + raise NotImplementedError() + + if reader_options: + raise NotImplementedError() + + drop_vars: list[str] + if drop_variables is None: + drop_vars = [] + else: + drop_vars = list(drop_variables) + + ds_attrs = attrs_from_zarr_group_json(storepath / "zarr.json") + coord_names = ds_attrs.pop("coordinates", []) + + # TODO recursive glob to create a datatree + # Note: this .is_file() check should not be necessary according to the pathlib docs, but tests fail on github CI without it + # see https://github.com/TomNicholas/VirtualiZarr/pull/45#discussion_r1547833166 + all_paths = storepath.glob("*/") + directory_paths = [p for p in all_paths if not p.is_file()] + + vars = {} + for array_dir in directory_paths: + var_name = array_dir.name + if var_name in drop_vars: + break + + zarray, dim_names, attrs = metadata_from_zarr_json(array_dir / "zarr.json") + manifest = ChunkManifest.from_zarr_json(str(array_dir / "manifest.json")) + + marr = ManifestArray(chunkmanifest=manifest, zarray=zarray) + var = Variable(data=marr, dims=dim_names, attrs=attrs) + vars[var_name] = var + + if indexes is None: + raise NotImplementedError() + elif indexes != {}: + # TODO allow manual specification of index objects + raise NotImplementedError() + else: + indexes = dict(**indexes) # for type hinting: to allow mutation + + data_vars, coords = separate_coords(vars, indexes, coord_names) + + ds = Dataset( + data_vars, + coords=coords, + # indexes={}, # TODO should be added in a later version of xarray + attrs=ds_attrs, + ) + + return ds + + +def attrs_from_zarr_group_json(filepath: Path) -> dict: + with open(filepath) as metadata_file: + attrs = json.load(metadata_file) + return attrs["attributes"] + + +def metadata_from_zarr_json(filepath: Path) -> tuple[ZArray, list[str], dict]: + with open(filepath) as metadata_file: + metadata = json.load(metadata_file) + + if { + "name": "chunk-manifest-json", + "configuration": { + "manifest": "./manifest.json", + }, + } not in metadata.get("storage_transformers", []): + raise ValueError( + "Can only read byte ranges from Zarr v3 stores which implement the manifest storage transformer ZEP." + ) + + attrs = metadata.pop("attributes") + dim_names = metadata.pop("dimension_names") + + chunk_shape = tuple(metadata["chunk_grid"]["configuration"]["chunk_shape"]) + shape = tuple(metadata["shape"]) + zarr_format = metadata["zarr_format"] + + if metadata["fill_value"] is None: + raise ValueError( + "fill_value must be specified https://zarr-specs.readthedocs.io/en/latest/v3/core/v3.0.html#fill-value" + ) + else: + fill_value = metadata["fill_value"] + + all_codecs = [ + codec + for codec in metadata["codecs"] + if codec["name"] not in ("transpose", "bytes") + ] + compressor, *filters = [ + _configurable_to_num_codec_config(_filter) for _filter in all_codecs + ] + zarray = ZArray( + chunks=chunk_shape, + compressor=compressor, + dtype=np.dtype(metadata["data_type"]), + fill_value=fill_value, + filters=filters or None, + order="C", + shape=shape, + zarr_format=zarr_format, + ) + + return zarray, dim_names, attrs + + +def _configurable_to_num_codec_config(configurable: dict) -> dict: + """ + Convert a zarr v3 configurable into a numcodecs codec. + """ + configurable_copy = configurable.copy() + codec_id = configurable_copy.pop("name") + configuration = configurable_copy.pop("configuration") + return numcodecs.get_codec({"id": codec_id, **configuration}).get_config() diff --git a/virtualizarr/tests/test_backend.py b/virtualizarr/tests/test_backend.py index 81a23e0c..43a6bbd8 100644 --- a/virtualizarr/tests/test_backend.py +++ b/virtualizarr/tests/test_backend.py @@ -9,9 +9,8 @@ from xarray.core.indexes import Index from virtualizarr import open_virtual_dataset -from virtualizarr.backend import FileType +from virtualizarr.backend import FileType, automatically_determine_filetype from virtualizarr.manifests import ManifestArray -from virtualizarr.readers.kerchunk import _automatically_determine_filetype from virtualizarr.tests import ( has_astropy, has_tifffile, @@ -34,10 +33,10 @@ def test_automatically_determine_filetype_netcdf3_netcdf4(): ds.to_netcdf(netcdf3_file_path, engine="scipy", format="NETCDF3_CLASSIC") ds.to_netcdf(netcdf4_file_path, engine="h5netcdf") - assert FileType("netcdf3") == _automatically_determine_filetype( + assert FileType("netcdf3") == automatically_determine_filetype( filepath=netcdf3_file_path ) - assert FileType("hdf5") == _automatically_determine_filetype( + assert FileType("hdf5") == automatically_determine_filetype( filepath=netcdf4_file_path ) @@ -56,7 +55,7 @@ def test_valid_filetype_bytes(tmp_path, filetype, headerbytes): filepath = tmp_path / "file.abc" with open(filepath, "wb") as f: f.write(headerbytes) - assert FileType(filetype) == _automatically_determine_filetype(filepath=filepath) + assert FileType(filetype) == automatically_determine_filetype(filepath=filepath) def test_notimplemented_filetype(tmp_path): @@ -65,7 +64,7 @@ def test_notimplemented_filetype(tmp_path): with open(filepath, "wb") as f: f.write(headerbytes) with pytest.raises(NotImplementedError): - _automatically_determine_filetype(filepath=filepath) + automatically_determine_filetype(filepath=filepath) def test_FileType(): @@ -326,7 +325,8 @@ def test_group_kwarg(self, hdf5_groups_file): if name in vars_to_load: xrt.assert_identical(vds.variables[name], full_ds.variables[name]) - @patch("virtualizarr.readers.kerchunk.read_kerchunk_references_from_file") + @pytest.mark.xfail(reason="patches a function which no longer exists") + @patch("virtualizarr.translators.kerchunk.read_kerchunk_references_from_file") def test_open_virtual_dataset_passes_expected_args( self, mock_read_kerchunk, netcdf4_file ): diff --git a/virtualizarr/tests/test_integration.py b/virtualizarr/tests/test_integration.py index 434d12d7..c9e3e302 100644 --- a/virtualizarr/tests/test_integration.py +++ b/virtualizarr/tests/test_integration.py @@ -5,11 +5,11 @@ from virtualizarr import open_virtual_dataset from virtualizarr.manifests import ChunkManifest, ManifestArray -from virtualizarr.readers.kerchunk import ( +from virtualizarr.tests import requires_kerchunk +from virtualizarr.translators.kerchunk import ( dataset_from_kerchunk_refs, find_var_names, ) -from virtualizarr.tests import requires_kerchunk from virtualizarr.zarr import ZArray diff --git a/virtualizarr/tests/test_writers/test_zarr.py b/virtualizarr/tests/test_writers/test_zarr.py index 278b2d78..67af401a 100644 --- a/virtualizarr/tests/test_writers/test_zarr.py +++ b/virtualizarr/tests/test_writers/test_zarr.py @@ -8,7 +8,7 @@ from virtualizarr import ManifestArray, open_virtual_dataset from virtualizarr.backend import FileType from virtualizarr.manifests.manifest import ChunkManifest -from virtualizarr.readers.zarr import metadata_from_zarr_json +from virtualizarr.readers.zarr_v3 import metadata_from_zarr_json from virtualizarr.writers.zarr import dataset_to_zarr diff --git a/virtualizarr/translators/__init__.py b/virtualizarr/translators/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/virtualizarr/translators/kerchunk.py b/virtualizarr/translators/kerchunk.py new file mode 100644 index 00000000..f2d2f5df --- /dev/null +++ b/virtualizarr/translators/kerchunk.py @@ -0,0 +1,223 @@ +from typing import Any, Mapping, MutableMapping, cast + +from xarray import Dataset +from xarray.core.indexes import Index +from xarray.core.variable import Variable + +from virtualizarr.manifests import ChunkManifest, ManifestArray +from virtualizarr.readers.common import separate_coords +from virtualizarr.types.kerchunk import ( + KerchunkArrRefs, + KerchunkStoreRefs, +) +from virtualizarr.zarr import ZArray, ZAttrs, determine_chunk_grid_shape + + +def virtual_vars_and_metadata_from_kerchunk_refs( + vds_refs: KerchunkStoreRefs, + loadable_variables, + drop_variables, + virtual_array_class=ManifestArray, +) -> tuple[Mapping[str, Variable], dict[str, Any], list[str]]: + """ + Parses all useful information from a set kerchunk references (for a single group). + """ + + virtual_vars = virtual_vars_from_kerchunk_refs( + vds_refs, + drop_variables=drop_variables + loadable_variables, + virtual_array_class=virtual_array_class, + ) + ds_attrs = fully_decode_arr_refs(vds_refs["refs"]).get(".zattrs", {}) + coord_names = ds_attrs.pop("coordinates", []) + + return virtual_vars, ds_attrs, coord_names + + +def extract_group(vds_refs: KerchunkStoreRefs, group: str | None) -> KerchunkStoreRefs: + """Extract only the part of the kerchunk reference dict that is relevant to a single HDF group""" + hdf_groups = [ + k.removesuffix(".zgroup") for k in vds_refs["refs"].keys() if ".zgroup" in k + ] + if len(hdf_groups) == 1: + return vds_refs + else: + if group is None: + raise ValueError( + f"Multiple HDF Groups found. Must specify group= keyword to select one of {hdf_groups}" + ) + else: + # Ensure supplied group kwarg is consistent with kerchunk keys + if not group.endswith("/"): + group += "/" + if group.startswith("/"): + group = group.removeprefix("/") + + if group not in hdf_groups: + raise ValueError(f'Group "{group}" not found in {hdf_groups}') + + # Filter by group prefix and remove prefix from all keys + groupdict = { + k.removeprefix(group): v + for k, v in vds_refs["refs"].items() + if k.startswith(group) + } + # Also remove group prefix from _ARRAY_DIMENSIONS + for k, v in groupdict.items(): + if isinstance(v, str): + groupdict[k] = v.replace("\\/", "/").replace(group, "") + + vds_refs["refs"] = groupdict + + return KerchunkStoreRefs(vds_refs) + + +def virtual_vars_from_kerchunk_refs( + refs: KerchunkStoreRefs, + drop_variables: list[str] | None = None, + virtual_array_class=ManifestArray, +) -> dict[str, Variable]: + """ + Translate a store-level kerchunk reference dict into aaset of xarray Variables containing virtualized arrays. + + Parameters + ---------- + drop_variables: list[str], default is None + Variables in the file to drop before returning. + virtual_array_class + Virtual array class to use to represent the references to the chunks in each on-disk array. + Currently can only be ManifestArray, but once VirtualZarrArray is implemented the default should be changed to that. + """ + + var_names = find_var_names(refs) + if drop_variables is None: + drop_variables = [] + var_names_to_keep = [ + var_name for var_name in var_names if var_name not in drop_variables + ] + + vars = { + var_name: variable_from_kerchunk_refs(refs, var_name, virtual_array_class) + for var_name in var_names_to_keep + } + return vars + + +def dataset_from_kerchunk_refs( + refs: KerchunkStoreRefs, + drop_variables: list[str] = [], + virtual_array_class: type = ManifestArray, + indexes: MutableMapping[str, Index] | None = None, +) -> Dataset: + """ + Translate a store-level kerchunk reference dict into an xarray Dataset containing virtualized arrays. + + drop_variables: list[str], default is None + Variables in the file to drop before returning. + virtual_array_class + Virtual array class to use to represent the references to the chunks in each on-disk array. + Currently can only be ManifestArray, but once VirtualZarrArray is implemented the default should be changed to that. + """ + + vars = virtual_vars_from_kerchunk_refs(refs, drop_variables, virtual_array_class) + ds_attrs = fully_decode_arr_refs(refs["refs"]).get(".zattrs", {}) + coord_names = ds_attrs.pop("coordinates", []) + + if indexes is None: + indexes = {} + data_vars, coords = separate_coords(vars, indexes, coord_names) + + vds = Dataset( + data_vars, + coords=coords, + # indexes={}, # TODO should be added in a later version of xarray + attrs=ds_attrs, + ) + + return vds + + +def variable_from_kerchunk_refs( + refs: KerchunkStoreRefs, var_name: str, virtual_array_class +) -> Variable: + """Create a single xarray Variable by reading specific keys of a kerchunk references dict.""" + + arr_refs = extract_array_refs(refs, var_name) + chunk_dict, zarray, zattrs = parse_array_refs(arr_refs) + # we want to remove the _ARRAY_DIMENSIONS from the final variables' .attrs + dims = zattrs.pop("_ARRAY_DIMENSIONS") + if chunk_dict: + manifest = ChunkManifest._from_kerchunk_chunk_dict(chunk_dict) + varr = virtual_array_class(zarray=zarray, chunkmanifest=manifest) + elif len(zarray.shape) != 0: + # empty variables don't have physical chunks, but zarray shows that the variable + # is at least 1D + shape = determine_chunk_grid_shape(zarray.shape, zarray.chunks) + manifest = ChunkManifest(entries={}, shape=shape) + varr = virtual_array_class(zarray=zarray, chunkmanifest=manifest) + else: + # This means we encountered a scalar variable of dimension 0, + # very likely that it actually has no numeric value and its only purpose + # is to communicate dataset attributes. + varr = zarray.fill_value + + return Variable(data=varr, dims=dims, attrs=zattrs) + + +def find_var_names(ds_reference_dict: KerchunkStoreRefs) -> list[str]: + """Find the names of zarr variables in this store/group.""" + + refs = ds_reference_dict["refs"] + found_var_names = {key.split("/")[0] for key in refs.keys() if "/" in key} + + return list(found_var_names) + + +def extract_array_refs( + ds_reference_dict: KerchunkStoreRefs, var_name: str +) -> KerchunkArrRefs: + """Extract only the part of the kerchunk reference dict that is relevant to this one zarr array""" + + found_var_names = find_var_names(ds_reference_dict) + + refs = ds_reference_dict["refs"] + if var_name in found_var_names: + # TODO these function probably have more loops in them than they need to... + + arr_refs = { + key.split("/")[1]: refs[key] + for key in refs.keys() + if var_name == key.split("/")[0] + } + + return fully_decode_arr_refs(arr_refs) + + else: + raise KeyError( + f"Could not find zarr array variable name {var_name}, only {found_var_names}" + ) + + +def parse_array_refs( + arr_refs: KerchunkArrRefs, +) -> tuple[dict, ZArray, ZAttrs]: + zarray = ZArray.from_kerchunk_refs(arr_refs.pop(".zarray")) + zattrs = arr_refs.pop(".zattrs", {}) + chunk_dict = arr_refs + + return chunk_dict, zarray, zattrs + + +def fully_decode_arr_refs(d: dict) -> KerchunkArrRefs: + """ + Only have to do this because kerchunk.SingleHdf5ToZarr apparently doesn't bother converting .zarray and .zattrs contents to dicts, see https://github.com/fsspec/kerchunk/issues/415 . + """ + import ujson + + sanitized = d.copy() + for k, v in d.items(): + if k.startswith("."): + # ensure contents of .zattrs and .zarray are python dictionaries + sanitized[k] = ujson.loads(v) + + return cast(KerchunkArrRefs, sanitized) diff --git a/virtualizarr/utils.py b/virtualizarr/utils.py index 1721a3e7..c9260aa6 100644 --- a/virtualizarr/utils.py +++ b/virtualizarr/utils.py @@ -1,7 +1,7 @@ from __future__ import annotations import io -from typing import TYPE_CHECKING, Optional, Union +from typing import TYPE_CHECKING, Iterable, Optional, Union if TYPE_CHECKING: import fsspec.core @@ -61,3 +61,28 @@ def __post_init__(self) -> None: storage_options = self.reader_options.get("storage_options", {}) # type: ignore self.fs = fsspec.filesystem(protocol, **storage_options) + + +def check_for_collisions( + drop_variables: Iterable[str] | None, + loadable_variables: Iterable[str] | None, +) -> tuple[list[str], list[str]]: + if drop_variables is None: + drop_variables = [] + elif isinstance(drop_variables, str): + drop_variables = [drop_variables] + else: + drop_variables = list(drop_variables) + + if loadable_variables is None: + loadable_variables = [] + elif isinstance(loadable_variables, str): + loadable_variables = [loadable_variables] + else: + loadable_variables = list(loadable_variables) + + common = set(drop_variables).intersection(set(loadable_variables)) + if common: + raise ValueError(f"Cannot both load and drop variables {common}") + + return drop_variables, loadable_variables diff --git a/virtualizarr/zarr.py b/virtualizarr/zarr.py index cd83a67d..4b3fdd53 100644 --- a/virtualizarr/zarr.py +++ b/virtualizarr/zarr.py @@ -210,6 +210,12 @@ def ceildiv(a: int, b: int) -> int: return -(a // -b) +def determine_chunk_grid_shape( + shape: tuple[int, ...], chunks: tuple[int, ...] +) -> tuple[int, ...]: + return tuple(ceildiv(length, chunksize) for length, chunksize in zip(shape, chunks)) + + def _num_codec_config_to_configurable(num_codec: dict) -> dict: """ Convert a numcodecs codec into a zarr v3 configurable.