Skip to content

Commit

Permalink
feat: local memory artifact cache (#165)
Browse files Browse the repository at this point in the history
* feat: in-memory cache

Signed-off-by: Avik Basu <[email protected]>
  • Loading branch information
ab93 committed May 3, 2023
1 parent 59a5e32 commit 794ddc6
Show file tree
Hide file tree
Showing 24 changed files with 349 additions and 187 deletions.
4 changes: 2 additions & 2 deletions .codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ coverage:
project:
default:
target: auto
threshold: 5%
threshold: 3%
patch:
default:
target: auto
threshold: 10%
threshold: 20%
28 changes: 5 additions & 23 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,12 @@ jobs:
black:
name: Black format
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
python-version: ["3.9"]

steps:
- uses: actions/checkout@v3

- name: Install poetry
run: pipx install poetry==1.4.2

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
cache: 'poetry'

- name: Install dependencies
run: |
poetry env use ${{ matrix.python-version }}
poetry install --with dev
- name: Black format check
run: poetry run black --check .
- uses: actions/checkout@v3
- uses: psf/black@stable
with:
options: "--check --verbose"
version: "~= 23.3"

ruff:
runs-on: ubuntu-latest
Expand Down
3 changes: 2 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ repos:
rev: 23.3.0
hooks:
- id: black
args: [ --check ]
language_version: python3.9
args: [--config=pyproject.toml, --diff, --color ]
- repo: https://github.com/charliermarsh/ruff-pre-commit
# Ruff version.
rev: 'v0.0.264'
Expand Down
6 changes: 1 addition & 5 deletions examples/numalogic-simple-pipeline/src/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,7 @@ class Payload:
is_artifact_valid: bool = True


def save_artifact(
artifact,
skeys: Sequence[str],
dkeys: Sequence[str],
) -> None:
def save_artifact(artifact, skeys: Sequence[str], dkeys: Sequence[str]) -> None:
if isinstance(artifact, BaseAE):
ml_registry = MLflowRegistry(tracking_uri=TRACKING_URI, artifact_type="pytorch")
else:
Expand Down
5 changes: 1 addition & 4 deletions numalogic/config/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,7 @@ class PreprocessFactory(_ObjectFactory):


class PostprocessFactory(_ObjectFactory):
_CLS_MAP = {
"TanhNorm": TanhNorm,
"ExpMovingAverage": ExpMovingAverage,
}
_CLS_MAP = {"TanhNorm": TanhNorm, "ExpMovingAverage": ExpMovingAverage}


class ThresholdFactory(_ObjectFactory):
Expand Down
5 changes: 2 additions & 3 deletions numalogic/models/autoencoder/variants/conv.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def _construct_layers(
layers.append(
nn.LazyConvTranspose1d(
out_channels=num_filters[-1], kernel_size=kernel_sizes[-1], padding=1
),
)
)
if final_activation:
layers.append(_get_activation_function(final_activation))
Expand Down Expand Up @@ -286,8 +286,7 @@ def predict_step(self, batch: Tensor, batch_idx: int, dataloader_idx: int = 0) -
"""Returns reconstruction for streaming input"""
recon = self.reconstruction(batch)
recon = recon.view(-1, self.seq_len, self.in_channels)
recon_err = self.criterion(batch, recon, reduction="none")
return recon_err
return self.criterion(batch, recon, reduction="none")


class SparseConv1dAE(Conv1dAE):
Expand Down
6 changes: 2 additions & 4 deletions numalogic/models/autoencoder/variants/lstm.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ def forward(self, x: Tensor) -> Tensor:
x = x.unsqueeze(1).repeat(1, self.seq_len, 1)
x, (_, __) = self.lstm(x)
x = x.reshape((-1, self.seq_len, self.hidden_size))
out = self.fc(x)
return out
return self.fc(x)


class LSTMAE(BaseAE):
Expand Down Expand Up @@ -152,8 +151,7 @@ def forward(self, x: Tensor) -> tuple[Tensor, Tensor]:
def predict_step(self, batch: Tensor, batch_idx: int, dataloader_idx: int = 0):
"""Returns reconstruction for streaming input"""
recon = self.reconstruction(batch)
recon_err = self.criterion(batch, recon, reduction="none")
return recon_err
return self.criterion(batch, recon, reduction="none")


class SparseLSTMAE(LSTMAE):
Expand Down
20 changes: 5 additions & 15 deletions numalogic/models/autoencoder/variants/transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@ def _scaled_dot_product(query: Tensor, key: Tensor, value: Tensor) -> Tensor:


def _positional_encoding(
feature: int,
seq_len: int,
device: torch.device = torch.device("cpu"),
feature: int, seq_len: int, device: torch.device = torch.device("cpu")
) -> Tensor:
r"""
Positional Encoding as described in
Expand Down Expand Up @@ -68,9 +66,7 @@ def _feed_forward(dim_input: int = 10, dim_feedforward: int = 2048) -> nn.Module
nn.Module type
"""
return nn.Sequential(
nn.Linear(dim_input, dim_feedforward),
nn.ReLU(),
nn.Linear(dim_feedforward, dim_input),
nn.Linear(dim_input, dim_feedforward), nn.ReLU(), nn.Linear(dim_feedforward, dim_input)
)


Expand Down Expand Up @@ -164,9 +160,7 @@ def __init__(
dropout=dropout,
)
self.feed_forward = _Residual(
_feed_forward(dim_model, dim_feedforward),
dimension=dim_model,
dropout=dropout,
_feed_forward(dim_model, dim_feedforward), dimension=dim_model, dropout=dropout
)

def forward(self, src: Tensor) -> Tensor:
Expand All @@ -193,7 +187,6 @@ def __init__(
dim_feedforward: int = 2048,
dropout: float = 0.1,
):

super().__init__()
self.layers = nn.ModuleList(
[
Expand Down Expand Up @@ -241,9 +234,7 @@ def __init__(
dropout=dropout,
)
self.feed_forward = _Residual(
_feed_forward(dim_model, dim_feedforward),
dimension=dim_model,
dropout=dropout,
_feed_forward(dim_model, dim_feedforward), dimension=dim_model, dropout=dropout
)

def forward(self, tgt: Tensor, memory: Tensor) -> Tensor:
Expand Down Expand Up @@ -362,8 +353,7 @@ def predict_step(self, batch: Tensor, batch_idx: int, dataloader_idx: int = 0):
"""Returns reconstruction for streaming input"""
recon = self.reconstruction(batch)
recon = recon.view(-1, self.seq_len, self.n_features)
recon_err = self.criterion(batch, recon, reduction="none")
return recon_err
return self.criterion(batch, recon, reduction="none")


class SparseTransformerAE(TransformerAE):
Expand Down
4 changes: 1 addition & 3 deletions numalogic/models/autoencoder/variants/vanilla.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ def __init__(
dropout_p: float = 0.25,
**kwargs,
):

super().__init__(**kwargs)
self.seq_len = seq_len
self.dropout_prob = dropout_p
Expand Down Expand Up @@ -202,8 +201,7 @@ def predict_step(self, batch: Tensor, batch_idx: int, dataloader_idx: int = 0):
"""Returns reconstruction for streaming input"""
recon = self.reconstruction(batch)
recon = recon.view(-1, self.seq_len, self.n_features)
recon_err = self.criterion(batch, recon, reduction="none")
return recon_err
return self.criterion(batch, recon, reduction="none")


class SparseVanillaAE(VanillaAE):
Expand Down
3 changes: 1 addition & 2 deletions numalogic/models/threshold/_static.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,5 +110,4 @@ def score_samples(self, x: npt.NDArray[float]) -> npt.NDArray[float]:
with values being anomaly scores.
"""
x = x.copy()
y = 10 / (1 + np.exp(-self.coeff * (x - self.upper_limit)))
return y
return 10 / (1 + np.exp(-self.coeff * (x - self.upper_limit)))
3 changes: 1 addition & 2 deletions numalogic/models/threshold/_std.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,4 @@ def score_samples(self, x_test: NDArray[float]) -> NDArray[float]:
"""
Returns an anomaly score array with the same shape as input.
"""
y_scores = x_test / self.threshold
return y_scores
return x_test / self.threshold
17 changes: 11 additions & 6 deletions numalogic/registry/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,18 @@
# limitations under the License.


from numalogic.registry.artifact import ArtifactManager
from numalogic.registry.artifact import ArtifactData
from numalogic.registry.artifact import ArtifactManager, ArtifactData, ArtifactCache
from numalogic.registry.localcache import LocalLRUCache

try:
from numalogic.registry.mlflow_registry import MLflowRegistry
except ImportError as err:
print("HERE", err)
__all__ = ["ArtifactManager", "ArtifactData"]
except ImportError:
__all__ = ["ArtifactManager", "ArtifactData", "ArtifactCache", "LocalLRUCache"]
else:
__all__ = ["ArtifactManager", "ArtifactData", "MLflowRegistry"]
__all__ = [
"ArtifactManager",
"ArtifactData",
"MLflowRegistry",
"ArtifactCache",
"LocalLRUCache",
]
90 changes: 78 additions & 12 deletions numalogic/registry/artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,30 @@
# limitations under the License.


from abc import ABCMeta, abstractmethod
from dataclasses import dataclass
from typing import Any
from typing import Any, Generic, TypeVar, Union
from collections.abc import Sequence

from numalogic.tools.types import artifact_t
from numalogic.tools.types import artifact_t, S_KEYS, D_KEYS

META_T = TypeVar("META_T", bound=dict[str, Union[str, list, dict]])
EXTRA_T = TypeVar("EXTRA_T", bound=dict[str, Union[str, list, dict]])


@dataclass
class ArtifactData:
__slots__ = ("artifact", "metadata", "extras")

artifact: artifact_t
metadata: dict[str, Any]
extras: dict[str, Any]
metadata: META_T
extras: EXTRA_T


A_D = TypeVar("A_D", bound=ArtifactData, covariant=True)
M_K = TypeVar("M_K", bound=str)


class ArtifactManager(metaclass=ABCMeta):
class ArtifactManager(Generic[S_KEYS, D_KEYS, A_D]):
"""
Abstract base class for artifact save, load and delete.
Expand All @@ -37,7 +45,6 @@ class ArtifactManager(metaclass=ABCMeta):
def __init__(self, uri: str):
self.uri = uri

@abstractmethod
def load(
self, skeys: Sequence[str], dkeys: Sequence[str], latest: bool = True, version: str = None
) -> ArtifactData:
Expand All @@ -49,9 +56,8 @@ def load(
latest: boolean field to determine if latest version is desired or not
version: explicit artifact version
"""
pass
raise NotImplementedError("Please implement this method!")

@abstractmethod
def save(
self, skeys: Sequence[str], dkeys: Sequence[str], artifact: artifact_t, **metadata
) -> Any:
Expand All @@ -63,9 +69,8 @@ def save(
artifact: primary artifact to be saved
metadata: additional metadata surrounding the artifact that needs to be saved
"""
pass
raise NotImplementedError("Please implement this method!")

@abstractmethod
def delete(self, skeys: Sequence[str], dkeys: Sequence[str], version: str) -> None:
"""
Deletes the artifact with a specified version from mlflow registry.
Expand All @@ -74,4 +79,65 @@ def delete(self, skeys: Sequence[str], dkeys: Sequence[str], version: str) -> No
dkeys: dynamic key fields as list/tuple of strings
version: explicit artifact version
"""
pass
raise NotImplementedError("Please implement this method!")

@staticmethod
def construct_key(skeys: Sequence[str], dkeys: Sequence[str]) -> str:
"""
Returns a single key comprising static and dynamic key fields.
Override this method if customization is needed.
Args:
skeys: static key fields as list/tuple of strings
dkeys: dynamic key fields as list/tuple of strings
Returns:
key
"""
_static_key = ":".join(skeys)
_dynamic_key = ":".join(dkeys)
return "::".join([_static_key, _dynamic_key])


class ArtifactCache(Generic[M_K, A_D]):
r"""
Base class for all artifact caches.
Caches support saving, loading and deletion, but not artifact versioning.
Args:
cachesize: size of the cache
ttl: time to live for each item in the cache
"""
__slots__ = ("_cachesize", "_ttl")

def __init__(self, cachesize: int, ttl: int):
self._cachesize = cachesize
self._ttl = ttl

@property
def cachesize(self):
return self._cachesize

@property
def ttl(self):
return self._ttl

def load(self, key: str) -> ArtifactData:
r"""
Returns the stored ArtifactData object from the cache.
Implement this method for your custom cache.
"""
raise NotImplementedError("Please implement this method!")

def save(self, key: str, artifact: ArtifactData) -> None:
r"""
Saves the ArtifactData object into the cache.
Implement this method for your custom cache.
"""
raise NotImplementedError("Please implement this method!")

def delete(self, key: str) -> None:
r"""
Deletes the ArtifactData object from the cache.
Implement this method for your custom cache.
"""
raise NotImplementedError("Please implement this method!")
Loading

0 comments on commit 794ddc6

Please sign in to comment.