Skip to content

Commit

Permalink
logging fix (#367)
Browse files Browse the repository at this point in the history
---------

Signed-off-by: Gulshan Bhatia <[email protected]>
  • Loading branch information
gulshan02 committed Apr 18, 2024
1 parent c3991bb commit 6975394
Show file tree
Hide file tree
Showing 9 changed files with 62 additions and 60 deletions.
12 changes: 6 additions & 6 deletions numalogic/udfs/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
Messages instance
"""
_start_time = time.perf_counter()
log = _struct_log.bind(udf_vertex=self._vtx)
logger = _struct_log.bind(udf_vertex=self._vtx)

# Construct payload object
json_data_payload = orjson.loads(datum.value)
Expand All @@ -121,7 +121,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
_stream_conf = self.get_stream_conf(payload.config_id)
_conf = _stream_conf.ml_pipelines[payload.pipeline_id]

log = log_data_payload_values(log, json_data_payload)
logger = log_data_payload_values(logger, json_data_payload)

artifact_data, payload = _load_artifact(
skeys=[_ckey for _, _ckey in zip(_stream_conf.composite_keys, payload.composite_keys)],
Expand All @@ -137,7 +137,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
msgs = Messages(get_trainer_message(keys, _stream_conf, payload))
if _conf.numalogic_conf.score.adjust:
msgs.append(get_static_thresh_message(keys, payload))
log.exception("Artifact model not loaded!")
logger.exception("Artifact model not loaded!")
return msgs

# Perform inference
Expand All @@ -146,7 +146,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
_update_info_metric(x_inferred, payload.metrics, _metric_label_values)
except RuntimeError:
_increment_counter(counter=RUNTIME_ERROR_COUNTER, labels=_metric_label_values)
log.exception(
logger.exception(
"Runtime inference error!",
keys=payload.composite_keys,
metrics=payload.metrics,
Expand Down Expand Up @@ -174,13 +174,13 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
)
# Send trainer message if artifact is stale
if status == Status.ARTIFACT_STALE:
log.info("Inference artifact found is stale")
logger.info("Inference artifact found is stale")
msgs.append(get_trainer_message(keys, _stream_conf, payload, *_metric_label_values))

_increment_counter(counter=MSG_PROCESSED_COUNTER, labels=_metric_label_values)
msgs.append(Message(keys=keys, value=payload.to_json(), tags=["postprocess"]))

log.info(
logger.info(
"Successfully inferred!",
keys=payload.composite_keys,
metrics=payload.metrics,
Expand Down
8 changes: 4 additions & 4 deletions numalogic/udfs/payloadtx.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,13 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
"""
_start_time = time.perf_counter()
log = _struct_log.bind(udf_vertex=self._vtx)
logger = _struct_log.bind(udf_vertex=self._vtx)

# check message sanity
try:
data_payload = orjson.loads(datum.value)
except (orjson.JSONDecodeError, KeyError): # catch json decode error only
log.exception("Error while decoding input json")
logger.exception("Error while decoding input json")
return Messages(Message.to_drop())

_stream_conf = self.get_stream_conf(data_payload["config_id"])
Expand All @@ -62,8 +62,8 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
data_payload["pipeline_id"] = pipeline
messages.append(Message(keys=keys, value=orjson.dumps(data_payload)))

log = log_data_payload_values(log, data_payload)
log.info(
logger = log_data_payload_values(logger, data_payload)
logger.info(
"Appended pipeline id to the payload",
keys=keys,
execution_time_ms=round((time.perf_counter() - _start_time) * 1000, 4),
Expand Down
10 changes: 5 additions & 5 deletions numalogic/udfs/postprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
"""
_start_time = time.perf_counter()
log = _struct_log.bind(udf_vertex=self._vtx)
logger = _struct_log.bind(udf_vertex=self._vtx)

# Construct payload object
json_payload = orjson.loads(datum.value)
Expand All @@ -111,7 +111,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
thresh_cfg = _conf.numalogic_conf.threshold
postprocess_cfg = _conf.numalogic_conf.postprocess

log = log_data_payload_values(log, json_payload)
logger = log_data_payload_values(logger, json_payload)

# load artifact
thresh_artifact, payload = _load_artifact(
Expand All @@ -135,7 +135,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
return msgs

if payload.header == Header.STATIC_INFERENCE:
log.warning("Static inference not supported in postprocess yet")
logger.warning("Static inference not supported in postprocess yet")

# Postprocess payload
try:
Expand All @@ -157,7 +157,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:

except RuntimeError:
_increment_counter(RUNTIME_ERROR_COUNTER, _metric_label_values)
log.exception(
logger.exception(
"Runtime postprocess error!",
uuid=payload.uuid,
composite_keys=payload.composite_keys,
Expand Down Expand Up @@ -199,7 +199,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
labels=_metric_label_values,
)

log.info(
logger.info(
"Successfully post-processed!",
composite_keys=out_payload.composite_keys,
unified_anomaly=out_payload.unified_anomaly,
Expand Down
20 changes: 9 additions & 11 deletions numalogic/udfs/preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,20 +101,20 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
"""
_start_time = time.perf_counter()
log = _struct_log.bind(udf_vertex=self._vtx)
logger = _struct_log.bind(udf_vertex=self._vtx)

# check message sanity
try:
data_payload = orjson.loads(datum.value)
except (orjson.JSONDecodeError, KeyError): # catch json decode error only
log.exception("Error while decoding input json")
logger.exception("Error while decoding input json")
return Messages(Message.to_drop())

_stream_conf = self.get_stream_conf(data_payload["config_id"])
_conf = _stream_conf.ml_pipelines[data_payload.get("pipeline_id", "default")]
raw_df, timestamps = get_df(data_payload=data_payload, stream_conf=_stream_conf)

log = log_data_payload_values(log, data_payload)
logger = log_data_payload_values(logger, data_payload)

source = NUMALOGIC_METRICS
if (
Expand All @@ -134,9 +134,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
_increment_counter(counter=MSG_IN_COUNTER, labels=_metric_label_values)
# Drop message if dataframe shape conditions are not met
if raw_df.shape[0] < _stream_conf.window_size or raw_df.shape[1] != len(_conf.metrics):
log.critical(
"Dataframe shape: (%f, %f) conditions not met ", raw_df.shape[0], raw_df.shape[1]
)
logger.critical("Dataframe shape conditions not met ", raw_df_shape=raw_df.shape)
_increment_counter(
counter=DATASHAPE_ERROR_COUNTER,
labels=_metric_label_values,
Expand Down Expand Up @@ -168,20 +166,20 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
if preproc_artifact:
preproc_clf = preproc_artifact.artifact
payload = replace(payload, status=Status.ARTIFACT_FOUND)
log = log.bind(artifact_source=preproc_artifact.extras.get("source"))
logger = logger.bind(artifact_source=preproc_artifact.extras.get("source"))
else:
msgs = Messages(get_trainer_message(keys, _stream_conf, payload))
if _conf.numalogic_conf.score.adjust:
msgs.append(get_static_thresh_message(keys, payload))
log.exception("Artifact model not loaded!")
logger.exception("Artifact model not loaded!")
return msgs
# Model will not be in registry
else:
# Load configuration for the config_id
_increment_counter(SOURCE_COUNTER, labels=("config", *_metric_label_values))
preproc_clf = self._load_model_from_config(_conf.numalogic_conf.preprocess)
payload = replace(payload, status=Status.ARTIFACT_FOUND)
log = log.bind(model_from_config=preproc_clf)
logger = logger.bind(model_from_config=preproc_clf)
try:
x_scaled = self.compute(model=preproc_clf, input_=payload.get_data())

Expand All @@ -197,7 +195,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
status=Status.ARTIFACT_FOUND,
header=Header.MODEL_INFERENCE,
)
log.info(
logger.info(
"Successfully preprocessed!",
keys=keys,
payload_metrics=payload.metrics,
Expand All @@ -209,7 +207,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
counter=RUNTIME_ERROR_COUNTER,
labels=_metric_label_values,
)
log.exception(
logger.exception(
"Runtime preprocess error!",
status=Status.RUNTIME_ERROR,
payload_metrics=payload.metrics,
Expand Down
10 changes: 5 additions & 5 deletions numalogic/udfs/staticthresh.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
conf = self.get_ml_pipeline_conf(payload.config_id, payload.pipeline_id)
adjust_conf = conf.numalogic_conf.score.adjust

log = _struct_log.bind(udf_vertex=self._vtx)
log = log_data_payload_values(log, json_data_payload)
logger = _struct_log.bind(udf_vertex=self._vtx)
logger = log_data_payload_values(logger, json_data_payload)

if not adjust_conf:
log.warning("No score adjust config found")
logger.warning("No score adjust config found")
return Messages(Message.to_drop())

try:
Expand All @@ -60,7 +60,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
)
y_unified = self.compute_unified_score(y_features, adjust_conf.feature_agg)
except RuntimeError:
log.exception("Error occurred while computing static anomaly scores")
logger.exception("Error occurred while computing static anomaly scores")
return Messages(Message.to_drop())

out_payload = OutputPayload(
Expand All @@ -73,7 +73,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
data=self._additional_scores(adjust_conf, y_features, y_unified),
metadata=payload.metadata,
)
log.info(
logger.info(
"Sending output payload",
keys=out_payload.composite_keys,
y_unified=y_unified,
Expand Down
24 changes: 12 additions & 12 deletions numalogic/udfs/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ def _load_artifact(
payload.pipeline_id,
)

log = _struct_log.bind(
logger = _struct_log.bind(
uuid=payload.uuid, skeys=skeys, dkeys=dkeys, payload_metrics=payload.metrics
)

Expand All @@ -177,11 +177,11 @@ def _load_artifact(
key = ":".join(dkeys)
if key in artifact_version:
version_to_load = artifact_version[key]
log.debug("Found version info for keys")
logger.debug("Found version info for keys")
else:
log.debug("Could not find what version of model to load")
logger.debug("Could not find what version of model to load")
else:
log.debug(
logger.debug(
"No version info passed on! Loading latest artifact version, "
"if one present in the registry"
)
Expand All @@ -195,19 +195,19 @@ def _load_artifact(
)
except RedisRegistryError:
_increment_counter(REDIS_ERROR_COUNTER, labels=_metric_label_values)
log.warning("Error while fetching artifact")
logger.warning("Error while fetching artifact")
return None, payload

except Exception:
_increment_counter(EXCEPTION_COUNTER, labels=_metric_label_values)
log.exception("Unhandled exception while fetching artifact")
logger.exception("Unhandled exception while fetching artifact")
return None, payload
else:
log = log.bind(
logger = logger.bind(
artifact_source=artifact_data.extras.get("source"),
artifact_version=artifact_data.extras.get("version"),
)
log.debug("Loaded Model!")
logger.debug("Loaded Model!")
_increment_counter(
counter=SOURCE_COUNTER,
labels=(artifact_data.extras.get("source"), *_metric_label_values),
Expand Down Expand Up @@ -339,10 +339,10 @@ def ack_read(
):
_struct_log.debug(
"There was insufficient data for the key in the past. Retrying fetching"
" and training after %s secs",
uuid,
key,
((min_train_records - int(_msg_train_records)) * data_freq)
" and training after secs",
uuid=uuid,
key=key,
secs=((min_train_records - int(_msg_train_records)) * data_freq)
- _curr_time
+ float(_msg_read_ts),
)
Expand Down
26 changes: 15 additions & 11 deletions numalogic/udfs/trainer/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
Messages instance (no forwarding)
"""
_start_time = time.perf_counter()
log = _struct_log.bind(udf_vertex=self._vtx)
logger = _struct_log.bind(udf_vertex=self._vtx)

# Construct payload object
json_payload = orjson.loads(datum.value)
Expand All @@ -171,7 +171,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
labels=[self._vtx, *_metric_label_values],
)

log = log_data_payload_values(log, json_payload)
logger = log_data_payload_values(logger, json_payload)

# set the retry and retrain_freq
retrain_freq_ts = _conf.numalogic_conf.trainer.retrain_freq_hr
Expand Down Expand Up @@ -200,7 +200,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
counter=MSG_DROPPED_COUNTER,
labels=(self._vtx, *_metric_label_values),
)
log.warning(
logger.warning(
"Caught exception/error while fetching from source",
uuid=payload.uuid,
keys=payload.composite_keys,
Expand All @@ -210,7 +210,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:

# Check if data is sufficient
if not self._is_data_sufficient(payload, df):
log.warning(
logger.warning(
"Insufficient data found",
uuid=payload.uuid,
keys=payload.composite_keys,
Expand All @@ -226,7 +226,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
)
return Messages(Message.to_drop())

log.debug("Data fetched", uuid=payload.uuid, shape=df.shape)
logger.debug("Data fetched", uuid=payload.uuid, shape=df.shape)

# Construct feature array
x_train, nan_counter, inf_counter = self.get_feature_arr(df, _conf.metrics)
Expand Down Expand Up @@ -264,14 +264,14 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
model_registry=self.model_registry,
payload=payload,
vertex_name=self._vtx,
log=log,
logger=logger,
)
if self.train_msg_deduplicator.ack_train(
key=[*payload.composite_keys, payload.pipeline_id], uuid=payload.uuid
):
log.info("Model trained and saved successfully", uuid=payload.uuid)
logger.info("Model trained and saved successfully", uuid=payload.uuid)

log.debug(
logger.debug(
"Time taken in trainer", execution_time_secs=round(time.perf_counter() - _start_time, 4)
)
_increment_counter(
Expand Down Expand Up @@ -303,7 +303,7 @@ def artifacts_to_save(
model_registry,
payload: TrainerPayload,
vertex_name: str,
log,
logger,
) -> None:
"""
Save artifacts.
Expand Down Expand Up @@ -335,10 +335,14 @@ def artifacts_to_save(
counter=REDIS_ERROR_COUNTER,
labels=(vertex_name, ":".join(payload.composite_keys), payload.config_id),
)
log.exception("Error while saving artifact with skeys", uuid=payload.uuid, skeys=skeys)
logger.exception(
"Error while saving artifact with skeys", uuid=payload.uuid, skeys=skeys
)

else:
log.info("Artifact saved with with versions", uuid=payload.uuid, version_dict=ver_dict)
logger.info(
"Artifact saved with with versions", uuid=payload.uuid, version_dict=ver_dict
)

def _is_data_sufficient(self, payload: TrainerPayload, df: pd.DataFrame) -> bool:
_conf = self.get_ml_pipeline_conf(
Expand Down
Loading

0 comments on commit 6975394

Please sign in to comment.