Skip to content

Commit

Permalink
feat!: numalogic udfs (#271)
Browse files Browse the repository at this point in the history
- add numalogic udf module
- preproc, inference, postproc and trainer udfs
- config updates and factory classes
- dockerfile
- image release github action

---------

Signed-off-by: Avik Basu <[email protected]>
Co-authored-by: Kushal Batra <[email protected]>
  • Loading branch information
ab93 and s0nicboOm committed Aug 31, 2023
1 parent c62c902 commit de8930a
Show file tree
Hide file tree
Showing 55 changed files with 17,724 additions and 666 deletions.
16 changes: 16 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
tests
.github
.gitignore
.hack
.ruff_cache
docs
examples
.coveragerc
.pre-commit-config.yaml
CHANGELOG.md
CODE_OF_CONDUCT.md
CONTRIBUTING.md
LICENSE
mkdocs.yml
README.md
USERS.md
45 changes: 45 additions & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
name: Docker Image Push

on:
push:
tags:
- v*
workflow_dispatch:

jobs:
build:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.11"]

steps:
- uses: actions/checkout@v3

- name: Docker Login
uses: docker/[email protected]
with:
registry: quay.io
username: ${{ secrets.QUAYIO_USERNAME }}
password: ${{ secrets.QUAYIO_PASSWORD }}

- name: Docker Build
env:
QUAYIO_ORG: quay.io/numaio
PLATFORM: linux/x86_64
TARGET: numalogic/udf
run: |
type=$(basename $(dirname $GITHUB_REF))
tag=$(basename $GITHUB_REF)
if [[ $type == "heads" ]]; then
tag="$(basename $GITHUB_REF)v${{ env.version }}"
fi
image_name="${QUAYIO_ORG}/numalogic/udf:${tag}"
docker buildx build \
--output "type=image,push=true" \
--platform="${PLATFORM}" \
--build-arg INSTALL_EXTRAS='redis druid'
--tag $image_name .
54 changes: 54 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
####################################################################################################
# builder: install needed dependencies
####################################################################################################

ARG PYTHON_VERSION=3.11
ARG POETRY_VERSION=1.6
FROM python:${PYTHON_VERSION}-slim-bookworm AS builder

ENV PYTHONFAULTHANDLER=1 \
PYTHONUNBUFFERED=1 \
PYTHONHASHSEED=random \
PIP_NO_CACHE_DIR=off \
PIP_DISABLE_PIP_VERSION_CHECK=on \
PIP_DEFAULT_TIMEOUT=100 \
POETRY_VERSION=${POETRY_VERSION} \
POETRY_HOME="/opt/poetry" \
POETRY_VIRTUALENVS_IN_PROJECT=true \
POETRY_NO_INTERACTION=1 \
PYSETUP_PATH="/opt/pysetup" \
VENV_PATH="/opt/pysetup/.venv"

ENV PATH="$POETRY_HOME/bin:$VENV_PATH/bin:$PATH"

RUN apt-get update \
&& apt-get install --no-install-recommends -y \
curl \
build-essential \
dumb-init \
&& apt-get clean && rm -rf /var/lib/apt/lists/* \
&& pip install --no-cache --upgrade pip \
&& curl -sSL https://install.python-poetry.org | python3 -

####################################################################################################
# udf: used for running the udf vertices
####################################################################################################
FROM builder AS udf

ARG INSTALL_EXTRAS

WORKDIR $PYSETUP_PATH
COPY ./pyproject.toml ./poetry.lock ./

# TODO install cpu/gpu based on args/arch
RUN poetry install --without dev --no-cache --no-root -E numaflow --extras "${INSTALL_EXTRAS}" && \
poetry run pip install --no-cache "torch>=2.0,<3.0" --index-url https://download.pytorch.org/whl/cpu && \
poetry run pip install --no-cache "pytorch-lightning>=2.0<3.0" && \
rm -rf ~/.cache/pypoetry/

COPY . /app
WORKDIR /app

ENTRYPOINT ["/usr/bin/dumb-init", "--"]

EXPOSE 5000
6 changes: 3 additions & 3 deletions examples/block_pipeline/src/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
)
from numalogic.models.autoencoder.variants import SparseVanillaAE
from numalogic.models.threshold import StdDevThreshold
from numalogic.numaflow import NumalogicUDF
from numalogic.udfs import NumalogicUDF
from numalogic.registry import RedisRegistry
from numalogic.tools.exceptions import RedisRegistryError
from numalogic.transforms import TanhNorm
Expand Down Expand Up @@ -56,8 +56,8 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
# Run inference
try:
output = block_pl(np.asarray(series).reshape(-1, self.n_features))
except Exception as err:
_LOGGER.error("Error running block pipeline: %r", err)
except Exception:
_LOGGER.exception("Error running block pipeline")
return Messages(Message.to_drop())

anomaly_score = np.mean(output)
Expand Down
2 changes: 1 addition & 1 deletion examples/block_pipeline/src/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
)
from numalogic.models.autoencoder.variants import SparseVanillaAE
from numalogic.models.threshold import StdDevThreshold
from numalogic.numaflow import NumalogicUDF
from numalogic.udfs import NumalogicUDF
from numalogic.registry import RedisRegistry
from numalogic.transforms import TanhNorm
from pynumaflow.function import Datum, Messages, Message
Expand Down
2 changes: 1 addition & 1 deletion examples/multi_udf/src/factory.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import ClassVar

from src.udf import Preprocess, Inference, Postprocess, Trainer, Threshold
from numalogic.numaflow import NumalogicUDF
from numalogic.udfs import NumalogicUDF


class UDFFactory:
Expand Down
2 changes: 1 addition & 1 deletion examples/multi_udf/src/udf/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import numpy.typing as npt
from numalogic.models.autoencoder import AutoencoderTrainer
from numalogic.numaflow import NumalogicUDF
from numalogic.udfs import NumalogicUDF
from numalogic.registry import MLflowRegistry, ArtifactData
from numalogic.tools.data import StreamingDataset
from pynumaflow.function import Messages, Message, Datum
Expand Down
2 changes: 1 addition & 1 deletion examples/multi_udf/src/udf/postprocess.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging

import numpy as np
from numalogic.numaflow import NumalogicUDF
from numalogic.udfs import NumalogicUDF
from numalogic.transforms import TanhNorm
from pynumaflow.function import Messages, Message, Datum

Expand Down
2 changes: 1 addition & 1 deletion examples/multi_udf/src/udf/preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import uuid

from numalogic.transforms import LogTransformer
from numalogic.numaflow import NumalogicUDF
from numalogic.udfs import NumalogicUDF
from pynumaflow.function import Messages, Message, Datum

from src.utils import Payload
Expand Down
2 changes: 1 addition & 1 deletion examples/multi_udf/src/udf/threshold.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging

from numalogic.numaflow import NumalogicUDF
from numalogic.udfs import NumalogicUDF
from numalogic.registry import MLflowRegistry
from pynumaflow.function import Messages, Message, Datum

Expand Down
2 changes: 1 addition & 1 deletion examples/multi_udf/src/udf/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from numalogic.models.autoencoder import AutoencoderTrainer
from numalogic.models.autoencoder.variants import Conv1dAE
from numalogic.models.threshold import StdDevThreshold
from numalogic.numaflow import NumalogicUDF
from numalogic.udfs import NumalogicUDF
from numalogic.registry import MLflowRegistry
from numalogic.tools.data import TimeseriesDataModule
from numalogic.transforms import LogTransformer
Expand Down
27 changes: 27 additions & 0 deletions log.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
[loggers]
keys=root, pllogger

[handlers]
keys=consoleHandler

[formatters]
keys=consoleFormatter

[logger_root]
level=INFO
handlers=consoleHandler

[logger_pllogger]
level=ERROR
handlers=consoleHandler
qualname=pytorch_lightning
propagate=0

[handler_consoleHandler]
class=StreamHandler
level=DEBUG
formatter=consoleFormatter

[formatter_consoleFormatter]
format=%(asctime)s - %(thread)d - %(levelname)s - %(message)s
class=logging.Formatter
1 change: 1 addition & 0 deletions numalogic/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@
NUMALOGIC_DIR = os.path.dirname(__file__)
BASE_DIR = os.path.split(NUMALOGIC_DIR)[0]
TESTS_DIR = os.path.join(NUMALOGIC_DIR, "../tests")
BASE_CONF_DIR = os.path.join(BASE_DIR, "config")
9 changes: 8 additions & 1 deletion numalogic/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@
# limitations under the License.


from numalogic.config._config import NumalogicConf, ModelInfo, LightningTrainerConf, RegistryInfo
from numalogic.config._config import (
NumalogicConf,
ModelInfo,
LightningTrainerConf,
RegistryInfo,
TrainerConf,
)
from numalogic.config.factory import (
ModelFactory,
PreprocessFactory,
Expand All @@ -30,4 +36,5 @@
"PostprocessFactory",
"ThresholdFactory",
"RegistryFactory",
"TrainerConf",
]
28 changes: 23 additions & 5 deletions numalogic/config/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class ModelInfo:
stateful: bool = True


# TODO add this in the right config
@dataclass
class RegistryInfo:
"""Registry config base class.
Expand All @@ -56,7 +57,7 @@ class LightningTrainerConf:
"""

accelerator: str = "auto"
max_epochs: int = 100
max_epochs: int = 50
logger: bool = False
check_val_every_n_epoch: int = 5
log_every_n_steps: int = 20
Expand All @@ -67,13 +68,30 @@ class LightningTrainerConf:
callbacks: Optional[Any] = None


@dataclass
class TrainerConf:
train_hours: int = 24 * 8 # 8 days worth of data
min_train_size: int = 2000
retrain_freq_hr: int = 24
model_expiry_sec: int = 86400 # 24 hrs # TODO: revisit this
dedup_expiry_sec: int = 1800 # 30 days # TODO: revisit this
batch_size: int = 32
pltrainer_conf: LightningTrainerConf = field(default_factory=LightningTrainerConf)


@dataclass
class NumalogicConf:
"""Top level config schema for numalogic."""

model: ModelInfo = field(default_factory=ModelInfo)
trainer: LightningTrainerConf = field(default_factory=LightningTrainerConf)
registry: RegistryInfo = field(default_factory=RegistryInfo)
trainer: TrainerConf = field(default_factory=TrainerConf)
preprocess: list[ModelInfo] = field(default_factory=list)
threshold: ModelInfo = field(default_factory=ModelInfo)
postprocess: ModelInfo = field(default_factory=ModelInfo)
threshold: ModelInfo = field(default_factory=lambda: ModelInfo(name="StdDevThreshold"))
postprocess: ModelInfo = field(
default_factory=lambda: ModelInfo(name="TanhNorm", stateful=False)
)


@dataclass
class DataConnectorConf:
source: str
17 changes: 17 additions & 0 deletions numalogic/connectors/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from numalogic.connectors._config import (
RedisConf,
PrometheusConf,
ConnectorConf,
DruidConf,
DruidFetcherConf,
ConnectorType,
)

__all__ = [
"RedisConf",
"PrometheusConf",
"ConnectorConf",
"DruidConf",
"DruidFetcherConf",
"ConnectorType",
]
57 changes: 57 additions & 0 deletions numalogic/connectors/_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from dataclasses import dataclass, field
from enum import IntEnum

from omegaconf import MISSING


class ConnectorType(IntEnum):
redis = 0
prometheus = 1
druid = 2


@dataclass
class ConnectorConf:
url: str


@dataclass
class PrometheusConf(ConnectorConf):
pushgateway: str
scrape_interval: int = 30


@dataclass
class RedisConf(ConnectorConf):
port: int
expiry: int = 300
master_name: str = "mymaster"


@dataclass
class Pivot:
index: str = "timestamp"
columns: list[str] = field(default_factory=list)
value: list[str] = field(default_factory=lambda: ["count"])


@dataclass
class DruidFetcherConf:
datasource: str
dimensions: list[str] = field(default_factory=list)
aggregations: dict = field(default_factory=dict)
group_by: list[str] = field(default_factory=list)
pivot: Pivot = field(default_factory=lambda: Pivot())
granularity: str = "minute"

def __post_init__(self):
from pydruid.utils.aggregators import doublesum

if not self.aggregations:
self.aggregations = {"count": doublesum("count")}


@dataclass
class DruidConf(ConnectorConf):
endpoint: str
fetcher: DruidFetcherConf = MISSING
Loading

0 comments on commit de8930a

Please sign in to comment.