Skip to content

Commit

Permalink
refactor runtime_configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
normanrz committed Jul 18, 2023
1 parent fce9c8b commit cbd5218
Show file tree
Hide file tree
Showing 14 changed files with 337 additions and 233 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/python-app.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
poetry install
- name: Type check with mypy
run: |
poetry run python -m mypy .
poetry run mypy .
- name: Lint with ruff
run: |
poetry run ruff check .
Expand Down
1 change: 1 addition & 0 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
- [ ] coalesce byte ranges in store
- [x] valuehandle bytes
- [ ] import asyncio loop
- [x] index_codecs

## Non-priority

Expand Down
14 changes: 11 additions & 3 deletions check.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
#!/usr/bin/env bash
set -eEuo pipefail

poetry run black .
poetry run isort .
poetry run python -m mypy -p zarrita
if [ "$#" = 1 ] && [ "$1" = "--fix" ]; then
poetry run black .
poetry run isort .
poetry run ruff check . --fix
poetry run mypy .
else
poetry run black --check .
poetry run isort --check .
poetry run ruff check .
poetry run mypy .
fi
3 changes: 1 addition & 2 deletions tests/test_perf.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@
from numcodecs import Blosc
from pytest import fixture

from zarrita import Array, LocalStore, Store, codecs
from zarrita.array import runtime_configuration
from zarrita import Array, LocalStore, Store, codecs, runtime_configuration

TEST_SIZE = int(os.environ.get("TEST_SIZE", "1024"))
CHUNK_SIZE = 32
Expand Down
24 changes: 24 additions & 0 deletions tests/test_v3.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,30 @@ def test_sharding_partial_overwrite(store: Store, l4_sample_data: np.ndarray):
assert np.array_equal(data, read_data)


def test_nested_sharding(store: Store, l4_sample_data: np.ndarray):
data = l4_sample_data

a = Array.create(
store / "l4_sample" / "color" / "1",
shape=data.shape,
chunk_shape=(64, 64, 64),
dtype=data.dtype,
fill_value=0,
codecs=[
codecs.sharding_codec(
(32, 32, 32),
[codecs.sharding_codec((16, 16, 16))],
)
],
)

a[:, :, :] = data

read_data = a[0 : data.shape[0], 0 : data.shape[1], 0 : data.shape[2]]
assert data.shape == read_data.shape
assert np.array_equal(data, read_data)


@pytest.mark.parametrize("input_order", ["F", "C"])
@pytest.mark.parametrize("store_order", ["F", "C"])
@pytest.mark.parametrize("runtime_write_order", ["F", "C"])
Expand Down
17 changes: 7 additions & 10 deletions zarrita/__init__.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
from __future__ import annotations

from typing import Optional, Union
from typing import Union

