Skip to content

Commit

Permalink
Demo branch (#335)
Browse files Browse the repository at this point in the history
Demo branch

---------

Signed-off-by: s0nicboOm <[email protected]>
  • Loading branch information
s0nicboOm committed Dec 13, 2023
1 parent 28fa28f commit 19ddc8e
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 31 deletions.
2 changes: 1 addition & 1 deletion numalogic/backtest/_prom.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ def generate_scores(

for idx, arr in enumerate(ds):
x_recon[idx] = nn_udf.compute(model=artifacts["model"], input_=arr)
anomaly_scores[idx] = postproc_udf.compute(
_, anomaly_scores[idx] = postproc_udf.compute(
model=artifacts["threshold_clf"],
input_=x_recon[idx],
postproc_clf=postproc_func,
Expand Down
28 changes: 27 additions & 1 deletion numalogic/monitoring/metrics.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
from typing import Optional

from prometheus_client import Counter, Info, Summary
from prometheus_client import Counter, Info, Summary, Gauge

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -92,3 +92,29 @@ def add_observation(self, *label_values, value: float) -> None:
if len(label_values) != len(self.label_keys):
raise ValueError(f"Labels mismatch with the definition: {self.label_keys}")
self.summary.labels(*label_values).observe(amount=value)


class PromGaugeMetric(_BaseMetric):
"""Class is used to create an info object and increment it."""

__slots__ = "info"

def __init__(self, name: str, description: str, label_keys: Optional[list[str]]) -> None:
super().__init__(name, description, label_keys)
self.info = Gauge(name, description, label_keys)

def set_gauge(
self,
*label_values,
data: float,
) -> None:
"""
Utility function is used to increment the info.
Args:
*label_values: List of labels
data: float data
"""
if len(label_values) != len(self.label_keys):
raise ValueError(f"Labels mismatch with the definition: {self.label_keys}")
self.info.labels(*label_values).set(data)
39 changes: 20 additions & 19 deletions numalogic/udfs/README.md
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
| Metric Name | Type | Labels | Description |
|:-------------------------:|:---------:|:----------------------------------------:|:-----------------------------------------------:|
| MSG_IN_COUNTER | Counter | vertex, composite_key, config_id | Count msgs flowing in |
| MSG_PROCESSED_COUNTER | Counter | vertex, composite_key, config_id | Count msgs processed |
| SOURCE_COUNTER | Counter | source, composite_key, config_id | Count artifact source (registry or cache) calls |
| INSUFFICIENT_DATA_COUNTER | Counter | composite_key, config_id | Count insufficient data while Training |
| MODEL_STATUS_COUNTER | Counter | status, vertex, composite_key, config_id | Count status of the model |
| DATASHAPE_ERROR_COUNTER | Counter | composite_key, config_id | Count datashape errors in preprocess |
| MSG_DROPPED_COUNTER | Counter | vertex, composite_key, config_id | Count dropped msgs |
| REDIS_ERROR_COUNTER | Counter | vertex, composite_key, config_id | Count redis errors |
| EXCEPTION_COUNTER | Counter | vertex, composite_key, config_id | Count exceptions |
| RUNTIME_ERROR_COUNTER | Counter | vertex, composite_key, config_id | Count runtime errors |
| FETCH_EXCEPTION_COUNTER | Counter | composite_key, config_id | Count exceptions during train data fetch calls |
| DATAFRAME_SHAPE_SUMMARY | Summary | composite_key, config_id | len of dataframe for training |
| NAN_SUMMARY | Summary | composite_key, config_id | Count nan's in train data |
| INF_SUMMARY | Summary | composite_key, config_id | Count inf's in train data |
| FETCH_TIME_SUMMARY | Summary | composite_key, config_id | Train Data Fetch time |
| MODEL_INFO | Info | composite_key, config_id | Model info |
| UDF_TIME | Histogram | composite_key, config_id | Histogram for udf processing time |
| Metric Name | Type | Labels | Description |
|:-------------------------:|:---------:|:---------------------------------------------:|:-----------------------------------------------:|
| MSG_IN_COUNTER | Counter | vertex, composite_key, config_id | Count msgs flowing in |
| MSG_PROCESSED_COUNTER | Counter | vertex, composite_key, config_id | Count msgs processed |
| SOURCE_COUNTER | Counter | source, composite_key, config_id | Count artifact source (registry or cache) calls |
| INSUFFICIENT_DATA_COUNTER | Counter | composite_key, config_id | Count insufficient data while Training |
| MODEL_STATUS_COUNTER | Counter | status, vertex, composite_key, config_id | Count status of the model |
| DATASHAPE_ERROR_COUNTER | Counter | composite_key, config_id | Count datashape errors in preprocess |
| MSG_DROPPED_COUNTER | Counter | vertex, composite_key, config_id | Count dropped msgs |
| REDIS_ERROR_COUNTER | Counter | vertex, composite_key, config_id | Count redis errors |
| EXCEPTION_COUNTER | Counter | vertex, composite_key, config_id | Count exceptions |
| RUNTIME_ERROR_COUNTER | Counter | vertex, composite_key, config_id | Count runtime errors |
| FETCH_EXCEPTION_COUNTER | Counter | composite_key, config_id | Count exceptions during train data fetch calls |
| DATAFRAME_SHAPE_SUMMARY | Summary | composite_key, config_id | len of dataframe for training |
| NAN_SUMMARY | Summary | composite_key, config_id | Count nan's in train data |
| INF_SUMMARY | Summary | composite_key, config_id | Count inf's in train data |
| FETCH_TIME_SUMMARY | Summary | composite_key, config_id | Train Data Fetch time |
| MODEL_INFO | Info | composite_key, config_id | Model info |
| RECORDED_DATA_INFO | Info | vertex, composite_key, config_id, metric name | Recorded data info |
| UDF_TIME | Histogram | composite_key, config_id | Histogram for udf processing time |
30 changes: 27 additions & 3 deletions numalogic/udfs/_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@

from prometheus_client import Histogram

from numalogic.monitoring.metrics import PromCounterMetric, PromInfoMetric, PromSummaryMetric
from numalogic.monitoring.metrics import (
PromCounterMetric,
PromInfoMetric,
PromSummaryMetric,
PromGaugeMetric,
)

# Define metrics

Expand Down Expand Up @@ -67,15 +72,22 @@
["composite_key", "config_id"],
)
NAN_SUMMARY = PromSummaryMetric(
"numalogic_nan_counter", "Count nan's in train data", ["composite_key", "config_id"]
"numalogic_nan_summary", "Count nan's in train data", ["composite_key", "config_id"]
)
INF_SUMMARY = PromSummaryMetric(
"numalogic_inf_counter", "Count inf's in train data", ["composite_key", "config_id"]
"numalogic_inf_summary", "Count inf's in train data", ["composite_key", "config_id"]
)
FETCH_TIME_SUMMARY = PromSummaryMetric(
"numalogic_fetch_time_summary", "Train data fetch time", ["composite_key", "config_id"]
)

# Gauge Metric
RECORDED_DATA_GAUGE = PromGaugeMetric(
"numalogic_recorded_value_gauge",
"Gauge metric to observe the mean value of the window",
["vertex", "composite_key", "config_id", "metric_name"],
)

# Info
MODEL_INFO = PromInfoMetric("numalogic_model_info", "Model info", ["composite_key", "config_id"])

Expand Down Expand Up @@ -132,6 +144,18 @@ def _add_info(info: PromInfoMetric, labels: Sequence[str], data: dict) -> None:
info.add_info(*labels, data=data)


def _set_gauge(gauge: PromGaugeMetric, labels: Sequence[str], data: float) -> None:
"""
Utility function is used to add the info.
Args:
gauge: Gauge object
labels: Sequence of labels
data: data
"""
gauge.set_gauge(*labels, data=data)


def _add_summary(summary: PromSummaryMetric, labels: Sequence[str], data: float) -> None:
"""
Utility function is used to add the summary.
Expand Down
3 changes: 2 additions & 1 deletion numalogic/udfs/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
UDF_TIME,
_increment_counter,
)
from numalogic.udfs.tools import _load_artifact
from numalogic.udfs.tools import _load_artifact, _update_info_metric

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -152,6 +152,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
# Perform inference
try:
x_inferred = self.compute(artifact_data.artifact, payload.get_data())
_update_info_metric(x_inferred, payload.metrics, _metric_label_values)
except RuntimeError:
_increment_counter(counter=RUNTIME_ERROR_COUNTER, labels=_metric_label_values)
_LOGGER.exception(
Expand Down
9 changes: 5 additions & 4 deletions numalogic/udfs/postprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from numalogic.udfs import NumalogicUDF
from numalogic.udfs._config import PipelineConf
from numalogic.udfs.entities import StreamPayload, Header, Status, TrainerPayload, OutputPayload
from numalogic.udfs.tools import _load_artifact
from numalogic.udfs.tools import _load_artifact, _update_info_metric

# TODO: move to config
LOCAL_CACHE_TTL = int(os.getenv("LOCAL_CACHE_TTL", "3600"))
Expand Down Expand Up @@ -118,11 +118,12 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
# Postprocess payload
if payload.status in (Status.ARTIFACT_FOUND, Status.ARTIFACT_STALE) and thresh_artifact:
try:
anomaly_scores = self.compute(
x_scaled, anomaly_scores = self.compute(
model=thresh_artifact.artifact,
input_=payload.get_data(),
postproc_clf=postproc_clf,
)
_update_info_metric(x_scaled, payload.metrics, _metric_label_values)
except RuntimeError:
_increment_counter(RUNTIME_ERROR_COUNTER, _metric_label_values)
_LOGGER.exception(
Expand Down Expand Up @@ -187,7 +188,7 @@ def _per_feature_score(feat_names: list[str], scores: NDArray[float]) -> dict[st
@classmethod
def compute(
cls, model: artifact_t, input_: NDArray[float], postproc_clf=None, **_
) -> NDArray[float]:
) -> tuple[NDArray[float], NDArray[float]]:
"""
Compute the postprocess function.
Expand All @@ -214,4 +215,4 @@ def compute(
_LOGGER.debug(
"Time taken in postprocess compute: %.4f sec", time.perf_counter() - _start_time
)
return score.reshape(-1)
return y_score, score.reshape(-1)
5 changes: 3 additions & 2 deletions numalogic/udfs/preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from numalogic.udfs import NumalogicUDF
from numalogic.udfs._config import PipelineConf
from numalogic.udfs.entities import Status, Header
from numalogic.udfs.tools import make_stream_payload, get_df, _load_artifact
from numalogic.udfs.tools import make_stream_payload, get_df, _load_artifact, _update_info_metric

# TODO: move to config
LOCAL_CACHE_TTL = int(os.getenv("LOCAL_CACHE_TTL", "3600"))
Expand Down Expand Up @@ -158,6 +158,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
payload = replace(payload, status=Status.ARTIFACT_FOUND)
try:
x_scaled = self.compute(model=preproc_clf, input_=payload.get_data())
_update_info_metric(x_scaled, payload.metrics, _metric_label_values)
payload = replace(
payload,
data=x_scaled,
Expand All @@ -177,7 +178,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
labels=_metric_label_values,
)
_LOGGER.exception(
"%s - Runtime inference error! Keys: %s, Metric: %s",
"%s - Runtime preprocess error! Keys: %s, Metric: %s",
payload.uuid,
payload.composite_keys,
payload.metrics,
Expand Down
22 changes: 22 additions & 0 deletions numalogic/udfs/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from dataclasses import replace
import time
from typing import Optional, NamedTuple
from collections.abc import Sequence

import numpy as np
import pandas as pd
Expand All @@ -20,6 +21,8 @@
EXCEPTION_COUNTER,
_increment_counter,
_add_info,
RECORDED_DATA_GAUGE,
_set_gauge,
)

_LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -91,6 +94,25 @@ def _get_artifact_stats(artifact_data):
}


def _update_info_metric(
data: np.ndarray, metric_name: Sequence[str], labels: Sequence[str]
) -> None:
"""
Utility function is used to update the gauge metric.
Args:
data: data
metric_name: metric name in the payload
labels: labels.
"""
for _data, _metric_name in zip(data.T, metric_name):
_set_gauge(
gauge=RECORDED_DATA_GAUGE,
labels=(*labels, _metric_name),
data=np.mean(_data).squeeze(),
)


def _load_artifact(
skeys: KEYS,
dkeys: KEYS,
Expand Down

0 comments on commit 19ddc8e

Please sign in to comment.