Skip to content

Commit

Permalink
feat: add transforms and robust thresholding
Browse files Browse the repository at this point in the history
Signed-off-by: Avik Basu <[email protected]>
  • Loading branch information
ab93 committed Dec 13, 2023
1 parent 19ddc8e commit 43c1ec8
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 19 deletions.
12 changes: 11 additions & 1 deletion numalogic/config/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,13 @@ class PreprocessFactory(_ObjectFactory):
"""Factory class to create preprocess instances."""

from sklearn.preprocessing import StandardScaler, MinMaxScaler, MaxAbsScaler, RobustScaler
from numalogic.transforms import LogTransformer, StaticPowerTransformer, TanhScaler
from numalogic.transforms import (
LogTransformer,
StaticPowerTransformer,
TanhScaler,
DataClipper,
GaussianNoiseAdder,
)

_CLS_MAP: ClassVar[dict] = {
"StandardScaler": StandardScaler,
Expand All @@ -53,6 +59,8 @@ class PreprocessFactory(_ObjectFactory):
"LogTransformer": LogTransformer,
"StaticPowerTransformer": StaticPowerTransformer,
"TanhScaler": TanhScaler,
"DataClipper": DataClipper,
"GaussianNoiseAdder": GaussianNoiseAdder,
}

def get_pipeline_instance(self, objs_info: list[ModelInfo]):
Expand Down Expand Up @@ -81,6 +89,7 @@ class ThresholdFactory(_ObjectFactory):
from numalogic.models.threshold import (
StdDevThreshold,
MahalanobisThreshold,
RobustMahalanobisThreshold,
StaticThreshold,
SigmoidThreshold,
)
Expand All @@ -90,6 +99,7 @@ class ThresholdFactory(_ObjectFactory):
"StaticThreshold": StaticThreshold,
"SigmoidThreshold": SigmoidThreshold,
"MahalanobisThreshold": MahalanobisThreshold,
"RobustMahalanobisThreshold": RobustMahalanobisThreshold,
}


Expand Down
10 changes: 8 additions & 2 deletions numalogic/models/threshold/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
from numalogic.models.threshold._std import StdDevThreshold
from numalogic.models.threshold._mahalanobis import MahalanobisThreshold
from numalogic.models.threshold._mahalanobis import MahalanobisThreshold, RobustMahalanobisThreshold
from numalogic.models.threshold._static import StaticThreshold, SigmoidThreshold

__all__ = ["StdDevThreshold", "StaticThreshold", "SigmoidThreshold", "MahalanobisThreshold"]
__all__ = [
"StdDevThreshold",
"StaticThreshold",
"SigmoidThreshold",
"MahalanobisThreshold",
"RobustMahalanobisThreshold",
]
62 changes: 61 additions & 1 deletion numalogic/models/threshold/_mahalanobis.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Final
from typing import Final, Optional

import numpy as np
import numpy.typing as npt
from sklearn.covariance import MinCovDet

from numalogic.base import BaseThresholdModel
from typing_extensions import Self
Expand Down Expand Up @@ -182,3 +183,62 @@ def score_samples(self, x: npt.NDArray[float]) -> npt.NDArray[float]:
raise ModelInitializationError("Model not fitted yet.")
self._validate_input(x)
return self.mahalanobis(x) / self._md_thresh


class RobustMahalanobisThreshold(MahalanobisThreshold):
"""
Robust Multivariate threshold estimator using Mahalanobis distance.
Args:
----
max_inlier_percentile: maximum inlier percentile (default: 95)
Raises
------
ValueError: if max_inlier_percentile is not in range [75, 100)
"""

def __init__(
self,
max_outlier_prob: float = 0.1,
max_inlier_percentile: Optional[float] = None,
):
super().__init__(max_outlier_prob)
self._mcd = MinCovDet(store_precision=False)
if max_inlier_percentile and (not 75.0 <= max_inlier_percentile < 100.0):
raise ValueError("max_inlier_percentile should be in range [75, 100)")
self._max_inlier_percentile = max_inlier_percentile

def mahalanobis(self, x: npt.NDArray[float]) -> npt.NDArray[float]:
return np.sqrt(self._mcd.mahalanobis(x))

def fit(self, x: npt.NDArray[float]) -> Self:
"""
Fit the estimator on the training set.
Args:
----
x: training data of shape (n_samples, n_features)
Returns
-------
self
Raises
------
InvalidDataShapeError: if the input matrix is not 2D
"""
self._validate_input(x)
self._distr_mean = np.mean(x, axis=0)

