Skip to content

Commit

Permalink
tmp: testing per feature score
Browse files Browse the repository at this point in the history
Signed-off-by: Avik Basu <[email protected]>
  • Loading branch information
ab93 committed Jan 30, 2024
1 parent 8902e6f commit cc681a3
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 43 deletions.
69 changes: 51 additions & 18 deletions numalogic/backtest/_prom.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
)
from numalogic.connectors import ConnectorType
from numalogic.connectors.prometheus import PrometheusFetcher
from numalogic.tools.aggregators import aggregate_window, aggregate_features
from numalogic.tools.data import StreamingDataset, inverse_window
from numalogic.tools.types import artifact_t
from numalogic.udfs import UDFFactory, StreamConf, MLPipelineConf
Expand Down Expand Up @@ -189,39 +190,69 @@ def generate_scores(
x_scaled = preproc_udf.compute(model=artifacts["preproc_clf"], input_=x_test)

ds = StreamingDataset(x_scaled, seq_len=self.conf.window_size)
raw_scores = np.zeros((len(ds), self.conf.window_size), dtype=np.float32)
final_scores = np.zeros((len(ds), 1), dtype=np.float32)
postproc_func = PostprocessFactory().get_instance(self.nlconf.postprocess)

x_recon = np.zeros((len(ds), self.conf.window_size, len(self.metrics)), dtype=np.float32)

# raw_scores = np.zeros_like(x_recon, dtype=np.float32)
raw_scores = np.zeros((len(ds), self.conf.window_size, len(self.metrics)), dtype=np.float32)
# raw_scores = np.zeros((len(ds), len(self.metrics)), dtype=np.float32)

final_scores = np.zeros((len(ds), len(self.metrics)), dtype=np.float32)
agg_final_scores = np.zeros((len(ds), 1), dtype=np.float32)
postproc_func = PostprocessFactory().get_instance(self.nlconf.postprocess)

# Model Inference
for idx, arr in enumerate(ds):
x_recon[idx] = nn_udf.compute(model=artifacts["model"], input_=arr)
raw_scores[idx], final_scores[idx] = postproc_udf.compute(
model=artifacts["threshold_clf"],
input_=x_recon[idx],
postproc_clf=postproc_func,
)
# y, y_final = postproc_udf.compute(
# model=artifacts["threshold_clf"],
# input_=x_recon[idx],
# postproc_clf=postproc_func,
# )
# agg_final_scores[idx] = postproc_udf.aggregate_features(y_final)
# raw_scores[idx], final_scores[idx] = y, y_final

thresh_out = postproc_udf.compute_threshold(artifacts["threshold_clf"], x_recon[idx])
raw_scores[idx] = thresh_out
_y = aggregate_window(raw_scores[idx])

final_scores[idx] = postproc_udf.compute_postprocess(postproc_func, _y)

_out = aggregate_features(final_scores[idx].reshape(1, -1)).reshape(-1)
agg_final_scores[idx] = _out

print(x_recon.shape, raw_scores.shape, final_scores.shape, agg_final_scores.shape)

x_recon = inverse_window(torch.from_numpy(x_recon), method="keep_first").numpy()
raw_scores = inverse_window(
torch.unsqueeze(torch.from_numpy(raw_scores), dim=2), method="keep_first"
).numpy()
final_scores = inverse_window(
torch.unsqueeze(torch.from_numpy(final_scores), dim=2), method="keep_first"
).numpy()
# raw_scores = inverse_window(
# torch.unsqueeze(torch.from_numpy(raw_scores), dim=2), method="keep_first"
# ).numpy()
raw_scores = inverse_window(torch.from_numpy(raw_scores), method="keep_first").numpy()
# final_scores = inverse_window(
# torch.unsqueeze(torch.from_numpy(final_scores), dim=2), method="keep_first"
# ).numpy()

print(x_recon.shape, raw_scores.shape, final_scores.shape, agg_final_scores.shape)

final_scores = np.vstack(
[np.full((self.conf.window_size - 1, 1), fill_value=np.nan), final_scores]
[
np.full((self.conf.window_size - 1, len(self.metrics)), fill_value=np.nan),
final_scores,
]
)

agg_final_scores = np.vstack(
[np.full((self.conf.window_size - 1, 1), fill_value=np.nan), agg_final_scores]
)

print(x_recon.shape, raw_scores.shape, final_scores.shape, agg_final_scores.shape)

return self._construct_output(
df_test,
preproc_out=x_scaled,
nn_out=x_recon,
thresh_out=raw_scores,
postproc_out=final_scores,
# postproc_out=agg_final_scores,
)

@classmethod
Expand Down Expand Up @@ -304,12 +335,14 @@ def _construct_output(
),
"thresh_out": pd.DataFrame(
thresh_out,
columns=["unified_score"],
# columns=["unified_score"],
columns=self.metrics,
index=ts_idx,
),
"postproc_out": pd.DataFrame(
postproc_out,
columns=["unified_score"],
# columns=["unified_score"],
columns=self.metrics,
index=ts_idx,
),
}
Expand Down
8 changes: 4 additions & 4 deletions numalogic/models/threshold/_median.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,11 @@ def score_samples(self, x: npt.NDArray[float]) -> npt.NDArray[float]:
raise ModelInitializationError("Model not fitted yet.")

