Skip to content

Commit

Permalink
feat: score adjuster using static threshold
Browse files Browse the repository at this point in the history
Signed-off-by: Avik Basu <[email protected]>
  • Loading branch information
ab93 committed Dec 11, 2023
1 parent dfc383a commit d0dcb47
Show file tree
Hide file tree
Showing 11 changed files with 1,718 additions and 1,461 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ lint: format

# install all dependencies
setup:
poetry install --with dev,torch --all-extras
poetry install --with dev,torch,jupyter --all-extras

# test your application (tests in the tests/ directory)
test:
Expand Down
2 changes: 2 additions & 0 deletions numalogic/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
LightningTrainerConf,
RegistryInfo,
TrainerConf,
ScoreAdjustConf,
)
from numalogic.config.factory import (
ModelFactory,
Expand All @@ -37,4 +38,5 @@
"ThresholdFactory",
"RegistryFactory",
"TrainerConf",
"ScoreAdjustConf",
]
10 changes: 9 additions & 1 deletion numalogic/config/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@


from dataclasses import dataclass, field
from typing import Any
from typing import Any, Optional

from omegaconf import MISSING

Expand Down Expand Up @@ -96,6 +96,13 @@ class TrainerConf:
pltrainer_conf: LightningTrainerConf = field(default_factory=LightningTrainerConf)


@dataclass
class ScoreAdjustConf:
weight: float
metric_weights: Optional[list[float]]
upper_limits: list[float]


@dataclass
class NumalogicConf:
"""Top level config schema for numalogic."""
Expand All @@ -107,6 +114,7 @@ class NumalogicConf:
postprocess: ModelInfo = field(
default_factory=lambda: ModelInfo(name="TanhNorm", stateful=False)
)
adjust: Optional[ScoreAdjustConf] = None


@dataclass
Expand Down
8 changes: 5 additions & 3 deletions numalogic/models/autoencoder/variants/conv.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from torch.nn.init import calculate_gain

from numalogic.models.autoencoder.base import BaseAE
from numalogic.tools.exceptions import ModelInitializationError

LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -223,9 +224,10 @@ def __init__(
enc_kernel_sizes = [enc_kernel_sizes for _ in range(len(enc_channels))]

elif isinstance(enc_kernel_sizes, Sequence):
assert len(enc_channels) == len(
enc_kernel_sizes
), "enc_channels and enc_kernel_sizes should be of the same length"
if len(enc_channels) != len(enc_kernel_sizes):
raise ModelInitializationError(
"enc_channels and enc_kernel_sizes should be of the same length"
)
else:
raise TypeError(f"Invalid enc_kernel_sizes type provided: {type(enc_kernel_sizes)}")

Expand Down
8 changes: 6 additions & 2 deletions numalogic/models/threshold/_mahalanobis.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ def predict(self, x: npt.NDArray[float]) -> npt.NDArray[int]:
y_hat[md >= self._md_thresh] = _OUTLIER
return y_hat

def score_samples(self, x: npt.NDArray[float]) -> npt.NDArray[float]:
def score_samples(self, x: npt.NDArray[float], keepdims=True) -> npt.NDArray[float]:
"""
Returns the outlier score for each sample.
Expand All @@ -168,6 +168,7 @@ def score_samples(self, x: npt.NDArray[float]) -> npt.NDArray[float]:
Args:
----
x: input data of shape (n_samples, n_features)
keepdims: if True then return as a column vector
Returns
-------
Expand All @@ -181,4 +182,7 @@ def score_samples(self, x: npt.NDArray[float]) -> npt.NDArray[float]:
if not self._is_fitted:
raise ModelInitializationError("Model not fitted yet.")
self._validate_input(x)
return self.mahalanobis(x) / self._md_thresh
scores = self.mahalanobis(x) / self._md_thresh
if keepdims:
return scores.reshape(-1, 1)
return scores
42 changes: 27 additions & 15 deletions numalogic/models/threshold/_static.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,8 @@ def __init__(self, upper_limit: float, outlier_score: float = 10.0, inlier_score
self.outlier_score = float(outlier_score)
self.inlier_score = float(inlier_score)

assert (
self.outlier_score > self.inlier_score
), "Outlier score needs to be greater than inlier score"
if self.outlier_score > self.inlier_score:
raise ValueError("Outlier score needs to be greater than inlier score")

def fit(self, _: npt.NDArray[float]) -> Self:
"""Does not do anything. Only for API compatibility."""
Expand All @@ -51,7 +50,7 @@ def predict(self, x: npt.NDArray[float]) -> npt.NDArray[int]:
"""Returns an integer array of same shape as input.
1 denotes anomaly.
"""
y = x.copy()
y = np.zeros_like(x, dtype=int)
y[x < self.upper_limit] = 0
y[x >= self.upper_limit] = 1
return y
Expand All @@ -68,45 +67,58 @@ def score_samples(self, x: npt.NDArray[float]) -> npt.NDArray[float]:

class SigmoidThreshold(BaseThresholdModel):
r"""Smooth and stateless static thesholding using sigmoid function as an estimator.
The values produced.
Score is given by:
score = score_limit * 1/ exp(-coeff * (x - upper_limit))
score = score_limit * 1/ exp(-coeff * (x - upper_limits))
Args:
----
upper_limit: is the desired threshold limit of x
upper_limits: is the desired threshold limit of x;
can be a float or a list of floats
list of floats represents the upper limits for each feature;
a single float represents the upper limit used for all features;
slope_factor: determines the slope of the curve
score_limit: is the scaler multiplier for the score
e.g. a value of 10 means that the output score
will be between 0 and 10.
Raises
------
ValueError: If the input data shape does not match the provided upper_limits
"""

__slots__ = ("upper_limit", "coeff", "score_limit")
__slots__ = ("upper_limits", "coeff", "score_limit")

def __init__(self, upper_limit: float, slope_factor: int = 5, score_limit: int = 10):
self.upper_limit = float(upper_limit)
def __init__(self, *upper_limits: float, slope_factor: int = 5, score_limit: int = 10):
self.upper_limits = np.asarray(upper_limits, dtype=np.float32)
self.coeff = slope_factor * np.pi
self.score_limit = score_limit

def fit(self, _: npt.NDArray[float]) -> Self:
"""Does not do anything. Only for API compatibility."""
return self

def _validate_input(self, x: npt.NDArray[float]) -> None:
if len(self.upper_limits) == 1:
return
if x.shape[1] != len(self.upper_limits):
raise ValueError("Input data shape does not match provided upper_limits")

def predict(self, x: npt.NDArray[float]) -> npt.NDArray[int]:
"""Returns an integer array of same shape as input.
1 denotes anomaly.
This is calculated as a hard threshold at upper limit.
"""
y = x.copy()
y[x < self.upper_limit] = 0
y[x >= self.upper_limit] = 1
self._validate_input(x)
y = np.zeros_like(x, dtype=int)
y[x < self.upper_limits] = 0
y[x >= self.upper_limits] = 1
return y

def score_samples(self, x: npt.NDArray[float]) -> npt.NDArray[float]:
"""Returns an array of same shape as input
with values being anomaly scores.
"""
x = x.copy()
return 10 / (1 + np.exp(-self.coeff * (x - self.upper_limit)))
self._validate_input(x)
return self.score_limit / (1.0 + np.exp(-self.coeff * (x.copy() - self.upper_limits)))
81 changes: 81 additions & 0 deletions numalogic/tools/adjust.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Copyright 2022 The Numaproj Authors.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http:https://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Self
import numpy as np
import numpy.typing as npt

from numalogic.config import ScoreAdjustConf
from numalogic.models.threshold import SigmoidThreshold


class ScoreAdjuster:
"""
Adjusts the model output based on the metric input.
Args:
----
adjust_weight: weight given to static thresholding output
(between 0 and 1)
metric_weights: weights given to each kpi/metric
upper_limits: upper limits for each metric
kwargs: kwargs for SigmoidThreshold
Raises
------
ValueError: if adjust_weight is not between 0 and 1
"""

__slots__ = ("_adjust_wt", "_kpi_wts", "_thresholder")

def __init__(
self, adjust_weight: float, metric_weights: list[float], upper_limits: list[float], **kwargs
):
if adjust_weight <= 0.0 or adjust_weight >= 1:
raise ValueError("adjust_weight needs to be between 0 and 1")
self._adjust_wt = adjust_weight
self._kpi_wts = np.asarray(metric_weights, dtype=np.float32).reshape(-1, 1)
self._thresholder = SigmoidThreshold(*upper_limits, **kwargs)

def adjust(
self, metric_in: npt.NDArray[float], model_scores: npt.NDArray[float]
) -> npt.NDArray[float]:
"""
Adjusts the model output based on the metric input.
Args:
----
metric_in: metric input to the model
model_scores: model output scores
Returns
-------
adjusted_scores: adjusted scores
"""
model_scores = np.reshape(-1, 1)
feature_scores = self._thresholder.score_samples(metric_in)
weighted_scores = np.dot(feature_scores, self._kpi_wts)
return (self._adjust_wt * weighted_scores) + ((1 - self._adjust_wt) * model_scores)

@classmethod
def from_conf(cls, conf: ScoreAdjustConf) -> Self:
"""
Creates an instance of ScoreAdjuster from ScoreAdjustConf.
Args:
----
conf: ScoreAdjustConf
Returns
-------
ScoreAdjuster instance
"""
return cls(conf.weight, conf.metric_weights, conf.upper_limits)
59 changes: 47 additions & 12 deletions numalogic/udfs/postprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from numalogic.config import PostprocessFactory, RegistryFactory
from numalogic.registry import LocalLRUCache
from numalogic.tools.adjust import ScoreAdjuster
from numalogic.tools.types import redis_client_t, artifact_t
from numalogic.udfs import NumalogicUDF
from numalogic.udfs._config import PipelineConf
Expand Down Expand Up @@ -79,6 +80,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
_conf = self.get_conf(payload.config_id)
thresh_cfg = _conf.numalogic_conf.threshold
postprocess_cfg = _conf.numalogic_conf.postprocess
adjust_cfg = _conf.numalogic_conf.adjust

# load artifact
thresh_artifact, payload = _load_artifact(
Expand All @@ -102,6 +104,8 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
model=thresh_artifact.artifact,
input_=payload.get_data(),
postproc_clf=postproc_clf,
score_adjust_conf=adjust_cfg,
raw_input=payload.get_data(original=True),
)
except RuntimeError:
_LOGGER.exception(
Expand Down Expand Up @@ -162,32 +166,63 @@ def _per_feature_score(feat_names: list[str], scores: NDArray[float]) -> dict[st

@classmethod
def compute(
cls, model: artifact_t, input_: NDArray[float], postproc_clf=None, **_
cls,
model: artifact_t,
input_: NDArray[float],
postproc_clf=None,
score_adjust_conf=None,
raw_input: Optional[NDArray[float]] = None,
**_
) -> NDArray[float]:
"""
Compute the postprocess function.
Args:
-------
model: Model instance
input_: Input data
kwargs: Additional arguments
model: Threshold model instance
input_: Input data of shape (n_samples, n_features)
postproc_clf: Postprocess classifier instance
score_adjust_conf: Score adjuster config
Returns
-------
Output data
"""
_start_time = time.perf_counter()
# Threshold model run

print("input", input_, input_.shape)

try:
y_score = model.score_samples(input_).astype(np.float32)
scores = model.score_samples(input_).astype(np.float32)
except Exception as err:
raise RuntimeError("Threshold model scoring failed") from err

print("thresh", scores, scores.shape)

if not postproc_clf:
return np.mean(scores, axis=0, keepdims=True).reshape(-1)

# Postprocessing transform
try:
win_score = np.mean(y_score, axis=0, keepdims=True)
score = postproc_clf.transform(win_score)
scores = postproc_clf.transform(scores) # (10, 1)
except Exception as err:
raise RuntimeError("Postprocess failed") from err
_LOGGER.debug(
"Time taken in postprocess compute: %.4f sec", time.perf_counter() - _start_time
)
return score.reshape(-1)

print("postproc", scores, scores.shape)

if not score_adjust_conf:
return np.mean(scores, axis=0, keepdims=True).reshape(-1)

# Score adjustment
if raw_input is None:
_LOGGER.warning("Raw input is None. Skipping score adjustment")
return np.mean(scores, axis=0, keepdims=True).reshape(-1)

print("raw_input", raw_input, raw_input.shape)

score_adjuster = ScoreAdjuster.from_conf(score_adjust_conf)
scores = score_adjuster.adjust(metric_in=raw_input, model_scores=scores)

print("adjusted", scores, scores.shape)

return np.mean(scores, axis=0, keepdims=True).reshape(-1)
Loading

0 comments on commit d0dcb47

Please sign in to comment.