Skip to content

Commit

Permalink
Add pl conf (#336)
Browse files Browse the repository at this point in the history
Add pl conf

---------

Signed-off-by: Nandita Koppisetty <[email protected]>
Signed-off-by: s0nicboOm <[email protected]>
Co-authored-by: Nandita Koppisetty <[email protected]>
  • Loading branch information
s0nicboOm and nkoppisetty committed Dec 21, 2023
1 parent 28fa28f commit b292553
Show file tree
Hide file tree
Showing 29 changed files with 1,708 additions and 1,247 deletions.
29 changes: 19 additions & 10 deletions numalogic/backtest/_prom.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from numalogic.connectors.prometheus import PrometheusFetcher
from numalogic.tools.data import StreamingDataset, inverse_window
from numalogic.tools.types import artifact_t
from numalogic.udfs import UDFFactory, StreamConf
from numalogic.udfs import UDFFactory, StreamConf, MLPipelineConf

DEFAULT_OUTPUT_DIR = os.path.join(BASE_DIR, ".btoutput")
LOGGER = logging.getLogger(__name__)
Expand All @@ -54,8 +54,12 @@ def _init_default_streamconf(metrics: list[str]) -> StreamConf:
return StreamConf(
source=ConnectorType.prometheus,
window_size=DEFAULT_SEQUENCE_LEN,
metrics=metrics,
numalogic_conf=numalogic_cfg,
ml_pipelines={
"default": MLPipelineConf(
metrics=metrics,
numalogic_conf=numalogic_cfg,
)
},
)


Expand All @@ -73,6 +77,7 @@ class PromUnivarBacktester:
output_dir: Output directory
test_ratio: Ratio of test data to total data
stream_conf: Stream configuration
pipeline_id: ml pipeline id from stream_conf
"""

def __init__(
Expand All @@ -86,18 +91,20 @@ def __init__(
output_dir: Union[str, Path] = DEFAULT_OUTPUT_DIR,
test_ratio: float = 0.25,
stream_conf: Optional[StreamConf] = None,
pipeline_id: str = "default",
):
self._url = url
self.namespace = namespace
self.appname = appname
self.metric = metric
self.conf = stream_conf or _init_default_streamconf([metric])
self.ml_pl_conf = self.conf.ml_pipelines[pipeline_id]
self.test_ratio = test_ratio
self.lookback_days = lookback_days
self.return_labels = return_labels

self._seq_len = self.conf.window_size
self._n_features = len(self.conf.metrics)
self._n_features = len(self.ml_pl_conf.metrics)

self.out_dir = self.get_outdir(appname, metric, outdir=output_dir)
self._datapath = os.path.join(self.out_dir, "data.csv")
Expand Down Expand Up @@ -170,13 +177,13 @@ def train_models(
LOGGER.info("Training data shape: %s", x_train.shape)

artifacts = UDFFactory.get_udf_cls("trainer").compute(
model=ModelFactory().get_instance(self.conf.numalogic_conf.model),
model=ModelFactory().get_instance(self.ml_pl_conf.numalogic_conf.model),
input_=x_train,
preproc_clf=PreprocessFactory().get_pipeline_instance(
self.conf.numalogic_conf.preprocess
self.ml_pl_conf.numalogic_conf.preprocess
),
threshold_clf=ThresholdFactory().get_instance(self.conf.numalogic_conf.threshold),
numalogic_cfg=self.conf.numalogic_conf,
threshold_clf=ThresholdFactory().get_instance(self.ml_pl_conf.numalogic_conf.threshold),
numalogic_cfg=self.ml_pl_conf.numalogic_conf,
)
artifacts_dict = {
"model": artifacts["inference"].artifact,
Expand Down Expand Up @@ -233,10 +240,12 @@ def generate_scores(

ds = StreamingDataset(x_scaled, seq_len=self.conf.window_size)
anomaly_scores = np.zeros(
(len(ds), self.conf.window_size, len(self.conf.metrics)), dtype=np.float32
(len(ds), self.conf.window_size, len(self.ml_pl_conf.metrics)), dtype=np.float32
)
x_recon = np.zeros_like(anomaly_scores, dtype=np.float32)
postproc_func = PostprocessFactory().get_instance(self.conf.numalogic_conf.postprocess)
postproc_func = PostprocessFactory().get_instance(
self.ml_pl_conf.numalogic_conf.postprocess
)

for idx, arr in enumerate(ds):
x_recon[idx] = nn_udf.compute(model=artifacts["model"], input_=arr)
Expand Down
38 changes: 19 additions & 19 deletions numalogic/udfs/README.md
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
| Metric Name | Type | Labels | Description |
|:-------------------------:|:---------:|:----------------------------------------:|:-----------------------------------------------:|
| MSG_IN_COUNTER | Counter | vertex, composite_key, config_id | Count msgs flowing in |
| MSG_PROCESSED_COUNTER | Counter | vertex, composite_key, config_id | Count msgs processed |
| SOURCE_COUNTER | Counter | source, composite_key, config_id | Count artifact source (registry or cache) calls |
| INSUFFICIENT_DATA_COUNTER | Counter | composite_key, config_id | Count insufficient data while Training |
| MODEL_STATUS_COUNTER | Counter | status, vertex, composite_key, config_id | Count status of the model |
| DATASHAPE_ERROR_COUNTER | Counter | composite_key, config_id | Count datashape errors in preprocess |
| MSG_DROPPED_COUNTER | Counter | vertex, composite_key, config_id | Count dropped msgs |
| REDIS_ERROR_COUNTER | Counter | vertex, composite_key, config_id | Count redis errors |
| EXCEPTION_COUNTER | Counter | vertex, composite_key, config_id | Count exceptions |
| RUNTIME_ERROR_COUNTER | Counter | vertex, composite_key, config_id | Count runtime errors |
| FETCH_EXCEPTION_COUNTER | Counter | composite_key, config_id | Count exceptions during train data fetch calls |
| DATAFRAME_SHAPE_SUMMARY | Summary | composite_key, config_id | len of dataframe for training |
| NAN_SUMMARY | Summary | composite_key, config_id | Count nan's in train data |
| INF_SUMMARY | Summary | composite_key, config_id | Count inf's in train data |
| FETCH_TIME_SUMMARY | Summary | composite_key, config_id | Train Data Fetch time |
| MODEL_INFO | Info | composite_key, config_id | Model info |
| UDF_TIME | Histogram | composite_key, config_id | Histogram for udf processing time |
| Metric Name | Type | Labels | Description |
|:-------------------------:|:---------:|:-----------------------------------------------------:|:-----------------------------------------------:|
| MSG_IN_COUNTER | Counter | vertex, composite_key, config_id, pipeline_id | Count msgs flowing in |
| MSG_PROCESSED_COUNTER | Counter | vertex, composite_key, config_id, pipeline_id | Count msgs processed |
| SOURCE_COUNTER | Counter | source, composite_key, config_id, pipeline_id | Count artifact source (registry or cache) calls |
| INSUFFICIENT_DATA_COUNTER | Counter | composite_key, config_id, pipeline_id | Count insufficient data while Training |
| MODEL_STATUS_COUNTER | Counter | status, vertex, composite_key, config_id, pipeline_id | Count status of the model |
| DATASHAPE_ERROR_COUNTER | Counter | composite_key, config_id, pipeline_id | Count datashape errors in preprocess |
| MSG_DROPPED_COUNTER | Counter | vertex, composite_key, config_id, pipeline_id | Count dropped msgs |
| REDIS_ERROR_COUNTER | Counter | vertex, composite_key, config_id, pipeline_id | Count redis errors |
| EXCEPTION_COUNTER | Counter | vertex, composite_key, config_id, pipeline_id | Count exceptions |
| RUNTIME_ERROR_COUNTER | Counter | vertex, composite_key, config_id, pipeline_id | Count runtime errors |
| FETCH_EXCEPTION_COUNTER | Counter | composite_key, config_id, pipeline_id | Count exceptions during train data fetch calls |
| DATAFRAME_SHAPE_SUMMARY | Summary | composite_key, config_id, pipeline_id | len of dataframe for training |
| NAN_SUMMARY | Summary | composite_key, config_id, pipeline_id | Count nan's in train data |
| INF_SUMMARY | Summary | composite_key, config_id, pipeline_id | Count inf's in train data |
| FETCH_TIME_SUMMARY | Summary | composite_key, config_id, pipeline_id | Train Data Fetch time |
| MODEL_INFO | Info | composite_key, config_id, pipeline_id | Model info |
| UDF_TIME | Histogram | composite_key, config_id, pipeline_id | Histogram for udf processing time |
5 changes: 4 additions & 1 deletion numalogic/udfs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@

from numalogic._constants import BASE_DIR
from numalogic.udfs._base import NumalogicUDF
from numalogic.udfs._config import StreamConf, PipelineConf, load_pipeline_conf
from numalogic.udfs._config import StreamConf, PipelineConf, MLPipelineConf, load_pipeline_conf
from numalogic.udfs.factory import UDFFactory, ServerFactory
from numalogic.udfs.payloadtx import PayloadTransformer
from numalogic.udfs.inference import InferenceUDF
from numalogic.udfs.postprocess import PostprocessUDF
from numalogic.udfs.preprocess import PreprocessUDF
Expand All @@ -25,6 +26,7 @@ def set_logger() -> None:

__all__ = [
"NumalogicUDF",
"PayloadTransformer",
"PreprocessUDF",
"InferenceUDF",
"TrainerUDF",
Expand All @@ -34,6 +36,7 @@ def set_logger() -> None:
"UDFFactory",
"StreamConf",
"PipelineConf",
"MLPipelineConf",
"load_pipeline_conf",
"ServerFactory",
"set_logger",
Expand Down
8 changes: 6 additions & 2 deletions numalogic/udfs/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@ def init_server(step: str, server_type: str):
pipeline_conf = load_pipeline_conf(BASE_CONF_FILE_PATH, APP_CONF_FILE_PATH)
LOGGER.info("Pipeline config: %s", pipeline_conf)

redis_client = get_redis_client_from_conf(pipeline_conf.redis_conf)
udf = UDFFactory.get_udf_instance(step, r_client=redis_client, pl_conf=pipeline_conf)
LOGGER.info("Starting vertex with step: %s, server_type %s", step, server_type)
if step == "mlpipeline":
udf = UDFFactory.get_udf_instance(step, pl_conf=pipeline_conf)
else:
redis_client = get_redis_client_from_conf(pipeline_conf.redis_conf)
udf = UDFFactory.get_udf_instance(step, r_client=redis_client, pl_conf=pipeline_conf)

return ServerFactory.get_server_instance(server_type, handler=udf)

Expand Down
43 changes: 37 additions & 6 deletions numalogic/udfs/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

from numalogic.tools.exceptions import ConfigNotFoundError
from numalogic.tools.types import artifact_t
from numalogic.udfs._config import PipelineConf, StreamConf

from numalogic.udfs._config import StreamConf, PipelineConf, MLPipelineConf

_DEFAULT_CONF_ID = "default"

Expand Down Expand Up @@ -61,17 +60,28 @@ def register_conf(self, config_id: str, conf: StreamConf) -> None:
"""
self.pl_conf.stream_confs[config_id] = conf

def _get_default_conf(self, config_id) -> StreamConf:
def _get_default_stream_conf(self, config_id) -> StreamConf:
"""Get the default config."""
try:
return self.pl_conf.stream_confs[_DEFAULT_CONF_ID]
except KeyError:
err_msg = f"Config with ID {config_id} or {_DEFAULT_CONF_ID} not found!"
raise ConfigNotFoundError(err_msg) from None

def get_conf(self, config_id: str) -> StreamConf:
def _get_default_ml_pipeline_conf(self, config_id, pipeline_id) -> MLPipelineConf:
"""Get the default pipeline config."""
try:
return self.pl_conf.stream_confs[_DEFAULT_CONF_ID].ml_pipelines[_DEFAULT_CONF_ID]
except KeyError:
err_msg = (
f"Pipeline with ID {pipeline_id} or {_DEFAULT_CONF_ID} "
f"not found for config ID {config_id}!"
)
raise ConfigNotFoundError(err_msg) from None

def get_stream_conf(self, config_id: str) -> StreamConf:
"""
Get config with the given ID.
Get stream config with the given ID.
If not found, return the default config.
Args:
Expand All @@ -88,7 +98,28 @@ def get_conf(self, config_id: str) -> StreamConf:
try:
return self.pl_conf.stream_confs[config_id]
except KeyError:
return self._get_default_conf(config_id)
return self._get_default_stream_conf(config_id)

def get_ml_pipeline_conf(self, config_id: str, pipeline_id: str) -> MLPipelineConf:
"""
Get stream's pipeline config with the given ID.
If not found, return the default config.
Args:
config_id: Config ID
Returns
-------
StreamConf object
Raises
------
ConfigNotFoundError: If config with the given ID is not found
"""
try:
return self.pl_conf.stream_confs[config_id].ml_pipelines[pipeline_id]
except KeyError:
return self._get_default_ml_pipeline_conf(config_id, pipeline_id)

def exec(self, keys: list[str], datum: Datum) -> Messages:
"""
Expand Down
10 changes: 8 additions & 2 deletions numalogic/udfs/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,20 @@
_logger = logging.getLogger(__name__)


@dataclass
class MLPipelineConf:
pipeline_id: str = "default"
metrics: list[str] = field(default_factory=list)
numalogic_conf: NumalogicConf = field(default_factory=lambda: NumalogicConf())


@dataclass
class StreamConf:
config_id: str = "default"
source: ConnectorType = ConnectorType.druid # TODO: do not allow redis connector here
window_size: int = 12
composite_keys: list[str] = field(default_factory=list)
metrics: list[str] = field(default_factory=list)
numalogic_conf: NumalogicConf = field(default_factory=lambda: NumalogicConf())
ml_pipelines: dict[str, MLPipelineConf] = field(default_factory=dict)


@dataclass
Expand Down
48 changes: 32 additions & 16 deletions numalogic/udfs/_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,74 +10,90 @@
SOURCE_COUNTER = PromCounterMetric(
"numalogic_artifact_source_counter",
"Count artifact source calls",
["source", "vertex", "composite_key", "config_id"],
["source", "vertex", "composite_key", "config_id", "pipeline_id"],
)

INSUFFICIENT_DATA_COUNTER = PromCounterMetric(
"numalogic_insufficient_data_counter",
"Count insufficient data while Training",
["composite_key", "config_id"],
["composite_key", "config_id", "pipeline_id"],
)
MODEL_STATUS_COUNTER = PromCounterMetric(
"numalogic_new_model_counter",
"Count status of the model",
["status", "vertex", "composite_key", "config_id"],
["status", "vertex", "composite_key", "config_id", "pipeline_id"],
)

DATASHAPE_ERROR_COUNTER = PromCounterMetric(
"numalogic_datashape_error_counter",
"Count datashape errors in preprocess",
["composite_key", "config_id"],
["composite_key", "config_id", "pipeline_id"],
)
MSG_DROPPED_COUNTER = PromCounterMetric(
"numalogic_msg_dropped_counter", "Count dropped msgs", ["vertex", "composite_key", "config_id"]
"numalogic_msg_dropped_counter",
"Count dropped msgs",
["vertex", "composite_key", "config_id", "pipeline_id"],
)

REDIS_ERROR_COUNTER = PromCounterMetric(
"numalogic_redis_error_counter", "Count redis errors", ["vertex", "composite_key", "config_id"]
"numalogic_redis_error_counter",
"Count redis errors",
["vertex", "composite_key", "config_id", "pipeline_id"],
)
EXCEPTION_COUNTER = PromCounterMetric(
"numalogic_exception_counter", "Count exceptions", ["vertex", "composite_key", "config_id"]
"numalogic_exception_counter",
"Count exceptions",
["vertex", "composite_key", "config_id", "pipeline_id"],
)
RUNTIME_ERROR_COUNTER = PromCounterMetric(
"numalogic_runtime_error_counter",
"Count runtime errors",
["vertex", "composite_key", "config_id"],
["vertex", "composite_key", "config_id", "pipeline_id"],
)

FETCH_EXCEPTION_COUNTER = PromCounterMetric(
"numalogic_fetch_exception_counter",
"count exceptions during fetch call",
["composite_key", "config_id"],
["composite_key", "config_id", "pipeline_id"],
)

MSG_IN_COUNTER = PromCounterMetric(
"numalogic_msg_in_counter", "Count msgs flowing in", ["vertex", "composite_key", "config_id"]
"numalogic_msg_in_counter",
"Count msgs flowing in",
["vertex", "composite_key", "config_id", "pipeline_id"],
)
MSG_PROCESSED_COUNTER = PromCounterMetric(
"numalogic_msg_processed_counter",
"Count msgs processed",
["vertex", "composite_key", "config_id"],
["vertex", "composite_key", "config_id", "pipeline_id"],
)

# SUMMARY
DATAFRAME_SHAPE_SUMMARY = PromSummaryMetric(
"numalogic_dataframe_shape_summary",
"len of dataframe for training",
["composite_key", "config_id"],
["composite_key", "config_id", "pipeline_id"],
)
NAN_SUMMARY = PromSummaryMetric(
"numalogic_nan_counter", "Count nan's in train data", ["composite_key", "config_id"]
"numalogic_nan_summary",
"Count nan's in train data",
["composite_key", "config_id", "pipeline_id"],
)
INF_SUMMARY = PromSummaryMetric(
"numalogic_inf_counter", "Count inf's in train data", ["composite_key", "config_id"]
"numalogic_inf_summary",
"Count inf's in train data",
["composite_key", "config_id", "pipeline_id"],
)
FETCH_TIME_SUMMARY = PromSummaryMetric(
"numalogic_fetch_time_summary", "Train data fetch time", ["composite_key", "config_id"]
"numalogic_fetch_time_summary",
"Train data fetch time",
["composite_key", "config_id", "pipeline_id"],
)

# Info
MODEL_INFO = PromInfoMetric("numalogic_model_info", "Model info", ["composite_key", "config_id"])
MODEL_INFO = PromInfoMetric(
"numalogic_model_info", "Model info", ["composite_key", "config_id", "pipeline_id"]
)

# HISTOGRAM
buckets = (
Expand Down
3 changes: 3 additions & 0 deletions numalogic/udfs/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class Header(str, Enum):
class _BasePayload:
uuid: str
config_id: str
pipeline_id: str
composite_keys: list[str]


Expand Down Expand Up @@ -60,6 +61,7 @@ class StreamPayload(_BasePayload):
timestamps: list[int]
status: Optional[Status] = None
header: Header = Header.MODEL_INFERENCE
artifact_versions: dict[str, dict] = field(default_factory=dict)
metadata: dict[str, Any] = field(default_factory=dict)

@property
Expand All @@ -85,6 +87,7 @@ def __str__(self) -> str:
return (
f'"StreamPayload(header={self.header}, status={self.status}, '
f'composite_keys={self.composite_keys}, data={list(self.data)})"'
f"artifact_versions={self.artifact_versions}"
)

def __repr__(self) -> str:
Expand Down
Loading

0 comments on commit b292553

Please sign in to comment.