Skip to content

Commit

Permalink
Metrics (#322)
Browse files Browse the repository at this point in the history
MAJOR:
Introduce metrics to the udfs.

MINOR:
1. Fix logs for insufficient data
2. Add logs in the scenario preproc model is absent and we forward the
message to trainer directly.

---------

Signed-off-by: s0nicboOm <[email protected]>
  • Loading branch information
s0nicboOm committed Dec 6, 2023
1 parent dfc383a commit 28fa28f
Show file tree
Hide file tree
Showing 21 changed files with 2,044 additions and 1,333 deletions.
1 change: 1 addition & 0 deletions numalogic/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@

DEFAULT_BASE_CONF_PATH = os.path.join(BASE_CONF_DIR, "default-configs", "config.yaml")
DEFAULT_APP_CONF_PATH = os.path.join(BASE_CONF_DIR, "app-configs", "config.yaml")
DEFAULT_METRICS_PORT = 8490
29 changes: 29 additions & 0 deletions numalogic/monitoring/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copyright 2022 The Numaproj Authors.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http:https://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import logging

from prometheus_client import start_http_server

_LOGGER = logging.getLogger(__name__)
_LOGGER.addHandler(logging.NullHandler())


def start_metrics_server(port: int) -> None:
"""
Starts the Prometheus monitoring server.
Args:
port: Port number
"""
_LOGGER.info("Starting Prometheus monitoring server on port: %s", port)
start_http_server(port)
94 changes: 94 additions & 0 deletions numalogic/monitoring/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import logging
from typing import Optional

from prometheus_client import Counter, Info, Summary

_LOGGER = logging.getLogger(__name__)


class _BaseMetric:
__slots__ = ("name", "description", "label_keys")

"""
Base class for metrics.
Args:
name: Name of the metric
description: Description of the metric
label_keys: List of labels
"""

def __init__(self, name: str, description: str, label_keys: Optional[list[str]]) -> None:
self.name = name
self.description = description
self.label_keys = label_keys


class PromCounterMetric(_BaseMetric):
"""Class is used to create a counter object and increment it."""

__slots__ = "counter"

def __init__(self, name: str, description: str, label_keys: list[str]) -> None:
super().__init__(name, description, label_keys)
self.counter = Counter(name, description, label_keys)

def increment_counter(self, *label_values, amount: int = 1) -> None:
"""
Utility function is used to increment the counter.
Args:
*label_values: List of labels
amount: Amount to increment the counter by
"""
if len(label_values) != len(self.label_keys):
raise ValueError(f"Labels mismatch with the definition: {self.label_keys}")
self.counter.labels(*label_values).inc(amount=amount)


class PromInfoMetric(_BaseMetric):
"""Class is used to create an info object and increment it."""

__slots__ = "info"

def __init__(self, name: str, description: str, label_keys: Optional[list[str]]) -> None:
super().__init__(name, description, label_keys)
self.info = Info(name, description, label_keys)

def add_info(
self,
*label_values,
data: dict,
) -> None:
"""
Utility function is used to increment the info.
Args:
*label_values: List of labels
data: Dictionary of data
"""
if len(label_values) != len(self.label_keys):
raise ValueError(f"Labels mismatch with the definition: {self.label_keys}")
self.info.labels(*label_values).info(data)


class PromSummaryMetric(_BaseMetric):
"""Class is used to create a histogram object and increment it."""

__slots__ = "summary"

def __init__(self, name: str, description: str, label_keys: Optional[list[str]]) -> None:
super().__init__(name, description, label_keys)
self.summary = Summary(name, description, label_keys)

def add_observation(self, *label_values, value: float) -> None:
"""
Utility function is to update the summary value with the given value.
Args:
*label_values: List of labels
value: Value to be updated
"""
if len(label_values) != len(self.label_keys):
raise ValueError(f"Labels mismatch with the definition: {self.label_keys}")
self.summary.labels(*label_values).observe(amount=value)
19 changes: 19 additions & 0 deletions numalogic/udfs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
| Metric Name | Type | Labels | Description |
|:-------------------------:|:---------:|:----------------------------------------:|:-----------------------------------------------:|
| MSG_IN_COUNTER | Counter | vertex, composite_key, config_id | Count msgs flowing in |
| MSG_PROCESSED_COUNTER | Counter | vertex, composite_key, config_id | Count msgs processed |
| SOURCE_COUNTER | Counter | source, composite_key, config_id | Count artifact source (registry or cache) calls |
| INSUFFICIENT_DATA_COUNTER | Counter | composite_key, config_id | Count insufficient data while Training |
| MODEL_STATUS_COUNTER | Counter | status, vertex, composite_key, config_id | Count status of the model |
| DATASHAPE_ERROR_COUNTER | Counter | composite_key, config_id | Count datashape errors in preprocess |
| MSG_DROPPED_COUNTER | Counter | vertex, composite_key, config_id | Count dropped msgs |
| REDIS_ERROR_COUNTER | Counter | vertex, composite_key, config_id | Count redis errors |
| EXCEPTION_COUNTER | Counter | vertex, composite_key, config_id | Count exceptions |
| RUNTIME_ERROR_COUNTER | Counter | vertex, composite_key, config_id | Count runtime errors |
| FETCH_EXCEPTION_COUNTER | Counter | composite_key, config_id | Count exceptions during train data fetch calls |
| DATAFRAME_SHAPE_SUMMARY | Summary | composite_key, config_id | len of dataframe for training |
| NAN_SUMMARY | Summary | composite_key, config_id | Count nan's in train data |
| INF_SUMMARY | Summary | composite_key, config_id | Count inf's in train data |
| FETCH_TIME_SUMMARY | Summary | composite_key, config_id | Train Data Fetch time |
| MODEL_INFO | Info | composite_key, config_id | Model info |
| UDF_TIME | Histogram | composite_key, config_id | Histogram for udf processing time |
1 change: 1 addition & 0 deletions numalogic/udfs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from logging import config as logconf
import os


from numalogic._constants import BASE_DIR
from numalogic.udfs._base import NumalogicUDF
from numalogic.udfs._config import StreamConf, PipelineConf, load_pipeline_conf
Expand Down
7 changes: 6 additions & 1 deletion numalogic/udfs/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@
import sys
from typing import Final

from numalogic._constants import DEFAULT_BASE_CONF_PATH, DEFAULT_APP_CONF_PATH
from numalogic._constants import DEFAULT_BASE_CONF_PATH, DEFAULT_APP_CONF_PATH, DEFAULT_METRICS_PORT
from numalogic.connectors.redis import get_redis_client_from_conf
from numalogic.monitoring import start_metrics_server
from numalogic.udfs import load_pipeline_conf, UDFFactory, ServerFactory, set_logger

LOGGER = logging.getLogger(__name__)

BASE_CONF_FILE_PATH: Final[str] = os.getenv("BASE_CONF_PATH", default=DEFAULT_BASE_CONF_PATH)
APP_CONF_FILE_PATH: Final[str] = os.getenv("APP_CONF_PATH", default=DEFAULT_APP_CONF_PATH)
METRICS_PORT: Final[int] = int(os.getenv("METRICS_PORT", default=DEFAULT_METRICS_PORT))


def init_server(step: str, server_type: str):
Expand All @@ -37,6 +39,9 @@ def start_server() -> None:

LOGGER.info("Running %s on %s server", step, server_type)

# Start the metrics server at port METRICS_PORT = 8490
start_metrics_server(METRICS_PORT)

server = init_server(step, server_type)
server.start()

Expand Down
15 changes: 10 additions & 5 deletions numalogic/udfs/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,25 @@ class NumalogicUDF(metaclass=ABCMeta):
Args:
is_async: If True, the UDF is executed in an asynchronous manner.
pl_conf: PipelineConf object
_vtx: Vertex/UDF name
"""

__slots__ = ("is_async", "pl_conf")
__slots__ = ("is_async", "pl_conf", "_vtx")

def __init__(self, is_async: bool = False, pl_conf: Optional[PipelineConf] = None):
def __init__(
self,
is_async: bool = False,
pl_conf: Optional[PipelineConf] = None,
_vtx: Optional[str] = "numalogic-udf",
):
self._vtx = _vtx
self.is_async = is_async
self.pl_conf = pl_conf or PipelineConf()

def __call__(
self, keys: list[str], datum: Datum
) -> Union[Coroutine[None, None, Messages], Messages]:
if self.is_async:
return self.aexec(keys, datum)
return self.exec(keys, datum)
return self.aexec(keys, datum) if self.is_async else self.exec(keys, datum)

# TODO: remove, and have an update config method
def register_conf(self, config_id: str, conf: StreamConf) -> None:
Expand Down
144 changes: 144 additions & 0 deletions numalogic/udfs/_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
from collections.abc import Sequence

from prometheus_client import Histogram

from numalogic.monitoring.metrics import PromCounterMetric, PromInfoMetric, PromSummaryMetric

# Define metrics

# COUNTERS
SOURCE_COUNTER = PromCounterMetric(
"numalogic_artifact_source_counter",
"Count artifact source calls",
["source", "vertex", "composite_key", "config_id"],
)

INSUFFICIENT_DATA_COUNTER = PromCounterMetric(
"numalogic_insufficient_data_counter",
"Count insufficient data while Training",
["composite_key", "config_id"],
)
MODEL_STATUS_COUNTER = PromCounterMetric(
"numalogic_new_model_counter",
"Count status of the model",
["status", "vertex", "composite_key", "config_id"],
)

DATASHAPE_ERROR_COUNTER = PromCounterMetric(
"numalogic_datashape_error_counter",
"Count datashape errors in preprocess",
["composite_key", "config_id"],
)
MSG_DROPPED_COUNTER = PromCounterMetric(
"numalogic_msg_dropped_counter", "Count dropped msgs", ["vertex", "composite_key", "config_id"]
)

REDIS_ERROR_COUNTER = PromCounterMetric(
"numalogic_redis_error_counter", "Count redis errors", ["vertex", "composite_key", "config_id"]
)
EXCEPTION_COUNTER = PromCounterMetric(
"numalogic_exception_counter", "Count exceptions", ["vertex", "composite_key", "config_id"]
)
RUNTIME_ERROR_COUNTER = PromCounterMetric(
"numalogic_runtime_error_counter",
"Count runtime errors",
["vertex", "composite_key", "config_id"],
)

FETCH_EXCEPTION_COUNTER = PromCounterMetric(
"numalogic_fetch_exception_counter",
"count exceptions during fetch call",
["composite_key", "config_id"],
)

MSG_IN_COUNTER = PromCounterMetric(
"numalogic_msg_in_counter", "Count msgs flowing in", ["vertex", "composite_key", "config_id"]
)
MSG_PROCESSED_COUNTER = PromCounterMetric(
"numalogic_msg_processed_counter",
"Count msgs processed",
["vertex", "composite_key", "config_id"],
)

# SUMMARY
DATAFRAME_SHAPE_SUMMARY = PromSummaryMetric(
"numalogic_dataframe_shape_summary",
"len of dataframe for training",
["composite_key", "config_id"],
)
NAN_SUMMARY = PromSummaryMetric(
"numalogic_nan_counter", "Count nan's in train data", ["composite_key", "config_id"]
)
INF_SUMMARY = PromSummaryMetric(
"numalogic_inf_counter", "Count inf's in train data", ["composite_key", "config_id"]
)
FETCH_TIME_SUMMARY = PromSummaryMetric(
"numalogic_fetch_time_summary", "Train data fetch time", ["composite_key", "config_id"]
)

# Info
MODEL_INFO = PromInfoMetric("numalogic_model_info", "Model info", ["composite_key", "config_id"])

# HISTOGRAM
buckets = (
0.001,
0.002,
0.003,
0.004,
0.005,
0.006,
0.007,
0.008,
0.009,
0.01,
0.025,
0.05,
0.075,
0.1,
0.25,
)

UDF_TIME = Histogram(
"numalogic_udf_time_histogram",
"Histogram for udf processing time",
buckets=buckets,
)


# helper functions


def _increment_counter(counter: PromCounterMetric, labels: Sequence[str], amount: int = 1) -> None:
"""
Utility function is used to increment the counter.
Args:
counter: Counter object
labels: Sequence of labels
amount: Amount to increment the counter by
"""
counter.increment_counter(*labels, amount=amount)


def _add_info(info: PromInfoMetric, labels: Sequence[str], data: dict) -> None:
"""
Utility function is used to add the info.
Args:
info: Info object
labels: Sequence of labels
data: Dictionary of data
"""
info.add_info(*labels, data=data)


def _add_summary(summary: PromSummaryMetric, labels: Sequence[str], data: float) -> None:
"""
Utility function is used to add the summary.
Args:
summary: Summary object
labels: Sequence of labels
data: Summary value
"""
summary.add_observation(*labels, value=data)
Loading

0 comments on commit 28fa28f

Please sign in to comment.