Skip to content

Commit

Permalink
feat: exponential moving average postprocessing (#156)
Browse files Browse the repository at this point in the history
Signed-off-by: Avik Basu <[email protected]>
  • Loading branch information
ab93 committed Apr 7, 2023
1 parent 9de8e4c commit 3160c2b
Show file tree
Hide file tree
Showing 12 changed files with 772 additions and 137 deletions.
28 changes: 27 additions & 1 deletion docs/post-processing.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# Post Processing
After the raw scores have been generated, we might need to do some additional postprocessing,
for various reasons.

Post-processing step is again an optional step, where we normalize the anomalies between 0-10. This is mostly to make the scores more understandable.
### Tanh Score Normalization
Tanh normalization step is an optional step, where we normalize the anomalies between 0-10. This is mostly to make the scores more understandable.

```python
import numpy as np
Expand All @@ -19,4 +22,27 @@ raw_score = np.random.randn(10, 2)

norm = TanhNorm(scale_factor=10, smooth_factor=10)
norm_score = norm.fit_transform(raw_score)
```

### Exponentially Weighted Moving Average
The Exponentially Weighted Moving Average (EWMA) serves as an effective smoothing function,
emphasizing the importance of more recent anomaly scores over those of previous elements within a sliding window.

This approach proves particularly beneficial in streaming inference scenarios, as it allows for
earlier increases in anomaly scores when a new outlier data point is encountered.
Consequently, the EMA enables a more responsive and dynamic assessment of streaming data,
facilitating timely detection and response to potential anomalies.

```python
import numpy as np
from numalogic.postprocess import ExpMovingAverage

raw_score = np.array(
[1.0, 1.5, 1.2, 3.5, 2.7, 5.6, 7.1, 6.9, 4.2, 1.1]
).reshape(-1, 1)

postproc_clf = ExpMovingAverage(beta=0.5)
out = postproc_clf.transform(raw_score)

# out: [[1.3], [1.433], [1.333], [2.473], [2.591], [4.119], [5.621], [6.263], [5.229], [3.163]]
```
575 changes: 466 additions & 109 deletions examples/quick-start.ipynb

Large diffs are not rendered by default.

17 changes: 11 additions & 6 deletions numalogic/config/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from sklearn.preprocessing import StandardScaler, MinMaxScaler, MaxAbsScaler, RobustScaler

from numalogic.config._config import ModelInfo
Expand All @@ -23,8 +21,8 @@
TransformerAE,
SparseTransformerAE,
)
from numalogic.models.threshold import StdDevThreshold, StaticThreshold
from numalogic.postprocess import TanhNorm
from numalogic.models.threshold import StdDevThreshold, StaticThreshold, SigmoidThreshold
from numalogic.postprocess import TanhNorm, ExpMovingAverage
from numalogic.preprocess import LogTransformer, StaticPowerTransformer, TanhScaler
from numalogic.tools.exceptions import UnknownConfigArgsError

Expand Down Expand Up @@ -59,11 +57,18 @@ class PreprocessFactory(_ObjectFactory):


class PostprocessFactory(_ObjectFactory):
_CLS_MAP = {"TanhNorm": TanhNorm}
_CLS_MAP = {
"TanhNorm": TanhNorm,
"ExpMovingAverage": ExpMovingAverage,
}


class ThresholdFactory(_ObjectFactory):
_CLS_MAP = {"StdDevThreshold": StdDevThreshold, "StaticThreshold": StaticThreshold}
_CLS_MAP = {
"StdDevThreshold": StdDevThreshold,
"StaticThreshold": StaticThreshold,
"SigmoidThreshold": SigmoidThreshold,
}


class ModelFactory(_ObjectFactory):
Expand Down
5 changes: 5 additions & 0 deletions numalogic/models/autoencoder/trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@


import logging
import sys
import warnings