self._validate_input(x)
scores = x / self._thresh
return x / self._thresh

if self._agg:
return self.agg_score_samples(scores, weights=self._weights)
return scores
# if self._agg:
# return self.agg_score_samples(scores, weights=self._weights)
# return scores

@staticmethod
def agg_score_samples(
Expand Down
27 changes: 27 additions & 0 deletions numalogic/tools/aggregators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import os
from collections.abc import Sequence
from typing import Optional

import numpy as np
import numpy.typing as npt

from numalogic.transforms import expmov_avg_aggregator


EXP_MOV_AVG_BETA = float(os.getenv("EXP_MOV_AVG_BETA", "0.6"))


def aggregate_window(y: npt.NDArray[float]) -> npt.NDArray[float]:
"""Aggregate over window/sequence length."""
return np.apply_along_axis(
func1d=expmov_avg_aggregator, axis=0, arr=y, beta=EXP_MOV_AVG_BETA
).reshape(-1)


def aggregate_features(
y: npt.NDArray[float], weights: Optional[Sequence[float]] = None
) -> npt.NDArray[float]:
"""Aggregate over features."""
if weights:
return np.average(y, weights=weights, axis=1, keepdims=True)
return np.mean(y, axis=1, keepdims=True)
56 changes: 35 additions & 21 deletions numalogic/udfs/postprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from pynumaflow.mapper import Messages, Datum, Message

from numalogic.config import PostprocessFactory, RegistryFactory
from numalogic.transforms import expmov_avg_aggregator
from numalogic.tools.aggregators import aggregate_window, aggregate_features
from numalogic.registry import LocalLRUCache
from numalogic.tools.types import redis_client_t, artifact_t
from numalogic.udfs import NumalogicUDF
Expand Down Expand Up @@ -127,11 +127,16 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
# Postprocess payload
if payload.status in (Status.ARTIFACT_FOUND, Status.ARTIFACT_STALE) and thresh_artifact:
try:
raw_scores, anomaly_scores = self.compute(
model=thresh_artifact.artifact,
input_=payload.get_data(),
postproc_clf=postproc_clf,
)
raw_scores = self.compute_threshold(
thresh_artifact.artifact, payload.get_data()
) # (seqlen x nfeat)
raw_win_scores = aggregate_window(raw_scores) # (nfeat,)
anomaly_scores = self.compute_postprocess(postproc_clf, raw_win_scores) # (nfeat,)
anomaly_score = aggregate_features(
anomaly_scores.reshape(1, -1),
weights=_conf.numalogic_conf.threshold.conf.get("feature_weights"),
).reshape(-1) # (1,)

except RuntimeError:
_increment_counter(RUNTIME_ERROR_COUNTER, _metric_label_values)
_LOGGER.exception(
Expand All @@ -153,7 +158,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
pipeline_id=payload.pipeline_id,
composite_keys=payload.composite_keys,
timestamp=payload.end_ts,
unified_anomaly=np.max(anomaly_scores),
unified_anomaly=anomaly_score,
data=self._per_feature_score(payload.metrics, anomaly_scores),
metadata=payload.metadata,
)
Expand Down Expand Up @@ -190,9 +195,6 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:

@staticmethod
def _per_feature_score(feat_names: list[str], scores: NDArray[float]) -> dict[str, float]:
if scores.shape[0] == 1:
# TODO improve this to incorporate per feature anomaly insights
return {_name: scores.item() for _name in feat_names}
return dict(zip(feat_names, scores))

@classmethod
Expand All @@ -205,26 +207,38 @@ def compute(
Args:
-------
model: Model instance
input_: Input data
input_: Input data (Shape: seq_len x n_features)
kwargs: Additional arguments
Returns
-------
Output data
Output data tuple of (threshold_output, postprocess_output)
Raises
------
RuntimeError: If threshold model or postproc function fails
"""
_start_time = time.perf_counter()
raw_scores = cls.compute_threshold(model, input_)
raw_win_scores = aggregate_window(raw_scores)
final_scores = cls.compute_postprocess(postproc_clf, raw_win_scores)
_LOGGER.debug(
"Time taken in postprocess compute: %.4f sec", time.perf_counter() - _start_time
)
return raw_scores, final_scores

@classmethod
def compute_threshold(cls, model: artifact_t, input_: NDArray[float]) -> NDArray[float]:
try:
y_score = model.score_samples(input_).astype(np.float32)
scores = model.score_samples(input_).astype(np.float32)
except Exception as err:
raise RuntimeError("Threshold model scoring failed") from err
return scores

@classmethod
def compute_postprocess(cls, tx: artifact_t, input_: NDArray[float]):
try:
win_score = np.apply_along_axis(
func1d=expmov_avg_aggregator, axis=0, arr=y_score, beta=EXP_MOV_AVG_BETA
).reshape(-1)
score = postproc_clf.transform(win_score)
score = tx.transform(input_)
except Exception as err:
raise RuntimeError("Postprocess failed") from err
_LOGGER.debug(
"Time taken in postprocess compute: %.4f sec", time.perf_counter() - _start_time
)
return y_score.reshape(-1), score.reshape(-1)
return score

0 comments on commit cc681a3

Please sign in to comment.