Skip to content

Commit

Permalink
feat: add PercentileScaler
Browse files Browse the repository at this point in the history
Signed-off-by: Avik Basu <[email protected]>
  • Loading branch information
ab93 committed Apr 19, 2024
1 parent 6975394 commit f7bd30b
Show file tree
Hide file tree
Showing 8 changed files with 540 additions and 651 deletions.
13 changes: 11 additions & 2 deletions numalogic/backtest/_prom.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
PreprocessFactory,
PostprocessFactory,
ThresholdFactory,
AggregatorConf,
)
from numalogic.config._config import AggMethod
from numalogic.connectors import ConnectorType
from numalogic.connectors.prometheus import PrometheusFetcher
from numalogic.tools.data import StreamingDataset, inverse_window
Expand Down Expand Up @@ -238,6 +240,8 @@ def generate_scores(

x_recon = np.zeros((len(ds), self.seq_len, n_feat), dtype=np.float32)
raw_scores = np.zeros((len(ds), self.seq_len, n_feat), dtype=np.float32)
unified_raw_scores = np.zeros((len(ds), 1), dtype=np.float32)

feature_scores = np.zeros((len(ds), n_feat), dtype=np.float32)
unified_scores = np.zeros((len(ds), 1), dtype=np.float32)

Expand All @@ -253,12 +257,17 @@ def generate_scores(

winscores = postproc_udf.compute_feature_scores(
raw_scores[idx], self.nlconf.score.window_agg
) # (nfeat,)

unified_raw_scores[idx] = postproc_udf.compute_unified_score(
winscores,
AggregatorConf(method=AggMethod.MEAN),
)

feature_scores[idx] = postproc_udf.compute_postprocess(postproc_func, winscores)

unified_scores[idx] = postproc_udf.compute_unified_score(
feature_scores[idx], self.nlconf.score.feature_agg
unified_scores[idx] = postproc_udf.compute_postprocess(
postproc_func, unified_raw_scores[idx]
)

x_recon = self.window_inverse(x_recon)
Expand Down
2 changes: 2 additions & 0 deletions numalogic/config/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class PreprocessFactory(_ObjectFactory):
GaussianNoiseAdder,
DifferenceTransform,
FlattenVector,
PercentileScaler,
)

_CLS_MAP: ClassVar[dict] = {
Expand All @@ -65,6 +66,7 @@ class PreprocessFactory(_ObjectFactory):
"GaussianNoiseAdder": GaussianNoiseAdder,
"DifferenceTransform": DifferenceTransform,
"FlattenVector": FlattenVector,
"PercentileScaler": PercentileScaler,
}

def get_pipeline_instance(self, objs_info: list[ModelInfo]):
Expand Down
3 changes: 2 additions & 1 deletion numalogic/transforms/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
feature engineering and postprocessing.
"""

from numalogic.transforms._scaler import TanhScaler
from numalogic.transforms._scaler import TanhScaler, PercentileScaler
from numalogic.transforms._stateless import (
LogTransformer,
StaticPowerTransformer,
Expand All @@ -38,4 +38,5 @@
"GaussianNoiseAdder",
"DifferenceTransform",
"FlattenVector",
"PercentileScaler",
]
39 changes: 39 additions & 0 deletions numalogic/transforms/_scaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@


import logging
from typing import Optional

import numpy as np
import numpy.typing as npt
from sklearn.preprocessing import MinMaxScaler
from typing_extensions import Self

from numalogic.base import BaseTransformer
from numalogic.transforms._stateless import DataClipper

LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -69,3 +72,39 @@ def fit_transform(self, x: npt.NDArray[float], y=None, **_) -> npt.NDArray[float
def _check_if_constant(self, x: npt.NDArray[float]) -> None:
delta = np.max(x, axis=0) - np.min(x, axis=0)
self._std[delta < self._eps] = 1.0


class PercentileScaler(BaseTransformer):
"""
Scales the data based on the percentiles of the data.
Args:
-----
max_percentile: float, optional
The upper percentile to clip the data.
Default is 90.
min_percentile: float, optional
The lower percentile to clip the data.
If None, minimum value of the data is used.
Default is None.
"""

def __init__(self, max_percentile: float = 95, min_percentile: Optional[float] = None):
self._max_px = max_percentile
self._min_px = min_percentile
self.tx = MinMaxScaler()

Check warning on line 95 in numalogic/transforms/_scaler.py

View check run for this annotation

Codecov / codecov/patch

numalogic/transforms/_scaler.py#L93-L95

Added lines #L93 - L95 were not covered by tests

def fit(self, x: npt.NDArray[float]) -> Self:
data_max_px = np.percentile(x, self._max_px, axis=0)

Check warning on line 98 in numalogic/transforms/_scaler.py

View check run for this annotation

Codecov / codecov/patch

numalogic/transforms/_scaler.py#L98

Added line #L98 was not covered by tests
if self._min_px is None:
data_min_px = np.min(x, axis=0)

Check warning on line 100 in numalogic/transforms/_scaler.py

View check run for this annotation

Codecov / codecov/patch

numalogic/transforms/_scaler.py#L100

Added line #L100 was not covered by tests
else:
data_min_px = np.percentile(x, self._min_px, axis=0)
x_clipped = DataClipper(lower=data_min_px, upper=data_max_px).transform(x)
return self.tx.fit(x_clipped)

Check warning on line 104 in numalogic/transforms/_scaler.py

View check run for this annotation

Codecov / codecov/patch

numalogic/transforms/_scaler.py#L102-L104

Added lines #L102 - L104 were not covered by tests

def fit_transform(self, x: npt.NDArray[float], y=None, **_):
return self.fit(x).transform(x)

Check warning on line 107 in numalogic/transforms/_scaler.py

View check run for this annotation

Codecov / codecov/patch

numalogic/transforms/_scaler.py#L107

Added line #L107 was not covered by tests

def transform(self, x: npt.NDArray[float]) -> npt.NDArray[float]:
return self.tx.transform(x)

Check warning on line 110 in numalogic/transforms/_scaler.py

View check run for this annotation

Codecov / codecov/patch

numalogic/transforms/_scaler.py#L110

Added line #L110 was not covered by tests
33 changes: 19 additions & 14 deletions numalogic/udfs/postprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,19 +139,14 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:

# Postprocess payload
try:
# Compute anomaly scores per feature
a_features = self.compute(
# Compute anomaly scores
a_unified, a_features = self.compute(
model=thresh_artifact.artifact,
input_=payload.get_data(),
score_conf=_conf.numalogic_conf.score,
postproc_tx=postproc_tx,
) # (nfeat,)

# Compute unified score
a_unified = self.compute_unified_score(
a_features, feat_agg_conf=_conf.numalogic_conf.score.feature_agg
)

# Calculate adjusted unified score
a_adjusted, y_unified, y_features = self._adjust_score(_conf, a_unified, payload)

Expand Down Expand Up @@ -302,7 +297,7 @@ def compute(
postproc_tx=None,
score_conf: Optional[ScoreConf] = None,
**_,
) -> NDArray[float]:
) -> tuple[float, NDArray[float]]:
"""
Compute thresholding, window aggregation followed by postprocess.
Expand All @@ -315,7 +310,7 @@ def compute(
Returns
-------
Output data of shape (n_features, )
Tuple of combined/unified score (float), and feature scores of shape (n_features,)
Raises
------
Expand All @@ -325,13 +320,23 @@ def compute(
_struct_log.warning("Score config not provided, using default values")
score_conf = ScoreConf()

scores = cls.compute_threshold(model, input_) # (seqlen x nfeat)
win_scores = cls.compute_feature_scores(
scores, win_agg_conf=score_conf.window_agg
thresh_scores = cls.compute_threshold(model, input_) # (seqlen x nfeat)

# Aggregate over the sequence length
raw_scores = cls.compute_feature_scores(
thresh_scores, win_agg_conf=score_conf.window_agg
) # (nfeat,)

# Aggregate over the features
unified_raw_score = cls.compute_unified_score(raw_scores, score_conf.feature_agg) # float

if postproc_tx:
win_scores = cls.compute_postprocess(postproc_tx, win_scores) # (nfeat,)
return win_scores # (nfeat, )
# Postprocess the raw scores
feature_scores = cls.compute_postprocess(postproc_tx, raw_scores) # (nfeat,)
unified_score = cls.compute_postprocess(postproc_tx, unified_raw_score) # float
return unified_score, feature_scores

return unified_raw_score, raw_scores

@classmethod
def compute_threshold(cls, model: artifact_t, input_: NDArray[float]) -> NDArray[float]:
Expand Down
Loading

0 comments on commit f7bd30b

Please sign in to comment.