Skip to content

Commit

Permalink
fix: add FlattenVector transformer (#344)
Browse files Browse the repository at this point in the history
1. Fix saving of stateless transformer
2. Introduce FlattenVector

---------

Signed-off-by: s0nicboOm <[email protected]>
  • Loading branch information
s0nicboOm committed Feb 1, 2024
1 parent 146ec00 commit 824a55d
Show file tree
Hide file tree
Showing 11 changed files with 76 additions and 36 deletions.
8 changes: 7 additions & 1 deletion numalogic/config/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,12 @@ class PreprocessFactory(_ObjectFactory):
"""Factory class to create preprocess instances."""

from sklearn.preprocessing import StandardScaler, MinMaxScaler, MaxAbsScaler, RobustScaler
from numalogic.transforms import LogTransformer, StaticPowerTransformer, TanhScaler
from numalogic.transforms import (
LogTransformer,
StaticPowerTransformer,
TanhScaler,
FlattenVector,
)

_CLS_MAP: ClassVar[dict] = {
"StandardScaler": StandardScaler,
Expand All @@ -53,6 +58,7 @@ class PreprocessFactory(_ObjectFactory):
"LogTransformer": LogTransformer,
"StaticPowerTransformer": StaticPowerTransformer,
"TanhScaler": TanhScaler,
"FlattenVector": FlattenVector,
}

def get_pipeline_instance(self, objs_info: list[ModelInfo]):
Expand Down
1 change: 1 addition & 0 deletions numalogic/tools/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class KeyedArtifact(NamedTuple):

dkeys: KEYS
artifact: artifact_t
stateful: bool = True


class Singleton(type):
Expand Down
3 changes: 2 additions & 1 deletion numalogic/transforms/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"""

from numalogic.transforms._scaler import TanhScaler
from numalogic.transforms._stateless import LogTransformer, StaticPowerTransformer
from numalogic.transforms._stateless import LogTransformer, StaticPowerTransformer, FlattenVector
from numalogic.transforms._movavg import ExpMovingAverage, expmov_avg_aggregator
from numalogic.transforms._postprocess import TanhNorm, tanh_norm

Expand All @@ -27,4 +27,5 @@
"expmov_avg_aggregator",
"TanhNorm",
"tanh_norm",
"FlattenVector",
]
19 changes: 19 additions & 0 deletions numalogic/transforms/_stateless.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,22 @@ def transform(self, X, **__):

def inverse_transform(self, X) -> npt.NDArray[float]:
return np.power(X, 1.0 / self.n) - self.add_factor


class FlattenVector(StatelessTransformer):
"""A stateless transformer that flattens a vector.
Args:
____
n_features: number of features
"""

def __init__(self, n_features: int):
self.n_features = n_features

def transform(self, X: npt.NDArray[float], **__) -> npt.NDArray[float]:
return X.flatten().reshape(-1, 1)

def inverse_transform(self, X: npt.NDArray[float]) -> npt.NDArray[float]:
return X.reshape(-1, self.n_features)
17 changes: 17 additions & 0 deletions numalogic/udfs/preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,17 @@
_LOGGER = logging.getLogger(__name__)


def _get_updated_metrics(uuid: str, metrics: list, shape: tuple) -> list[str]:
if shape[1] != len(metrics) and shape[1] == 1:
metrics = ["-".join(metrics)]
_LOGGER.debug(
"%s - Metrics used: %s",
uuid,
metrics,
)
return metrics


class PreprocessUDF(NumalogicUDF):
"""
Preprocess UDF for Numalogic.
Expand Down Expand Up @@ -174,6 +185,12 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
payload = replace(payload, status=Status.ARTIFACT_FOUND)
try:
x_scaled = self.compute(model=preproc_clf, input_=payload.get_data())

# make metrics list matching same shape as data
payload = replace(
payload, metrics=_get_updated_metrics(payload.uuid, payload.metrics, x_scaled.shape)
)

_update_info_metric(x_scaled, payload.metrics, _metric_label_values)
payload = replace(
payload,
Expand Down
17 changes: 9 additions & 8 deletions numalogic/udfs/trainer/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from sklearn.pipeline import make_pipeline
from torch.utils.data import DataLoader

from numalogic.base import StatelessTransformer
from numalogic.config import PreprocessFactory, ModelFactory, ThresholdFactory, RegistryFactory
from numalogic.config._config import NumalogicConf
from numalogic.models.autoencoder import TimeseriesTrainer
Expand Down Expand Up @@ -105,7 +104,9 @@ def compute(
if preproc_clf:
input_ = preproc_clf.fit_transform(input_)
dict_artifacts["preproc_clf"] = KeyedArtifact(
dkeys=[_conf.name for _conf in numalogic_cfg.preprocess], artifact=preproc_clf
dkeys=[_conf.name for _conf in numalogic_cfg.preprocess],
artifact=preproc_clf,
stateful=any(_conf.stateful for _conf in numalogic_cfg.preprocess),
)

train_ds = StreamingDataset(input_, model.seq_len)
Expand All @@ -117,13 +118,15 @@ def compute(
model, dataloaders=DataLoader(train_ds, batch_size=trainer_cfg.batch_size)
).numpy()
dict_artifacts["inference"] = KeyedArtifact(
dkeys=[numalogic_cfg.model.name], artifact=model
dkeys=[numalogic_cfg.model.name], artifact=model, stateful=numalogic_cfg.model.stateful
)

if threshold_clf:
threshold_clf.fit(train_reconerr)
dict_artifacts["threshold_clf"] = KeyedArtifact(
dkeys=[numalogic_cfg.threshold.name], artifact=threshold_clf
dkeys=[numalogic_cfg.threshold.name],
artifact=threshold_clf,
stateful=numalogic_cfg.threshold.stateful,
)

return dict_artifacts
Expand Down Expand Up @@ -242,7 +245,6 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
threshold_clf=thresh_clf,
numalogic_cfg=_conf.numalogic_conf,
)

# Save artifacts

self.artifacts_to_save(
Expand Down Expand Up @@ -306,11 +308,10 @@ def artifacts_to_save(
"""
dict_artifacts = {
k: KeyedArtifact([payload.pipeline_id, *v.dkeys], v.artifact)
k: KeyedArtifact([payload.pipeline_id, *v.dkeys], v.artifact, v.stateful)
for k, v in dict_artifacts.items()
if not isinstance(v.artifact, StatelessTransformer)
if v.stateful
}

try:
ver_dict = model_registry.save_multiple(
skeys=skeys,
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "numalogic"
version = "0.6.1"
version = "0.6.2"
description = "Collection of operational Machine Learning models and tools."
authors = ["Numalogic Developers"]
packages = [{ include = "numalogic" }]
Expand Down
10 changes: 8 additions & 2 deletions tests/transforms/test_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
from sklearn.pipeline import make_pipeline

from numalogic.base import StatelessTransformer
from numalogic.transforms import LogTransformer, StaticPowerTransformer, TanhScaler

from numalogic.transforms import LogTransformer, StaticPowerTransformer, TanhScaler, FlattenVector

RNG = np.random.default_rng(42)

Expand Down Expand Up @@ -77,6 +76,13 @@ def test_base_transform(self):
self.assertRaises(NotImplementedError, trfr.fit_transform, x)
self.assertEqual(trfr.fit(x), trfr)

def test_FlattenVector(self):
x = RNG.random((5, 2))
clf = FlattenVector(n_features=2)
data = clf.transform(x)
self.assertEqual(data.shape[1], 1)
self.assertEqual(clf.inverse_transform(data).shape[1], 2)


if __name__ == "__main__":
unittest.main()
10 changes: 7 additions & 3 deletions tests/udfs/resources/_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ stream_confs:
config_id: "druid-config"
source: "druid"
composite_keys: [ 'service-mesh', '1', '2' ]
window_size: 10
window_size: 20
ml_pipelines:
pipeline1:
pipeline_id: "pipeline1"
Expand All @@ -14,9 +14,13 @@ stream_confs:
model:
name: "VanillaAE"
conf:
seq_len: 10
n_features: 2
seq_len: 20
n_features: 1
preprocess:
- name: "FlattenVector"
stateful: false
conf:
n_features: 2
- name: "LogTransformer"
stateful: false
conf:
Expand Down
20 changes: 1 addition & 19 deletions tests/udfs/test_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,30 +105,12 @@ def test_trainer_01(self):

@patch.object(DruidFetcher, "fetch", Mock(return_value=mock_druid_fetch_data()))
def test_trainer_02(self):
self.udf1.register_conf(
"druid-config",
StreamConf(
ml_pipelines={
"pipeline1": MLPipelineConf(
pipeline_id="pipeline1",
numalogic_conf=NumalogicConf(
model=ModelInfo(
name="VanillaAE", conf={"seq_len": 12, "n_features": 2}
),
preprocess=[ModelInfo(name="StandardScaler", conf={})],
trainer=TrainerConf(pltrainer_conf=LightningTrainerConf(max_epochs=1)),
),
)
}
),
)
self.udf1(self.keys, self.datum)
self.assertEqual(
3,
2,
REDIS_CLIENT.exists(
b"5984175597303660107::pipeline1:VanillaAE::LATEST",
b"5984175597303660107::pipeline1:StdDevThreshold::LATEST",
b"5984175597303660107::pipeline1:StandardScaler::LATEST",
),
)

Expand Down
5 changes: 4 additions & 1 deletion tests/udfs/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,16 @@ def store_in_redis(pl_conf, registry):
registry.save_multiple(
skeys=[*pl_conf.stream_confs["druid-config"].composite_keys],
dict_artifacts={
"inference": KeyedArtifact(dkeys=[_pipeline_id, "AE"], artifact=VanillaAE(10)),
"inference": KeyedArtifact(
dkeys=[_pipeline_id, "AE"], artifact=VanillaAE(10), stateful=True
),
"preproc": KeyedArtifact(
dkeys=[
_pipeline_id,
*[_conf.name for _conf in _ml_conf.numalogic_conf.preprocess],
],
artifact=preproc_clf,
stateful=True,
),
},
)

0 comments on commit 824a55d

Please sign in to comment.