Skip to content

Commit

Permalink
fix: update production key to latest key (#221)
Browse files Browse the repository at this point in the history
1. Fix lint issues
2. Update redis registry to point to "latest" instead of "production"

---------

Signed-off-by: s0nicboOm <[email protected]>
  • Loading branch information
s0nicboOm committed Jun 27, 2023
1 parent 72d2ae7 commit 7778ae8
Show file tree
Hide file tree
Showing 16 changed files with 647 additions and 424 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,5 @@ jobs:
steps:
- uses: actions/checkout@v3
- uses: chartboost/ruff-action@v1
with:
version: "0.0.275"
6 changes: 5 additions & 1 deletion benchmarks/plots.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from matplotlib import pyplot as plt
from sklearn.metrics import RocCurveDisplay
import numpy.typing as npt
from typing import Optional


def plot_reconerr_comparision(reconerr, input_, labels, start=0, end=None, title=None):
Expand All @@ -20,7 +21,10 @@ def plot_reconerr_comparision(reconerr, input_, labels, start=0, end=None, title


def plot_roc_curve(
y_true: npt.NDArray[float], y_pred: npt.NDArray[float], model_name: str, title: str = None
y_true: npt.NDArray[float],
y_pred: npt.NDArray[float],
model_name: str,
title: Optional[str] = None,
):
"""
Plots the ROC curve for the given true and predicted labels.
Expand Down
4 changes: 3 additions & 1 deletion examples/multi_udf/src/factory.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from typing import ClassVar

from src.udf import Preprocess, Inference, Postprocess, Trainer, Threshold
from numalogic.numaflow import NumalogicUDF


class UDFFactory:
"""Factory class to return the handler for the given step."""

_UDF_MAP = {
_UDF_MAP: ClassVar[dict] = {
"preprocess": Preprocess,
"inference": Inference,
"postprocess": Postprocess,
Expand Down
3 changes: 2 additions & 1 deletion examples/multi_udf/src/udf/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from pynumaflow.function import Datum, Messages, Message

from src.utils import Payload, TRAIN_DATA_PATH
from typing import Optional

LOGGER = logging.getLogger(__name__)
WIN_SIZE = int(os.getenv("WIN_SIZE"))
Expand All @@ -33,7 +34,7 @@ def __init__(self):
self.model_key = "ae::model"

def _save_artifact(
self, model, skeys: list[str], dkeys: list[str], _: AutoencoderTrainer = None
self, model, skeys: list[str], dkeys: list[str], _: Optional[AutoencoderTrainer] = None
) -> None:
"""Saves the model in the registry."""
self.registry.save(skeys=skeys, dkeys=dkeys, artifact=model)
Expand Down
3 changes: 2 additions & 1 deletion numalogic/blocks/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from numalogic.blocks._transform import Block
from numalogic.registry import ArtifactManager
from numalogic.tools.types import artifact_t
from typing import Optional


class BlockPipeline(Sequence[Block]):
Expand All @@ -35,7 +36,7 @@ class BlockPipeline(Sequence[Block]):

__slots__ = ("_blocks", "_registry")

def __init__(self, *blocks: Block, registry: ArtifactManager = None):
def __init__(self, *blocks: Block, registry: Optional[ArtifactManager] = None):
self._blocks = blocks
self._registry = registry

Expand Down
14 changes: 7 additions & 7 deletions numalogic/config/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Union
from typing import Union, ClassVar

from numalogic.config._config import ModelInfo, RegistryInfo
from numalogic.tools.exceptions import UnknownConfigArgsError


class _ObjectFactory:
_CLS_MAP = {}
_CLS_MAP: ClassVar[dict] = {}

def get_instance(self, object_info: Union[ModelInfo, RegistryInfo]):
try:
Expand All @@ -41,7 +41,7 @@ class PreprocessFactory(_ObjectFactory):
from sklearn.preprocessing import StandardScaler, MinMaxScaler, MaxAbsScaler, RobustScaler
from numalogic.transforms import LogTransformer, StaticPowerTransformer, TanhScaler

_CLS_MAP = {
_CLS_MAP: ClassVar[dict] = {
"StandardScaler": StandardScaler,
"MinMaxScaler": MinMaxScaler,
"MaxAbsScaler": MaxAbsScaler,
Expand All @@ -57,15 +57,15 @@ class PostprocessFactory(_ObjectFactory):

from numalogic.transforms import TanhNorm, ExpMovingAverage

_CLS_MAP = {"TanhNorm": TanhNorm, "ExpMovingAverage": ExpMovingAverage}
_CLS_MAP: ClassVar[dict] = {"TanhNorm": TanhNorm, "ExpMovingAverage": ExpMovingAverage}


class ThresholdFactory(_ObjectFactory):
"""Factory class to create threshold instances."""

from numalogic.models.threshold import StdDevThreshold, StaticThreshold, SigmoidThreshold

_CLS_MAP = {
_CLS_MAP: ClassVar[dict] = {
"StdDevThreshold": StdDevThreshold,
"StaticThreshold": StaticThreshold,
"SigmoidThreshold": SigmoidThreshold,
Expand All @@ -86,7 +86,7 @@ class ModelFactory(_ObjectFactory):
SparseTransformerAE,
)

_CLS_MAP = {
_CLS_MAP: ClassVar[dict] = {
"VanillaAE": VanillaAE,
"SparseVanillaAE": SparseVanillaAE,
"Conv1dAE": Conv1dAE,
Expand All @@ -101,7 +101,7 @@ class ModelFactory(_ObjectFactory):
class RegistryFactory(_ObjectFactory):
"""Factory class to create registry instances."""

_CLS_SET = {"RedisRegistry", "MLflowRegistry"}
_CLS_SET: ClassVar[frozenset] = {"RedisRegistry", "MLflowRegistry"}

def get_instance(self, object_info: Union[ModelInfo, RegistryInfo]):
import numalogic.registry as reg
Expand Down
3 changes: 2 additions & 1 deletion numalogic/models/autoencoder/trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from numalogic.tools.callbacks import ProgressDetails
from numalogic.tools.data import inverse_window
from typing import Optional

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -68,7 +69,7 @@ def __init__(
**trainer_kw
)

def predict(self, model: pl.LightningModule = None, unbatch=True, **kwargs) -> Tensor:
def predict(self, model: Optional[pl.LightningModule] = None, unbatch=True, **kwargs) -> Tensor:
r"""Predicts the output of the model.
Args:
Expand Down
4 changes: 2 additions & 2 deletions numalogic/models/autoencoder/variants/conv.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@


import logging
from typing import Union
from typing import Union, Optional
from collections.abc import Sequence

import torch
Expand Down Expand Up @@ -212,7 +212,7 @@ def __init__(
enc_channels: Sequence[int] = (16, 8),
enc_kernel_sizes: Union[int, Sequence[int]] = 3,
pool_kernel_size: int = 2,
dec_activation: str = None,
dec_activation: Optional[str] = None,
**kwargs,
):
super().__init__(**kwargs)
Expand Down
4 changes: 2 additions & 2 deletions numalogic/registry/artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@


from dataclasses import dataclass
from typing import Any, Generic, TypeVar, Union
from typing import Any, Generic, TypeVar, Union, Optional

from numalogic.tools.types import artifact_t, KEYS, META_T, META_VT, EXTRA_T, state_dict_t

Expand Down Expand Up @@ -54,7 +54,7 @@ def __init__(self, uri: str):
self.uri = uri

def load(
self, skeys: KEYS, dkeys: KEYS, latest: bool = True, version: str = None
self, skeys: KEYS, dkeys: KEYS, latest: bool = True, version: Optional[str] = None
) -> ArtifactData:
"""Loads the desired artifact from mlflow registry and returns it.
Expand Down
8 changes: 4 additions & 4 deletions numalogic/registry/mlflow_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def __new__(
tracking_uri: Optional[str],
models_to_retain: int = 5,
model_stage: ModelStage = ModelStage.PRODUCTION,
cache_registry: ArtifactCache = None,
cache_registry: Optional[ArtifactCache] = None,
*args,
**kwargs,
):
Expand All @@ -91,7 +91,7 @@ def __init__(
tracking_uri: str,
models_to_retain: int = 5,
model_stage: str = ModelStage.PRODUCTION,
cache_registry: ArtifactCache = None,
cache_registry: Optional[ArtifactCache] = None,
):
super().__init__(tracking_uri)
mlflow.set_tracking_uri(tracking_uri)
Expand Down Expand Up @@ -138,7 +138,7 @@ def load(
skeys: KEYS,
dkeys: KEYS,
latest: bool = True,
version: str = None,
version: Optional[str] = None,
artifact_type: str = "pytorch",
) -> Optional[ArtifactData]:
"""Load the artifact from the registry. The artifact is loaded from the cache if available.
Expand Down Expand Up @@ -206,7 +206,7 @@ def save(
skeys: KEYS,
dkeys: KEYS,
artifact: artifact_t,
run_id: str = None,
run_id: Optional[str] = None,
**metadata: META_VT,
) -> Optional[ModelVersion]:
"""Saves the artifact into mlflow registry and updates version.
Expand Down
38 changes: 17 additions & 21 deletions numalogic/registry/redis_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def __init__(
self,
client: redis_client_t,
ttl: int = 604800,
cache_registry: ArtifactCache = None,
cache_registry: Optional[ArtifactCache] = None,
):
super().__init__("")
self.client = client
Expand All @@ -68,8 +68,8 @@ def construct_key(skeys: KEYS, dkeys: KEYS) -> str:
return "::".join([_static_key, _dynamic_key])

@staticmethod
def __construct_production_key(key: str):
return RedisRegistry.construct_key(skeys=[key], dkeys=["PROD"])
def __construct_latest_key(key: str):
return RedisRegistry.construct_key(skeys=[key], dkeys=["LATEST"])

@staticmethod
def __construct_version_key(key: str, version: str):
Expand Down Expand Up @@ -131,13 +131,11 @@ def __load_latest_artifact(self, key: str) -> ArtifactData:
if cached_artifact:
_LOGGER.debug("Found cached artifact for key: %s", key)
return cached_artifact
production_key = self.__construct_production_key(key)
if not self.client.exists(production_key):
raise ModelKeyNotFound(
f"Production key: {production_key}, Not Found !!!\n Exiting....."
)
model_key = self.client.get(production_key)
_LOGGER.info("Production key, %s, is pointing to the key : %s", production_key, model_key)
latest_key = self.__construct_latest_key(key)
if not self.client.exists(latest_key):
raise ModelKeyNotFound(f"latest key: {latest_key}, Not Found !!!")
model_key = self.client.get(latest_key)
_LOGGER.info("latest key, %s, is pointing to the key : %s", latest_key, model_key)
return self.__load_version_artifact(version=self.get_version(model_key.decode()), key=key)

def __load_version_artifact(self, version: str, key: str) -> ArtifactData:
Expand All @@ -152,11 +150,9 @@ def __save_artifact(
self, pipe, artifact: artifact_t, metadata: META_T, key: KEYS, version: str
) -> str:
new_version_key = self.__construct_version_key(key, version)
production_key = self.__construct_production_key(key)
pipe.set(name=production_key, value=new_version_key)
_LOGGER.info(
"Setting Production key : %s ,to this new key = %s", production_key, new_version_key
)
latest_key = self.__construct_latest_key(key)
pipe.set(name=latest_key, value=new_version_key)
_LOGGER.info("Setting latest key : %s ,to this new key = %s", latest_key, new_version_key)
serialized_metadata = ""
if metadata:
serialized_metadata = dumps(deserialized_object=metadata)
Expand All @@ -177,7 +173,7 @@ def load(
skeys: KEYS,
dkeys: KEYS,
latest: bool = True,
version: str = None,
version: Optional[str] = None,
) -> Optional[ArtifactData]:
"""Loads the artifact from redis registry. Either latest or version (one of the arguments)
is needed to load the respective artifact.
Expand All @@ -186,7 +182,7 @@ def load(
----
skeys: static key fields as list/tuple of strings
dkeys: dynamic key fields as list/tuple of strings
latest: load the model in production stage
latest: load the model in latest stage
version: version to load.
Returns
Expand Down Expand Up @@ -228,12 +224,12 @@ def save(
model version
"""
key = self.construct_key(skeys, dkeys)
production_key = self.__construct_production_key(key)
latest_key = self.__construct_latest_key(key)
version = 0
try:
if self.client.exists(production_key):
_LOGGER.debug("Production key exists for the model")
version_key = self.client.get(name=production_key)
if self.client.exists(latest_key):
_LOGGER.debug("latest key exists for the model")
version_key = self.client.get(name=latest_key)
version = int(self.get_version(version_key.decode())) + 1
with self.client.pipeline() as pipe:
new_version_key = self.__save_artifact(pipe, artifact, metadata, key, str(version))
Expand Down
12 changes: 9 additions & 3 deletions numalogic/synthetic/anomalies.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from typing import Optional, ClassVar


class AnomalyGenerator:
Expand All @@ -41,7 +42,12 @@ class AnomalyGenerator:
random_seed: seed for random number generator.
"""

__MIN_COLUMNS = {"global": 1, "contextual": 1, "causal": 2, "collective": 2}
__MIN_COLUMNS: ClassVar[dict[str, int]] = {
"global": 1,
"contextual": 1,
"causal": 2,
"collective": 2,
}

def __init__(
self,
Expand Down Expand Up @@ -80,7 +86,7 @@ def add_impact_sign(self) -> int:
raise ValueError(f"Invalid anomaly sign provided: {self.anomaly_sign}")

def inject_anomalies(
self, target_df: pd.DataFrame, cols: Sequence[str] = None, **kwargs
self, target_df: pd.DataFrame, cols: Optional[Sequence[str]] = None, **kwargs
) -> pd.DataFrame:
"""@param target_df: Target DataFrame where anomalies will be injected
@param cols: Columns to inject anomalies
Expand All @@ -97,7 +103,7 @@ def inject_anomalies(
raise AttributeError(f"Invalid anomaly type provided: {self.anomaly_type}")

def _inject_global_anomalies(
self, target_df: pd.DataFrame, cols: Sequence[str] = None, impact=3
self, target_df: pd.DataFrame, cols: Optional[Sequence[str]] = None, impact=3
) -> pd.DataFrame:
target_df = self._init_target_df(target_df, cols)
anomaly_df = pd.DataFrame(index=target_df.index)
Expand Down
3 changes: 2 additions & 1 deletion numalogic/synthetic/timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from datetime import date

from numpy.typing import NDArray
from typing import Optional


class SyntheticTSGenerator:
Expand Down Expand Up @@ -51,7 +52,7 @@ def __init__(
amplitude_range=(10, 40),
cosine_ratio_range=(0.5, 0.9),
noise_range=(5, 15),
phase_shift_range: tuple[int, int] = None,
phase_shift_range: Optional[tuple[int, int]] = None,
random_seed: int = 42,
):
self.seq_len = seq_len
Expand Down
4 changes: 2 additions & 2 deletions numalogic/tools/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@


from collections.abc import Sequence
from typing import Union, TypeVar
from typing import Union, TypeVar, ClassVar

from sklearn.base import BaseEstimator
from torch import Tensor
Expand Down Expand Up @@ -43,7 +43,7 @@
class Singleton(type):
r"""Helper metaclass to use as a Singleton class."""

_instances = {}
_instances: ClassVar[dict] = {}

def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
Expand Down
Loading

0 comments on commit 7778ae8

Please sign in to comment.