Skip to content

Commit

Permalink
"Source" tag for metrics (#338)
Browse files Browse the repository at this point in the history
1. Add source tags for the metrics
2. Fix some logs

---------

Signed-off-by: s0nicboOm <[email protected]>
  • Loading branch information
s0nicboOm committed Jan 17, 2024
1 parent b292553 commit 80ef431
Show file tree
Hide file tree
Showing 16 changed files with 183 additions and 68 deletions.
1 change: 1 addition & 0 deletions numalogic/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@
DEFAULT_BASE_CONF_PATH = os.path.join(BASE_CONF_DIR, "default-configs", "config.yaml")
DEFAULT_APP_CONF_PATH = os.path.join(BASE_CONF_DIR, "app-configs", "config.yaml")
DEFAULT_METRICS_PORT = 8490
NUMALOGIC_METRICS = "numalogic_metrics"
38 changes: 32 additions & 6 deletions numalogic/monitoring/metrics.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import logging
from typing import Optional
from collections.abc import Sequence

from prometheus_client import Counter, Info, Summary
from prometheus_client import Counter, Info, Summary, Gauge

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -33,7 +34,7 @@ def __init__(self, name: str, description: str, label_keys: list[str]) -> None:
super().__init__(name, description, label_keys)
self.counter = Counter(name, description, label_keys)

def increment_counter(self, *label_values, amount: int = 1) -> None:
def increment_counter(self, *label_values: Sequence[str], amount: int = 1) -> None:
"""
Utility function is used to increment the counter.
Expand All @@ -42,7 +43,7 @@ def increment_counter(self, *label_values, amount: int = 1) -> None:
amount: Amount to increment the counter by
"""
if len(label_values) != len(self.label_keys):
raise ValueError(f"Labels mismatch with the definition: {self.label_keys}")
raise ValueError(f"Labels length mismatch with the definition: {self.label_keys}")
self.counter.labels(*label_values).inc(amount=amount)


Expand All @@ -57,7 +58,7 @@ def __init__(self, name: str, description: str, label_keys: Optional[list[str]])

def add_info(
self,
*label_values,
*label_values: Sequence[str],
data: dict,
) -> None:
"""
Expand All @@ -68,7 +69,7 @@ def add_info(
data: Dictionary of data
"""
if len(label_values) != len(self.label_keys):
raise ValueError(f"Labels mismatch with the definition: {self.label_keys}")
raise ValueError(f"Labels length mismatch with the definition: {self.label_keys}")
self.info.labels(*label_values).info(data)


Expand All @@ -90,5 +91,30 @@ def add_observation(self, *label_values, value: float) -> None:
value: Value to be updated
"""
if len(label_values) != len(self.label_keys):
raise ValueError(f"Labels mismatch with the definition: {self.label_keys}")
raise ValueError(f"Labels length mismatch with the definition: {self.label_keys}")
self.summary.labels(*label_values).observe(amount=value)


class PromGaugeMetric(_BaseMetric):
"""Class is used to create an info object and increment it."""

__slots__ = "info"

def __init__(self, name: str, description: str, label_keys: Optional[list[str]]) -> None:
super().__init__(name, description, label_keys)
self.info = Gauge(name, description, label_keys)

def set_gauge(
self,
*label_values: Sequence[str],
data: float,
) -> None:
"""
Utility function is used to increment the info.
Args:
*label_values: List of labels
data: float data.
"""
if len(label_values) != len(self.label_keys):
raise ValueError(f"Labels mismatch with the definition: {self.label_keys}")
self.info.labels(*label_values).set(data)
2 changes: 1 addition & 1 deletion numalogic/tools/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
META_T = TypeVar("META_T", bound=dict[str, Union[str, float, int, list, dict]])
META_VT = TypeVar("META_VT", str, int, float, list, dict)
EXTRA_T = TypeVar("EXTRA_T", bound=dict[str, Union[str, list, dict]])
KEYS = TypeVar("KEYS", bound=Sequence[str], covariant=True)
KEYS = TypeVar("KEYS", bound=Sequence[str], covariant=False)


class KeyedArtifact(NamedTuple):
Expand Down
39 changes: 20 additions & 19 deletions numalogic/udfs/README.md
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
| Metric Name | Type | Labels | Description |
|:-------------------------:|:---------:|:-----------------------------------------------------:|:-----------------------------------------------:|
| MSG_IN_COUNTER | Counter | vertex, composite_key, config_id, pipeline_id | Count msgs flowing in |
| MSG_PROCESSED_COUNTER | Counter | vertex, composite_key, config_id, pipeline_id | Count msgs processed |
| SOURCE_COUNTER | Counter | source, composite_key, config_id, pipeline_id | Count artifact source (registry or cache) calls |
| INSUFFICIENT_DATA_COUNTER | Counter | composite_key, config_id, pipeline_id | Count insufficient data while Training |
| MODEL_STATUS_COUNTER | Counter | status, vertex, composite_key, config_id, pipeline_id | Count status of the model |
| DATASHAPE_ERROR_COUNTER | Counter | composite_key, config_id, pipeline_id | Count datashape errors in preprocess |
| MSG_DROPPED_COUNTER | Counter | vertex, composite_key, config_id, pipeline_id | Count dropped msgs |
| REDIS_ERROR_COUNTER | Counter | vertex, composite_key, config_id, pipeline_id | Count redis errors |
| EXCEPTION_COUNTER | Counter | vertex, composite_key, config_id, pipeline_id | Count exceptions |
| RUNTIME_ERROR_COUNTER | Counter | vertex, composite_key, config_id, pipeline_id | Count runtime errors |
| FETCH_EXCEPTION_COUNTER | Counter | composite_key, config_id, pipeline_id | Count exceptions during train data fetch calls |
| DATAFRAME_SHAPE_SUMMARY | Summary | composite_key, config_id, pipeline_id | len of dataframe for training |
| NAN_SUMMARY | Summary | composite_key, config_id, pipeline_id | Count nan's in train data |
| INF_SUMMARY | Summary | composite_key, config_id, pipeline_id | Count inf's in train data |
| FETCH_TIME_SUMMARY | Summary | composite_key, config_id, pipeline_id | Train Data Fetch time |
| MODEL_INFO | Info | composite_key, config_id, pipeline_id | Model info |
| UDF_TIME | Histogram | composite_key, config_id, pipeline_id | Histogram for udf processing time |
| Metric Name | Type | Labels | Description |
|:-------------------------:|:---------:|:------------------------------------------------------------------------------:|:----------------------------------------------------:|
| MSG_IN_COUNTER | Counter | vertex, composite_key, config_id, pipeline_id | Count msgs flowing in |
| MSG_PROCESSED_COUNTER | Counter | vertex, composite_key, config_id, pipeline_id | Count msgs processed |
| SOURCE_COUNTER | Counter | source, composite_key, config_id, pipeline_id | Count artifact source (registry or cache) calls |
| INSUFFICIENT_DATA_COUNTER | Counter | composite_key, config_id, pipeline_id | Count insufficient data while Training |
| MODEL_STATUS_COUNTER | Counter | status, vertex, composite_key, config_id, pipeline_id | Count status of the model |
| DATASHAPE_ERROR_COUNTER | Counter | composite_key, config_id, pipeline_id | Count datashape errors in preprocess |
| MSG_DROPPED_COUNTER | Counter | vertex, composite_key, config_id, pipeline_id | Count dropped msgs |
| REDIS_ERROR_COUNTER | Counter | vertex, composite_key, config_id, pipeline_id | Count redis errors |
| EXCEPTION_COUNTER | Counter | vertex, composite_key, config_id, pipeline_id | Count exceptions |
| RUNTIME_ERROR_COUNTER | Counter | vertex, composite_key, config_id, pipeline_id | Count runtime errors |
| FETCH_EXCEPTION_COUNTER | Counter | composite_key, config_id, pipeline_id | Count exceptions during train data fetch calls |
| DATAFRAME_SHAPE_SUMMARY | Summary | composite_key, config_id, pipeline_id | len of dataframe for training |
| NAN_SUMMARY | Summary | composite_key, config_id, pipeline_id | Count nan's in train data |
| INF_SUMMARY | Summary | composite_key, config_id, pipeline_id | Count inf's in train data |
| FETCH_TIME_SUMMARY | Summary | composite_key, config_id, pipeline_id | Train Data Fetch time |
| MODEL_INFO | Info | composite_key, config_id, pipeline_id | Model info |
| UDF_TIME | Histogram | composite_key, config_id, pipeline_id | Histogram for udf processing time |
| RECORDED_DATA_GAUGE | Gauge | "source", "vertex", "composite_key", "config_id", "pipeline_id", "metric_name" | Gauge metric to observe the mean value of the window |
56 changes: 39 additions & 17 deletions numalogic/udfs/_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,97 +2,108 @@

from prometheus_client import Histogram

from numalogic.monitoring.metrics import PromCounterMetric, PromInfoMetric, PromSummaryMetric
from numalogic.monitoring.metrics import (
PromCounterMetric,
PromInfoMetric,
PromSummaryMetric,
PromGaugeMetric,
)

# Define metrics

# COUNTERS
SOURCE_COUNTER = PromCounterMetric(
"numalogic_artifact_source_counter",
"Count artifact source calls",
["source", "vertex", "composite_key", "config_id", "pipeline_id"],
["artifact_source", "source", "vertex", "composite_key", "config_id", "pipeline_id"],
)

INSUFFICIENT_DATA_COUNTER = PromCounterMetric(
"numalogic_insufficient_data_counter",
"Count insufficient data while Training",
["composite_key", "config_id", "pipeline_id"],
["source", "composite_key", "config_id", "pipeline_id"],
)
MODEL_STATUS_COUNTER = PromCounterMetric(
"numalogic_new_model_counter",
"Count status of the model",
["status", "vertex", "composite_key", "config_id", "pipeline_id"],
["source", "status", "vertex", "composite_key", "config_id", "pipeline_id"],
)

DATASHAPE_ERROR_COUNTER = PromCounterMetric(
"numalogic_datashape_error_counter",
"Count datashape errors in preprocess",
["composite_key", "config_id", "pipeline_id"],
["source", "composite_key", "config_id", "pipeline_id"],
)
MSG_DROPPED_COUNTER = PromCounterMetric(
"numalogic_msg_dropped_counter",
"Count dropped msgs",
["vertex", "composite_key", "config_id", "pipeline_id"],
["source", "vertex", "composite_key", "config_id", "pipeline_id"],
)

REDIS_ERROR_COUNTER = PromCounterMetric(
"numalogic_redis_error_counter",
"Count redis errors",
["vertex", "composite_key", "config_id", "pipeline_id"],
["source", "vertex", "composite_key", "config_id", "pipeline_id"],
)
EXCEPTION_COUNTER = PromCounterMetric(
"numalogic_exception_counter",
"Count exceptions",
["vertex", "composite_key", "config_id", "pipeline_id"],
["source", "vertex", "composite_key", "config_id", "pipeline_id"],
)
RUNTIME_ERROR_COUNTER = PromCounterMetric(
"numalogic_runtime_error_counter",
"Count runtime errors",
["vertex", "composite_key", "config_id", "pipeline_id"],
["source", "vertex", "composite_key", "config_id", "pipeline_id"],
)

FETCH_EXCEPTION_COUNTER = PromCounterMetric(
"numalogic_fetch_exception_counter",
"count exceptions during fetch call",
["composite_key", "config_id", "pipeline_id"],
["source", "composite_key", "config_id", "pipeline_id"],
)

MSG_IN_COUNTER = PromCounterMetric(
"numalogic_msg_in_counter",
"Count msgs flowing in",
["vertex", "composite_key", "config_id", "pipeline_id"],
["source", "vertex", "composite_key", "config_id", "pipeline_id"],
)
MSG_PROCESSED_COUNTER = PromCounterMetric(
"numalogic_msg_processed_counter",
"Count msgs processed",
["vertex", "composite_key", "config_id", "pipeline_id"],
["source", "vertex", "composite_key", "config_id", "pipeline_id"],
)

# SUMMARY
DATAFRAME_SHAPE_SUMMARY = PromSummaryMetric(
"numalogic_dataframe_shape_summary",
"len of dataframe for training",
["composite_key", "config_id", "pipeline_id"],
["source", "composite_key", "config_id", "pipeline_id"],
)
NAN_SUMMARY = PromSummaryMetric(
"numalogic_nan_summary",
"Count nan's in train data",
["composite_key", "config_id", "pipeline_id"],
["source", "composite_key", "config_id", "pipeline_id"],
)
INF_SUMMARY = PromSummaryMetric(
"numalogic_inf_summary",
"Count inf's in train data",
["composite_key", "config_id", "pipeline_id"],
["source", "composite_key", "config_id", "pipeline_id"],
)
FETCH_TIME_SUMMARY = PromSummaryMetric(
"numalogic_fetch_time_summary",
"Train data fetch time",
["composite_key", "config_id", "pipeline_id"],
["source", "composite_key", "config_id", "pipeline_id"],
)
# Gauge Metrics
RECORDED_DATA_GAUGE = PromGaugeMetric(
"numalogic_recorded_value_gauge",
"Gauge metric to observe the mean value of the window",
["source", "vertex", "composite_key", "config_id", "pipeline_id", "metric_name"],
)

# Info
MODEL_INFO = PromInfoMetric(
"numalogic_model_info", "Model info", ["composite_key", "config_id", "pipeline_id"]
"numalogic_model_info", "Model info", ["source", "composite_key", "config_id", "pipeline_id"]
)

# HISTOGRAM
Expand Down Expand Up @@ -158,3 +169,14 @@ def _add_summary(summary: PromSummaryMetric, labels: Sequence[str], data: float)
data: Summary value
"""
summary.add_observation(*labels, value=data)


def _set_gauge(gauge: PromGaugeMetric, labels: Sequence[str], data: float) -> None:
"""
Utility function is used to add the info.
Args:
gauge: Gauge object
labels: Sequence of labels
data: data.
"""
gauge.set_gauge(*labels, data=data)
9 changes: 9 additions & 0 deletions numalogic/udfs/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import numpy.typing as npt
import orjson

from numalogic._constants import NUMALOGIC_METRICS

Vector = list[float]
Matrix = Union[Vector, list[Vector], npt.ArrayLike]

Expand Down Expand Up @@ -64,6 +66,12 @@ class StreamPayload(_BasePayload):
artifact_versions: dict[str, dict] = field(default_factory=dict)
metadata: dict[str, Any] = field(default_factory=dict)

def __post_init__(self):
try:
_ = self.metadata["numalogic_opex_tags"]["source"]
except KeyError:
self.metadata["numalogic_opex_tags"] = {"source": NUMALOGIC_METRICS}

@property
def start_ts(self) -> int:
return int(self.timestamps[0])
Expand All @@ -88,6 +96,7 @@ def __str__(self) -> str:
f'"StreamPayload(header={self.header}, status={self.status}, '
f'composite_keys={self.composite_keys}, data={list(self.data)})"'
f"artifact_versions={self.artifact_versions}"
f"metadata={self.metadata}"
)

