Skip to content

Commit

Permalink
feat: Add support for storing preproc artifacts (scondary artifact) i… (
Browse files Browse the repository at this point in the history
#11)


Signed-off-by: s0nicboOm <[email protected]>
  • Loading branch information
s0nicboOm committed Jul 29, 2022
1 parent 2784ffc commit a5ef072
Show file tree
Hide file tree
Showing 6 changed files with 1,347 additions and 254 deletions.
44 changes: 25 additions & 19 deletions numalogic/registry/artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,36 +19,42 @@ def load(
self, skeys: Sequence[str], dkeys: Sequence[str], latest: bool = True, version: str = None
) -> Artifact:
"""
Loads the desired artifact from registry/model-backend and returns it.
:param skeys: static key fields as list/tuple of strings
:param dkeys: dynamic key fields as list/tuple of strings
:param latest: boolean field to determine if latest version is desired or not
:param version: explicit artifact version
Loads the desired artifact from mlflow registry and returns it.
Args:
skeys: static key fields as list/tuple of strings
dkeys: dynamic key fields as list/tuple of strings
latest: boolean field to determine if latest version is desired or not
version: explicit artifact version
"""
pass

@abstractmethod
def save(
self, skeys: Sequence[str], dkeys: Sequence[str], artifact: Artifact, **metadata
self,
skeys: Sequence[str],
dkeys: Sequence[str],
primary_artifact: Artifact,
secondary_artifact: Artifact = None,
**metadata
) -> Any:
"""
Saves the artifact into registry/model-backend and updates version if supported.
:param skeys: static key fields as list/tuple of strings
:param dkeys: dynamic key fields as list/tuple of strings
:param artifact: artifact to be saved
:param metadata: additional metadata surrounding the artifact that needs to be saved
r"""
Saves the artifact into mlflow registry and updates version.
Args:
skeys: static key fields as list/tuple of strings
dkeys: dynamic key fields as list/tuple of strings
primary_artifact: primary artifact to be saved
secondary_artifact: secondary artifact to be saved
metadata: additional metadata surrounding the artifact that needs to be saved
"""
pass

@abstractmethod
def delete(self, skeys: Sequence[str], dkeys: Sequence[str], version: str) -> None:
"""
Deletes the artifact with a specified version from registry/model-backend.
:param skeys: static key fields as list/tuple of strings
:param dkeys: dynamic key fields as list/tuple of strings
:param version: explicit artifact version
Deletes the artifact with a specified version from mlflow registry.
Args:
skeys: static key fields as list/tuple of strings
dkeys: dynamic key fields as list/tuple of strings
version: explicit artifact version
"""
pass
67 changes: 52 additions & 15 deletions numalogic/registry/mlflow_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
import pickle
from enum import Enum
from typing import Optional, Sequence
from typing import Optional, Sequence, Union, Dict

import mlflow.pyfunc
import mlflow.pytorch
Expand All @@ -17,7 +17,6 @@


class ModelStage(str, Enum):

"""
Defines different stages the model state can be in mlflow
"""
Expand All @@ -35,9 +34,24 @@ class MLflowRegistrar(ArtifactManager):
Args:
tracking_uri: the tracking server uri to use for mlflow
artifact_type: the type of artifact to use
artifact_type: the type of primary artifact to use
supported values include:
{"pytorch", "sklearn", "tensorflow", "pyfunc"}
Examples
--------
>>> from numalogic.models.autoencoder.variants.vanilla import VanillaAE
>>> from numalogic.preprocess.transformer import LogTransformer
>>> from numalogic.registry.mlflow_registry import MLflowRegistrar
>>> from sklearn.preprocessing import StandardScaler, Normalizer
>>> from sklearn.pipeline import make_pipeline
>>>
>>> data = [[0, 0], [0, 0], [1, 1], [1, 1]]
>>> scaler = StandardScaler.fit(data)
>>> ml = MLflowRegistrar(tracking_uri= "localhost:8080", artifact_type="pytorch")
>>> ml.save(skeys=["model"],dkeys=["AE"],primary_artifact=VanillaAE(10),
secondary_artifacts={"preproc": make_pipeline(scaler)})
>>> data = ml.load(skeys=["model"],dkeys=["AE"])
"""

def __init__(self, tracking_uri: str, artifact_type: str = "pytorch"):
Expand All @@ -48,22 +62,29 @@ def __init__(self, tracking_uri: str, artifact_type: str = "pytorch"):

@staticmethod
def __as_dict(
artifact: Optional[Artifact],
primary_artifact: Optional[Artifact],
secondary_artifacts: Union[Sequence[Artifact], Dict[str, Artifact], None],
metadata: Optional[dict],
model_properties: Optional[ModelVersion],
) -> ArtifactDict:
"""
Returns a dictionary comprising information on model, metadata, model_properties
Args:
artifact: artifact to be saved
primary_artifact: main artifact to be saved
secondary_artifacts: secondary artifact to be saved
metadata: ML models metadata
model_properties: ML model properties (information like time "model_created",
"model_updated_time", "model_name", "tags" , "current stage",
"version" etc.)
Returns: ArtifactDict type object . A dictionary of artifact, metadata and model_properties
Returns: ArtifactDict type object
"""
return {"artifact": artifact, "metadata": metadata, "model_properties": model_properties}
return {
"primary_artifact": primary_artifact,
"secondary_artifacts": secondary_artifacts,
"metadata": metadata,
"model_properties": model_properties,
}

@staticmethod
def construct_key(skeys: Sequence[str], dkeys: Sequence[str]) -> str:
Expand Down Expand Up @@ -107,7 +128,8 @@ def load(
version: explicit artifact version
Returns:
A dictionary artifact, metadata and model_properties
A dictionary containing primary_artifact, secondary_artifacts, metadata and
model_properties
"""

model_key = self.construct_key(skeys, dkeys)
Expand All @@ -122,36 +144,52 @@ def load(
return {}
_LOGGER.info("Successfully loaded model %s from Mlflow", model_key)
metadata = None
secondary_artifacts = None
model_properties = self.client.get_latest_versions(model_key, stages=["Production"])[-1]
if model_properties.run_id:
run_id = model_properties.run_id
run_data = self.client.get_run(run_id).data.to_dictionary()
if run_data["params"]:
data = run_data["params"]
secondary_artifacts = pickle.loads(
codecs.decode(data["secondary_artifacts"].encode(), "base64")
)
_LOGGER.info("Successfully loaded secondary_artifacts from Mlflow")
metadata = pickle.loads(codecs.decode(data["metadata"].encode(), "base64"))
_LOGGER.info("Successfully loaded model metadata from Mlflow")
return self.__as_dict(model, metadata, model_properties)
return self.__as_dict(model, secondary_artifacts, metadata, model_properties)
except Exception as ex:
_LOGGER.exception("Error when loading a model with key: %s: %r", model_key, ex)
return {}

def save(
self, skeys: Sequence[str], dkeys: Sequence[str], artifact: Artifact, **metadata
self,
skeys: Sequence[str],
dkeys: Sequence[str],
primary_artifact: Artifact,
secondary_artifacts: Union[Sequence[Artifact], Dict[str, Artifact], None] = None,
**metadata,
) -> Optional[ModelVersion]:
"""
Saves the artifact into mlflow registry and updates version.
Args:
skeys: static key fields as list/tuple of strings
dkeys: dynamic key fields as list/tuple of strings
artifact: artifact to be saved
primary_artifact: primary artifact to be saved
secondary_artifacts: secondary artifact to be saved
metadata: additional metadata surrounding the artifact that needs to be saved
Returns:
mlflow ModelVersion instance
"""
model_key = self.construct_key(skeys, dkeys)
try:
self.handler.log_model(artifact, "model", registered_model_name=model_key)
self.handler.log_model(primary_artifact, "model", registered_model_name=model_key)
if secondary_artifacts:
secondary_artifacts_data = codecs.encode(
pickle.dumps(secondary_artifacts), "base64"
).decode()
mlflow.log_param(key="secondary_artifacts", value=secondary_artifacts_data)
if metadata:
data = codecs.encode(pickle.dumps(metadata), "base64").decode()
mlflow.log_param(key="metadata", value=data)
Expand Down Expand Up @@ -183,9 +221,8 @@ def delete(self, skeys: Sequence[str], dkeys: Sequence[str], version: str) -> No

def transition_stage(self, model_name: str) -> Optional[ModelVersion]:
"""
Changes stage information for the given model.
Sets new model to "Production". The old production model is set
to "Staging" and the rest model versions are set to "Archived".
Changes stage information for the given model. Sets new model to "Production". The old
production model is set to "Staging" and the rest model versions are set to "Archived".
Args:
model_name: model name for which we are updating the stage information.
Expand Down
Loading

0 comments on commit a5ef072

Please sign in to comment.