Skip to content

Commit

Permalink
feat: add percentile thresholding
Browse files Browse the repository at this point in the history
Signed-off-by: Avik Basu <[email protected]>
  • Loading branch information
ab93 committed Jan 24, 2024
1 parent fbf171f commit a075761
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 0 deletions.
2 changes: 2 additions & 0 deletions numalogic/config/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class ThresholdFactory(_ObjectFactory):
RobustMahalanobisThreshold,
StaticThreshold,
SigmoidThreshold,
MaxPercentileThreshold,
)

_CLS_MAP: ClassVar[dict] = {
Expand All @@ -104,6 +105,7 @@ class ThresholdFactory(_ObjectFactory):
"SigmoidThreshold": SigmoidThreshold,
"MahalanobisThreshold": MahalanobisThreshold,
"RobustMahalanobisThreshold": RobustMahalanobisThreshold,
"MaxPercentileThreshold": MaxPercentileThreshold,
}


Expand Down
2 changes: 2 additions & 0 deletions numalogic/models/threshold/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from numalogic.models.threshold._std import StdDevThreshold, AggStdDevThreshold
from numalogic.models.threshold._mahalanobis import MahalanobisThreshold, RobustMahalanobisThreshold
from numalogic.models.threshold._static import StaticThreshold, SigmoidThreshold
from numalogic.models.threshold._median import MaxPercentileThreshold

__all__ = [
"StdDevThreshold",
Expand All @@ -9,4 +10,5 @@
"SigmoidThreshold",
"MahalanobisThreshold",
"RobustMahalanobisThreshold",
"MaxPercentileThreshold",
]
69 changes: 69 additions & 0 deletions numalogic/models/threshold/_median.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from collections.abc import Sequence
from typing import Optional

import numpy as np
import numpy.typing as npt
from typing import Self, Final

from numalogic.base import BaseThresholdModel
from numalogic.tools.exceptions import InvalidDataShapeError, ModelInitializationError

_INLIER: Final[int] = 0
_OUTLIER: Final[int] = 1
_INPUT_DIMS: Final[int] = 2


class MaxPercentileThreshold(BaseThresholdModel):
def __init__(
self,
max_inlier_percentile: float = 96.0,
min_threshold: float = 1e-3,
aggregate: bool = False,
feature_weights: Optional[Sequence[float]] = None,
):
super().__init__()
self._max_percentile = max_inlier_percentile
self._min_thresh = min_threshold
self._thresh = None
self._agg = aggregate
self._weights = feature_weights
self._is_fitted = False

@property
def threshold(self):
return self._thresh

@staticmethod
def _validate_input(x: npt.NDArray[float]) -> None:
"""Validate the input matrix shape."""
if x.ndim != _INPUT_DIMS:
raise InvalidDataShapeError(f"Input matrix should have 2 dims, given shape: {x.shape}.")

def fit(self, x: npt.NDArray[float]) -> Self:
self._validate_input(x)
self._thresh = np.percentile(x, self._max_percentile, axis=0)
self._thresh[self._thresh < self._min_thresh] = self._min_thresh
self._is_fitted = True
return self

def predict(self):
pass

def score_samples(self, x: npt.NDArray[float]) -> npt.NDArray[float]:
if not self._is_fitted:
raise ModelInitializationError("Model not fitted yet.")

self._validate_input(x)
scores = x / self._thresh

if self._agg:
return self.agg_score_samples(scores, weights=self._weights)
return scores

@staticmethod
def agg_score_samples(
y: npt.NDArray[float], weights: Optional[Sequence[float]] = None
) -> npt.NDArray[float]:
if weights:
return np.average(y, weights=weights, axis=1)
return np.mean(y, axis=1)

0 comments on commit a075761

Please sign in to comment.