Skip to content

Commit

Permalink
feat!: disentangle threshold selection from the main model (#89)
Browse files Browse the repository at this point in the history
* threshold estimators as separate models
* remove threshold estimating from autoencoders
* simplify mlflow model saving
* mlflow now only supports saving per artifact
* registry load function now returns a dataclass instead of dict
* replace mlflow with mlflow-skinny to reduce unwanted dependencies


Signed-off-by: Avik Basu <[email protected]>
  • Loading branch information
ab93 committed Nov 10, 2022
1 parent 32d3d05 commit 701812e
Show file tree
Hide file tree
Showing 17 changed files with 528 additions and 1,110 deletions.
3 changes: 2 additions & 1 deletion docs/post-processing.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
Post-processing step is again an optional step, where we normalize the anomalies between 0-10. This is mostly to make the scores more understandable.

```python
from numalogic.scores import tanh_norm
from numalogic.postprocess import tanh_norm

test_anomaly_score_norm = tanh_norm(test_anomaly_score)
```
6 changes: 3 additions & 3 deletions docs/quick-start.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ In this example, the train data set has numbers ranging from 1-10. Whereas in th
import numpy as np
from numalogic.models.autoencoder import AutoencoderPipeline
from numalogic.models.autoencoder.variants import Conv1dAE
from numalogic.scores import tanh_norm
from numalogic.postprocess import tanh_norm

X_train = np.array([1, 3, 5, 2, 5, 1, 4, 5, 1, 4, 5, 8, 9, 1, 2, 4, 5, 1, 3]).reshape(-1, 1)
X_test = np.array([-20, 3, 5, 40, 5, 10, 4, 5, 100]).reshape(-1,1)
X_test = np.array([-20, 3, 5, 40, 5, 10, 4, 5, 100]).reshape(-1, 1)

model = AutoencoderPipeline(
model=Conv1dAE(in_channels=1, enc_channels=4), seq_len=8, num_epochs=30
model=Conv1dAE(in_channels=1, enc_channels=4), seq_len=8, num_epochs=30
)
# fit method trains the model on train data set
model.fit(X_train)
Expand Down
Empty file.
2 changes: 1 addition & 1 deletion examples/numalogic-simple-pipeline/src/udf/postprocess.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging

import numpy as np
from numalogic.scores import tanh_norm
from numalogic.postprocess import tanh_norm
from pynumaflow.function import Messages, Message, Datum

from src.utils import Payload
Expand Down
110 changes: 19 additions & 91 deletions numalogic/models/autoencoder/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import io
import logging
from copy import copy
from typing import Optional, Dict, Tuple, BinaryIO, Union, Callable
from typing import Optional, BinaryIO, Union

import numpy as np
import torch
from numpy.typing import NDArray
from sklearn.base import OutlierMixin
from sklearn.base import TransformerMixin, BaseEstimator
from torch import nn, optim, Tensor
from torch.utils.data import DataLoader

Expand All @@ -15,7 +15,7 @@
_LOGGER = logging.getLogger(__name__)


class AutoencoderPipeline(OutlierMixin):
class AutoencoderPipeline(TransformerMixin, BaseEstimator):
r"""
Class to simplify training, inference, loading and saving of time-series autoencoders.
Expand All @@ -25,17 +25,12 @@ class AutoencoderPipeline(OutlierMixin):
model: model instance
seq_len: sequence length
loss_fn: loss function used for training
supported values include {"huber", "l1", "mse"}
supported values include {"huber", "l1", "mse"}
optimizer: optimizer to used for training.
supported values include {"adam", "adagrad", "rmsprop"}
lr: learning rate
batch_size: batch size for training
num_epochs: number of epochs for training
std_tolerance: determines how many times the standard deviation to be used for threshold
reconerr_method: method used to calculate the distance
between the original and the reconstucted data
supported values include {"absolute", "squared"}
threshold_min: the minimum threshold to use;
can be used when the threshold calculated is too low
resume_train: parameter to decide if resume training is needed. Also,
based on this parameter the optimizer state dict
Expand All @@ -59,10 +54,7 @@ def __init__(
lr: float = 0.001,
batch_size: int = 256,
num_epochs: int = 100,
std_tolerance: float = 3.0,
reconerr_method: str = "absolute",
threshold_min: float = None,
resume_train: bool = False,
resume_train: bool = False
):
if not (model and seq_len):
raise ValueError("No model and seq len provided!")
Expand All @@ -75,19 +67,15 @@ def __init__(
self.optimizer = self.init_optimizer(optimizer, lr)
self.batch_size = batch_size
self.num_epochs = num_epochs

self._thresholds = None
self._stats: Dict[str, Optional[float]] = dict(mean=None, std=None)
self.stdtol = std_tolerance
self.reconerr_func = self.get_reconerr_func(reconerr_method)
self.threshold_min = threshold_min
self.resume_train = resume_train
self._epochs_elapsed = 0

@property
def model_properties(self):
model_properties_dict = {
"thresholds": self._thresholds,
"err_stats": self._stats,
"batch_size": self.batch_size,
"num_epochs": self.num_epochs,
"epochs_elapsed": self._epochs_elapsed
}
if self.resume_train:
model_properties_dict["optimizer_state_dict"] = self.optimizer.state_dict()
Expand All @@ -97,22 +85,6 @@ def model_properties(self):
def model(self) -> AutoencoderModel:
return self._model

@property
def thresholds(self) -> Optional[NDArray[float]]:
return self._thresholds

@property
def err_stats(self) -> Dict[str, Optional[NDArray[float]]]:
return self._stats

@staticmethod
def get_reconerr_func(method: str) -> Callable:
if method == "squared":
return np.square
if method == "absolute":
return np.abs
raise ValueError(f"Unrecognized reconstruction error method specified: {method}")

@staticmethod
def init_criterion(loss_fn: str):
if loss_fn == "huber":
Expand Down Expand Up @@ -161,11 +133,7 @@ def fit(self, X: NDArray[float], y=None, log_freq: int = 5) -> "AutoencoderPipel
if epoch % log_freq == 0:
_LOGGER.info(f"epoch : {epoch}, loss_mean : {np.mean(losses):.7f}")
losses = []

self._thresholds, _mean, _std = self.find_thresholds(X)
self._stats["mean"] = _mean
self._stats["std"] = _std

self._epochs_elapsed += 1
return self

def predict(self, X: NDArray[float], seq_len: int = None) -> NDArray[float]:
Expand All @@ -187,59 +155,22 @@ def predict(self, X: NDArray[float], seq_len: int = None) -> NDArray[float]:
_, pred = self._model(dataset.data)
return dataset.recover_shape(pred)

def score(self, X: NDArray[float], seq_len: int = None) -> NDArray[float]:
r"""
Return anomaly score using the calculated threshold
Args:
X: training dataset
seq_len: sequence length / window length
Returns:
numpy array with anomaly scores
"""
if self._thresholds is None:
raise RuntimeError("Thresholds not present!!!")
thresh = self._thresholds.reshape(1, -1)
if not seq_len:
seq_len = self.seq_len or len(X)
recon_err = self.recon_err(X, seq_len=seq_len)
anomaly_scores = recon_err / thresh
return anomaly_scores

def recon_err(self, X: NDArray[float], seq_len: int) -> NDArray:
def score(self, X: NDArray[float]) -> NDArray:
r"""
Returns the reconstruction error.
Args:
X: training dataset
seq_len: sequence length / window length
X: data
Returns:
numpy array with anomaly scores
"""
x_recon = self.predict(X, seq_len=seq_len)
recon_err = self.reconerr_func(X - x_recon)
x_recon = self.predict(X, seq_len=self.seq_len)
recon_err = np.abs(X - x_recon)
return recon_err

def find_thresholds(
self, X: NDArray[float]
) -> Tuple[NDArray[float], NDArray[float], NDArray[float]]:
r"""
Calculate threshold for the anomaly model
Args:
X: training dataset
Returns:
Tuple consisting of thresholds, reconstruction error mean, reconstruction error std
"""
recon_err = self.recon_err(X, seq_len=self.seq_len)
recon_err_mean = np.mean(recon_err, axis=0)
recon_err_std = np.std(recon_err, axis=0)
thresholds = recon_err_mean + (self.stdtol * recon_err_std)
if self.threshold_min:
thresholds[thresholds < self.threshold_min] = self.threshold_min
return thresholds, recon_err_mean, recon_err_std
def transform(self, X: NDArray[float]) -> NDArray:
return self.score(X)

def save(self, path: Optional[str] = None) -> Optional[BinaryIO]:
r"""
Expand All @@ -263,8 +194,9 @@ def save(self, path: Optional[str] = None) -> Optional[BinaryIO]:
def __load_metadata(self, **metadata) -> None:
if self.resume_train:
self.optimizer.load_state_dict(metadata["optimizer_state_dict"])
self._thresholds = metadata["thresholds"]
self._stats = metadata["err_stats"]
self._epochs_elapsed = metadata["epochs_elapsed"]
self.num_epochs = metadata["num_epochs"]
self.batch_size = metadata["batch_size"]

def load(self, path: Union[str, BinaryIO] = None, model=None, **metadata) -> None:
r"""
Expand Down Expand Up @@ -466,7 +398,3 @@ def fit(self, X: NDArray[float], y=None, log_freq: int = 5) -> None:

if epoch % log_freq == 0:
_LOGGER.info(f"epoch : {epoch}, penalty: {penalty} loss_mean : {loss.item():.7f}")

self._thresholds, _mean, _std = self.find_thresholds(X)
self._stats["mean"] = _mean
self._stats["std"] = _std
2 changes: 1 addition & 1 deletion numalogic/models/forecast/variants/naive.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler, FunctionTransformer

from numalogic.scores import tanh_norm
from numalogic.postprocess import tanh_norm


class BaselineForecaster:
Expand Down
Empty file.
41 changes: 41 additions & 0 deletions numalogic/models/threshold/_std.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import numpy as np
from numpy.typing import NDArray
from sklearn.base import BaseEstimator


class StdDevThreshold(BaseEstimator):
def __init__(
self,
std_factor: float = 3.0,
min_threshold: float = 0.1
):
self.std_factor = std_factor
self.min_threshold = min_threshold

self._std = None
self._mean = None
self._threshold = None

@property
def mean(self):
return self._mean

@property
def std(self):
return self._std

@property
def threshold(self):
return self._threshold

def fit(self, X, y=None):
self._std = np.std(X, axis=0)
self._mean = np.mean(X, axis=0)
self._threshold = self._mean + (self.std_factor * self._std)
self._threshold[self._threshold < self.min_threshold] = self.min_threshold

return self

def predict(self, X: NDArray[float]) -> NDArray[float]:
anomaly_scores = X / self.threshold
return anomaly_scores
File renamed without changes.
5 changes: 3 additions & 2 deletions numalogic/registry/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from numalogic.registry.artifact import ArtifactManager
from numalogic.registry.artifact import ArtifactData

try:
from numalogic.registry.mlflow_registry import MLflowRegistrar
except ImportError:
__all__ = ["ArtifactManager"]
__all__ = ["ArtifactManager", "ArtifactData"]
else:
__all__ = ["ArtifactManager", "MLflowRegistrar"]
__all__ = ["ArtifactManager", "ArtifactData", "MLflowRegistrar"]
16 changes: 11 additions & 5 deletions numalogic/registry/artifact.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
from abc import ABCMeta, abstractmethod
from dataclasses import dataclass
from typing import Sequence, Any, Union, Dict

from numalogic.tools.types import Artifact


@dataclass
class ArtifactData:
artifact: Artifact
metadata: Dict[str, Any]
extras: Dict[str, Any]


class ArtifactManager(metaclass=ABCMeta):
"""
Abstract base class for artifact save, load and delete.
Expand All @@ -17,7 +25,7 @@ def __init__(self, uri: str):
@abstractmethod
def load(
self, skeys: Sequence[str], dkeys: Sequence[str], latest: bool = True, version: str = None
) -> Artifact:
) -> ArtifactData:
"""
Loads the desired artifact from mlflow registry and returns it.
Args:
Expand All @@ -33,17 +41,15 @@ def save(
self,
skeys: Sequence[str],
dkeys: Sequence[str],
primary_artifact: Artifact,
secondary_artifacts: Union[Sequence[Artifact], Dict[str, Artifact], None] = None,
artifact: Artifact,
**metadata
) -> Any:
r"""
Saves the artifact into mlflow registry and updates version.
Args:
skeys: static key fields as list/tuple of strings
dkeys: dynamic key fields as list/tuple of strings
primary_artifact: primary artifact to be saved
secondary_artifacts: secondary artifact to be saved
artifact: primary artifact to be saved
metadata: additional metadata surrounding the artifact that needs to be saved
"""
pass
Expand Down
Loading

0 comments on commit 701812e

Please sign in to comment.