import pytorch_lightning as pl
import torch
Expand Down Expand Up @@ -38,6 +40,9 @@ def __init__(
if (not callbacks) and enable_progress_bar:
callbacks = ProgressDetails()

if not sys.warnoptions:
warnings.simplefilter("ignore", category=UserWarning)

super().__init__(
logger=logger,
max_epochs=max_epochs,
Expand Down
152 changes: 145 additions & 7 deletions numalogic/postprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,162 @@
# See the License for the specific language governing permissions and
# limitations under the License.


import numpy as np
from numpy.typing import ArrayLike
import numpy.typing as npt

from numalogic.tools import DataIndependentTransformers
from numalogic.tools.exceptions import InvalidDataShapeError


def _allow_only_single_feature(data: npt.NDArray[float]) -> None:
if data.ndim > 2:
raise InvalidDataShapeError(
f"Input data can only be 2 dimensions or less, input size: {data.shape}"
)
if data.ndim > 1 and data.shape[1] > 1:
raise InvalidDataShapeError(
f"Input data can only have 1 feature column, input shape: {data.shape}"
)


def tanh_norm(scores: ArrayLike, scale_factor=10, smooth_factor=10) -> ArrayLike:
def tanh_norm(scores: npt.NDArray[float], scale_factor=10, smooth_factor=10) -> npt.NDArray[float]:
return scale_factor * np.tanh(scores / smooth_factor)


def expmov_avg_aggregator(
arr: npt.NDArray[float], beta: float, bias_correction: bool = True
) -> float:
"""
Aggregate a window of data into an expoentially weighted moving average value.
V(n) = (1 - beta) * beta**n * sum(x(i)/beta**i) [for i = 1 to i = n]
"1.0 - beta" denotes the weight given to the latest element.
Args:
arr: single feature numpy array
beta: how much weight to give to the previous weighted average (n-1)th value
bias_correction: flag to perform bias correction (default: true)
Raises:
ValueError: if beta is not between 0 and 1
InvalidDataShapeError: if input array is not single featured
"""
if beta <= 0.0 or beta >= 1.0:
raise ValueError("beta only accepts values between 0 and 1 (not inclusive)")
_allow_only_single_feature(arr)

# alpha is the weight given to the latest element
alpha = 1.0 - beta
n = len(arr)
theta = arr.reshape(-1, 1)
powers = np.arange(n - 1, -1, -1)

# Calculate decreasing powers of beta of the form
# [beta**(n-1), beta**(n-2), .., beta**0]
beta_powers = np.power(beta, powers).reshape(1, -1)

exp_avg = alpha * (beta_powers @ theta)
if not bias_correction:
return exp_avg.item()

# Perform bias correction
corrected_exp_avg = exp_avg / (1.0 - np.power(beta, n))
return corrected_exp_avg.item()


class TanhNorm(DataIndependentTransformers):
__slots__ = ("scale_factor", "smooth_factor")

def __init__(self, scale_factor=10, smooth_factor=10):
self.scale_factor = scale_factor
self.smooth_factor = smooth_factor

def fit_transform(self, X, y=None, **fit_params):
return self.transform(X)
def fit_transform(self, input_: npt.NDArray[float], _=None, **__) -> npt.NDArray[float]:
return self.transform(input_)

def transform(self, input_: npt.NDArray[float], _=None, **__) -> npt.NDArray[float]:
return tanh_norm(input_, scale_factor=self.scale_factor, smooth_factor=self.smooth_factor)


class ExpMovingAverage(DataIndependentTransformers):
r"""
Calculate the exponential moving averages for a vector.
This transformation returns an array where each element "n"
is given by the expression:
V(n) = (1 - beta) * beta**n * sum(x(i)/beta**i) [for i = 1 to i = n]
"1.0 - beta" denotes the weight given to the latest element.
Without bias correction, early values can tend more towards zero, since V(0) = 0
Bias correction helps inhibit this issue by dividing with (1 - beta**i)
Args:
beta: how much weight to give to the previous weighted average
bias_correction: flag to perform bias correction (default: true)
Note: this only supports single feature input array.
Raises:
ValueError: if beta is not between 0 and 1
"""
__slots__ = ("beta", "bias_correction")

def __init__(self, beta: float, bias_correction: bool = True):
if beta <= 0.0 or beta >= 1.0:
raise ValueError("beta only accepts values between 0 and 1 (not inclusive)")
self.beta = beta
self.bias_correction = bias_correction

def transform(self, input_: npt.NDArray[float], **__):
r"""
Returns transformed output.
Args:
input_: input column vector
Raises:
InvalidDataShapeError: if input array is not single featured
"""
_allow_only_single_feature(input_)

# alpha is the weight given to the latest element
alpha = 1.0 - self.beta
n = len(input_)

theta = input_.reshape(-1, 1)
theta_tril = np.multiply(theta.T, np.tril(np.ones((n, n))))
powers = np.arange(1, n + 1).reshape(-1, 1)

# Calculate increasing powers of beta of the form,
# [beta, beta**2, .., beta**n]
beta_powers = np.power(self.beta, powers)

# Calculate the array of reciprocals of beta powers of form,
# [beta**(-1), beta**(-2), .., beta**(-n)]
beta_arr_inv = np.reciprocal(beta_powers)

# Calculate the summation of the ratio between (theta(i) / beta**i),
# [ theta(1)/beta, sum(theta(1)/beta, theta(2)/beta**2), .., ]
theta_beta_ratio = theta_tril @ beta_arr_inv

# Elemental multiply with beta powers
exp_avg = alpha * np.multiply(beta_powers, theta_beta_ratio)
if not self.bias_correction:
return exp_avg

# Calculate array of 1 / (1 - beta**i) values
return np.divide(exp_avg, 1.0 - beta_powers)

def fit_transform(self, input_: npt.NDArray[float], **__):
r"""
Returns transformed output.
Args:
input_: input column vector
def transform(self, X):
return tanh_norm(X, scale_factor=self.scale_factor, smooth_factor=self.smooth_factor)
Raises:
InvalidDataShapeError: if input array is not single featured
"""
return self.transform(input_)
9 changes: 7 additions & 2 deletions numalogic/tools/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,14 @@ def __iter__(self) -> Iterator[npt.NDArray[float]]:
r"""
Returns an iterator for the StreamingDataset object.
# TODO implement multi worker iter
Raises:
NotImplementedError: If multiple worker input is provided
"""
return iter(self.create_seq(self._data))
worker_info = torch.utils.data.get_worker_info()
if not worker_info or worker_info.num_workers == 1:
return self.create_seq(self._data)

raise NotImplementedError("Multiple workers are not supported yet for Streaming Dataset")

def __len__(self) -> int:
r"""
Expand Down
16 changes: 14 additions & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pytest = "^7.1"
pytest-cov = "^4.0"
pylint = "^2.14.2"
flake8 = "^5.0"
torchinfo = "^1.7.2"

[tool.poetry.group.jupyter]
optional = true
Expand Down
7 changes: 7 additions & 0 deletions tests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import sys
import os
import warnings

if not sys.warnoptions:
warnings.simplefilter("default", category=UserWarning)
os.environ["PYTHONWARNINGS"] = "default"
18 changes: 10 additions & 8 deletions tests/preprocess/test_transformer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import unittest
import warnings

import numpy as np
from numpy.testing import assert_almost_equal, assert_array_less
Expand All @@ -9,7 +10,7 @@

class TestTransformers(unittest.TestCase):
def test_logtransformer(self):
x = 1 + np.random.randn(5, 3)
x = 3 + np.random.randn(5, 3)
transformer = LogTransformer(add_factor=1)
x_prime = transformer.transform(x)

Expand All @@ -18,13 +19,13 @@ def test_logtransformer(self):
assert_almost_equal(transformer.inverse_transform(x_prime), np.expm1(x_prime))

def test_staticpowertransformer(self):
x = 1 + np.random.randn(5, 3)
transformer = StaticPowerTransformer(3, add_factor=2)
x = 3 + np.random.randn(5, 3)
transformer = StaticPowerTransformer(3, add_factor=4)
x_prime = transformer.transform(x)

assert_almost_equal(np.power(2 + x, 3), x_prime)
assert_almost_equal(np.power(4 + x, 3), x_prime)
assert_almost_equal(transformer.fit_transform(x), x_prime)
assert_almost_equal(transformer.inverse_transform(x_prime), x, decimal=4)
assert_almost_equal(transformer.inverse_transform(x_prime), x, decimal=3)

def test_tanh_scaler_1(self):
x = 1 + np.random.randn(5, 3)
Expand All @@ -35,7 +36,7 @@ def test_tanh_scaler_1(self):
assert_array_less(np.zeros_like(x_scaled), x_scaled)

def test_tanh_scaler_2(self):
x = 1 + np.random.randn(5, 3)
x = 3 + np.random.randn(5, 3)
pl = make_pipeline(LogTransformer(), TanhScaler())

x_scaled = pl.fit_transform(x)
Expand All @@ -58,8 +59,9 @@ def test_tanh_scaler_nan(self):
x[:, 1] = np.zeros(5)

scaler = TanhScaler(eps=0.0)

x_scaled = scaler.fit_transform(x)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
x_scaled = scaler.fit_transform(x)
self.assertTrue(np.isnan(x_scaled[:, 1]).all())


Expand Down
Loading

0 comments on commit 3160c2b

Please sign in to comment.