def __repr__(self) -> str:
Expand Down
4 changes: 3 additions & 1 deletion numalogic/udfs/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
_increment_counter,
)
from numalogic.udfs.entities import StreamPayload, Header, Status
from numalogic.udfs.tools import _load_artifact
from numalogic.udfs.tools import _load_artifact, _update_info_metric

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -104,6 +104,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
# Construct payload object
payload = StreamPayload(**orjson.loads(datum.value))
_metric_label_values = (
payload.metadata["numalogic_opex_tags"]["source"],
self._vtx,
":".join(payload.composite_keys),
payload.config_id,
Expand Down Expand Up @@ -158,6 +159,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
# Perform inference
try:
x_inferred = self.compute(artifact_data.artifact, payload.get_data())
_update_info_metric(x_inferred, payload.metrics, _metric_label_values)
except RuntimeError:
_increment_counter(counter=RUNTIME_ERROR_COUNTER, labels=_metric_label_values)
_LOGGER.exception(
Expand Down
1 change: 1 addition & 0 deletions numalogic/udfs/postprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
# Construct payload object
payload = StreamPayload(**orjson.loads(datum.value))
_metric_label_values = (
payload.composite_keys,
self._vtx,
":".join(payload.composite_keys),
payload.config_id,
Expand Down
15 changes: 12 additions & 3 deletions numalogic/udfs/preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from pynumaflow.mapper import Datum, Messages, Message
from sklearn.pipeline import make_pipeline

from numalogic._constants import NUMALOGIC_METRICS
from numalogic.config import PreprocessFactory, RegistryFactory
from numalogic.udfs._metrics import (
DATASHAPE_ERROR_COUNTER,
Expand All @@ -26,7 +27,7 @@
from numalogic.udfs import NumalogicUDF
from numalogic.udfs._config import PipelineConf
from numalogic.udfs.entities import Status, Header
from numalogic.udfs.tools import make_stream_payload, get_df, _load_artifact
from numalogic.udfs.tools import make_stream_payload, get_df, _load_artifact, _update_info_metric

# TODO: move to config
LOCAL_CACHE_TTL = int(os.getenv("LOCAL_CACHE_TTL", "3600"))
Expand Down Expand Up @@ -100,8 +101,15 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
_conf = _stream_conf.ml_pipelines[data_payload["pipeline_id"]]
raw_df, timestamps = get_df(data_payload=data_payload, stream_conf=_stream_conf)

# TODO: Add pipeline id to the ml metrics
source = NUMALOGIC_METRICS
if (
"numalogic_opex_tags" in data_payload["metadata"]
and "source" in data_payload["metadata"]["numalogic_opex_tags"]
):
source = data_payload["metadata"]["numalogic_opex_tags"]["source"]

_metric_label_values = (
source,
self._vtx,
":".join(keys),
data_payload["config_id"],
Expand Down Expand Up @@ -166,6 +174,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
payload = replace(payload, status=Status.ARTIFACT_FOUND)
try:
x_scaled = self.compute(model=preproc_clf, input_=payload.get_data())
_update_info_metric(x_scaled, payload.metrics, _metric_label_values)
payload = replace(
payload,
data=x_scaled,
Expand All @@ -185,7 +194,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
labels=_metric_label_values,
)
_LOGGER.exception(
"%s - Runtime inference error! Keys: %s, Metric: %s",
"%s - Runtime preprocess error! Keys: %s, Metric: %s",
payload.uuid,
payload.composite_keys,
payload.metrics,
Expand Down
Loading

0 comments on commit 80ef431

Please sign in to comment.