import zarrita.codecs # noqa: F401
from zarrita.array import ( # noqa: F401
Array,
ArrayRuntimeConfiguration,
runtime_configuration,
)
from zarrita.array import Array # noqa: F401
from zarrita.array_v2 import ArrayV2 # noqa: F401
from zarrita.group import Group # noqa: F401
from zarrita.group_v2 import GroupV2 # noqa: F401
from zarrita.metadata import RuntimeConfiguration, runtime_configuration # noqa: F401
from zarrita.store import ( # noqa: F401
LocalStore,
RemoteStore,
Expand All @@ -24,22 +21,22 @@

async def open_auto_async(
store: StoreLike,
runtime_configuration_: Optional[ArrayRuntimeConfiguration] = None,
runtime_configuration_: RuntimeConfiguration = RuntimeConfiguration(),
) -> Union[Array, ArrayV2, Group, GroupV2]:
store_path = make_store_path(store)
try:
return await Group.open_or_array(
store_path, runtime_configuration=runtime_configuration_
)
except KeyError:
return await GroupV2.open_or_array(store_path)
return await GroupV2.open_or_array(store_path, runtime_configuration_)


def open_auto(
store: StoreLike,
runtime_configuration_: Optional[ArrayRuntimeConfiguration] = None,
runtime_configuration_: RuntimeConfiguration = RuntimeConfiguration(),
) -> Union[Array, ArrayV2, Group, GroupV2]:
return _sync(
open_auto_async(store, runtime_configuration_),
runtime_configuration_.asyncio_loop if runtime_configuration_ else None,
runtime_configuration_.asyncio_loop,
)
83 changes: 23 additions & 60 deletions zarrita/array.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from __future__ import annotations

import json
from asyncio import AbstractEventLoop
from typing import Any, Dict, Iterable, Literal, Optional, Tuple, Union

import attr
Expand All @@ -12,7 +11,6 @@
from zarrita.codecs import CodecMetadata, CodecPipeline, endian_codec
from zarrita.common import (
ZARR_JSON,
BytesLike,
ChunkCoords,
Selection,
SliceSelection,
Expand All @@ -27,6 +25,7 @@
DefaultChunkKeyEncodingMetadata,
RegularChunkGridConfigurationMetadata,
RegularChunkGridMetadata,
RuntimeConfiguration,
V2ChunkKeyEncodingConfigurationMetadata,
V2ChunkKeyEncodingMetadata,
dtype_to_data_type,
Expand All @@ -36,19 +35,6 @@
from zarrita.sync import sync


@frozen
class ArrayRuntimeConfiguration:
order: Literal["C", "F"] = "C"
concurrency: Optional[int] = None
asyncio_loop: Optional[AbstractEventLoop] = None


def runtime_configuration(
order: Literal["C", "F"], concurrency: Optional[int] = None
) -> ArrayRuntimeConfiguration:
return ArrayRuntimeConfiguration(order=order, concurrency=concurrency)


@frozen
class _AsyncArrayProxy:
array: Array
Expand Down Expand Up @@ -79,7 +65,7 @@ def _json_convert(o):
class Array:
metadata: ArrayMetadata
store_path: StorePath
runtime_configuration: ArrayRuntimeConfiguration
runtime_configuration: RuntimeConfiguration
codec_pipeline: CodecPipeline

@classmethod
Expand All @@ -98,7 +84,7 @@ async def create_async(
codecs: Optional[Iterable[CodecMetadata]] = None,
dimension_names: Optional[Iterable[str]] = None,
attributes: Optional[Dict[str, Any]] = None,
runtime_configuration: Optional[ArrayRuntimeConfiguration] = None,
runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(),
exists_ok: bool = False,
) -> Array:
store_path = make_store_path(store)
Expand Down Expand Up @@ -139,7 +125,7 @@ async def create_async(
dimension_names=tuple(dimension_names) if dimension_names else None,
attributes=attributes or {},
)
runtime_configuration = runtime_configuration or ArrayRuntimeConfiguration()
runtime_configuration = runtime_configuration or RuntimeConfiguration()

array = cls(
metadata=metadata,
Expand Down Expand Up @@ -169,7 +155,7 @@ def create(
codecs: Optional[Iterable[CodecMetadata]] = None,
dimension_names: Optional[Iterable[str]] = None,
attributes: Optional[Dict[str, Any]] = None,
runtime_configuration: Optional[ArrayRuntimeConfiguration] = None,
runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(),
exists_ok: bool = False,
) -> Array:
return sync(
Expand All @@ -186,41 +172,41 @@ def create(
runtime_configuration=runtime_configuration,
exists_ok=exists_ok,
),
runtime_configuration.asyncio_loop if runtime_configuration else None,
runtime_configuration.asyncio_loop,
)

@classmethod
async def open_async(
cls,
store: StoreLike,
runtime_configuration: Optional[ArrayRuntimeConfiguration] = None,
runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(),
) -> Array:
store_path = make_store_path(store)
zarr_json_bytes = await (store_path / ZARR_JSON).get_async()
assert zarr_json_bytes is not None
return cls.from_json(
store_path,
json.loads(zarr_json_bytes),
runtime_configuration=runtime_configuration or ArrayRuntimeConfiguration(),
runtime_configuration=runtime_configuration or RuntimeConfiguration(),
)

@classmethod
def open(
cls,
store: StoreLike,
runtime_configuration: Optional[ArrayRuntimeConfiguration] = None,
runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(),
) -> Array:
return sync(
cls.open_async(store, runtime_configuration=runtime_configuration),
runtime_configuration.asyncio_loop if runtime_configuration else None,
runtime_configuration.asyncio_loop,
)

@classmethod
def from_json(
cls,
store_path: StorePath,
zarr_json: Any,
runtime_configuration: ArrayRuntimeConfiguration,
runtime_configuration: RuntimeConfiguration,
) -> Array:
metadata = make_cattr().structure(zarr_json, ArrayMetadata)
out = cls(
Expand All @@ -238,28 +224,27 @@ def from_json(
async def open_auto_async(
cls,
store: StoreLike,
runtime_configuration: Optional[ArrayRuntimeConfiguration] = None,
runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(),
) -> Union[Array, ArrayV2]:
store_path = make_store_path(store)
v3_metadata_bytes = await (store_path / ZARR_JSON).get_async()
if v3_metadata_bytes is not None:
return cls.from_json(
store_path,
json.loads(v3_metadata_bytes),
runtime_configuration=runtime_configuration
or ArrayRuntimeConfiguration(),
runtime_configuration=runtime_configuration or RuntimeConfiguration(),
)
return await ArrayV2.open_async(store_path)

@classmethod
def open_auto(
cls,
store: StoreLike,
runtime_configuration: Optional[ArrayRuntimeConfiguration] = None,
runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(),
) -> Union[Array, ArrayV2]:
return sync(
cls.open_auto_async(store, runtime_configuration),
runtime_configuration.asyncio_loop if runtime_configuration else None,
runtime_configuration.asyncio_loop,
)

async def _save_metadata(self) -> None:
Expand Down Expand Up @@ -348,34 +333,14 @@ async def _read_chunk(
else:
out[out_selection] = self.metadata.fill_value
else:
chunk_array = await self._decode_chunk(store_path, chunk_selection)
if chunk_array is not None:
chunk_bytes = await store_path.get_async()
if chunk_bytes is not None:
chunk_array = await self.codec_pipeline.decode(chunk_bytes)
tmp = chunk_array[chunk_selection]
out[out_selection] = tmp
else:
out[out_selection] = self.metadata.fill_value

async def _decode_chunk(
self, store_path: StorePath, selection: SliceSelection
) -> Optional[np.ndarray]:
chunk_bytes = await store_path.get_async()
if chunk_bytes is None:
return None

chunk_array = await self.codec_pipeline.decode(chunk_bytes)

# ensure correct dtype
if chunk_array.dtype.name != self.metadata.data_type.name:
chunk_array = chunk_array.view(self.metadata.dtype)

# ensure correct chunk shape
if chunk_array.shape != self.metadata.chunk_grid.configuration.chunk_shape:
chunk_array = chunk_array.reshape(
self.metadata.chunk_grid.configuration.chunk_shape,
)

return chunk_array

def __setitem__(self, selection: Selection, value: np.ndarray) -> None:
sync(self._set_async(selection, value), self.runtime_configuration.asyncio_loop)

Expand Down Expand Up @@ -453,28 +418,26 @@ async def _write_chunk(
else:
# writing partial chunks
# read chunk first
tmp = await self._decode_chunk(
store_path,
tuple(slice(0, c) for c in chunk_shape),
)
chunk_bytes = await store_path.get_async()

# merge new value
if tmp is None:
if chunk_bytes is None:
chunk_array = np.empty(
chunk_shape,
dtype=self.metadata.dtype,
)
chunk_array.fill(self.metadata.fill_value)
else:
chunk_array = tmp.copy() # make a writable copy
chunk_array = (
await self.codec_pipeline.decode(chunk_bytes)
).copy() # make a writable copy
chunk_array[chunk_selection] = value[out_selection]

await self._write_chunk_to_store(store_path, chunk_array)

async def _write_chunk_to_store(
self, store_path: StorePath, chunk_array: np.ndarray
):
chunk_bytes: Optional[BytesLike]
if np.all(chunk_array == self.metadata.fill_value):
# chunks that only contain fill_value will be removed
await store_path.delete_async()
Expand Down
Loading

0 comments on commit cbd5218

Please sign in to comment.