Skip to content

Commit

Permalink
feat!: support mv query in fetch() method
Browse files Browse the repository at this point in the history
Signed-off-by: Avik Basu <[email protected]>
  • Loading branch information
ab93 committed Nov 17, 2023
1 parent c967f20 commit c783ba8
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 12 deletions.
34 changes: 22 additions & 12 deletions numalogic/connectors/prometheus.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ def build_query(metric: str, filters: dict[str, str]) -> str:

def fetch(
self,
metric_name: str,
start: datetime,
end: Optional[datetime] = None,
metric_name: str = "",
filters: Optional[dict[str, str]] = None,
return_labels: Optional[list[str]] = None,
aggregate: bool = True,
Expand All @@ -70,9 +70,9 @@ def fetch(
Args:
-------
metric_name: Prometheus metric name
start: Start time
end: End time
metric_name: Prometheus metric name (default="")
filters: Prometheus label filters
return_labels: Prometheus label names as columns to return
aggregate: Whether to aggregate the data
Expand All @@ -96,22 +96,32 @@ def fetch(
results = self.query_range(query, start_ts, end_ts)

df = pd.json_normalize(results)
return_labels = [f"metric.{label}" for label in return_labels or []]
if df.empty:
LOGGER.warning("Query returned no results")
return df

df = self._consolidate_df(df, metric_name, return_labels)
if aggregate and return_labels:
df = self._agg_df(df, [metric_name])
extra_labels = [f"metric.{label}" for label in return_labels or []]
if metric_name:
metric_names = [metric_name]
else:
metric_names = self._extract_metric_names(df)

try:
df.set_index("timestamp", inplace=True)
except KeyError:
pass
df.sort_values(by="timestamp", inplace=True)
df.set_index(_METRIC_KEY, inplace=True)

return df
dfs = []
for metric_name in metric_names:
_df = self._consolidate_df(df.loc[[metric_name]], metric_name, extra_labels)
dfs.append(_df.set_index(["timestamp", *extra_labels]))

df = dfs[0].join(dfs[1:]).reset_index().set_index("timestamp")

if return_labels:
df.rename(columns=dict(zip(extra_labels, return_labels)), inplace=True)

if aggregate:
df = self._agg_df(df, metric_names)

return df.sort_values(by=["timestamp"])

def raw_fetch(
self,
Expand Down
36 changes: 36 additions & 0 deletions tests/connectors/test_prometheus.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,42 @@ def test_fetch_return_labels(self):
self.assertEqual(df.index.name, "timestamp")
self.assertListEqual([10.5, 12.5], df[metric].to_list())

@patch.object(PrometheusFetcher, "_api_query_range", Mock(return_value=_mock_mv()))
def test_fetch_mv_01(self):
df = self.fetcher.fetch(
filters={"namespace": "odl-odlgraphql-usw2-e2e", "numalogic": "true"},
start=datetime.now() - timedelta(minutes=3),
aggregate=True,
)
self.assertEqual(df.shape, (7, 3))
self.assertListEqual(
df.columns.to_list(),
[
"namespace_app_rollouts_cpu_utilization",
"namespace_app_rollouts_http_request_error_rate",
"namespace_app_rollouts_memory_utilization",
],
)
self.assertEqual(df.index.name, "timestamp")

@patch.object(PrometheusFetcher, "_api_query_range", Mock(return_value=_mock_mv()))
def test_fetch_mv_02(self):
df = self.fetcher.fetch(
filters={"namespace": "odl-odlgraphql-usw2-e2e", "numalogic": "true"},
start=datetime.now() - timedelta(minutes=3),
aggregate=False,
)
self.assertEqual(df.shape, (10, 3))
self.assertListEqual(
df.columns.to_list(),
[
"namespace_app_rollouts_cpu_utilization",
"namespace_app_rollouts_http_request_error_rate",
"namespace_app_rollouts_memory_utilization",
],
)
self.assertEqual(df.index.name, "timestamp")

@patch.object(PrometheusFetcher, "_api_query_range", Mock(return_value=[]))
def test_fetch_no_data(self):
df = self.fetcher.fetch(
Expand Down

0 comments on commit c783ba8

Please sign in to comment.