Skip to content

Commit

Permalink
Create agg() function for dataframes(rapidsai#6483)
Browse files Browse the repository at this point in the history
Closes rapidsai#5247

Adds `agg` function for DataFrame

Authors:
  - Sheilah Kirui <[email protected]>
  - Sheilah Kirui <[email protected]>
  - Michael Wang <[email protected]>
  - skirui-source <[email protected]>
  - galipremsagar <[email protected]>
  - GALI PREM SAGAR <[email protected]>
  - Keith Kraus <[email protected]>
  - Ashwin Srinath <[email protected]>

Approvers:
  - Michael Wang
  - Michael Wang
  - Keith Kraus

URL: rapidsai#6483
  • Loading branch information
skirui-source authored Dec 4, 2020
1 parent cd7a0ad commit 30bbb39
Show file tree
Hide file tree
Showing 3 changed files with 236 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
- PR #6765 Cupy fallback for __array_function__ and __array_ufunc__ for cudf.Series
- PR #6817 Add support for scatter() on lists-of-struct columns
- PR #6805 Implement `cudf::detail::copy_if` for `decimal32` and `decimal64`
- PR #6483 Add `agg` function to aggregate dataframe using one or more operations
- PR #6726 Support selecting different hash functions in hash_partition
- PR #6619 Improve Dockerfile
- PR #6831 Added parquet chunked writing ability for list columns
Expand Down
135 changes: 133 additions & 2 deletions python/cudf/cudf/core/dataframe.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Copyright (c) 2018-2020, NVIDIA CORPORATION.
from __future__ import division, print_function
from __future__ import division

import inspect
import itertools
Expand All @@ -8,7 +8,7 @@
import sys
import warnings
from collections import OrderedDict, defaultdict
from collections.abc import Mapping, Sequence
from collections.abc import Iterable, Mapping, Sequence

import cupy
import numpy as np
Expand Down Expand Up @@ -3728,6 +3728,137 @@ def sort_values(
keep_index=not ignore_index,
)

def agg(self, aggs, axis=None):
"""
Aggregate using one or more operations over the specified axis.
Parameters
----------
aggs : Iterable (set, list, string, tuple or dict)
Function to use for aggregating data. Accepted types are:
* string name, e.g. ``"sum"``
* list of functions, e.g. ``["sum", "min", "max"]``
* dict of axis labels specified operations per column,
e.g. ``{"a": "sum"}``
axis : not yet supported
Returns
-------
Aggregation Result : ``Series`` or ``DataFrame``
When ``DataFrame.agg`` is called with single agg,
``Series`` is returned.
When ``DataFrame.agg`` is called with several aggs,
``DataFrame`` is returned.
Notes
-----
Difference from pandas:
* Not supporting: ``axis``, ``*args``, ``**kwargs``
"""
# TODO: Remove the typecasting below once issue #6846 is fixed
# link <https://github.com/rapidsai/cudf/issues/6846>
dtypes = [self[col].dtype for col in self._column_names]
common_dtype = cudf.utils.dtypes.find_common_type(dtypes)
df_normalized = self.astype(common_dtype)

if any(is_string_dtype(dt) for dt in dtypes):
raise NotImplementedError(
"DataFrame.agg() is not supported for "
"frames containing string columns"
)

if axis == 0 or axis is not None:
raise NotImplementedError("axis not implemented yet")

if isinstance(aggs, Iterable) and not isinstance(aggs, (str, dict)):
result = cudf.DataFrame()
# TODO : Allow simultaneous pass for multi-aggregation as
# a future optimization
for agg in aggs:
result[agg] = getattr(df_normalized, agg)()
return result.T.sort_index(axis=1, ascending=True)

elif isinstance(aggs, str):
if not hasattr(df_normalized, aggs):
raise AttributeError(
f"{aggs} is not a valid function for "
f"'DataFrame' object"
)
result = cudf.DataFrame()
result[aggs] = getattr(df_normalized, aggs)()
result = result.iloc[:, 0]
result.name = None
return result

elif isinstance(aggs, dict):
cols = aggs.keys()
if any([callable(val) for val in aggs.values()]):
raise NotImplementedError(
"callable parameter is not implemented yet"
)
elif all([isinstance(val, str) for val in aggs.values()]):
result = cudf.Series(index=cols)
for key, value in aggs.items():
col = df_normalized[key]
if not hasattr(col, value):
raise AttributeError(
f"{value} is not a valid function for "
f"'Series' object"
)
result[key] = getattr(col, value)()
elif all([isinstance(val, Iterable) for val in aggs.values()]):
idxs = set()
for val in aggs.values():
if isinstance(val, Iterable):
idxs.update(val)
elif isinstance(val, str):
idxs.add(val)
idxs = sorted(list(idxs))
for agg in idxs:
if agg is callable:
raise NotImplementedError(
"callable parameter is not implemented yet"
)
result = cudf.DataFrame(index=idxs, columns=cols)
for key in aggs.keys():
col = df_normalized[key]
col_empty = column_empty(
len(idxs), dtype=col.dtype, masked=True
)
ans = cudf.Series(data=col_empty, index=idxs)
if isinstance(aggs.get(key), Iterable):
# TODO : Allow simultaneous pass for multi-aggregation
# as a future optimization
for agg in aggs.get(key):
if not hasattr(col, agg):
raise AttributeError(
f"{agg} is not a valid function for "
f"'Series' object"
)
ans[agg] = getattr(col, agg)()
elif isinstance(aggs.get(key), str):
if not hasattr(col, aggs.get(key)):
raise AttributeError(
f"{aggs.get(key)} is not a valid function for "
f"'Series' object"
)
ans[aggs.get(key)] = getattr(col, agg)()
result[key] = ans
else:
raise ValueError("values of dict must be a string or list")

return result

elif callable(aggs):
raise NotImplementedError(
"callable parameter is not implemented yet"
)

else:
raise ValueError("argument must be a string, list or dict")

def nlargest(self, n, columns, keep="first"):
"""Get the rows of the DataFrame sorted by the n largest value of *columns*
Expand Down
102 changes: 102 additions & 0 deletions python/cudf/cudf/tests/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -7995,3 +7995,105 @@ def test_dataframe_from_pandas_duplicate_columns():
ValueError, match="Duplicate column names are not allowed"
):
gd.from_pandas(pdf)


@pytest.mark.parametrize(
"data",
[
{"a": [1, 2, 3], "b": [3.0, 4.0, 5.0], "c": [True, True, False]},
{"a": [1.0, 2.0, 3.0], "b": [3.0, 4.0, 5.0], "c": [True, True, False]},
{"a": [1, 2, 3], "b": [3, 4, 5], "c": [True, True, False]},
{"a": [1, 2, 3], "b": [True, True, False], "c": [False, True, False]},
{
"a": [1.0, 2.0, 3.0],
"b": [True, True, False],
"c": [False, True, False],
},
{"a": [1, 2, 3], "b": [3, 4, 5], "c": [2.0, 3.0, 4.0]},
{"a": [1, 2, 3], "b": [2.0, 3.0, 4.0], "c": [5.0, 6.0, 4.0]},
],
)
@pytest.mark.parametrize(
"aggs",
[
["min", "sum", "max"],
("min", "sum", "max"),
{"min", "sum", "max"},
"sum",
{"a": "sum", "b": "min", "c": "max"},
{"a": ["sum"], "b": ["min"], "c": ["max"]},
{"a": ("sum"), "b": ("min"), "c": ("max")},
{"a": {"sum"}, "b": {"min"}, "c": {"max"}},
{"a": ["sum", "min"], "b": ["sum", "max"], "c": ["min", "max"]},
{"a": ("sum", "min"), "b": ("sum", "max"), "c": ("min", "max")},
{"a": {"sum", "min"}, "b": {"sum", "max"}, "c": {"min", "max"}},
],
)
def test_agg_for_dataframes(data, aggs):
pdf = pd.DataFrame(data)
gdf = gd.DataFrame(data)

expect = pdf.agg(aggs)
got = gdf.agg(aggs)

assert_eq(expect, got, check_dtype=False)


@pytest.mark.parametrize("aggs", [{"a": np.sum, "b": np.min, "c": np.max}])
def test_agg_for_unsupported_function(aggs):
gdf = gd.DataFrame(
{"a": [1, 2, 3], "b": [3.0, 4.0, 5.0], "c": [True, True, False]}
)

with pytest.raises(NotImplementedError):
gdf.agg(aggs)


@pytest.mark.parametrize("aggs", ["asdf"])
def test_agg_for_dataframe_with_invalid_function(aggs):
gdf = gd.DataFrame(
{"a": [1, 2, 3], "b": [3.0, 4.0, 5.0], "c": [True, True, False]}
)

with pytest.raises(
AttributeError,
match=f"{aggs} is not a valid function for 'DataFrame' object",
):
gdf.agg(aggs)


@pytest.mark.parametrize("aggs", [{"a": "asdf"}])
def test_agg_for_series_with_invalid_function(aggs):
gdf = gd.DataFrame(
{"a": [1, 2, 3], "b": [3.0, 4.0, 5.0], "c": [True, True, False]}
)

with pytest.raises(
AttributeError,
match=f"{aggs['a']} is not a valid function for 'Series' object",
):
gdf.agg(aggs)


@pytest.mark.parametrize(
"aggs",
[
"sum",
["min", "sum", "max"],
{"a": {"sum", "min"}, "b": {"sum", "max"}, "c": {"min", "max"}},
],
)
def test_agg_for_dataframe_with_string_columns(aggs):
gdf = gd.DataFrame(
{"a": ["m", "n", "o"], "b": ["t", "u", "v"], "c": ["x", "y", "z"]},
index=["a", "b", "c"],
)

with pytest.raises(
NotImplementedError,
match=re.escape(
"DataFrame.agg() is not supported for "
"frames containing string columns"
),
):
gdf.agg(aggs)

0 comments on commit 30bbb39

Please sign in to comment.