self._mcd.fit(x)
self._cov_inv = self._mcd.get_precision()

mahal_dist = self.mahalanobis(x)
if self._max_inlier_percentile:
self._md_thresh = np.percentile(mahal_dist, self._max_inlier_percentile)
else:
self._md_thresh = np.mean(mahal_dist) + self._k * np.std(mahal_dist)

self._is_fitted = True
return self
48 changes: 34 additions & 14 deletions numalogic/synthetic/anomalies.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from typing import Optional, ClassVar
from typing import Optional, ClassVar, Union


class AnomalyGenerator:
Expand Down Expand Up @@ -103,18 +103,23 @@ def inject_anomalies(
raise AttributeError(f"Invalid anomaly type provided: {self.anomaly_type}")

def _inject_global_anomalies(
self, target_df: pd.DataFrame, cols: Optional[Sequence[str]] = None, impact=3
self,
target_df: pd.DataFrame,
cols: Optional[Sequence[str]] = None,
impact=3,
anomaly_start_idx: Optional[Union[int, str]] = None,
) -> pd.DataFrame:
target_df = self._init_target_df(target_df, cols)
anomaly_df = pd.DataFrame(index=target_df.index)
anomaly_df["is_anomaly"] = 0

for col in self.__injected_cols:
tseries = target_df[col]
sample = tseries[: -self.block_size].sample(1)
idx_start = sample.index
idx_end = idx_start + (self.block_size * self.freq)
outlier_block = tseries[idx_start.values[0] : idx_end.values[0]]

idx_start = self._find_start_idx(anomaly_start_idx, target_df)
idx_end = idx_start + self.block_size

outlier_block = tseries[idx_start:idx_end]
factor = abs(self.ref_stats_df.loc["max", col] - outlier_block.mean())

# Add gaussian noise to the data
Expand All @@ -123,7 +128,7 @@ def _inject_global_anomalies(

# Add labels to the data
anomaly_col = anomaly_df["is_anomaly"]
anomaly_block = anomaly_col[idx_start.values[0] : idx_end.values[0]]
anomaly_block = anomaly_col[idx_start:idx_end]
anomaly_block += self.add_impact_sign()

return pd.DataFrame(
Expand Down Expand Up @@ -155,7 +160,6 @@ def _inject_contextual_anomalies(
dist_from_max = np.linalg.norm(
outlier_block.to_numpy() - self.ref_stats_df.loc["max", col]
)

if dist_from_min > dist_from_max:
factor = abs(self.ref_stats_df.loc["min", col] - outlier_block.mean())
outlier_block -= (
Expand All @@ -178,19 +182,22 @@ def _inject_contextual_anomalies(
).merge(anomaly_df, left_index=True, right_index=True)

def _inject_collective_anomalies(
self, target_df: pd.DataFrame, cols: Sequence[str], impact=0.8
self,
target_df: pd.DataFrame,
cols: Sequence[str],
impact=0.8,
anomaly_start_idx: Optional[Union[int, str]] = None,
) -> pd.DataFrame:
target_df = self._init_target_df(target_df, cols)
anomaly_df = pd.DataFrame(index=target_df.index)
anomaly_df["is_anomaly"] = 0

sample = target_df[: -self.block_size].sample(1)
idx_start = sample.index
idx_end = idx_start + (self.block_size * self.freq)
idx_start = self._find_start_idx(anomaly_start_idx, target_df)
idx_end = idx_start + self.block_size

for col in self.__injected_cols:
tseries = target_df[col]
outlier_block = tseries[idx_start.values[0] : idx_end.values[0]]
outlier_block = tseries[idx_start:idx_end]

# Add gaussian noise to the data
noise = self._rnd_gen.normal(self.mu, self.sigma, outlier_block.shape)
Expand All @@ -212,7 +219,7 @@ def _inject_collective_anomalies(
noise + impact * factor * abs(outlier_block) * self.add_impact_sign()
)
anomaly_col = anomaly_df["is_anomaly"]
anomaly_block = anomaly_col[idx_start.values[0] : idx_end.values[0]]
anomaly_block = anomaly_col[idx_start:idx_end]
anomaly_block += self.add_impact_sign()

return pd.DataFrame(
Expand All @@ -221,6 +228,19 @@ def _inject_collective_anomalies(
columns=target_df.columns,
).merge(anomaly_df, left_index=True, right_index=True)

def _find_start_idx(self, anomaly_start_idx: Union[int, str], target_df: pd.DataFrame) -> int:
if anomaly_start_idx is None:
idx_start = self._rnd_gen.integers(0, target_df.shape[0] - self.block_size)
elif isinstance(anomaly_start_idx, str):
idx_start = target_df.index.get_slice_bound(anomaly_start_idx, side="left")
elif isinstance(anomaly_start_idx, int):
idx_start = anomaly_start_idx
else:
raise TypeError(
f"Invalid value for anomaly_start_idx: {anomaly_start_idx}, " f"expected int or str"
)
return idx_start

def _inject_causal_anomalies(
self, target_df: pd.DataFrame, cols: Sequence[str], impact=2, gap_range=(5, 20)
) -> pd.DataFrame:
Expand Down
9 changes: 8 additions & 1 deletion numalogic/transforms/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,23 @@
"""

from numalogic.transforms._scaler import TanhScaler
from numalogic.transforms._stateless import LogTransformer, StaticPowerTransformer
from numalogic.transforms._stateless import (
LogTransformer,
StaticPowerTransformer,
DataClipper,
GaussianNoiseAdder,
)
from numalogic.transforms._movavg import ExpMovingAverage, expmov_avg_aggregator
from numalogic.transforms._postprocess import TanhNorm, tanh_norm

__all__ = [
"TanhScaler",
"LogTransformer",
"StaticPowerTransformer",
"DataClipper",
"ExpMovingAverage",
"expmov_avg_aggregator",
"TanhNorm",
"tanh_norm",
"GaussianNoiseAdder",
]
69 changes: 69 additions & 0 deletions numalogic/transforms/_stateless.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from collections.abc import Sequence
from typing import Union, Optional

import numpy as np
import numpy.typing as npt
import pandas as pd

from numalogic.base import StatelessTransformer

Expand Down Expand Up @@ -53,3 +57,68 @@ def transform(self, X, **__):

def inverse_transform(self, X) -> npt.NDArray[float]:
return np.power(X, 1.0 / self.n) - self.add_factor


class DataClipper(StatelessTransformer):
"""
Applies column-wise ceiling transformation.
Args:
----
lower: lower bound for clipping.
upper: upper bound for clipping.
"""

__slots__ = ("lower", "upper")

def __init__(
self,
lower: Optional[Union[float, Sequence[float]]] = None,
upper: Optional[Union[float, Sequence[float]]] = None,
):
self._validate_args(lower, upper)
self.lower = lower
self.upper = upper

@staticmethod
def _validate_args(
lower: Union[float, Sequence[float]], upper: Union[float, Sequence[float]]
) -> None:
if lower is None and upper is None:
raise ValueError("At least one of lower or upper should be provided.")

if isinstance(lower, Sequence) and isinstance(upper, Sequence):
if len(lower) != len(upper):
raise ValueError("lower and upper should have the same length.")

def transform(self, x: npt.NDArray[float], **__) -> npt.NDArray[float]:
_df = pd.DataFrame(x, dtype=np.float32)
if (self.lower is not None) and (self.upper is not None):
return _df.clip(lower=self.lower, upper=self.upper, axis=1).to_numpy(dtype=np.float32)
if self.upper is not None:
return _df.clip(upper=self.upper, axis=1).to_numpy(dtype=np.float32)
return _df.clip(lower=self.lower, axis=1).to_numpy(dtype=np.float32)


class GaussianNoiseAdder(StatelessTransformer):
"""
Applies Gaussian noise to data.
Args:
----
scale: small float value to be used as the noise factor (default: 1e-8).
positive_only: bool value to indicate whether
to use absolute value of the noise (default: True).
seed: int value to be used as the random seed (default: 42).
"""

def __init__(self, scale: float = 1e-8, positive_only: bool = True, seed: int = 42):
self._rng = np.random.default_rng(seed)
self._is_abs = positive_only
self._scale = scale

def transform(self, x: npt.NDArray[float], **__) -> npt.NDArray[float]:
noise = self._rng.normal(loc=0.0, scale=self._scale, size=x.shape)
if self._is_abs:
noise = np.abs(noise)
return x + noise

0 comments on commit 43c1ec8

Please sign in to comment.