Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
Signed-off-by: Avik Basu <[email protected]>
  • Loading branch information
ab93 committed Feb 28, 2024
1 parent 8f8a532 commit c0225a9
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 127 deletions.
46 changes: 6 additions & 40 deletions numalogic/udfs/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,16 @@
from numalogic.registry import LocalLRUCache, ArtifactData
from numalogic.tools.types import artifact_t, redis_client_t
from numalogic.udfs._base import NumalogicUDF
from numalogic.udfs._config import PipelineConf, StreamConf
from numalogic.udfs._config import PipelineConf
from numalogic.udfs._metrics import (
MODEL_STATUS_COUNTER,
RUNTIME_ERROR_COUNTER,
MSG_PROCESSED_COUNTER,
MSG_IN_COUNTER,
UDF_TIME,
_increment_counter,
)
from numalogic.udfs.entities import StreamPayload, Status, TrainerPayload
from numalogic.udfs.tools import _load_artifact, _update_info_metric
from numalogic.udfs.entities import StreamPayload, Status
from numalogic.udfs.tools import _load_artifact, _update_info_metric, get_trainer_message

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -86,33 +85,6 @@ def compute(cls, model: artifact_t, input_: npt.NDArray[float], **_) -> npt.NDAr
raise RuntimeError("Model forward pass failed!") from err
return np.ascontiguousarray(recon_err).squeeze(0)

@staticmethod
def _get_trainer_message(
keys: list[str],
stream_conf: StreamConf,
payload: StreamPayload,
*metric_values: str,
) -> Message:
ckeys = [_ckey for _, _ckey in zip(stream_conf.composite_keys, payload.composite_keys)]
train_payload = TrainerPayload(
uuid=payload.uuid,
composite_keys=ckeys,
metrics=payload.metrics,
config_id=payload.config_id,
pipeline_id=payload.pipeline_id,
)
if metric_values:
_increment_counter(
counter=MODEL_STATUS_COUNTER,
labels=(payload.status, *metric_values),
)
_LOGGER.info(
"%s - Sending training request for: %s",
train_payload.uuid,
train_payload.composite_keys,
)
return Message(keys=keys, value=train_payload.to_json(), tags=["train"])

