Skip to content

Commit

Permalink
feat: Score config in postprocess
Browse files Browse the repository at this point in the history
Signed-off-by: Avik Basu <[email protected]>
  • Loading branch information
ab93 committed Jan 31, 2024
1 parent a913545 commit bc6d08c
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 86 deletions.
98 changes: 52 additions & 46 deletions numalogic/backtest/_prom.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
)
from numalogic.connectors import ConnectorType
from numalogic.connectors.prometheus import PrometheusFetcher
from numalogic.tools.aggregators import aggregate_window, aggregate_features
from numalogic.tools.data import StreamingDataset, inverse_window
from numalogic.tools.types import artifact_t
from numalogic.udfs import UDFFactory, StreamConf, MLPipelineConf
Expand Down Expand Up @@ -190,69 +189,50 @@ def generate_scores(
x_scaled = preproc_udf.compute(model=artifacts["preproc_clf"], input_=x_test)

ds = StreamingDataset(x_scaled, seq_len=self.conf.window_size)
x_recon = np.zeros((len(ds), self.conf.window_size, len(self.metrics)), dtype=np.float32)

# raw_scores = np.zeros_like(x_recon, dtype=np.float32)
x_recon = np.zeros((len(ds), self.conf.window_size, len(self.metrics)), dtype=np.float32)
raw_scores = np.zeros((len(ds), self.conf.window_size, len(self.metrics)), dtype=np.float32)
# raw_scores = np.zeros((len(ds), len(self.metrics)), dtype=np.float32)
feature_scores = np.zeros((len(ds), len(self.metrics)), dtype=np.float32)
unified_scores = np.zeros((len(ds), 1), dtype=np.float32)

final_scores = np.zeros((len(ds), len(self.metrics)), dtype=np.float32)
agg_final_scores = np.zeros((len(ds), 1), dtype=np.float32)
postproc_func = PostprocessFactory().get_instance(self.nlconf.postprocess)

# Model Inference
for idx, arr in enumerate(ds):
x_recon[idx] = nn_udf.compute(model=artifacts["model"], input_=arr)
# y, y_final = postproc_udf.compute(
# model=artifacts["threshold_clf"],
# input_=x_recon[idx],
# postproc_clf=postproc_func,
# )
# agg_final_scores[idx] = postproc_udf.aggregate_features(y_final)
# raw_scores[idx], final_scores[idx] = y, y_final

thresh_out = postproc_udf.compute_threshold(artifacts["threshold_clf"], x_recon[idx])
raw_scores[idx] = thresh_out
_y = aggregate_window(raw_scores[idx])

final_scores[idx] = postproc_udf.compute_postprocess(postproc_func, _y)
feature_scores[idx] = postproc_udf.compute_feature_scores(
raw_scores[idx], self.nlconf.score
)

_out = aggregate_features(final_scores[idx].reshape(1, -1)).reshape(-1)
agg_final_scores[idx] = _out
postproc_out = postproc_udf.compute_postprocess(postproc_func, feature_scores[idx])

print(x_recon.shape, raw_scores.shape, final_scores.shape, agg_final_scores.shape)
unified_scores[idx] = postproc_udf.compute_unified_score(
postproc_out, self.nlconf.score
)

x_recon = inverse_window(torch.from_numpy(x_recon), method="keep_first").numpy()
# raw_scores = inverse_window(
# torch.unsqueeze(torch.from_numpy(raw_scores), dim=2), method="keep_first"
# ).numpy()
raw_scores = inverse_window(torch.from_numpy(raw_scores), method="keep_first").numpy()
# final_scores = inverse_window(
# torch.unsqueeze(torch.from_numpy(final_scores), dim=2), method="keep_first"
# ).numpy()

print(x_recon.shape, raw_scores.shape, final_scores.shape, agg_final_scores.shape)

final_scores = np.vstack(
feature_scores = np.vstack(
[
np.full((self.conf.window_size - 1, len(self.metrics)), fill_value=np.nan),
final_scores,
feature_scores,
]
)

agg_final_scores = np.vstack(
[np.full((self.conf.window_size - 1, 1), fill_value=np.nan), agg_final_scores]
unified_scores = np.vstack(
[np.full((self.conf.window_size - 1, 1), fill_value=np.nan), unified_scores]
)

print(x_recon.shape, raw_scores.shape, final_scores.shape, agg_final_scores.shape)

return self._construct_output(
df_test,
preproc_out=x_scaled,
nn_out=x_recon,
thresh_out=raw_scores,
postproc_out=final_scores,
# postproc_out=agg_final_scores,
postproc_out=feature_scores,
unified_out=unified_scores,
)

@classmethod
Expand Down Expand Up @@ -319,8 +299,39 @@ def _construct_output(
nn_out: NDArray[float],
thresh_out: NDArray[float],
postproc_out: NDArray[float],
unified_out: NDArray[float],
) -> pd.DataFrame:
ts_idx = input_df.index

if thresh_out.shape[1] > 1:
thresh_df = pd.DataFrame(
thresh_out,
columns=self.metrics,
index=ts_idx,
)
else:
thresh_df = pd.DataFrame(
thresh_out,
columns=["unified"],
index=ts_idx,
)

if postproc_out.shape[1] > 1:
postproc_df = pd.DataFrame(
postproc_out,
# columns=["unified_score"],
columns=self.metrics,
index=ts_idx,
)
else:
postproc_df = (
pd.DataFrame(
postproc_out,
columns=["unified"],
index=ts_idx,
),
)

dfs = {
"input": input_df,
"preproc_out": pd.DataFrame(
Expand All @@ -333,16 +344,11 @@ def _construct_output(
columns=self.metrics,
index=ts_idx,
),
"thresh_out": pd.DataFrame(
thresh_out,
# columns=["unified_score"],
columns=self.metrics,
index=ts_idx,
),
"postproc_out": pd.DataFrame(
postproc_out,
# columns=["unified_score"],
columns=self.metrics,
"thresh_out": thresh_df,
"postproc_out": postproc_df,
"unified_out": pd.DataFrame(
unified_out,
columns=["unified"],
index=ts_idx,
),
}
Expand Down
4 changes: 4 additions & 0 deletions numalogic/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
LightningTrainerConf,
RegistryInfo,
TrainerConf,
ScoreConf,
)
from numalogic.config.factory import (
ModelFactory,
PreprocessFactory,
PostprocessFactory,
ThresholdFactory,
RegistryFactory,
AggregatorFactory,
)


Expand All @@ -37,4 +39,6 @@
"ThresholdFactory",
"RegistryFactory",
"TrainerConf",
"ScoreConf",
"AggregatorFactory",
]
30 changes: 30 additions & 0 deletions numalogic/config/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@


from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Optional

from omegaconf import MISSING
Expand Down Expand Up @@ -98,6 +99,34 @@ class TrainerConf:
pltrainer_conf: LightningTrainerConf = field(default_factory=LightningTrainerConf)


class AggMethod(str, Enum):
EXP = "exp_moving_average"
WEIGHTED_AVG = "weighted_average"
MEAN = "mean"
MAX = "max"
MIN = "min"

@classmethod
def get_all(cls) -> list[str]:
return [AggMethod.EXP, AggMethod.WEIGHTED_AVG, AggMethod.MEAN, AggMethod.MAX, AggMethod.MIN]


@dataclass
class AggregatorConf:
method: AggMethod
conf: dict[str, Any] = field(default_factory=dict)


@dataclass
class ScoreConf:
window_agg: AggregatorConf = field(
default_factory=lambda: AggregatorConf(method=AggMethod.EXP, conf=dict(beta=0.6))
)
feature_agg: AggregatorConf = field(
default_factory=lambda: AggregatorConf(method=AggMethod.MAX)
)


@dataclass
class NumalogicConf:
"""Top level config schema for numalogic."""
Expand All @@ -109,6 +138,7 @@ class NumalogicConf:
postprocess: ModelInfo = field(
default_factory=lambda: ModelInfo(name="TanhNorm", stateful=False)
)
score: ScoreConf = field(default_factory=lambda: ScoreConf())


@dataclass
Expand Down
32 changes: 30 additions & 2 deletions numalogic/config/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@
# limitations under the License.
from typing import Union, ClassVar, TypeVar

import numpy as np
from sklearn.pipeline import make_pipeline

from numalogic.config._config import ModelInfo, RegistryInfo
from numalogic.config._config import ModelInfo, RegistryInfo, AggMethod
from numalogic.tools.exceptions import UnknownConfigArgsError


conf_t = TypeVar("conf_t", bound=Union[ModelInfo, RegistryInfo], covariant=True)


Expand Down Expand Up @@ -192,3 +192,31 @@ def get_cls(cls, name: str):
raise UnknownConfigArgsError(
f"Invalid name provided for ConnectorFactory: {name}"
) from None


class AggregatorFactory:
"""Factory class for aggregator functions."""

from numalogic.transforms import expmov_avg_aggregator

_FUNC_MAP: ClassVar[dict] = {
AggMethod.MAX: np.max,
AggMethod.MIN: np.min,
AggMethod.MEAN: np.mean,
AggMethod.WEIGHTED_AVG: np.average,
AggMethod.EXP: expmov_avg_aggregator,
}

@classmethod
def get_func(cls, name: str):
try:
return cls._FUNC_MAP[name]
except KeyError:
raise UnknownConfigArgsError(
f"Invalid agg method provided for AggregatorFactory: {name}"
) from None

@classmethod
def invoke_func(cls, name: str, *args, **kwargs):
func = cls.get_func(name)
return func(*args, **kwargs)
17 changes: 7 additions & 10 deletions numalogic/tools/aggregators.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import os
from collections.abc import Sequence
from typing import Optional
from collections.abc import Callable

import numpy as np
import numpy.typing as npt
Expand All @@ -11,17 +10,15 @@
EXP_MOV_AVG_BETA = float(os.getenv("EXP_MOV_AVG_BETA", "0.6"))


def aggregate_window(y: npt.NDArray[float]) -> npt.NDArray[float]:
def aggregate_window(
y: npt.NDArray[float], agg_func: Callable = expmov_avg_aggregator, **agg_func_kw
) -> npt.NDArray[float]:
"""Aggregate over window/sequence length."""
return np.apply_along_axis(
func1d=expmov_avg_aggregator, axis=0, arr=y, beta=EXP_MOV_AVG_BETA
).reshape(-1)
return np.apply_along_axis(func1d=agg_func, axis=0, arr=y, **agg_func_kw).reshape(-1)


def aggregate_features(
y: npt.NDArray[float], weights: Optional[Sequence[float]] = None
y: npt.NDArray[float], agg_func: Callable = np.mean, **agg_func_kw
) -> npt.NDArray[float]:
"""Aggregate over features."""
if weights:
return np.average(y, weights=weights, axis=1, keepdims=True)
return np.mean(y, axis=1, keepdims=True)
return agg_func(y, axis=1, keepdims=True, **agg_func_kw)
2 changes: 1 addition & 1 deletion numalogic/udfs/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ async def aexec(self, keys: list[str], datum: Datum) -> Messages:

@classmethod
@abstractmethod
def compute(cls, model: artifact_t, input_: npt.NDArray[float], **kwargs):
def compute(cls, model: artifact_t, input_: npt.NDArray[float], *args, **kwargs):
"""
Abstract method to be implemented by subclasses.
Expand Down
Loading

0 comments on commit bc6d08c

Please sign in to comment.