Skip to content

Commit

Permalink
Numalogic metrics (#393)
Browse files Browse the repository at this point in the history
Update metrics for udfs

---------

Signed-off-by: Kushal Batra <[email protected]>
  • Loading branch information
s0nicboOm committed Jun 17, 2024
1 parent 9d4ea32 commit 7fa698c
Show file tree
Hide file tree
Showing 31 changed files with 735 additions and 700 deletions.
3 changes: 3 additions & 0 deletions numalogic/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
BASE_CONF_DIR = os.path.join(BASE_DIR, "config")

DEFAULT_BASE_CONF_PATH = os.path.join(BASE_CONF_DIR, "default-configs", "config.yaml")
DEFAULT_METRICS_CONF_PATH = os.path.join(
BASE_CONF_DIR, "default-configs", "numalogic_udf_metrics.yaml"
)
DEFAULT_APP_CONF_PATH = os.path.join(BASE_CONF_DIR, "app-configs", "config.yaml")
DEFAULT_METRICS_PORT = 8490
NUMALOGIC_METRICS = "numalogic_metrics"
29 changes: 0 additions & 29 deletions numalogic/monitoring/__init__.py

This file was deleted.

120 changes: 0 additions & 120 deletions numalogic/monitoring/metrics.py

This file was deleted.

6 changes: 6 additions & 0 deletions numalogic/tools/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ class RedisRegistryError(Exception):
pass


class MetricConfigError(Exception):
"""Raised when a numalogic udf metric config is not valid."""

pass


class DynamoDBRegistryError(Exception):
"""Base class for all exceptions raised by the DynamoDBRegistry class."""

Expand Down
6 changes: 6 additions & 0 deletions numalogic/transforms/_stateless.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ def _validate_args(
if len(lower) != len(upper):
raise ValueError("lower and upper should have the same length.")
lower, upper = np.asarray(lower, dtype=np.float32), np.asarray(upper, dtype=np.float32)
elif lower is not None and upper is not None:
if type(lower) is not type(upper):
if isinstance(lower, Sequence):
upper = np.asarray(upper, dtype=np.float32)
else:
lower = np.asarray(lower, dtype=np.float32)
if upper is not None and lower is not None and np.any(lower > upper):
raise ValueError("lower value should be less than or equal to upper value")
return lower, upper
Expand Down
8 changes: 8 additions & 0 deletions numalogic/udfs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from numalogic._constants import BASE_DIR
from numalogic.udfs._base import NumalogicUDF
from numalogic.udfs._config import StreamConf, PipelineConf, MLPipelineConf, load_pipeline_conf
from numalogic.udfs._metrics_utility import MetricsLoader
from numalogic.udfs.factory import UDFFactory, ServerFactory
from numalogic.udfs.payloadtx import PayloadTransformer
from numalogic.udfs.inference import InferenceUDF
Expand All @@ -23,6 +24,11 @@ def set_logger() -> None:
logging.getLogger("root").setLevel(logging.DEBUG)


def set_metrics(conf_file: str) -> None:
"""Sets the metrics for the UDFs."""
MetricsLoader().load_metrics(config_file_path=conf_file)


__all__ = [
"NumalogicUDF",
"PayloadTransformer",
Expand All @@ -40,4 +46,6 @@ def set_logger() -> None:
"load_pipeline_conf",
"ServerFactory",
"set_logger",
"set_metrics",
"MetricsLoader",
]
19 changes: 14 additions & 5 deletions numalogic/udfs/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,24 @@
import sys
from typing import Final

from numalogic._constants import DEFAULT_BASE_CONF_PATH, DEFAULT_APP_CONF_PATH, DEFAULT_METRICS_PORT
from numaprom.monitoring import start_metrics_server

from numalogic._constants import (
DEFAULT_BASE_CONF_PATH,
DEFAULT_APP_CONF_PATH,
DEFAULT_METRICS_PORT,
DEFAULT_METRICS_CONF_PATH,
)
from numalogic.connectors.redis import get_redis_client_from_conf
from numalogic.monitoring import start_metrics_server
from numalogic.udfs import load_pipeline_conf, UDFFactory, ServerFactory, set_logger
from numalogic.udfs import load_pipeline_conf, UDFFactory, ServerFactory, set_logger, set_metrics

LOGGER = logging.getLogger(__name__)

BASE_CONF_FILE_PATH: Final[str] = os.getenv("BASE_CONF_PATH", default=DEFAULT_BASE_CONF_PATH)
APP_CONF_FILE_PATH: Final[str] = os.getenv("APP_CONF_PATH", default=DEFAULT_APP_CONF_PATH)
METRICS_PORT: Final[int] = int(os.getenv("METRICS_PORT", default=DEFAULT_METRICS_PORT))
METRICS_ENABLED: Final[bool] = bool(int(os.getenv("METRICS_ENABLED", default="1")))
METRICS_CONF_PATH: Final[str] = os.getenv("METRICS_CONF_PATH", default=DEFAULT_METRICS_CONF_PATH)


def init_server(step: str, server_type: str):
Expand Down Expand Up @@ -43,8 +51,9 @@ def start_server() -> None:

LOGGER.info("Running %s on %s server", step, server_type)

# Start the metrics server at port METRICS_PORT = 8490
start_metrics_server(METRICS_PORT)
if METRICS_ENABLED:
set_metrics(conf_file=METRICS_CONF_PATH)
start_metrics_server(METRICS_PORT)

server = init_server(step, server_type)
server.start()
Expand Down
Loading

0 comments on commit 7fa698c

Please sign in to comment.