From f3f43d39e192b803f8f34d706fb9b0466ba9fa88 Mon Sep 17 00:00:00 2001 From: Philip Chmielowiec Date: Sun, 29 Sep 2024 18:16:48 -0500 Subject: [PATCH 1/6] create partition variables, update topo agg --- uxarray/core/aggregation.py | 129 ++++++++++++++---------------------- uxarray/grid/grid.py | 15 +++++ uxarray/grid/partition.py | 122 ++++++++++++++++++++++++++++++++++ 3 files changed, 186 insertions(+), 80 deletions(-) create mode 100644 uxarray/grid/partition.py diff --git a/uxarray/core/aggregation.py b/uxarray/core/aggregation.py index 23aaa6ba0..7349dd8c8 100644 --- a/uxarray/core/aggregation.py +++ b/uxarray/core/aggregation.py @@ -1,23 +1,16 @@ import numpy as np +import dask import dask.array as da -from uxarray.grid.connectivity import get_face_node_partitions import uxarray.core.dataarray -NUMPY_AGGREGATIONS = { - "mean": np.mean, - "max": np.max, - "min": np.min, - "prod": np.prod, - "sum": np.sum, - "std": np.std, - "var": np.var, - "median": np.median, - "all": np.all, - "any": np.any, -} +def result_array(arr): + if isinstance(arr, np.ndarray): + return np.empty + if isinstance(arr, dask.array.core.Array): + return da.empty def _uxda_grid_aggregate(uxda, destination, aggregation, **kwargs): @@ -79,18 +72,10 @@ def _node_to_face_aggregation(uxda, aggregation, aggregation_func_kwargs): f"{uxda.uxgrid.n_face}." ) - if isinstance(uxda.data, np.ndarray): - # apply aggregation using numpy - aggregated_var = _apply_node_to_face_aggregation_numpy( - uxda, NUMPY_AGGREGATIONS[aggregation], aggregation_func_kwargs - ) - elif isinstance(uxda.data, da.Array): - # apply aggregation on dask array, TODO: - aggregated_var = _apply_node_to_face_aggregation_numpy( - uxda, NUMPY_AGGREGATIONS[aggregation], aggregation_func_kwargs - ) - else: - raise ValueError + # TODO: + aggregated_var = _apply_node_to_face_aggregation( + uxda, aggregation, aggregation_func_kwargs + ) return uxarray.core.dataarray.UxDataArray( uxgrid=uxda.uxgrid, @@ -100,41 +85,37 @@ def _node_to_face_aggregation(uxda, aggregation, aggregation_func_kwargs): ).rename({"n_node": "n_face"}) -def _apply_node_to_face_aggregation_numpy( - uxda, aggregation_func, aggregation_func_kwargs +def _apply_node_to_face_aggregation( + uxda, aggregation_func, aggregation_func_kwargs, result_array_kwargs=None ): - """Applies a Node to Face Topological aggregation on a Numpy array.""" - data = uxda.values - face_node_conn = uxda.uxgrid.face_node_connectivity.values - n_nodes_per_face = uxda.uxgrid.n_nodes_per_face.values - - ( - change_ind, - n_nodes_per_face_sorted_ind, - element_sizes, - size_counts, - ) = get_face_node_partitions(n_nodes_per_face) - - result = np.empty(shape=(data.shape[:-1]) + (uxda.uxgrid.n_face,)) - - for e, start, end in zip(element_sizes, change_ind[:-1], change_ind[1:]): - face_inds = n_nodes_per_face_sorted_ind[start:end] - face_nodes_par = face_node_conn[face_inds, 0:e] - - # apply aggregation function to current face node partition - aggregation_par = aggregation_func( - data[..., face_nodes_par], axis=-1, **aggregation_func_kwargs - ) + """TODO:""" - # store current aggregation - result[..., face_inds] = aggregation_par + # shape [..., n_face] since data is being aggregated onto the faces + result = result_array(uxda.data)( + shape=(uxda.data.shape[:-1]) + (uxda.uxgrid.n_face,) + ) - return result + for ( + cur_face_node_partition, + cur_original_face_indices, + ) in uxda.uxgrid.partitioned_face_node_connectivity: + # index array using flattened connectivity (to avoid Dask errors) + data_flat = uxda.data[..., cur_face_node_partition.flatten()] + + # reshape index data back to desired shape [..., n_face_geom, geom_size] + data_reshaped = data_flat.reshape( + (uxda.data.shape[:-1]) + cur_face_node_partition.shape + ) + + # apply aggregation on current partition of elements + aggregation_par = getattr(data_reshaped, aggregation_func)( + axis=-1, **aggregation_func_kwargs + ) + # store computed aggregation using original face indices + result[..., cur_original_face_indices] = aggregation_par -def _apply_node_to_face_aggregation_dask(*args, **kwargs): - """Applies a Node to Face Topological aggregation on a Dask array.""" - pass + return result def _node_to_edge_aggregation(uxda, aggregation, aggregation_func_kwargs): @@ -145,39 +126,27 @@ def _node_to_edge_aggregation(uxda, aggregation, aggregation_func_kwargs): f"{uxda.uxgrid.n_face}." ) - if isinstance(uxda.data, np.ndarray): - # apply aggregation using numpy - aggregation_var = _apply_node_to_edge_aggregation_numpy( - uxda, NUMPY_AGGREGATIONS[aggregation], aggregation_func_kwargs - ) - elif isinstance(uxda.data, da.Array): - # apply aggregation on dask array, TODO: - aggregation_var = _apply_node_to_edge_aggregation_numpy( - uxda, NUMPY_AGGREGATIONS[aggregation], aggregation_func_kwargs - ) - else: - raise ValueError + # TODO: + aggregated_var = _apply_node_to_edge_aggregation_( + uxda, aggregation, aggregation_func_kwargs + ) return uxarray.core.dataarray.UxDataArray( uxgrid=uxda.uxgrid, - data=aggregation_var, + data=aggregated_var, dims=uxda.dims, name=uxda.name, ).rename({"n_node": "n_edge"}) -def _apply_node_to_edge_aggregation_numpy( - uxda, aggregation_func, aggregation_func_kwargs +def _apply_node_to_edge_aggregation_( + uxda, aggregation_func, aggregation_func_kwargs, result_array_kwargs=None ): - """Applies a Node to Edge topological aggregation on a numpy array.""" - data = uxda.values - edge_node_conn = uxda.uxgrid.edge_node_connectivity.values - result = aggregation_func( - data[..., edge_node_conn], axis=-1, **aggregation_func_kwargs + """TODO:""" + + data_flat = uxda.data[..., uxda.uxgrid.edge_node_connectivity.flatten()] + data_reshaped = data_flat.reshape((uxda.data.shape[:-1]) + (uxda.uxgrid.n_edge, 2)) + result = getattr(data_reshaped, aggregation_func)( + axis=-1, **aggregation_func_kwargs ) return result - - -def _apply_node_to_edge_aggregation_dask(*args, **kwargs): - """Applies a Node to Edge topological aggregation on a dask array.""" - pass diff --git a/uxarray/grid/grid.py b/uxarray/grid/grid.py index 64d6ee653..0cae3d715 100644 --- a/uxarray/grid/grid.py +++ b/uxarray/grid/grid.py @@ -76,6 +76,8 @@ _check_normalization, ) +from uxarray.grid.partition import _build_partitioned_face_node_connectivity + from xarray.core.utils import UncachedAccessor from warnings import warn @@ -158,6 +160,9 @@ def __init__( # internal xarray dataset for storing grid variables self._ds = grid_ds + # TODO: + self._ds_partitioned = xr.Dataset() + # initialize attributes self._antimeridian_face_indices = None self._ds.assign_attrs({"source_grid_spec": self.source_grid_spec}) @@ -1151,6 +1156,16 @@ def hole_edge_indices(self): ) return self._ds["hole_edge_indices"] + # TODO: =================== + @property + def partitioned_face_node_connectivity(self): + if not hasattr(self, "_partitioned_face_node_connectivity"): + self._partitioned_face_node_connectivity = ( + _build_partitioned_face_node_connectivity(self) + ) + + return self._partitioned_face_node_connectivity + def chunk(self, n_node="auto", n_edge="auto", n_face="auto"): """Converts all arrays to dask arrays with given chunks across grid dimensions in-place. diff --git a/uxarray/grid/partition.py b/uxarray/grid/partition.py new file mode 100644 index 000000000..cb0c5da47 --- /dev/null +++ b/uxarray/grid/partition.py @@ -0,0 +1,122 @@ +import numpy as np + + +def get_face_partitions(n_nodes_per_face): + # sort number of nodes per face in ascending order + n_nodes_per_face_sorted_indices = np.argsort(n_nodes_per_face) + + # unique element sizes and their respective counts + element_sizes, size_counts = np.unique(n_nodes_per_face, return_counts=True) + element_sizes_sorted_ind = np.argsort(element_sizes) + + # sort elements by their size + element_sizes = element_sizes[element_sizes_sorted_ind] + size_counts = size_counts[element_sizes_sorted_ind] + + # find the index at the point where the geometry changes from one shape to another + change_ind = np.cumsum(size_counts) + change_ind = np.concatenate((np.array([0]), change_ind)) + + return change_ind, n_nodes_per_face_sorted_indices, element_sizes, size_counts + + +def initialize_face_partition_variables(uxgrid): + # number of nodes and edges for a face are equal + + ( + face_geometry_inflections, + n_nodes_per_face_sorted_indices, + face_geometries, + face_geometries_counts, + ) = get_face_partitions(uxgrid.n_nodes_per_face.values) + + uxgrid._face_geometry_inflections = face_geometry_inflections + uxgrid._n_nodes_per_face_sorted_indices = n_nodes_per_face_sorted_indices + uxgrid._face_geometries = face_geometries + uxgrid._face_geometries_counts = face_geometries_counts + + +def _build_partitioned_face_node_connectivity(uxgrid): + if not hasattr(uxgrid, "_face_geometry_inflections"): + # TODO: + initialize_face_partition_variables(uxgrid) + + face_node_connectivity = uxgrid.face_node_connectivity.values + face_geometry_inflections = uxgrid._face_geometry_inflections + n_nodes_per_face_sorted_indices = uxgrid._n_nodes_per_face_sorted_indices + face_geometries = uxgrid._face_geometries + face_geometries_counts = uxgrid._face_geometries + + # TODO: + partitioned_face_node_connectivity = PartitionedFaceNodeConnectivity( + face_geometries, + connectivity_name="face_node_connectivity", + indices_name="original_face_indices", + ) + + for e, e_count, start, end in zip( + face_geometries, + face_geometries_counts, + face_geometry_inflections[:-1], + face_geometry_inflections[1:], + ): + original_face_indices = n_nodes_per_face_sorted_indices[start:end] + face_nodes_par = face_node_connectivity[original_face_indices, 0:e] + + # TODO + setattr( + partitioned_face_node_connectivity, + f"face_node_connectivity_{e}", + face_nodes_par, + ) + setattr( + partitioned_face_node_connectivity, + f"original_face_indices_{e}", + original_face_indices, + ) + + return partitioned_face_node_connectivity + + +class BasePartitionedConnectivity: + def __init__(self, geometries, connectivity_name="", indices_name=""): + # TODO + self._geometries = set(geometries) + self._connectivity_name = connectivity_name + self._indices_name = indices_name + + def __getitem__(self, geometry): + return ( + getattr(self, f"{self._connectivity_name}_{geometry}"), + getattr(self, f"{self._indices_name}_{geometry}"), + ) + + def __iter__(self): + for geom in self.geometries: + yield self[str(geom)] + + @property + def geometries(self): + # TODO: + return self._geometries + + @property + def partitions(self): + return [self[geom] for geom in self.geometries] + + @property + def original_face_indices(self): + return getattr(f"original_face_indices_{geom}" for geom in self.geometries) + + def chunk(self, chunk_size): + pass + + +class PartitionedFaceNodeConnectivity(BasePartitionedConnectivity): + def __repr__(self): + repr_str = "Partitioned Face Node Connectivity\n" + repr_str += "----------------------------------\n" + for geom in self.geometries: + repr_str += f" - {geom}: {self[geom][0].shape}\n" + + return repr_str From c6f49f785defe3ab55ab3558a08f33ce3f2e35ae Mon Sep 17 00:00:00 2001 From: Philip Chmielowiec Date: Sun, 29 Sep 2024 19:08:59 -0500 Subject: [PATCH 2/6] update dask implementation --- uxarray/core/aggregation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/uxarray/core/aggregation.py b/uxarray/core/aggregation.py index 7349dd8c8..77628bf89 100644 --- a/uxarray/core/aggregation.py +++ b/uxarray/core/aggregation.py @@ -144,7 +144,7 @@ def _apply_node_to_edge_aggregation_( ): """TODO:""" - data_flat = uxda.data[..., uxda.uxgrid.edge_node_connectivity.flatten()] + data_flat = uxda.data[..., uxda.uxgrid.edge_node_connectivity.data.flatten()] data_reshaped = data_flat.reshape((uxda.data.shape[:-1]) + (uxda.uxgrid.n_edge, 2)) result = getattr(data_reshaped, aggregation_func)( axis=-1, **aggregation_func_kwargs From d088780faecabaf8d2029b5fa6f4a081300e4e96 Mon Sep 17 00:00:00 2001 From: Philip Chmielowiec Date: Sun, 29 Sep 2024 19:58:13 -0500 Subject: [PATCH 3/6] add chunking to partitioned connectivity --- uxarray/grid/partition.py | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/uxarray/grid/partition.py b/uxarray/grid/partition.py index cb0c5da47..33a0fa372 100644 --- a/uxarray/grid/partition.py +++ b/uxarray/grid/partition.py @@ -1,4 +1,5 @@ import numpy as np +import dask.array as da def get_face_partitions(n_nodes_per_face): @@ -108,8 +109,25 @@ def partitions(self): def original_face_indices(self): return getattr(f"original_face_indices_{geom}" for geom in self.geometries) - def chunk(self, chunk_size): - pass + def chunk(self, chunks=-1): + for geom in self.geometries: + partition = da.from_array( + getattr(self, f"{self._connectivity_name}_{geom}"), chunks=chunks + ) + face_indices = da.from_array( + getattr(self, f"{self._indices_name}_{geom}"), chunks=chunks + ) + + setattr(self, f"{self._connectivity_name}_{geom}", partition) + setattr(self, f"{self._indices_name}_{geom}", face_indices) + + def persist(self): + for geom in self.geometries: + partition = getattr(self, f"{self._connectivity_name}_{geom}").persist() + face_indices = getattr(self, f"{self._indices_name}_{geom}").persist() + + setattr(self, f"{self._connectivity_name}_{geom}", partition) + setattr(self, f"{self._indices_name}_{geom}", face_indices) class PartitionedFaceNodeConnectivity(BasePartitionedConnectivity): From e44038acb46db5ce5a0191abf90d70e054a8364a Mon Sep 17 00:00:00 2001 From: Philip Chmielowiec Date: Mon, 30 Sep 2024 11:47:49 -0500 Subject: [PATCH 4/6] add numba aggs --- uxarray/core/aggregation.py | 50 +++++++ uxarray/core/numba_aggregation.py | 210 ++++++++++++++++++++++++++++++ uxarray/grid/grid.py | 23 +++- uxarray/grid/partition.py | 86 ++++++------ 4 files changed, 326 insertions(+), 43 deletions(-) create mode 100644 uxarray/core/numba_aggregation.py diff --git a/uxarray/core/aggregation.py b/uxarray/core/aggregation.py index 77628bf89..56b77ebe1 100644 --- a/uxarray/core/aggregation.py +++ b/uxarray/core/aggregation.py @@ -6,6 +6,9 @@ import uxarray.core.dataarray +from uxarray.core.numba_aggregation import NUMBA_NODE_FACE_AGGS, NUMBA_NODE_EDGE_AGGS + + def result_array(arr): if isinstance(arr, np.ndarray): return np.empty @@ -90,6 +93,27 @@ def _apply_node_to_face_aggregation( ): """TODO:""" + if isinstance(uxda.data, np.ndarray): + _numba_agg_func = NUMBA_NODE_FACE_AGGS[aggregation_func] + result = _numba_agg_func( + uxda.data, + uxda.uxgrid.face_node_connectivity.values, + uxda.uxgrid.n_nodes_per_face.values, + uxda.uxgrid.n_face, + ) + elif isinstance(uxda.data, dask.array.core.Array): + result = _node_to_face_aggregation_dask( + uxda, + aggregation_func, + aggregation_func_kwargs, + ) + else: + raise ValueError("TODO") + + return result + + +def _node_to_face_aggregation_dask(uxda, aggregation_func, aggregation_func_kwargs): # shape [..., n_face] since data is being aggregated onto the faces result = result_array(uxda.data)( shape=(uxda.data.shape[:-1]) + (uxda.uxgrid.n_face,) @@ -143,6 +167,32 @@ def _apply_node_to_edge_aggregation_( uxda, aggregation_func, aggregation_func_kwargs, result_array_kwargs=None ): """TODO:""" + if isinstance(uxda.data, np.ndarray): + _numba_agg_func = NUMBA_NODE_EDGE_AGGS[aggregation_func] + result = _numba_agg_func( + uxda.data, uxda.uxgrid.edge_node_connectivity.values, uxda.uxgrid.n_face + ) + elif isinstance(uxda.data, dask.array.core.Array): + result = _node_to_edge_aggregation_dask( + uxda, + aggregation_func, + aggregation_func_kwargs, + ) + else: + raise ValueError("TODO") + + return result + + data_flat = uxda.data[..., uxda.uxgrid.edge_node_connectivity.data.flatten()] + data_reshaped = data_flat.reshape((uxda.data.shape[:-1]) + (uxda.uxgrid.n_edge, 2)) + result = getattr(data_reshaped, aggregation_func)( + axis=-1, **aggregation_func_kwargs + ) + return result + + +def _node_to_edge_aggregation_dask(uxda, aggregation_func, aggregation_func_kwargs): + """TODO:""" data_flat = uxda.data[..., uxda.uxgrid.edge_node_connectivity.data.flatten()] data_reshaped = data_flat.reshape((uxda.data.shape[:-1]) + (uxda.uxgrid.n_edge, 2)) diff --git a/uxarray/core/numba_aggregation.py b/uxarray/core/numba_aggregation.py new file mode 100644 index 000000000..108402052 --- /dev/null +++ b/uxarray/core/numba_aggregation.py @@ -0,0 +1,210 @@ +import numpy as np +from numba import njit, prange + + +@njit(parallel=True) +def _node_to_face_mean_numba(data, face_node_connectivity, n_nodes_per_face, n_face): + result = np.empty(shape=(data.shape[:-1]) + (n_face,)) + for i in prange(n_face): + result[..., i] = np.mean( + data[..., face_node_connectivity[i, : n_nodes_per_face[i]]] + ) + return result + + +@njit(parallel=True) +def _node_to_face_max_numba(data, face_node_connectivity, n_nodes_per_face, n_face): + result = np.empty(shape=(data.shape[:-1]) + (n_face,)) + for i in prange(n_face): + result[..., i] = np.max( + data[..., face_node_connectivity[i, : n_nodes_per_face[i]]] + ) + return result + + +@njit(parallel=True) +def _node_to_face_min_numba(data, face_node_connectivity, n_nodes_per_face, n_face): + result = np.empty(shape=(data.shape[:-1]) + (n_face,)) + for i in prange(n_face): + result[..., i] = np.min( + data[..., face_node_connectivity[i, : n_nodes_per_face[i]]] + ) + return result + + +@njit(parallel=True) +def _node_to_face_prod_numba(data, face_node_connectivity, n_nodes_per_face, n_face): + result = np.empty(shape=(data.shape[:-1]) + (n_face,)) + for i in prange(n_face): + result[..., i] = np.prod( + data[..., face_node_connectivity[i, : n_nodes_per_face[i]]] + ) + return result + + +@njit(parallel=True) +def _node_to_face_sum_numba(data, face_node_connectivity, n_nodes_per_face, n_face): + result = np.empty(shape=(data.shape[:-1]) + (n_face,)) + for i in prange(n_face): + result[..., i] = np.sum( + data[..., face_node_connectivity[i, : n_nodes_per_face[i]]] + ) + return result + + +@njit(parallel=True) +def _node_to_face_std_numba(data, face_node_connectivity, n_nodes_per_face, n_face): + result = np.empty(shape=(data.shape[:-1]) + (n_face,)) + for i in prange(n_face): + result[..., i] = np.std( + data[..., face_node_connectivity[i, : n_nodes_per_face[i]]] + ) + return result + + +@njit(parallel=True) +def _node_to_face_var_numba(data, face_node_connectivity, n_nodes_per_face, n_face): + result = np.empty(shape=(data.shape[:-1]) + (n_face,)) + for i in prange(n_face): + result[..., i] = np.var( + data[..., face_node_connectivity[i, : n_nodes_per_face[i]]] + ) + return result + + +@njit(parallel=True) +def _node_to_face_median_numba(data, face_node_connectivity, n_nodes_per_face, n_face): + result = np.empty(shape=(data.shape[:-1]) + (n_face,)) + for i in prange(n_face): + result[..., i] = np.median( + data[..., face_node_connectivity[i, : n_nodes_per_face[i]]] + ) + return result + + +@njit(parallel=True) +def _node_to_face_all_numba(data, face_node_connectivity, n_nodes_per_face, n_face): + result = np.empty(shape=(data.shape[:-1]) + (n_face,)) + for i in prange(n_face): + result[..., i] = np.all( + data[..., face_node_connectivity[i, : n_nodes_per_face[i]]] + ) + return result + + +@njit(parallel=True) +def _node_to_face_any_numba(data, face_node_connectivity, n_nodes_per_face, n_face): + result = np.empty(shape=(data.shape[:-1]) + (n_face,)) + for i in prange(n_face): + result[..., i] = np.any( + data[..., face_node_connectivity[i, : n_nodes_per_face[i]]] + ) + return result + + +NUMBA_NODE_FACE_AGGS = { + "mean": _node_to_face_mean_numba, + "max": _node_to_face_max_numba, + "min": _node_to_face_min_numba, + "prod": _node_to_face_prod_numba, + "sum": _node_to_face_sum_numba, + "std": _node_to_face_std_numba, + "var": _node_to_face_var_numba, + "median": _node_to_face_median_numba, + "all": _node_to_face_all_numba, + "any": _node_to_face_any_numba, +} + + +@njit(parallel=True) +def _node_to_edge_mean_numba(data, edge_node_connectivity, n_face): + result = np.empty(shape=(data.shape[:-1]) + (n_face,)) + for i in prange(n_face): + result[..., i] = np.mean(data[..., edge_node_connectivity[i]]) + return result + + +@njit(parallel=True) +def _node_to_edge_max_numba(data, edge_node_connectivity, n_face): + result = np.empty(shape=(data.shape[:-1]) + (n_face,)) + for i in prange(n_face): + result[..., i] = np.max(data[..., edge_node_connectivity[i]]) + return result + + +@njit(parallel=True) +def _node_to_edge_min_numba(data, edge_node_connectivity, n_face): + result = np.empty(shape=(data.shape[:-1]) + (n_face,)) + for i in prange(n_face): + result[..., i] = np.min(data[..., edge_node_connectivity[i]]) + return result + + +@njit(parallel=True) +def _node_to_edge_prod_numba(data, edge_node_connectivity, n_face): + result = np.empty(shape=(data.shape[:-1]) + (n_face,)) + for i in prange(n_face): + result[..., i] = np.prod(data[..., edge_node_connectivity[i]]) + return result + + +@njit(parallel=True) +def _node_to_edge_sum_numba(data, edge_node_connectivity, n_face): + result = np.empty(shape=(data.shape[:-1]) + (n_face,)) + for i in prange(n_face): + result[..., i] = np.sum(data[..., edge_node_connectivity[i]]) + return result + + +@njit(parallel=True) +def _node_to_edge_std_numba(data, edge_node_connectivity, n_face): + result = np.empty(shape=(data.shape[:-1]) + (n_face,)) + for i in prange(n_face): + result[..., i] = np.std(data[..., edge_node_connectivity[i]]) + return result + + +@njit(parallel=True) +def _node_to_edge_var_numba(data, edge_node_connectivity, n_face): + result = np.empty(shape=(data.shape[:-1]) + (n_face,)) + for i in prange(n_face): + result[..., i] = np.var(data[..., edge_node_connectivity[i]]) + return result + + +@njit(parallel=True) +def _node_to_edge_median_numba(data, edge_node_connectivity, n_face): + result = np.empty(shape=(data.shape[:-1]) + (n_face,)) + for i in prange(n_face): + result[..., i] = np.median(data[..., edge_node_connectivity[i]]) + return result + + +@njit(parallel=True) +def _node_to_edge_all_numba(data, edge_node_connectivity, n_face): + result = np.empty(shape=(data.shape[:-1]) + (n_face,)) + for i in prange(n_face): + result[..., i] = np.all(data[..., edge_node_connectivity[i]]) + return result + + +@njit(parallel=True) +def _node_to_edge_any_numba(data, edge_node_connectivity, n_face): + result = np.empty(shape=(data.shape[:-1]) + (n_face,)) + for i in prange(n_face): + result[..., i] = np.any(data[..., edge_node_connectivity[i]]) + return result + + +NUMBA_NODE_EDGE_AGGS = { + "mean": _node_to_edge_mean_numba, + "max": _node_to_edge_max_numba, + "min": _node_to_edge_min_numba, + "prod": _node_to_edge_prod_numba, + "sum": _node_to_edge_sum_numba, + "std": _node_to_edge_std_numba, + "var": _node_to_edge_var_numba, + "median": _node_to_edge_median_numba, + "all": _node_to_edge_all_numba, + "any": _node_to_edge_any_numba, +} diff --git a/uxarray/grid/grid.py b/uxarray/grid/grid.py index 0cae3d715..b33ac9af2 100644 --- a/uxarray/grid/grid.py +++ b/uxarray/grid/grid.py @@ -76,7 +76,7 @@ _check_normalization, ) -from uxarray.grid.partition import _build_partitioned_face_node_connectivity +from uxarray.grid.partition import _build_partitioned_face_connectivity from xarray.core.utils import UncachedAccessor @@ -1159,13 +1159,32 @@ def hole_edge_indices(self): # TODO: =================== @property def partitioned_face_node_connectivity(self): + """TODO:""" if not hasattr(self, "_partitioned_face_node_connectivity"): self._partitioned_face_node_connectivity = ( - _build_partitioned_face_node_connectivity(self) + _build_partitioned_face_connectivity(self, "face_node_connectivity") ) return self._partitioned_face_node_connectivity + @partitioned_face_node_connectivity.setter + def partitioned_face_node_connectivity(self, value): + self._partitioned_face_node_connectivity = value + + @property + def partitioned_face_edge_connectivity(self): + """TODO:""" + if not hasattr(self, "_partitioned_face_edge_connectivity"): + self._partitioned_face_edge_connectivity = ( + _build_partitioned_face_connectivity(self, "face_edge_connectivity") + ) + + return self._partitioned_face_edge_connectivity + + @partitioned_face_edge_connectivity.setter + def partitioned_face_edge_connectivity(self, value): + self._partitioned_face_edge_connectivity = value + def chunk(self, n_node="auto", n_edge="auto", n_face="auto"): """Converts all arrays to dask arrays with given chunks across grid dimensions in-place. diff --git a/uxarray/grid/partition.py b/uxarray/grid/partition.py index 33a0fa372..7c0dbdaa5 100644 --- a/uxarray/grid/partition.py +++ b/uxarray/grid/partition.py @@ -1,8 +1,9 @@ import numpy as np -import dask.array as da +import xarray as xr def get_face_partitions(n_nodes_per_face): + """TODO:""" # sort number of nodes per face in ascending order n_nodes_per_face_sorted_indices = np.argsort(n_nodes_per_face) @@ -22,7 +23,7 @@ def get_face_partitions(n_nodes_per_face): def initialize_face_partition_variables(uxgrid): - # number of nodes and edges for a face are equal + """TODO:""" ( face_geometry_inflections, @@ -37,21 +38,34 @@ def initialize_face_partition_variables(uxgrid): uxgrid._face_geometries_counts = face_geometries_counts -def _build_partitioned_face_node_connectivity(uxgrid): +def _build_partitioned_face_connectivity(uxgrid, connectivity_name): + """TODO:""" if not hasattr(uxgrid, "_face_geometry_inflections"): # TODO: initialize_face_partition_variables(uxgrid) - face_node_connectivity = uxgrid.face_node_connectivity.values + if connectivity_name == "face_node_connectivity": + conn = uxgrid.face_node_connectivity.values + elif connectivity_name == "face_edge_connectivity": + conn = uxgrid.face_edge_connectivity.values + else: + raise ValueError("TODO: ") + face_geometry_inflections = uxgrid._face_geometry_inflections n_nodes_per_face_sorted_indices = uxgrid._n_nodes_per_face_sorted_indices face_geometries = uxgrid._face_geometries face_geometries_counts = uxgrid._face_geometries - # TODO: - partitioned_face_node_connectivity = PartitionedFaceNodeConnectivity( + if connectivity_name == "face_node_connectivity": + partition_obj = PartitionedFaceNodeConnectivity + elif connectivity_name == "face_edge_connectivity": + partition_obj = PartitionedFaceEdgeConnectivity + else: + raise ValueError() + + partitioned_connectivity = partition_obj( face_geometries, - connectivity_name="face_node_connectivity", + connectivity_name=connectivity_name, indices_name="original_face_indices", ) @@ -62,21 +76,20 @@ def _build_partitioned_face_node_connectivity(uxgrid): face_geometry_inflections[1:], ): original_face_indices = n_nodes_per_face_sorted_indices[start:end] - face_nodes_par = face_node_connectivity[original_face_indices, 0:e] + conn_par = conn[original_face_indices, 0:e] - # TODO - setattr( - partitioned_face_node_connectivity, - f"face_node_connectivity_{e}", - face_nodes_par, - ) - setattr( - partitioned_face_node_connectivity, - f"original_face_indices_{e}", - original_face_indices, + # Store partitioned face node connectivity + partitioned_connectivity._ds[f"{connectivity_name}_{e}"] = xr.DataArray( + data=conn_par, dims=[f"n_face_{str(e)}", str(e)] ) - return partitioned_face_node_connectivity + if f"original_face_indices_{e}" not in partitioned_connectivity._ds: + # Store original face indices (avoid duplicates) + partitioned_connectivity._ds[f"original_face_indices_{e}"] = xr.DataArray( + data=original_face_indices, dims=[f"n_face_{str(e)}"] + ) + + return partitioned_connectivity class BasePartitionedConnectivity: @@ -85,11 +98,12 @@ def __init__(self, geometries, connectivity_name="", indices_name=""): self._geometries = set(geometries) self._connectivity_name = connectivity_name self._indices_name = indices_name + self._ds = xr.Dataset() def __getitem__(self, geometry): return ( - getattr(self, f"{self._connectivity_name}_{geometry}"), - getattr(self, f"{self._indices_name}_{geometry}"), + self._ds[f"{self._connectivity_name}_{geometry}"].data, + self._ds[f"{self._indices_name}_{geometry}"].data, ) def __iter__(self): @@ -103,36 +117,26 @@ def geometries(self): @property def partitions(self): - return [self[geom] for geom in self.geometries] + return [self[geom][0].data for geom in self.geometries] @property def original_face_indices(self): - return getattr(f"original_face_indices_{geom}" for geom in self.geometries) - - def chunk(self, chunks=-1): - for geom in self.geometries: - partition = da.from_array( - getattr(self, f"{self._connectivity_name}_{geom}"), chunks=chunks - ) - face_indices = da.from_array( - getattr(self, f"{self._indices_name}_{geom}"), chunks=chunks - ) + return [self[geom][1].data for geom in self.geometries] - setattr(self, f"{self._connectivity_name}_{geom}", partition) - setattr(self, f"{self._indices_name}_{geom}", face_indices) - def persist(self): +class PartitionedFaceNodeConnectivity(BasePartitionedConnectivity): + def __repr__(self): + repr_str = "Partitioned Face Node Connectivity\n" + repr_str += "----------------------------------\n" for geom in self.geometries: - partition = getattr(self, f"{self._connectivity_name}_{geom}").persist() - face_indices = getattr(self, f"{self._indices_name}_{geom}").persist() + repr_str += f" - {geom}: {self[geom][0].shape}\n" - setattr(self, f"{self._connectivity_name}_{geom}", partition) - setattr(self, f"{self._indices_name}_{geom}", face_indices) + return repr_str -class PartitionedFaceNodeConnectivity(BasePartitionedConnectivity): +class PartitionedFaceEdgeConnectivity(BasePartitionedConnectivity): def __repr__(self): - repr_str = "Partitioned Face Node Connectivity\n" + repr_str = "Partitioned Face Edge Connectivity\n" repr_str += "----------------------------------\n" for geom in self.geometries: repr_str += f" - {geom}: {self[geom][0].shape}\n" From 435eb3f25da1c329409a28bb236e0951bb6c6bf0 Mon Sep 17 00:00:00 2001 From: Philip Chmielowiec Date: Mon, 30 Sep 2024 20:30:35 -0500 Subject: [PATCH 5/6] add docstrings --- uxarray/grid/grid.py | 6 +- uxarray/grid/partition.py | 156 +++++++++++++++++++++++++++++++++++--- 2 files changed, 148 insertions(+), 14 deletions(-) diff --git a/uxarray/grid/grid.py b/uxarray/grid/grid.py index b33ac9af2..3e33ef20e 100644 --- a/uxarray/grid/grid.py +++ b/uxarray/grid/grid.py @@ -160,9 +160,6 @@ def __init__( # internal xarray dataset for storing grid variables self._ds = grid_ds - # TODO: - self._ds_partitioned = xr.Dataset() - # initialize attributes self._antimeridian_face_indices = None self._ds.assign_attrs({"source_grid_spec": self.source_grid_spec}) @@ -1156,10 +1153,9 @@ def hole_edge_indices(self): ) return self._ds["hole_edge_indices"] - # TODO: =================== @property def partitioned_face_node_connectivity(self): - """TODO:""" + """TODO.""" if not hasattr(self, "_partitioned_face_node_connectivity"): self._partitioned_face_node_connectivity = ( _build_partitioned_face_connectivity(self, "face_node_connectivity") diff --git a/uxarray/grid/partition.py b/uxarray/grid/partition.py index 7c0dbdaa5..c508f22ea 100644 --- a/uxarray/grid/partition.py +++ b/uxarray/grid/partition.py @@ -3,7 +3,27 @@ def get_face_partitions(n_nodes_per_face): - """TODO:""" + """Calculate the partitions of faces based on the number of nodes per face. + + This function organizes faces by their size, creating partitions that can be used + for operations requiring uniform face geometry. It returns information about + the boundaries of each partition and the sorted order of nodes. + + Parameters + ---------- + n_nodes_per_face : array-like + An array specifying the number of nodes per face for each face in the grid. + + Returns + ------- + tuple + - change_ind (np.ndarray): Indices indicating where the geometry changes + from one shape to another in the sorted `n_nodes_per_face` array. + - n_nodes_per_face_sorted_indices (np.ndarray): Indices that sort the `n_nodes_per_face` + array in ascending order. + - element_sizes (np.ndarray): The unique sizes of the faces in the grid. + - size_counts (np.ndarray): The count of faces corresponding to each unique size. + """ # sort number of nodes per face in ascending order n_nodes_per_face_sorted_indices = np.argsort(n_nodes_per_face) @@ -23,7 +43,14 @@ def get_face_partitions(n_nodes_per_face): def initialize_face_partition_variables(uxgrid): - """TODO:""" + """Initialize partitioning variables for the face connectivity based on the + number of nodes per face. + + This function sets internal variables on the provided `uxgrid` + object, which are used to define partitions for faces of different + geometries. These partitions are necessary for efficient operations + on unstructured grid data. + """ ( face_geometry_inflections, @@ -39,9 +66,30 @@ def initialize_face_partition_variables(uxgrid): def _build_partitioned_face_connectivity(uxgrid, connectivity_name): - """TODO:""" + """Partitions the face connectivity (either face_node_connectivity or + face_edge_connectivity) into a partitioned format. + + This function creates partitions of a dense face connectivity variable, such as `face_node_connectivity` + or `face_edge_connectivity`, into a partitioned form based on the face geometries. The partitioned form + allows for vectorized operations on individual face partitions. + + Parameters + ---------- + uxgrid : ux.Grid + Grid object + + connectivity_name : str + Name of the connectivity variable to partition. Must be either `"face_node_connectivity"` or + `"face_edge_connectivity"`. + + Returns + ------- + PartitionedFaceNodeConnectivity or PartitionedFaceEdgeConnectivity + An object containing the partitioned face connectivity and original face indices for each + geometry in the grid. + """ if not hasattr(uxgrid, "_face_geometry_inflections"): - # TODO: + # initialize partition variables initialize_face_partition_variables(uxgrid) if connectivity_name == "face_node_connectivity": @@ -49,7 +97,7 @@ def _build_partitioned_face_connectivity(uxgrid, connectivity_name): elif connectivity_name == "face_edge_connectivity": conn = uxgrid.face_edge_connectivity.values else: - raise ValueError("TODO: ") + raise ValueError(f"Unknown connectivity name: {connectivity_name}") face_geometry_inflections = uxgrid._face_geometry_inflections n_nodes_per_face_sorted_indices = uxgrid._n_nodes_per_face_sorted_indices @@ -93,52 +141,142 @@ def _build_partitioned_face_connectivity(uxgrid, connectivity_name): class BasePartitionedConnectivity: + """A class for storing partitioned connectivity variables and original + indices for geometries to enable efficient vectorized operations on each + block of partitions. + + This class eliminates the need to store fill values by representing data in a partitioned format, + making it easier to perform operations directly on the partitioned data. Each geometry is associated + with a unique connectivity and index pair, stored in an internal xarray dataset. + + Parameters + ---------- + geometries : list or set + A collection of geometry identifiers. Each identifier corresponds to a distinct partition + of face node connectivity and original face indices. + connectivity_name : str, optional + Name of the connectivity variable used in the internal dataset. Default is an empty string. + indices_name : str, optional + Name of the indices variable used in the internal dataset. Default is an empty string. + + Attributes + ---------- + _geometries : set + Set of geometry identifiers, representing each partition for which connectivity and indices are stored. + _connectivity_name : str + The name used for the partitioned connectivity variable. + _indices_name : str + The name used for the original face indices variable. + _ds : xr.Dataset + Internal xarray dataset used to store the connectivity and indices variables for each geometry. + """ + def __init__(self, geometries, connectivity_name="", indices_name=""): - # TODO + """Class for storing partitioned connectivity variables without needing + to store fill values.""" self._geometries = set(geometries) self._connectivity_name = connectivity_name self._indices_name = indices_name self._ds = xr.Dataset() def __getitem__(self, geometry): + """Retrieve the partitioned connectivity and original indices for a + given geometry. + + Parameters + ---------- + geometry : str or int + The geometry identifier. + + Returns + ------- + tuple + A tuple containing: + - `cur_partition` (any): The partitioned connectivity data for the specified geometry. + - `cur_original__indices` (any): The original indices corresponding to the specified geometry. + + Example + ------- + >>> cur_face_node_partition, cur_original_face_indices = uxgrid.partitioned_face_node_connectivity["3"] + """ return ( self._ds[f"{self._connectivity_name}_{geometry}"].data, self._ds[f"{self._indices_name}_{geometry}"].data, ) def __iter__(self): + """Custom iterator that yields the partitioned connectivity and + original indices for each geometry in the collection. + + Yields + ------ + tuple + A tuple containing: + - `cur_partition` (any): Partitioned connectivity for the current geometry. + - `cur_original_indices` (any): Original ndices corresponding to the current geometry. + + Example + ------- + >>> for cur_face_node_partition, cur_original_face_indices in uxgrid.partitioned_face_node_connectivity: + ... # Access the partition and original indices for a given geometry (i.e. 3, 4, etc.) + """ for geom in self.geometries: yield self[str(geom)] @property def geometries(self): - # TODO: + """The unique geometries present in the partitioned connectivity. + + Returns + ------- + set + A set of unique geometry identifiers representing each partition for which connectivity + and original indices are stored. + """ return self._geometries @property def partitions(self): + """Retrieve the partitioned connectivity for each geometry. + + Returns + ------- + list + A list where each entry is the partitioned connectivity data for a specific geometry. + """ return [self[geom][0].data for geom in self.geometries] @property def original_face_indices(self): + """Retrieve the original indices that map each partitioned connectivity + entry back to the index in the source connectivity. + + Returns + ------- + list + A list where each element is the array of original face indices for a specific geometry, + allowing for reconstruction or indexing of the original connectivity + """ return [self[geom][1].data for geom in self.geometries] class PartitionedFaceNodeConnectivity(BasePartitionedConnectivity): + """Represents the ``face_node_connectivity`` in a partitioned manner.""" + def __repr__(self): repr_str = "Partitioned Face Node Connectivity\n" repr_str += "----------------------------------\n" for geom in self.geometries: repr_str += f" - {geom}: {self[geom][0].shape}\n" - return repr_str class PartitionedFaceEdgeConnectivity(BasePartitionedConnectivity): + """Represents the ``face_node_connectivity`` in a partitioned manner.""" + def __repr__(self): repr_str = "Partitioned Face Edge Connectivity\n" repr_str += "----------------------------------\n" for geom in self.geometries: repr_str += f" - {geom}: {self[geom][0].shape}\n" - return repr_str From 664e535e971a2bc24b0a823dc3689ad0bd250569 Mon Sep 17 00:00:00 2001 From: Philip Chmielowiec Date: Mon, 30 Sep 2024 20:33:30 -0500 Subject: [PATCH 6/6] add docstrings to properties --- uxarray/grid/grid.py | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/uxarray/grid/grid.py b/uxarray/grid/grid.py index 3e33ef20e..efa53de69 100644 --- a/uxarray/grid/grid.py +++ b/uxarray/grid/grid.py @@ -1155,7 +1155,18 @@ def hole_edge_indices(self): @property def partitioned_face_node_connectivity(self): - """TODO.""" + """Partitioned representation of face_node_connectivity. + + This property constructs the partitioned form of the face_node_connectivity using + the number of nodes per face. The partitioned structure enables efficient operations on + each block of partitions based on unique face geometries. + + Returns + ------- + PartitionedFaceNodeConnectivity + An object representing the partitioned face_node_connectivity for the grid, containing + partitioned data and original face indices for each unique geometry. + """ if not hasattr(self, "_partitioned_face_node_connectivity"): self._partitioned_face_node_connectivity = ( _build_partitioned_face_connectivity(self, "face_node_connectivity") @@ -1169,7 +1180,18 @@ def partitioned_face_node_connectivity(self, value): @property def partitioned_face_edge_connectivity(self): - """TODO:""" + """Partitioned representation of face_edge_connectivity. + + This property constructs the partitioned form of the face_edge_connectivity using + the number of nodes per face. The partitioned structure enables efficient operations on + each block of partitions based on unique face geometries. + + Returns + ------- + PartitionedFaceEdgeConnectivity + An object representing the partitioned face_node_connectivity for the grid, containing + partitioned data and original face indices for each unique geometry. + """ if not hasattr(self, "_partitioned_face_edge_connectivity"): self._partitioned_face_edge_connectivity = ( _build_partitioned_face_connectivity(self, "face_edge_connectivity")