Skip to content

Commit

Permalink
Refactor logging in UDFs (#356)
Browse files Browse the repository at this point in the history
Explain what this PR does.

---------

Signed-off-by: Gulshan Bhatia <[email protected]>
Signed-off-by: Gulshan Bhatia <[email protected]>
Co-authored-by: Gulshan Bhatia <[email protected]>
  • Loading branch information
gulshan02 and Gulshan Bhatia committed Apr 15, 2024
1 parent be29f02 commit c3991bb
Show file tree
Hide file tree
Showing 14 changed files with 279 additions and 239 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ jobs:
poetry run pytest --cov-report=xml --cov=numalogic --cov-config .coveragerc tests/ -sq
- name: Upload Coverage
uses: codecov/codecov-action@v3
uses: codecov/codecov-action@v4
with:
token: ${{ secrets.CODECOV_TOKEN }}
files: ./coverage.xml
fail_ci_if_error: true
verbose: true
32 changes: 32 additions & 0 deletions numalogic/udfs/_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import structlog
from structlog import processors, stdlib


def configure_logger():
"""Configure struct logger for the UDFs."""
structlog.configure(
processors=[
stdlib.filter_by_level,
stdlib.add_log_level,
stdlib.PositionalArgumentsFormatter(),
processors.TimeStamper(fmt="iso"),
processors.StackInfoRenderer(),
# processors.format_exc_info,
processors.UnicodeDecoder(),
processors.KeyValueRenderer(key_order=["uuid", "event"]),
stdlib.ProcessorFormatter.wrap_for_formatter,
],
logger_factory=stdlib.LoggerFactory(),
wrapper_class=stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
return structlog.getLogger(__name__)


def log_data_payload_values(log, data_payload):
return log.bind(
uuid=data_payload["uuid"],
config_id=data_payload["config_id"],
pipeline_id=data_payload.get("pipeline_id", "default"),
metadata=data_payload.get("metadata", {}),
)
48 changes: 18 additions & 30 deletions numalogic/udfs/inference.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import logging
import os
import time
from dataclasses import replace
Expand All @@ -15,6 +14,7 @@
from numalogic.tools.types import artifact_t, redis_client_t
from numalogic.udfs._base import NumalogicUDF
from numalogic.udfs._config import PipelineConf
from numalogic.udfs._logger import configure_logger, log_data_payload_values
from numalogic.udfs._metrics import (
RUNTIME_ERROR_COUNTER,
MSG_PROCESSED_COUNTER,
Expand All @@ -30,7 +30,7 @@
get_static_thresh_message,
)

_LOGGER = logging.getLogger(__name__)
_struct_log = configure_logger()

# TODO: move to config
LOCAL_CACHE_TTL = int(os.getenv("LOCAL_CACHE_TTL", "3600"))
Expand Down Expand Up @@ -104,9 +104,11 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
Messages instance
"""
_start_time = time.perf_counter()
log = _struct_log.bind(udf_vertex=self._vtx)

# Construct payload object
payload = StreamPayload(**orjson.loads(datum.value))
json_data_payload = orjson.loads(datum.value)
payload = StreamPayload(**json_data_payload)
_metric_label_values = (
payload.metadata["numalogic_opex_tags"]["source"],
self._vtx,
Expand All @@ -115,16 +117,12 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
payload.pipeline_id,
)
_increment_counter(counter=MSG_IN_COUNTER, labels=_metric_label_values)
_LOGGER.debug(
"%s - Received Msg: { CompositeKeys: %s, Metrics: %s }",
payload.uuid,
payload.composite_keys,
payload.metrics,
)

_stream_conf = self.get_stream_conf(payload.config_id)
_conf = _stream_conf.ml_pipelines[payload.pipeline_id]

log = log_data_payload_values(log, json_data_payload)

artifact_data, payload = _load_artifact(
skeys=[_ckey for _, _ckey in zip(_stream_conf.composite_keys, payload.composite_keys)],
dkeys=[payload.pipeline_id, _conf.numalogic_conf.model.name],
Expand All @@ -139,6 +137,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
msgs = Messages(get_trainer_message(keys, _stream_conf, payload))
if _conf.numalogic_conf.score.adjust:
msgs.append(get_static_thresh_message(keys, payload))
log.exception("Artifact model not loaded!")
return msgs

# Perform inference
Expand All @@ -147,11 +146,10 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
_update_info_metric(x_inferred, payload.metrics, _metric_label_values)
except RuntimeError:
_increment_counter(counter=RUNTIME_ERROR_COUNTER, labels=_metric_label_values)
_LOGGER.exception(
"%s - Runtime inference error! Keys: %s, Metric: %s",
payload.uuid,
payload.composite_keys,
payload.metrics,
log.exception(
"Runtime inference error!",
keys=payload.composite_keys,
metrics=payload.metrics,
)
# Send training request if inference fails
msgs = Messages(get_trainer_message(keys, _stream_conf, payload))
Expand All @@ -176,21 +174,17 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
)
# Send trainer message if artifact is stale
if status == Status.ARTIFACT_STALE:
log.info("Inference artifact found is stale")
msgs.append(get_trainer_message(keys, _stream_conf, payload, *_metric_label_values))

_LOGGER.info(
"%s - Successfully inferred: { CompositeKeys: %s, Metrics: %s }",
payload.uuid,
payload.composite_keys,
payload.metrics,
)
_increment_counter(counter=MSG_PROCESSED_COUNTER, labels=_metric_label_values)
msgs.append(Message(keys=keys, value=payload.to_json(), tags=["postprocess"]))

_LOGGER.debug(
"%s - Time taken in inference: %.4f sec",
payload.uuid,
time.perf_counter() - _start_time,
log.info(
"Successfully inferred!",
keys=payload.composite_keys,
metrics=payload.metrics,
execution_time_ms=round((time.perf_counter() - _start_time) * 1000, 4),
)
return msgs

Expand All @@ -214,11 +208,5 @@ def is_model_stale(self, artifact_data: ArtifactData, payload: StreamPayload) ->
) == "registry" and self.model_registry.is_artifact_stale(
artifact_data, _conf.numalogic_conf.trainer.retrain_freq_hr
):
_LOGGER.info(
"%s - Inference artifact found is stale, Keys: %s, Metric: %s",
payload.uuid,
payload.composite_keys,
payload.metrics,
)
return True
return False
16 changes: 9 additions & 7 deletions numalogic/udfs/payloadtx.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import logging
import time
from typing import Optional

Expand All @@ -9,8 +8,9 @@
from numalogic.tools.types import artifact_t
from numalogic.udfs import NumalogicUDF
from numalogic.udfs._config import PipelineConf
from numalogic.udfs._logger import configure_logger, log_data_payload_values

_LOGGER = logging.getLogger(__name__)
_struct_log = configure_logger()


class PayloadTransformer(NumalogicUDF):
Expand Down Expand Up @@ -45,13 +45,13 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
"""
_start_time = time.perf_counter()
log = _struct_log.bind(udf_vertex=self._vtx)

# check message sanity
try:
data_payload = orjson.loads(datum.value)
_LOGGER.info("%s - Data payload: %s", data_payload["uuid"], data_payload)
except (orjson.JSONDecodeError, KeyError): # catch json decode error only
_LOGGER.exception("Error while decoding input json")
log.exception("Error while decoding input json")
return Messages(Message.to_drop())

_stream_conf = self.get_stream_conf(data_payload["config_id"])
Expand All @@ -62,8 +62,10 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
data_payload["pipeline_id"] = pipeline
messages.append(Message(keys=keys, value=orjson.dumps(data_payload)))

_LOGGER.debug(
"Time taken to execute Pipeline: %.4f sec",
time.perf_counter() - _start_time,
log = log_data_payload_values(log, data_payload)
log.info(
"Appended pipeline id to the payload",
keys=keys,
execution_time_ms=round((time.perf_counter() - _start_time) * 1000, 4),
)
return messages
58 changes: 29 additions & 29 deletions numalogic/udfs/postprocess.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import logging
import os
import time
from dataclasses import replace
Expand All @@ -22,6 +21,7 @@
from numalogic.tools.types import redis_client_t, artifact_t
from numalogic.udfs import NumalogicUDF
from numalogic.udfs._config import PipelineConf, MLPipelineConf
from numalogic.udfs._logger import configure_logger, log_data_payload_values
from numalogic.udfs._metrics import (
RUNTIME_ERROR_COUNTER,
MSG_PROCESSED_COUNTER,
Expand All @@ -38,7 +38,7 @@
LOAD_LATEST = os.getenv("LOAD_LATEST", "false").lower() == "true"
SCORE_PREFIX = os.getenv("SCORE_PREFIX", "unified")

_LOGGER = logging.getLogger(__name__)
_struct_log = configure_logger()


class PostprocessUDF(NumalogicUDF):
Expand Down Expand Up @@ -87,9 +87,11 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
"""
_start_time = time.perf_counter()
log = _struct_log.bind(udf_vertex=self._vtx)

# Construct payload object
payload = StreamPayload(**orjson.loads(datum.value))
json_payload = orjson.loads(datum.value)
payload = StreamPayload(**json_payload)
_metric_label_values = (
payload.composite_keys,
self._vtx,
Expand All @@ -109,6 +111,8 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
thresh_cfg = _conf.numalogic_conf.threshold
postprocess_cfg = _conf.numalogic_conf.postprocess

log = log_data_payload_values(log, json_payload)

# load artifact
thresh_artifact, payload = _load_artifact(
skeys=[_ckey for _, _ckey in zip(_stream_conf.composite_keys, payload.composite_keys)],
Expand All @@ -131,7 +135,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
return msgs

if payload.header == Header.STATIC_INFERENCE:
_LOGGER.warning("Static inference not supported in postprocess yet")
log.warning("Static inference not supported in postprocess yet")

# Postprocess payload
try:
Expand All @@ -153,11 +157,11 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:

except RuntimeError:
_increment_counter(RUNTIME_ERROR_COUNTER, _metric_label_values)
_LOGGER.exception(
"%s - Runtime postprocess error! Keys: %s, Metric: %s",
payload.uuid,
payload.composite_keys,
payload.metrics,
log.exception(
"Runtime postprocess error!",
uuid=payload.uuid,
composite_keys=payload.composite_keys,
payload_metrics=payload.metrics,
)
# Send training request if postprocess fails
msgs = Messages(get_trainer_message(keys, _stream_conf, payload))
Expand Down Expand Up @@ -189,26 +193,23 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
data=out_data,
metadata=payload.metadata,
)
_LOGGER.info(
"%s - Successfully post-processed, Keys: %s, Score: %s, "
"Model Score: %s, Static Score: %s, Feature Scores: %s",
out_payload.uuid,
out_payload.composite_keys,
out_payload.unified_anomaly,
a_unified,
y_unified,
a_features.tolist(),
)

_increment_counter(
MSG_PROCESSED_COUNTER,
labels=_metric_label_values,
)
_LOGGER.debug(
"%s - Time taken in postprocess: %.4f sec",
payload.uuid,
time.perf_counter() - _start_time,

log.info(
"Successfully post-processed!",
composite_keys=out_payload.composite_keys,
unified_anomaly=out_payload.unified_anomaly,
a_unified=a_unified,
y_features=y_features,
y_unified=y_unified,
a_features=a_features.tolist(),
execution_time_secs=round(time.perf_counter() - _start_time, 4),
)

return Messages(Message(keys=keys, value=out_payload.to_json(), tags=["output"]))

def _adjust_score(
Expand Down Expand Up @@ -245,7 +246,6 @@ def _adjust_score(

# Compute adjusted unified score
a_adjusted = self.compute_adjusted_score(a_unified, y_unified)
_LOGGER.debug("y_unified: %s, y_features: %s", y_unified, y_features)
return a_adjusted, y_unified, y_features
return a_unified, None, None

Expand Down Expand Up @@ -286,10 +286,10 @@ def _additional_scores(
@staticmethod
def _per_feature_score(feat_names: list[str], scores: NDArray[float]) -> dict[str, float]:
if (scores_len := len(scores)) != len(feat_names):
_LOGGER.debug(
"Scores length: %s does not match feat_names: %s",
scores_len,
feat_names,
_struct_log.debug(
"Scores length does not match feat_names",
scores_len=scores_len,
feat_names=feat_names,
)
return {}
return dict(zip(feat_names, scores))
Expand Down Expand Up @@ -322,7 +322,7 @@ def compute(
RuntimeError: If threshold model or postproc function fails
"""
if score_conf is None:
_LOGGER.warning("Score config not provided, using default values")
_struct_log.warning("Score config not provided, using default values")
score_conf = ScoreConf()

scores = cls.compute_threshold(model, input_) # (seqlen x nfeat)
Expand Down
Loading

0 comments on commit c3991bb

Please sign in to comment.