Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] Support named aggregations in df.groupby().agg() #16528

Merged
merged 16 commits into from
Aug 15, 2024
Merged
7 changes: 2 additions & 5 deletions python/cudf/cudf/core/column_accessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ def _pad_key(self, key: Any, pad_value="") -> Any:
return key + (pad_value,) * (self.nlevels - len(key))

def rename_levels(
self, mapper: Mapping[Any, Any] | Callable, level: int | None
self, mapper: Mapping[Any, Any] | Callable, level: int | None = None
) -> ColumnAccessor:
"""
Rename the specified levels of the given ColumnAccessor
Expand Down Expand Up @@ -653,10 +653,7 @@ def rename_column(x):
return x

if level is None:
raise NotImplementedError(
"Renaming columns with a MultiIndex and level=None is"
"not supported"
)
level = 0
new_col_names = (rename_column(k) for k in self.keys())

else:
Expand Down
41 changes: 26 additions & 15 deletions python/cudf/cudf/core/groupby/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ def _groupby(self):
)

@_performance_tracking
def agg(self, func, *args, engine=None, engine_kwargs=None, **kwargs):
def agg(self, func=None, *args, engine=None, engine_kwargs=None, **kwargs):
"""
Apply aggregation(s) to the groups.

Expand Down Expand Up @@ -648,11 +648,10 @@ def agg(self, func, *args, engine=None, engine_kwargs=None, **kwargs):
raise NotImplementedError(
"Passing args to func is currently not supported."
)
if kwargs:
raise NotImplementedError(
"Passing kwargs to func is currently not supported."
)
column_names, columns, normalized_aggs = self._normalize_aggs(func)

column_names, columns, normalized_aggs = self._normalize_aggs(
func, **kwargs
)
orig_dtypes = tuple(c.dtype for c in columns)

# Note: When there are no key columns, the below produces
Expand Down Expand Up @@ -1266,11 +1265,11 @@ def _grouped(self, *, include_groups: bool = True):
return (group_names, offsets, grouped_keys, grouped_values)

def _normalize_aggs(
self, aggs: MultiColumnAggType
self, aggs: MultiColumnAggType, **kwargs
) -> tuple[Iterable[Any], tuple[ColumnBase, ...], list[list[AggType]]]:
"""
Normalize aggs to a list of list of aggregations, where `out[i]`
is a list of aggregations for column `self.obj[i]`. We support three
is a list of aggregations for column `self.obj[i]`. We support four
different form of `aggs` input here:
- A single agg, such as "sum". This agg is applied to all value
columns.
Expand All @@ -1279,18 +1278,30 @@ def _normalize_aggs(
- A mapping of column name to aggs, such as
{"a": ["sum"], "b": ["mean"]}, the aggs are applied to specified
column.
- Pairs of column name and agg tuples passed as kwargs
eg. col1=("a", "sum"), col2=("b", "prod"). The output column names are
the keys. The aggs are applied to the corresponding column in the tuple.
Each agg can be string or lambda functions.
"""

aggs_per_column: Iterable[AggType | Iterable[AggType]]
if isinstance(aggs, dict):
column_names, aggs_per_column = aggs.keys(), aggs.values()
columns = tuple(self.obj._data[col] for col in column_names)
# TODO: Remove isinstance condition when the legacy dask_cudf API is removed.
# See https://github.com/rapidsai/cudf/pull/16528#discussion_r1715482302 for information.
if aggs or isinstance(aggs, dict):
Matt711 marked this conversation as resolved.
Show resolved Hide resolved
if isinstance(aggs, dict):
column_names, aggs_per_column = aggs.keys(), aggs.values()
columns = tuple(self.obj._data[col] for col in column_names)
else:
values = self.grouping.values
column_names = values._column_names
columns = values._columns
aggs_per_column = (aggs,) * len(columns)
elif not aggs and kwargs:
column_names, aggs_per_column = kwargs.keys(), kwargs.values()
columns = tuple(self.obj._data[x[0]] for x in kwargs.values())
aggs_per_column = tuple(x[1] for x in kwargs.values())
else:
values = self.grouping.values
column_names = values._column_names
columns = values._columns
aggs_per_column = (aggs,) * len(columns)
raise TypeError("Must provide at least one aggregation function.")

# is_list_like performs type narrowing but type-checkers don't
# know it. One could add a TypeGuard annotation to
Expand Down
30 changes: 30 additions & 0 deletions python/cudf/cudf/tests/groupby/test_agg.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pytest

import cudf
from cudf.testing import assert_eq


@pytest.mark.parametrize(
Expand All @@ -26,3 +27,32 @@ def test_series_agg(attr):
pd_agg = getattr(pdf.groupby(["a"])["a"], attr)("count")

assert agg.ndim == pd_agg.ndim


@pytest.mark.parametrize("func", ["sum", "prod", "mean", "count"])
@pytest.mark.parametrize("attr", ["agg", "aggregate"])
def test_dataframe_agg(attr, func):
df = cudf.DataFrame({"a": [1, 2, 1, 2], "b": [0, 0, 0, 0]})
pdf = df.to_pandas()

agg = getattr(df.groupby("a"), attr)(func)
pd_agg = getattr(pdf.groupby(["a"]), attr)(func)

assert_eq(agg, pd_agg)

agg = getattr(df.groupby("a"), attr)({"b": func})
pd_agg = getattr(pdf.groupby(["a"]), attr)({"b": func})

assert_eq(agg, pd_agg)

agg = getattr(df.groupby("a"), attr)([func])
pd_agg = getattr(pdf.groupby(["a"]), attr)([func])

assert_eq(agg, pd_agg)

agg = getattr(df.groupby("a"), attr)(foo=("b", func), bar=("a", func))
pd_agg = getattr(pdf.groupby(["a"]), attr)(
foo=("b", func), bar=("a", func)
)

assert_eq(agg, pd_agg)
4 changes: 4 additions & 0 deletions python/cudf/cudf/tests/test_column_accessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,10 @@ def test_replace_level_values_MultiColumn():
got = ca.rename_levels(mapper={"a": "f"}, level=0)
check_ca_equal(expect, got)

# passing without level kwarg assumes level=0
got = ca.rename_levels(mapper={"a": "f"})
check_ca_equal(expect, got)


def test_clear_nrows_empty_before():
ca = ColumnAccessor({})
Expand Down
1 change: 0 additions & 1 deletion python/cudf/cudf/tests/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -9409,7 +9409,6 @@ def test_rename_for_level_RangeIndex_dataframe():
assert_eq(expect, got)


@pytest_xfail(reason="level=None not implemented yet")
def test_rename_for_level_is_None_MC():
gdf = cudf.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6], "c": [7, 8, 9]})
gdf.columns = pd.MultiIndex.from_tuples([("a", 1), ("a", 2), ("b", 1)])
Expand Down
Loading