Skip to content

Commit

Permalink
examples: update with new numalogic and pynumaflow (#202)
Browse files Browse the repository at this point in the history
- update examples for numalogic version and pynumaflow versions
- base numalogic udf
- pynumaflow as an optional dependency
- class-based udfs
- remove protobuf 3.20 requirement

---------

Signed-off-by: Avik Basu <[email protected]>
  • Loading branch information
ab93 committed Jun 2, 2023
1 parent fd169cf commit ce93191
Show file tree
Hide file tree
Showing 17 changed files with 715 additions and 446 deletions.
10 changes: 5 additions & 5 deletions examples/numalogic-simple-pipeline/numa-pl.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ spec:
min: 1
udf:
container:
image: quay.io/numaio/numalogic/example-udf:v0.3.0
image: quay.io/numaio/numalogic/example-udf:v0.4
env:
- name: WIN_SIZE
value: "12"
Expand All @@ -31,7 +31,7 @@ spec:
min: 1
udf:
container:
image: quay.io/numaio/numalogic/example-udf:v0.3.0
image: quay.io/numaio/numalogic/example-udf:v0.4
env:
- name: WIN_SIZE
value: "12"
Expand All @@ -44,7 +44,7 @@ spec:
min: 1
udf:
container:
image: quay.io/numaio/numalogic/example-udf:v0.3.0
image: quay.io/numaio/numalogic/example-udf:v0.4
env:
- name: WIN_SIZE
value: "12"
Expand All @@ -57,7 +57,7 @@ spec:
min: 1
udf:
container:
image: quay.io/numaio/numalogic/example-udf:v0.3.0
image: quay.io/numaio/numalogic/example-udf:v0.4
env:
- name: WIN_SIZE
value: "12"
Expand All @@ -78,7 +78,7 @@ spec:
env:
- name: WIN_SIZE
value: "12"
image: quay.io/numaio/numalogic/example-udf:v0.3.0
image: quay.io/numaio/numalogic/example-udf:v0.4
args:
- python
- starter.py
Expand Down
11 changes: 5 additions & 6 deletions examples/numalogic-simple-pipeline/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
cachetools==5.2.0
dataclasses-json==0.5.7
numalogic[mlflow]==0.3.0
pytorch-lightning==1.8.6
protobuf==3.20 # need this to avoid errors with tensorboard
pynumaflow==0.2.6
cachetools>5.2,<6.0
numalogic[mlflow,numaflow] @ git+https://github.com/numaproj/numalogic.git@main
# ../../../numalogic[mlflow,numaflow] # for local testing
pytorch-lightning>2.0,< 3.0
pynumaflow>0.4,<0.5
20 changes: 20 additions & 0 deletions examples/numalogic-simple-pipeline/src/factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from src.udf import Preprocess, Inference, Postprocess, Trainer, Threshold
from numalogic.numaflow import NumalogicUDF


class UDFFactory:
"""Factory class to return the handler for the given step."""

_UDF_MAP = {
"preprocess": Preprocess,
"inference": Inference,
"postprocess": Postprocess,
"train": Trainer,
"threshold": Threshold,
}

@classmethod
def get_handler(cls, step: str) -> NumalogicUDF:
"""Return the handler for the given step."""
udf_cls = cls._UDF_MAP[step]
return udf_cls()
12 changes: 6 additions & 6 deletions examples/numalogic-simple-pipeline/src/udf/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from src.udf.inference import inference
from src.udf.postprocess import postprocess
from src.udf.preprocess import preprocess
from src.udf.train import train
from src.udf.threshold import threshold
from src.udf.inference import Inference
from src.udf.postprocess import Postprocess
from src.udf.preprocess import Preprocess
from src.udf.train import Trainer
from src.udf.threshold import Threshold


__all__ = ["preprocess", "inference", "postprocess", "train", "threshold"]
__all__ = ["Preprocess", "Inference", "Postprocess", "Trainer", "Threshold"]
72 changes: 44 additions & 28 deletions examples/numalogic-simple-pipeline/src/udf/inference.py
Original file line number Diff line number Diff line change
@@ -1,52 +1,68 @@
import logging
import os

import numpy as np
import numpy.typing as npt
from numalogic.models.autoencoder import AutoencoderTrainer
from numalogic.numaflow import NumalogicUDF
from numalogic.registry import MLflowRegistry, ArtifactData
from numalogic.tools.data import StreamingDataset
from pynumaflow.function import Messages, Message, Datum
from torch.utils.data import DataLoader

from src.utils import Payload, load_artifact
from src.utils import Payload

LOGGER = logging.getLogger(__name__)
WIN_SIZE = int(os.getenv("WIN_SIZE"))
TRACKING_URI = "http:https://mlflow-service.default.svc.cluster.local:5000"


def inference(_: str, datum: Datum) -> Messages:
r"""Here inference is done on the data, given, the ML model is present
in the registry. If a model does not exist, the payload is flagged for training.
It then passes to the threshold vertex.
For more information about the arguments, refer:
https://github.com/numaproj/numaflow-python/blob/main/pynumaflow/function/_dtypes.py
class Inference(NumalogicUDF):
"""
The inference function here performs inference on the streaming data and sends
the payload to threshold vertex.
"""
# Load data and convert bytes to Payload
payload = Payload.from_json(datum.value.decode("utf-8"))
messages = Messages()

artifact_data = load_artifact(skeys=["ae"], dkeys=["model"], type_="pytorch")
stream_data = np.asarray(payload.ts_data).reshape(-1, 1)
def __init__(self):
super().__init__()
self.registry = MLflowRegistry(tracking_uri=TRACKING_URI)

# Check if model exists for inference
if artifact_data:
LOGGER.info("%s - Model found!", payload.uuid)
def load_model(self) -> ArtifactData:
"""Loads the model from the registry."""
return self.registry.load(skeys=["ae"], dkeys=["model"])

# Load model from registry
@staticmethod
def _infer(artifact_data: ArtifactData, stream_data: npt.NDArray[float]) -> list[float]:
"""Performs inference on the streaming data."""
main_model = artifact_data.artifact
streamloader = DataLoader(StreamingDataset(stream_data, WIN_SIZE))

trainer = AutoencoderTrainer()
recon_err = trainer.predict(main_model, dataloaders=streamloader)
reconerr = trainer.predict(main_model, dataloaders=streamloader)
return reconerr.tolist()

def exec(self, keys: list[str], datum: Datum) -> Messages:
"""
Here inference is done on the data, given, the ML model is present
in the registry. If a model does not exist, the payload is flagged for training.
It then passes to the threshold vertex.
For more information about the arguments, refer:
https://github.com/numaproj/numaflow-python/blob/main/pynumaflow/function/_dtypes.py
"""
# Load data and convert bytes to Payload
payload = Payload.from_json(datum.value)

payload.ts_data = recon_err.tolist()
LOGGER.info("%s - Inference complete", payload.uuid)
artifact_data = self.load_model()
stream_data = payload.get_array().reshape(-1, 1)

else:
# If model not found, set status as not found
LOGGER.warning("%s - Model not found", payload.uuid)
payload.is_artifact_valid = False
# Check if model exists for inference
if artifact_data:
payload.set_array(self._infer(artifact_data, stream_data))
LOGGER.info("%s - Inference complete", payload.uuid)
else:
# If model not found, set status as not found
LOGGER.warning("%s - Model not found", payload.uuid)
payload.is_artifact_valid = False

# Convert Payload back to bytes and conditional forward to threshold vertex
messages.append(Message.to_vtx(key="threshold", value=payload.to_json().encode("utf-8")))
return messages
# Convert Payload back to bytes and conditional forward to threshold vertex
return Messages(Message(value=payload.to_json()))
41 changes: 23 additions & 18 deletions examples/numalogic-simple-pipeline/src/udf/postprocess.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging

import numpy as np
from numalogic.numaflow import NumalogicUDF
from numalogic.transforms import TanhNorm
from pynumaflow.function import Messages, Message, Datum

Expand All @@ -9,26 +10,30 @@
LOGGER = logging.getLogger(__name__)


def postprocess(key: str, datum: Datum) -> Messages:
r"""The postprocess transforms the inferred data into anomaly score between [0,10]
and sends it to log sink.
class Postprocess(NumalogicUDF):
"""UDF to postprocess the anomaly score, and scale it between [0,10]."""

For more information about the arguments, refer:
https://github.com/numaproj/numaflow-python/blob/main/pynumaflow/function/_dtypes.py
"""
# Load json data
payload = Payload.from_json(datum.value.decode("utf-8"))
def __init__(self):
super().__init__()

# Postprocess step
data = np.asarray(payload.ts_data)
def exec(self, _: list[str], datum: Datum) -> Messages:
"""The postprocess transforms the inferred data into anomaly score between [0,10]
and sends it to log sink.
# Taking mean of the anomaly scores
normalizer = TanhNorm()
payload.anomaly_score = normalizer.fit_transform(np.mean(data))
For more information about the arguments, refer:
https://github.com/numaproj/numaflow-python/blob/main/pynumaflow/function/_dtypes.py
"""
# Load json data
payload = Payload.from_json(datum.value)

LOGGER.info("%s - The anomaly score is: %s", payload.uuid, payload.anomaly_score)
# Postprocess step
data = payload.get_array()

# Convert Payload back to bytes
messages = Messages()
messages.append(Message.to_all(payload.to_json().encode("utf-8")))
return messages
# Taking mean of the anomaly scores
normalizer = TanhNorm()
payload.anomaly_score = normalizer.fit_transform(np.mean(data))

LOGGER.info("%s - The anomaly score is: %s", payload.uuid, payload.anomaly_score)

# Convert Payload back to bytes
return Messages(Message(value=payload.to_json()))
42 changes: 24 additions & 18 deletions examples/numalogic-simple-pipeline/src/udf/preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,38 @@
import logging
import uuid

import numpy as np
from numalogic.transforms import LogTransformer
from numalogic.numaflow import NumalogicUDF
from pynumaflow.function import Messages, Message, Datum

from src.utils import Payload

LOGGER = logging.getLogger(__name__)


def preprocess(_: str, datum: Datum) -> Messages:
r"""The preprocess function here transforms the input data for ML inference and sends
the payload to inference vertex.
class Preprocess(NumalogicUDF):
"""UDF to preprocess the input data for ML inference."""

For more information about the arguments, refer:
https://github.com/numaproj/numaflow-python/blob/main/pynumaflow/function/_dtypes.py
"""
# Load json data
json_data = datum.value
ts_array = json.loads(json_data)["data"]
payload = Payload(ts_data=ts_array, uuid=str(uuid.uuid4()))
def __init__(self):
super().__init__()

# preprocess step
data = np.asarray(payload.ts_data)
clf = LogTransformer()
payload.ts_data = clf.transform(data).tolist()
LOGGER.info("%s - Preprocess complete for data: %s", payload.uuid, payload.ts_data)
def exec(self, _: list[str], datum: Datum) -> Messages:
"""The preprocess function here transforms the input data for ML inference and sends
the payload to inference vertex.
# Convert Payload back to bytes
return Messages(Message.to_all(payload.to_json().encode("utf-8")))
For more information about the arguments, refer:
https://github.com/numaproj/numaflow-python/blob/main/pynumaflow/function/_dtypes.py
"""
# Load json data
series = json.loads(datum.value)["data"]
payload = Payload(uuid=str(uuid.uuid4()), arr=series)

# preprocess step
data = payload.get_array()
clf = LogTransformer()
out = clf.fit_transform(data)
payload.set_array(out.tolist())
LOGGER.info("%s - Preprocess complete for data: %s", payload.uuid, payload.arr)

# Return as a Messages object
return Messages(Message(value=payload.to_json()))
66 changes: 40 additions & 26 deletions examples/numalogic-simple-pipeline/src/udf/threshold.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,58 @@
import logging

import numpy as np
from numalogic.numaflow import NumalogicUDF
from numalogic.registry import MLflowRegistry
from pynumaflow.function import Messages, Message, Datum

from src.utils import Payload, load_artifact
from src.utils import Payload

LOGGER = logging.getLogger(__name__)
TRACKING_URI = "http:https://mlflow-service.default.svc.cluster.local:5000"


def threshold(_: str, datum: Datum) -> Messages:
r"""UDF that applies thresholding to the reconstruction error returned by the autoencoder.
class Threshold(NumalogicUDF):
"""UDF to apply thresholding to the reconstruction error returned by the autoencoder."""

For more information about the arguments, refer:
https://github.com/numaproj/numaflow-python/blob/main/pynumaflow/function/_dtypes.py
"""
# Load data and convert bytes to Payload
payload = Payload.from_json(datum.value.decode("utf-8"))
messages = Messages()
def __init__(self):
super().__init__()
self.registry = MLflowRegistry(tracking_uri=TRACKING_URI)

# Load the threshold model from registry
thresh_clf_artifact = load_artifact(skeys=["thresh_clf"], dkeys=["model"])
recon_err = np.asarray(payload.ts_data).reshape(-1, 1)

# Check if model exists for inference
if (not thresh_clf_artifact) or (not payload.is_artifact_valid):
# If model not found, send it to trainer for training
@staticmethod
def _handle_not_found(payload: Payload) -> Messages:
"""
Handles the case when the model is not found.
If model not found, send it to trainer for training.
"""
LOGGER.warning("%s - Model not found. Training the model.", payload.uuid)

# Convert Payload back to bytes and conditional forward to train vertex
payload.is_artifact_valid = False
messages.append(Message.to_vtx(key="train", value=payload.to_json().encode("utf-8")))
return messages
return Messages(Message(keys=["train"], value=payload.to_json()))

def exec(self, _: list[str], datum: Datum) -> Messages:
"""
UDF that applies thresholding to the reconstruction error returned by the autoencoder.
For more information about the arguments, refer:
https://github.com/numaproj/numaflow-python/blob/main/pynumaflow/function/_dtypes.py
"""
# Load data and convert bytes to Payload
payload = Payload.from_json(datum.value)

# Load the threshold model from registry
thresh_clf_artifact = self.registry.load(
skeys=["thresh_clf"], dkeys=["model"], artifact_type="sklearn"
)
recon_err = payload.get_array().reshape(-1, 1)

LOGGER.debug("%s - Threshold Model found!", payload.uuid)
# Check if model exists for inference
if (not thresh_clf_artifact) or (not payload.is_artifact_valid):
return self._handle_not_found(payload)

thresh_clf = thresh_clf_artifact.artifact
payload.ts_data = thresh_clf.predict(recon_err).tolist()
thresh_clf = thresh_clf_artifact.artifact
payload.set_array(thresh_clf.predict(recon_err).tolist())

LOGGER.info("%s - Thresholding complete", payload.uuid)
LOGGER.info("%s - Thresholding complete", payload.uuid)

# Convert Payload back to bytes and conditional forward to postprocess vertex
messages.append(Message.to_vtx(key="postprocess", value=payload.to_json().encode("utf-8")))
return messages
# Convert Payload back to bytes and conditional forward to postprocess vertex
return Messages(Message(keys=["postprocess"], value=payload.to_json()))
Loading

0 comments on commit ce93191

Please sign in to comment.