Skip to content

Commit

Permalink
fix: fix pipeline for 0.3 release (#106)
Browse files Browse the repository at this point in the history
Signed-off-by: s0nicboOm <[email protected]>
  • Loading branch information
s0nicboOm committed Dec 8, 2022
1 parent 709553f commit 78cf5b4
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 42 deletions.
20 changes: 12 additions & 8 deletions examples/numalogic-simple-pipeline/src/udf/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from numalogic.models.autoencoder.variants import Conv1dAE
from pynumaflow.function import Messages, Message, Datum

from src.utils import Payload, load_model
from src.utils import Payload, load_artifact

LOGGER = logging.getLogger(__name__)
WIN_SIZE = int(os.getenv("WIN_SIZE"))
Expand All @@ -27,20 +27,24 @@ def inference(key: str, datum: Datum) -> Messages:
payload = Payload.from_json(datum.value.decode("utf-8"))
messages = Messages()

#
artifact_data = load_model(skeys=["ae"], dkeys=["model"])
artifact_data = load_artifact(skeys=["ae"], dkeys=["model"], type="pytorch")
thresh_clf_data = load_artifact(skeys=["thresh_clf"], dkeys=["model"])

# Check if model exists for inference
if artifact_data:
# load model from registry
if artifact_data and thresh_clf_data:
LOGGER.info("%s - Model found!", payload.uuid)

# Load model from registry
pl = AutoencoderPipeline(model=Conv1dAE(in_channels=1, enc_channels=12), seq_len=WIN_SIZE)
pl.load(model=artifact_data.artifact, **artifact_data.metadata)
pl.load(model=artifact_data.artifact)

LOGGER.info("%s - Model found!", payload.uuid)
# Load the threshold model from registry
thresh_clf = thresh_clf_data.artifact

# Infer using the loaded model
infer_data = np.asarray(payload.ts_data).reshape(-1, 1)
payload.ts_data = pl.score(infer_data).tolist()
score_data = pl.score(infer_data)
payload.ts_data = thresh_clf.predict(score_data).tolist()

LOGGER.info("%s - Inference complete", payload.uuid)

Expand Down
10 changes: 8 additions & 2 deletions examples/numalogic-simple-pipeline/src/udf/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
import pandas as pd
from numalogic.models.autoencoder import AutoencoderPipeline
from numalogic.models.autoencoder.variants import Conv1dAE
from numalogic.models.threshold._std import StdDevThreshold
from numalogic.preprocess.transformer import LogTransformer
from pynumaflow.function import Datum, Messages, Message

from src.utils import Payload, save_model, TRAIN_DATA_PATH
from src.utils import Payload, save_artifact, TRAIN_DATA_PATH

LOGGER = logging.getLogger(__name__)
WIN_SIZE = int(os.getenv("WIN_SIZE"))
Expand Down Expand Up @@ -50,13 +51,18 @@ def train(key: str, datum: Datum):
clf = LogTransformer()
train_data = clf.fit_transform(data)

# Define Threshold method
thresh_clf = StdDevThreshold(std_factor=1.2)
thresh_clf.fit(train_data.to_numpy().reshape(-1, 1))

# Train step
pl = AutoencoderPipeline(model=Conv1dAE(in_channels=1, enc_channels=12), seq_len=WIN_SIZE)
pl.fit(train_data.to_numpy())
LOGGER.info("%s - Training complete", payload.uuid)

# Save to registry
save_model(pl, skeys=["ae"], dkeys=["model"])
save_artifact(pl.model, skeys=["ae"], dkeys=["model"])
save_artifact(thresh_clf, skeys=["thresh_clf"], dkeys=["model"])
LOGGER.info("%s - Model Saving complete", payload.uuid)

# Train is the last vertex in the graph
Expand Down
26 changes: 19 additions & 7 deletions examples/numalogic-simple-pipeline/src/utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import logging
import os
from dataclasses import dataclass
from typing import Sequence
from typing import Sequence, Union

from dataclasses_json import dataclass_json
from numalogic.models.autoencoder import AutoencoderPipeline
from numalogic.models.autoencoder.base import TorchAE
from numalogic.models.threshold._std import StdDevThreshold
from numalogic.registry import MLflowRegistrar
from numalogic.tools.types import ArtifactDict
from numpy.typing import ArrayLike
Expand All @@ -24,16 +26,26 @@ class Payload:
uuid: str = None


def save_model(pl: AutoencoderPipeline, skeys: Sequence[str], dkeys: Sequence[str]) -> None:
ml_registry = MLflowRegistrar(tracking_uri=TRACKING_URI, artifact_type="pytorch")
ml_registry.save(skeys=skeys, dkeys=dkeys, artifact=pl.model, **pl.model_properties)
def save_artifact(
pl: Union[AutoencoderPipeline, StdDevThreshold],
skeys: Sequence[str],
dkeys: Sequence[str],
) -> None:
if isinstance(pl, TorchAE):
ml_registry = MLflowRegistrar(tracking_uri=TRACKING_URI, artifact_type="pytorch")
else:
ml_registry = MLflowRegistrar(tracking_uri=TRACKING_URI, artifact_type="sklearn")
ml_registry.save(skeys=skeys, dkeys=dkeys, artifact=pl)


def load_model(skeys: Sequence[str], dkeys: Sequence[str]) -> ArtifactDict:
def load_artifact(skeys: Sequence[str], dkeys: Sequence[str], type: str = None) -> ArtifactDict:
try:
ml_registry = MLflowRegistrar(tracking_uri=TRACKING_URI)
if type == "pytorch":
ml_registry = MLflowRegistrar(tracking_uri=TRACKING_URI, artifact_type="pytorch")
else:
ml_registry = MLflowRegistrar(tracking_uri=TRACKING_URI, artifact_type="sklearn")
artifact_dict = ml_registry.load(skeys=skeys, dkeys=dkeys)
return artifact_dict
except Exception as ex:
LOGGER.exception("Error while loading model from MLFlow database: %s", ex)
LOGGER.exception("Error while loading artifact from MLFlow database: %s", ex)
return None
4 changes: 2 additions & 2 deletions numalogic/models/autoencoder/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def __init__(
lr: float = 0.001,
batch_size: int = 256,
num_epochs: int = 100,
resume_train: bool = False
resume_train: bool = False,
):
if not (model and seq_len):
raise ValueError("No model and seq len provided!")
Expand All @@ -75,7 +75,7 @@ def model_properties(self):
model_properties_dict = {
"batch_size": self.batch_size,
"num_epochs": self.num_epochs,
"epochs_elapsed": self._epochs_elapsed
"epochs_elapsed": self._epochs_elapsed,
}
if self.resume_train:
model_properties_dict["optimizer_state_dict"] = self.optimizer.state_dict()
Expand Down
6 changes: 1 addition & 5 deletions numalogic/models/threshold/_std.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,7 @@


class StdDevThreshold(BaseEstimator):
def __init__(
self,
std_factor: float = 3.0,
min_threshold: float = 0.1
):
def __init__(self, std_factor: float = 3.0, min_threshold: float = 0.1):
self.std_factor = std_factor
self.min_threshold = min_threshold

Expand Down
6 changes: 1 addition & 5 deletions numalogic/registry/artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,7 @@ def load(

@abstractmethod
def save(
self,
skeys: Sequence[str],
dkeys: Sequence[str],
artifact: Artifact,
**metadata
self, skeys: Sequence[str], dkeys: Sequence[str], artifact: Artifact, **metadata
) -> Any:
r"""
Saves the artifact into mlflow registry and updates version.
Expand Down
18 changes: 6 additions & 12 deletions numalogic/registry/mlflow_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class ModelStage(str, Enum):
"""
Defines different stages the model state can be in mlflow
"""

STAGE = "Staging"
ARCHIVE = "Archived"
PRODUCTION = "Production"
Expand Down Expand Up @@ -48,6 +49,7 @@ class MLflowRegistrar(ArtifactManager):
>>> registry.save(skeys=["model"], dkeys=["AE"], artifact=VanillaAE(10))
>>> artifact_data = registry.load(skeys=["model"], dkeys=["AE"])
"""

_TRACKING_URI = None

def __new__(
Expand Down Expand Up @@ -119,9 +121,7 @@ def load(
model_key, stages=[ModelStage.PRODUCTION]
)[-1]
elif version is not None:
model = self.handler.load_model(
model_uri=f"models:/{model_key}/{version}"
)
model = self.handler.load_model(model_uri=f"models:/{model_key}/{version}")
version_info = self.client.get_model_version(model_key, version)
else:
raise ValueError("One of 'latest' or 'version' needed in load method call")
Expand All @@ -133,9 +133,7 @@ def load(

return ArtifactData(artifact=model, metadata=metadata, extras=dict(version_info))
except Exception as ex:
_LOGGER.exception(
"Error when loading a model with key: %s: %r", model_key, ex
)
_LOGGER.exception("Error when loading a model with key: %s: %r", model_key, ex)
return None

def save(
Expand All @@ -159,18 +157,14 @@ def save(
model_key = self.construct_key(skeys, dkeys)
try:
mlflow.start_run()
self.handler.log_model(
artifact, "model", registered_model_name=model_key
)
self.handler.log_model(artifact, "model", registered_model_name=model_key)
if metadata:
mlflow.log_params(metadata)
model_version = self.transition_stage(skeys=skeys, dkeys=dkeys)
_LOGGER.info("Successfully inserted model %s to Mlflow", model_key)
return model_version
except Exception as ex:
_LOGGER.exception(
"Error when saving a model with key: %s: %r", model_key, ex
)
_LOGGER.exception("Error when saving a model with key: %s: %r", model_key, ex)
return None
finally:
mlflow.end_run()
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "numalogic"
version = "0.2.6"
version = "0.3.0a0"
description = "Collection of operational Machine Learning models and tools."
authors = ["Numalogic Developers"]
packages = [{ include = "numalogic" }]
Expand Down

0 comments on commit 78cf5b4

Please sign in to comment.