@UDF_TIME.time()
def exec(self, keys: list[str], datum: Datum) -> Messages:
"""
Expand Down Expand Up @@ -159,9 +131,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:

# Send training request if artifact loading is not successful
if not artifact_data:
return Messages(
self._get_trainer_message(keys, _stream_conf, payload, *_metric_label_values)
)
return Messages(get_trainer_message(keys, _stream_conf, payload, *_metric_label_values))

# Perform inference
try:
Expand All @@ -175,9 +145,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
payload.composite_keys,
payload.metrics,
)
return Messages(
self._get_trainer_message(keys, _stream_conf, payload, *_metric_label_values)
)
return Messages(get_trainer_message(keys, _stream_conf, payload, *_metric_label_values))

msgs = Messages()
status = (
Expand All @@ -195,9 +163,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
},
)
if status == Status.ARTIFACT_STALE:
msgs.append(
self._get_trainer_message(keys, _stream_conf, payload, *_metric_label_values)
)
msgs.append(get_trainer_message(keys, _stream_conf, payload, *_metric_label_values))

_LOGGER.info(
"%s - Successfully inferred: { CompositeKeys: %s, Metrics: %s }",
Expand Down
87 changes: 36 additions & 51 deletions numalogic/udfs/postprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,16 @@
from numalogic.tools.aggregators import aggregate_window, aggregate_features
from numalogic.tools.types import redis_client_t, artifact_t
from numalogic.udfs import NumalogicUDF
from numalogic.udfs._config import PipelineConf, MLPipelineConf, StreamConf
from numalogic.udfs._config import PipelineConf, MLPipelineConf
from numalogic.udfs._metrics import (
MODEL_STATUS_COUNTER,
RUNTIME_ERROR_COUNTER,
MSG_PROCESSED_COUNTER,
MSG_IN_COUNTER,
UDF_TIME,
_increment_counter,
)
from numalogic.udfs.entities import StreamPayload, Header, Status, TrainerPayload, OutputPayload
from numalogic.udfs.tools import _load_artifact
from numalogic.udfs.entities import StreamPayload, Header, Status, OutputPayload
from numalogic.udfs.tools import _load_artifact, get_trainer_message

# TODO: move to config
LOCAL_CACHE_TTL = int(os.getenv("LOCAL_CACHE_TTL", "3600"))
Expand Down Expand Up @@ -124,9 +123,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
payload = replace(
payload, status=Status.ARTIFACT_NOT_FOUND, header=Header.TRAIN_REQUEST
)
return Messages(
self._get_trainer_message(keys, _stream_conf, payload, *_metric_label_values)
)
return Messages(get_trainer_message(keys, _stream_conf, payload, *_metric_label_values))

if payload.header == Header.STATIC_INFERENCE:
_LOGGER.warning("Static inference not supported in postprocess yet")
Expand Down Expand Up @@ -157,9 +154,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
payload.composite_keys,
payload.metrics,
)
return Messages(
self._get_trainer_message(keys, _stream_conf, payload, *_metric_label_values)
)
return Messages(get_trainer_message(keys, _stream_conf, payload, *_metric_label_values))

payload = replace(
payload,
Expand Down Expand Up @@ -205,6 +200,22 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
def _adjust_score(
self, mlpl_conf: MLPipelineConf, a_unified: float, payload: StreamPayload
) -> tuple[float, Optional[float], Optional[NDArray[float]]]:
"""
Adjust the unified score using static threshold scores.
Args:
-------
mlpl_conf: MLPipelineConf object
a_unified: Unified anomaly score
payload: StreamPayload object
Returns
-------
A tuple consisting of
Adjusted unified score,
Unified static threshold score,
Static threshold scores per feature
"""
adjust_conf = mlpl_conf.numalogic_conf.score.adjust
if adjust_conf:
# Compute static threshold scores
Expand All @@ -231,6 +242,21 @@ def _additional_scores(
y_features: Optional[NDArray[float]] = None,
y_unified: Optional[float] = None,
) -> dict[str, float]:
"""
Get additional scores.
Args:
-------
feat_names: Feature names
a_features: ML model anomaly scores per feature
a_unified: ML model unified anomaly score
y_features: Static threshold scores per feature
y_unified: Static threshold unified score
Returns
-------
Dictionary with additional output scores
"""
data = self._per_feature_score(feat_names, a_features)
data["namespace_app_rollouts_ML"] = a_unified
if y_unified is not None:
Expand All @@ -250,47 +276,6 @@ def _per_feature_score(feat_names: list[str], scores: NDArray[float]) -> dict[st
return {}
return dict(zip(feat_names, scores))

@staticmethod
def _get_trainer_message(
keys: list[str],
stream_conf: StreamConf,
payload: StreamPayload,
*metric_values: str,
) -> Message:
"""
Get message for training request.
Args:
-------
keys: List of keys
stream_conf: StreamConf instance
payload: StreamPayload object
metric_values: Optional metric values for monitoring
Returns
-------
Mapper Message instance
"""
ckeys = [_ckey for _, _ckey in zip(stream_conf.composite_keys, payload.composite_keys)]
train_payload = TrainerPayload(
uuid=payload.uuid,
composite_keys=ckeys,
metrics=payload.metrics,
config_id=payload.config_id,
pipeline_id=payload.pipeline_id,
)
if metric_values:
_increment_counter(
counter=MODEL_STATUS_COUNTER,
labels=(payload.status, *metric_values),
)
_LOGGER.info(
"%s - Sending training request for: %s",
train_payload.uuid,
train_payload.composite_keys,
)
return Message(keys=keys, value=train_payload.to_json(), tags=["train"])

@classmethod
def compute(
cls,
Expand Down
46 changes: 11 additions & 35 deletions numalogic/udfs/preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,21 @@
MSG_PROCESSED_COUNTER,
MSG_IN_COUNTER,
RUNTIME_ERROR_COUNTER,
MODEL_STATUS_COUNTER,
UDF_TIME,
_increment_counter,
)
from numalogic.registry import LocalLRUCache
from numalogic.tools.types import redis_client_t, artifact_t
from numalogic.udfs import NumalogicUDF
from numalogic.udfs._config import PipelineConf, StreamConf
from numalogic.udfs.entities import Status, Header, TrainerPayload, StreamPayload
from numalogic.udfs.tools import make_stream_payload, get_df, _load_artifact, _update_info_metric
from numalogic.udfs._config import PipelineConf
from numalogic.udfs.entities import Status, Header
from numalogic.udfs.tools import (
make_stream_payload,
get_df,
_load_artifact,
_update_info_metric,
get_trainer_message,
)

# TODO: move to config
LOCAL_CACHE_TTL = int(os.getenv("LOCAL_CACHE_TTL", "3600"))
Expand Down Expand Up @@ -169,7 +174,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
)
payload = replace(payload, status=Status.ARTIFACT_FOUND)
else:
return Messages(self._get_trainer_message(keys, _stream_conf, payload))
return Messages(get_trainer_message(keys, _stream_conf, payload))
# Model will not be in registry
else:
# Load configuration for the config_id
Expand Down Expand Up @@ -215,9 +220,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
payload,
status=Status.RUNTIME_ERROR,
)
return Messages(
self._get_trainer_message(keys, _stream_conf, payload, *_metric_label_values)
)
return Messages(get_trainer_message(keys, _stream_conf, payload, *_metric_label_values))
_increment_counter(
counter=MSG_PROCESSED_COUNTER,
labels=_metric_label_values,
Expand All @@ -229,33 +232,6 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
)
return Messages(Message(keys=keys, value=payload.to_json(), tags=["inference"]))

@staticmethod
def _get_trainer_message(
keys: list[str],
stream_conf: StreamConf,
payload: StreamPayload,
*metric_values: str,
) -> Message:
ckeys = [_ckey for _, _ckey in zip(stream_conf.composite_keys, payload.composite_keys)]
train_payload = TrainerPayload(
uuid=payload.uuid,
composite_keys=ckeys,
metrics=payload.metrics,
config_id=payload.config_id,
pipeline_id=payload.pipeline_id,
)
if metric_values:
_increment_counter(
counter=MODEL_STATUS_COUNTER,
labels=(payload.status.value, *metric_values),
)
_LOGGER.info(
"%s - Sending training request for: %s",
train_payload.uuid,
train_payload.composite_keys,
)
return Message(keys=keys, value=train_payload.to_json(), tags=["train"])

@classmethod
def compute(
cls, model: artifact_t, input_: Optional[NDArray[float]] = None, **_
Expand Down
45 changes: 44 additions & 1 deletion numalogic/udfs/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@
import numpy as np
import pandas as pd
from pandas import DataFrame
from pynumaflow.mapper import Message

from numalogic.registry import ArtifactManager, ArtifactData
from numalogic.tools.exceptions import RedisRegistryError
from numalogic.tools.types import KEYS, redis_client_t
from numalogic.udfs._config import StreamConf
from numalogic.udfs.entities import StreamPayload
from numalogic.udfs.entities import StreamPayload, TrainerPayload
from numalogic.udfs._metrics import (
SOURCE_COUNTER,
MODEL_INFO,
Expand All @@ -22,6 +23,7 @@
_add_info,
RECORDED_DATA_GAUGE,
_set_gauge,
MODEL_STATUS_COUNTER,
)

_LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -411,3 +413,44 @@ def ack_train(self, key: KEYS, uuid: str) -> bool:
return False
_LOGGER.info("%s - Acknowledging model saving complete for the key: %s", uuid, key)
return True


def get_trainer_message(
keys: list[str],
stream_conf: StreamConf,
payload: StreamPayload,
*metric_values: str,
) -> Message:
"""
Get message for training request.
Args:
-------
keys: List of keys
stream_conf: StreamConf instance
payload: StreamPayload object
metric_values: Optional metric values for monitoring
Returns
-------
Mapper Message instance
"""
ckeys = [_ckey for _, _ckey in zip(stream_conf.composite_keys, payload.composite_keys)]
train_payload = TrainerPayload(
uuid=payload.uuid,
composite_keys=ckeys,
metrics=payload.metrics,
config_id=payload.config_id,
pipeline_id=payload.pipeline_id,
)
if metric_values:
_increment_counter(
counter=MODEL_STATUS_COUNTER,
labels=(payload.status, *metric_values),
)
_LOGGER.info(
"%s - Sending training request for: %s",
train_payload.uuid,
train_payload.composite_keys,
)
return Message(keys=keys, value=train_payload.to_json(), tags=["train"])

0 comments on commit c0225a9

Please sign in to comment.