From b36356c656285981ac8dc12f96586182d9404906 Mon Sep 17 00:00:00 2001 From: Avik Basu <3485425+ab93@users.noreply.github.com> Date: Sun, 21 Apr 2024 14:20:20 -0700 Subject: [PATCH] fix: percentile scaler, exp mov avg, sigmoid norm (#369) Signed-off-by: Avik Basu --- numalogic/config/factory.py | 6 ++-- numalogic/transforms/__init__.py | 3 +- numalogic/transforms/_movavg.py | 44 +++++----------------------- numalogic/transforms/_postprocess.py | 10 +++++++ numalogic/transforms/_scaler.py | 32 ++++++++++++++++++-- 5 files changed, 53 insertions(+), 42 deletions(-) diff --git a/numalogic/config/factory.py b/numalogic/config/factory.py index be2e0777..cd16e603 100644 --- a/numalogic/config/factory.py +++ b/numalogic/config/factory.py @@ -52,6 +52,7 @@ class PreprocessFactory(_ObjectFactory): DifferenceTransform, FlattenVector, PercentileScaler, + ExpMovingAverage ) _CLS_MAP: ClassVar[dict] = { @@ -67,6 +68,7 @@ class PreprocessFactory(_ObjectFactory): "DifferenceTransform": DifferenceTransform, "FlattenVector": FlattenVector, "PercentileScaler": PercentileScaler, + "ExpMovingAverage": ExpMovingAverage } def get_pipeline_instance(self, objs_info: list[ModelInfo]): @@ -84,9 +86,9 @@ def get_pipeline_instance(self, objs_info: list[ModelInfo]): class PostprocessFactory(_ObjectFactory): """Factory class to create postprocess instances.""" - from numalogic.transforms import TanhNorm, ExpMovingAverage + from numalogic.transforms import TanhNorm, ExpMovingAverage, SigmoidNorm - _CLS_MAP: ClassVar[dict] = {"TanhNorm": TanhNorm, "ExpMovingAverage": ExpMovingAverage} + _CLS_MAP: ClassVar[dict] = {"TanhNorm": TanhNorm, "ExpMovingAverage": ExpMovingAverage, "SigmoidNorm": SigmoidNorm} class ThresholdFactory(_ObjectFactory): diff --git a/numalogic/transforms/__init__.py b/numalogic/transforms/__init__.py index 38d2e11f..f237435d 100644 --- a/numalogic/transforms/__init__.py +++ b/numalogic/transforms/__init__.py @@ -24,7 +24,7 @@ FlattenVector, ) from numalogic.transforms._movavg import ExpMovingAverage, expmov_avg_aggregator -from numalogic.transforms._postprocess import TanhNorm, tanh_norm +from numalogic.transforms._postprocess import TanhNorm, tanh_norm, SigmoidNorm __all__ = [ "TanhScaler", @@ -39,4 +39,5 @@ "DifferenceTransform", "FlattenVector", "PercentileScaler", + "SigmoidNorm" ] diff --git a/numalogic/transforms/_movavg.py b/numalogic/transforms/_movavg.py index 67608e08..c431d61c 100644 --- a/numalogic/transforms/_movavg.py +++ b/numalogic/transforms/_movavg.py @@ -10,6 +10,7 @@ # limitations under the License. import numpy as np +import pandas as pd from numalogic.base import StatelessTransformer from numalogic.tools.exceptions import InvalidDataShapeError @@ -86,24 +87,20 @@ class ExpMovingAverage(StatelessTransformer): Args: ---- beta: how much weight to give to the previous weighted average - bias_correction: flag to perform bias correction (default: true) - - Note: this only supports single feature input array. Raises ------ ValueError: if beta is not between 0 and 1 """ - __slots__ = ("beta", "bias_correction") + __slots__ = ("alpha",) - def __init__(self, beta: float, bias_correction: bool = True): + def __init__(self, beta: float = 0.5): if beta <= 0.0 or beta >= 1.0: raise ValueError("beta only accepts values between 0 and 1 (not inclusive)") - self.beta = beta - self.bias_correction = bias_correction + self.alpha = 1.0 - beta - def transform(self, input_: npt.NDArray[float], **__): + def transform(self, input_: npt.NDArray[float], **__) -> npt.NDArray[float]: r"""Returns transformed output. Args: @@ -114,32 +111,5 @@ def transform(self, input_: npt.NDArray[float], **__): ------ InvalidDataShapeError: if input array is not single featured """ - _allow_only_single_feature(input_) - - # alpha is the weight given to the latest element - alpha = 1.0 - self.beta - n = len(input_) - - theta = input_.reshape(-1, 1) - theta_tril = np.multiply(theta.T, np.tril(np.ones((n, n)))) - powers = np.arange(1, n + 1).reshape(-1, 1) - - # Calculate increasing powers of beta of the form, - # [beta, beta**2, .., beta**n] - beta_powers = np.power(self.beta, powers) - - # Calculate the array of reciprocals of beta powers of form, - # [beta**(-1), beta**(-2), .., beta**(-n)] - beta_arr_inv = np.reciprocal(beta_powers) - - # Calculate the summation of the ratio between (theta(i) / beta**i), - # [ theta(1)/beta, sum(theta(1)/beta, theta(2)/beta**2), .., ] - theta_beta_ratio = theta_tril @ beta_arr_inv - - # Elemental multiply with beta powers - exp_avg = alpha * np.multiply(beta_powers, theta_beta_ratio) - if not self.bias_correction: - return exp_avg - - # Calculate array of 1 / (1 - beta**i) values - return np.divide(exp_avg, 1.0 - beta_powers) + x_df = pd.DataFrame(input_) + return x_df.ewm(alpha=self.alpha).mean().to_numpy(dtype=np.float32) diff --git a/numalogic/transforms/_postprocess.py b/numalogic/transforms/_postprocess.py index 1e4ad149..f921889c 100644 --- a/numalogic/transforms/_postprocess.py +++ b/numalogic/transforms/_postprocess.py @@ -48,3 +48,13 @@ def __init__(self, scale_factor=10, smooth_factor=10): def transform(self, input_: npt.NDArray[float], **__) -> npt.NDArray[float]: return tanh_norm(input_, scale_factor=self.scale_factor, smooth_factor=self.smooth_factor) + + +class SigmoidNorm(StatelessTransformer): + def __init__(self, scale_factor: float = 10., smooth_factor: float = 0.5): + super().__init__() + self.scale_factor = scale_factor + self.smooth_factor = smooth_factor + + def transform(self, x: npt.NDArray[float], **__) -> npt.NDArray[float]: + return self.scale_factor / (1.0 + np.exp(5 - (self.smooth_factor * x))) \ No newline at end of file diff --git a/numalogic/transforms/_scaler.py b/numalogic/transforms/_scaler.py index 4cb7ed2d..355013a6 100644 --- a/numalogic/transforms/_scaler.py +++ b/numalogic/transforms/_scaler.py @@ -82,24 +82,52 @@ class PercentileScaler(BaseTransformer): ----- max_percentile: float, optional The upper percentile to clip the data. - Default is 90. + Default is 99. min_percentile: float, optional The lower percentile to clip the data. If None, minimum value of the data is used. Default is None. """ - def __init__(self, max_percentile: float = 95, min_percentile: Optional[float] = None): + def __init__(self, max_percentile: float = 99, min_percentile: Optional[float] = None, eps: float = 1e-2): self._max_px = max_percentile self._min_px = min_percentile self.tx = MinMaxScaler() + self._data_pth_max = None + self._data_pth_min = None + self._eps = eps + + @property + def data_pth_max(self) -> float: + return self._data_pth_max + + @property + def data_pth_min(self) -> float: + return self._data_pth_min + def fit(self, x: npt.NDArray[float]) -> Self: data_max_px = np.percentile(x, self._max_px, axis=0) + data_max = np.max(x, axis=0) + if self._min_px is None: data_min_px = np.min(x, axis=0) else: data_min_px = np.percentile(x, self._min_px, axis=0) + + p_ranges = data_max_px - data_min_px + + for idx, _range in enumerate(p_ranges): + if _range <= self._eps: + LOGGER.warning( + "Max and Min percentile difference is less than " + "epsilon: %s for column %s", self._eps, idx + ) + data_max_px[idx] = data_max[idx] + + self._data_pth_max = data_max_px + self._data_pth_min = data_min_px + x_clipped = DataClipper(lower=data_min_px, upper=data_max_px).transform(x) return self.tx.fit(x_clipped)