Skip to content

Commit

Permalink
fix: add max value map for cliping the value (#339)
Browse files Browse the repository at this point in the history
Add max_value_map for trainer

---------

Signed-off-by: s0nicboOm <[email protected]>
  • Loading branch information
s0nicboOm committed Jan 18, 2024
1 parent 80ef431 commit dbb510f
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 4 deletions.
4 changes: 3 additions & 1 deletion numalogic/config/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@


from dataclasses import dataclass, field
from typing import Any
from typing import Any, Optional

from omegaconf import MISSING

Expand Down Expand Up @@ -93,6 +93,8 @@ class TrainerConf:
retry_sec: int = 600 # 10 min
batch_size: int = 64
data_freq_sec: int = 60
# TODO: Support trainer based transform models
max_value_map: Optional[dict[str, float]] = None
pltrainer_conf: LightningTrainerConf = field(default_factory=LightningTrainerConf)


Expand Down
1 change: 0 additions & 1 deletion numalogic/udfs/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,6 @@ def ack_read(
bool
"""
print(key)
_key = self.__construct_train_key(key)
metadata = self.__fetch_ts(key=_key)
_msg_read_ts, _msg_train_ts, _msg_train_records = (
Expand Down
18 changes: 16 additions & 2 deletions numalogic/udfs/trainer/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,9 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
_LOGGER.info("%s - Data fetched, shape: %s", payload.uuid, df.shape)

# Construct feature array
x_train, nan_counter, inf_counter = self.get_feature_arr(df, payload.metrics)
x_train, nan_counter, inf_counter = self.get_feature_arr(
df, payload.metrics, max_value_map=_conf.numalogic_conf.trainer.max_value_map
)
_add_summary(
summary=NAN_SUMMARY,
labels=_metric_label_values,
Expand Down Expand Up @@ -328,7 +330,10 @@ def _is_data_sufficient(self, payload: TrainerPayload, df: pd.DataFrame) -> bool
# TODO: Use a custom impute in transforms module
@staticmethod
def get_feature_arr(
raw_df: pd.DataFrame, metrics: list[str], fill_value: float = 0.0
raw_df: pd.DataFrame,
metrics: list[str],
fill_value: float = 0.0,
max_value_map: Optional[dict[str, float]] = None,
) -> tuple[npt.NDArray[float], float, float]:
"""
Get feature array from the raw dataframe.
Expand All @@ -349,7 +354,16 @@ def get_feature_arr(
if col not in raw_df.columns:
raw_df[col] = fill_value
nan_counter += len(raw_df)

feat_df = raw_df[metrics]
if max_value_map:
max_value_list = [max_value_map.get(col, np.nan) for col in metrics]
feat_df.clip(upper=max_value_list, inplace=True)
_LOGGER.info(
"Replaced %s with max_value_map from the map with value of %s.",
metrics,
max_value_list,
)
nan_counter += raw_df.isna().sum().all()
inf_counter = np.isinf(feat_df).sum().all()
feat_df = feat_df.fillna(fill_value).replace([np.inf, -np.inf], fill_value)
Expand Down
8 changes: 8 additions & 0 deletions tests/udfs/resources/_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ stream_confs:
name: "StdDevThreshold"
conf:
min_threshold: 0.1
trainer:
train_hours: 3
min_train_size: 100
max_value_map:
"col1": 0
pltrainer_conf:
accelerator: cpu
max_epochs: 5
pipeline2:
pipeline_id: "pipeline2"
metrics: [ "col1" , "col2" ]
Expand Down
9 changes: 9 additions & 0 deletions tests/udfs/resources/_config2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@ stream_confs:
name: "MahalanobisThreshold"
conf:
max_outlier_prob: 0.08
trainer:
train_hours: 3
min_train_size: 100
max_value_map:
"failed": 0
"degraded": 0
pltrainer_conf:
accelerator: cpu
max_epochs: 5

redis_conf:
url: "isbsvc-redis-isbs-redis-svc.oss-analytics-numalogicosamfci-usw2-e2e.svc"
Expand Down

0 comments on commit dbb510f

Please sign in to comment.