Skip to content

Commit

Permalink
feat: agg from conf for multi pivot (#378)
Browse files Browse the repository at this point in the history
Explain what this PR does.

Adding agg to pivot config for multi pivot

---------

Signed-off-by: Nandita Koppisetty <[email protected]>
  • Loading branch information
nkoppisetty committed May 7, 2024
1 parent 402aabf commit 984650b
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 11 deletions.
1 change: 1 addition & 0 deletions numalogic/connectors/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class Pivot:
index: str = "timestamp"
columns: list[str] = field(default_factory=list)
value: list[str] = field(default_factory=lambda: ["count"])
agg: list[str] = field(default_factory=lambda : ["sum"])


@dataclass
Expand Down
8 changes: 4 additions & 4 deletions numalogic/connectors/druid/_druid.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,12 +198,12 @@ def fetch(
# TODO: performance review
if pivot:
pivoted_frames = []
for column in pivot.columns:
for idx, column in enumerate(pivot.columns):
_df = df.pivot_table(
index=pivot.index,
columns=[column],
values=pivot.value,
aggfunc="sum"
aggfunc=pivot.agg[idx]
)
pivoted_frames.append(_df)

Expand Down Expand Up @@ -304,12 +304,12 @@ def chunked_fetch(

if pivot:
pivoted_frames = []
for column in pivot.columns:
for idx, column in enumerate(pivot.columns):
_df = df.pivot_table(
index=pivot.index,
columns=[column],
values=pivot.value,
aggfunc="sum"
aggfunc=pivot.agg[idx]
)
pivoted_frames.append(_df)

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "numalogic"
version = "0.9.1a10"
version = "0.9.1a11"
description = "Collection of operational Machine Learning models and tools."
authors = ["Numalogic Developers"]
packages = [{ include = "numalogic" }]
Expand Down
16 changes: 10 additions & 6 deletions tests/connectors/test_druid.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ def group_by(*_, **__):
"env": "prod",
"status": 200,
"http_status": "2xx",
"gw_gen": "T",
"count": 20,
},
"timestamp": "2023-09-06T07:50:00.000Z",
Expand All @@ -120,6 +121,7 @@ def group_by(*_, **__):
"env": "prod",
"status": 500,
"http_status": "5xx",
"gw_gen": "T",
"count": 10,
},
"timestamp": "2023-09-06T07:53:00.000Z",
Expand Down Expand Up @@ -293,18 +295,20 @@ def test_chunked_fetch_err(get_args):
def test_multi_column_pivot(setup, mock_group_by_multi_column):
start, end, fetcher = setup
_out = fetcher.fetch(
filter_keys=["service_alias"],
filter_values=["identity.authn.signin"],
dimensions=["http_status", "status"],
filter_keys=["authtype", "slane"],
filter_values=["browserUserAgent", "sw1"],
dimensions=["http_status", "status", "gw_gen"],
datasource="ip-apigw-telegraf-druid",
aggregations={"count": aggregators.doublesum("count")},
group_by=["timestamp", "http_status", "status"],
group_by=["timestamp", "http_status", "status", "gw_gen"],
hours=2,
pivot=Pivot(
index="timestamp",
columns=["http_status", "status"],
columns=["http_status", "status", "gw_gen"],
value=["count"],
agg=["sum", "sum", "count"]
),

)
print(_out)
assert (2, 5) == _out.shape
assert (2, 6) == _out.shape

0 comments on commit 984650b

Please sign in to comment.