Skip to content

Commit

Permalink
feat: Integrate mlflow in niceml (#79)
Browse files Browse the repository at this point in the history
## 📥 Pull Request Description

Integrated mlflow in the train and eval job.

## 👀 Affected Areas

Affects code and tests.

## 📝 Checklist

Please make sure you've completed the following tasks before submitting
this pull request:

- [x] Pre-commit hooks were executed
- [X] Changes have been reviewed by at least one other developer
- [ ] Tests have been added or updated to cover the changes (only
necessary if the changes affect the executable code)
- [x] All tests ran successfully
- [X] All merge conflicts are resolved
- [ ] Documentation has been updated to reflect the changes
- [ ] Any necessary migrations have been run

---------

Co-authored-by: Nils <[email protected]>
  • Loading branch information
dstalzjohn and aiakide authored Sep 21, 2023
1 parent de9605e commit 23aa7dc
Show file tree
Hide file tree
Showing 24 changed files with 814 additions and 293 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -296,3 +296,4 @@ exp_cache
cache
image_cache
/tmp/
/mlflow-logs/
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ defaults:
- ops/[email protected]: exptests_default.yaml
# experiment locations
- shared/locations@globals: exp_locations.yaml
# ressources
- resources/[email protected]: res_mlflow_base.yaml
- _self_

hydra:
Expand Down
2 changes: 2 additions & 0 deletions configs/jobs/job_eval/job_eval_reg/job_eval_reg_number.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ defaults:
- ops/[email protected]: exptests_default.yaml
# experiment locations
- shared/locations@globals: exp_locations.yaml
# ressources
- resources/[email protected]: res_mlflow_base.yaml
- _self_

hydra:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ defaults:
- /ops/[email protected]: exptests_default.yaml
# experiment locations
- shared/locations@globals: exp_locations.yaml
# ressources
- resources/[email protected]: res_mlflow_base.yaml
- _self_

hydra:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ defaults:
- /ops/[email protected]: exptests_default.yaml
# experiment locations
- shared/locations@globals: exp_locations.yaml
# ressources
- resources/[email protected]: res_mlflow_base.yaml
- _self_

hydra:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ defaults:
- /ops/[email protected]: exptests_default.yaml
# experiment locations
- shared/locations@globals: exp_locations.yaml
# ressources
- resources/[email protected]: res_mlflow_base.yaml
- _self_

hydra:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ defaults:
# experiment tests
- /ops/[email protected]: exptests_default.yaml
- shared/locations@globals: exp_locations.yaml
# ressources
- resources/[email protected]: res_mlflow_base.yaml
- _self_

hydra:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ defaults:
- /ops/[email protected]: exptests_default.yaml
# experiment locations
- shared/locations@globals: exp_locations.yaml
# ressources
- resources/[email protected]: res_mlflow_base.yaml
- _self_

hydra:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ defaults:
- /ops/[email protected]: exptests_default.yaml
# experiment locations
- shared/locations@globals: exp_locations.yaml
# ressources
- resources/[email protected]: res_mlflow_base.yaml
- _self_

hydra:
Expand Down
2 changes: 2 additions & 0 deletions configs/resources/mlflow/res_mlflow_base.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
mlflow_tracking_uri: ${oc.env:MLFLOW_TRACKING_URI,mlflow-logs}
experiment_name: ${globals.exp_name}
3 changes: 3 additions & 0 deletions niceml/config/defaultremoveconfigkeys.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"""Contains the default keys to remove from the config file when saving."""

DEFAULT_REMOVE_CONFIG_KEYS = ["credentials"]
28 changes: 26 additions & 2 deletions niceml/config/writeopconfig.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,36 @@
"""module for writing config files"""
from os.path import join
from typing import List

import mlflow

from niceml.experiments.experimentcontext import ExperimentContext
from niceml.experiments.expfilenames import ExperimentFilenames


def write_op_config(op_conf: dict, exp_context: ExperimentContext, op_name: str):
def write_op_config(
op_conf: dict,
exp_context: ExperimentContext,
op_name: str,
remove_key_list: List[str],
):
"""Writes a dict as yamls. With one file per key"""
for key, values in op_conf.items():
removed_values = remove_key_recursive(values, remove_key_list)
outfile = join(ExperimentFilenames.CONFIGS_FOLDER, op_name, f"{key}.yaml")
exp_context.write_yaml(values, outfile)
mlflow.log_dict(removed_values, outfile)
exp_context.write_yaml(removed_values, outfile)


def remove_key_recursive(data, key_list: List[str]):
"""Removes keys from a dict recursively. This is used for credentials."""
if isinstance(data, list): # handle lists
return [remove_key_recursive(item, key_list) for item in data]
elif isinstance(data, dict): # handle dictionaries
return {
k: remove_key_recursive(v, key_list)
for k, v in data.items()
if k not in key_list
}
else:
return data
8 changes: 6 additions & 2 deletions niceml/dagster/jobs/jobs.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
"""Module containing all dagster jobs"""
from dagster_mlflow import mlflow_tracking, end_mlflow_on_run_finished

from niceml.config.hydra import hydra_conf_mapping_factory
from niceml.dagster.ops.analysis import analysis
from niceml.dagster.ops.copyexp import copy_exp
Expand Down Expand Up @@ -33,7 +35,8 @@ def job_data_generation():
df_normalization(current_data_location)


@job(config=hydra_conf_mapping_factory())
@end_mlflow_on_run_finished
@job(config=hydra_conf_mapping_factory(), resource_defs={"mlflow": mlflow_tracking})
def job_train():
"""Job for training an experiment"""
filelock_dict = acquire_locks() # pylint: disable=no-value-for-parameter
Expand All @@ -51,7 +54,8 @@ def job_train():
exptests(exp_context) # pylint: disable=no-value-for-parameter


@job(config=hydra_conf_mapping_factory())
@end_mlflow_on_run_finished
@job(config=hydra_conf_mapping_factory(), resource_defs={"mlflow": mlflow_tracking})
def job_eval():
"""Job for evaluating experiment"""
filelock_dict = acquire_locks() # pylint: disable=no-value-for-parameter
Expand Down
17 changes: 14 additions & 3 deletions niceml/dagster/ops/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from hydra.utils import ConvertMode, instantiate

from niceml.config.defaultremoveconfigkeys import DEFAULT_REMOVE_CONFIG_KEYS
from niceml.config.hydra import HydraInitField, instantiate_from_yaml
from niceml.config.writeopconfig import write_op_config
from niceml.data.datadescriptions.datadescription import DataDescription
Expand All @@ -13,15 +14,23 @@
from niceml.experiments.expfilenames import ExperimentFilenames, OpNames
from niceml.mlcomponents.resultanalyzers.analyzer import ResultAnalyzer
from niceml.utilities.fsspec.locationutils import open_location
from dagster import OpExecutionContext, op, Out
from dagster import OpExecutionContext, op, Out, Field

from niceml.utilities.readwritelock import FileLock


# pylint: disable=use-dict-literal
@op(
config_schema=dict(result_analyzer=HydraInitField(ResultAnalyzer)),
config_schema=dict(
result_analyzer=HydraInitField(ResultAnalyzer),
remove_key_list=Field(
list,
default_value=DEFAULT_REMOVE_CONFIG_KEYS,
description="These key are removed from any config recursively before it is saved.",
),
),
out={"expcontext": Out(), "filelock_dict": Out()},
required_resource_keys={"mlflow"},
)
def analysis(
context: OpExecutionContext,
Expand All @@ -30,7 +39,9 @@ def analysis(
) -> Tuple[ExperimentContext, Dict[str, FileLock]]:
"""This dagster op analysis the previous predictions applied by the model"""
op_config = json.loads(json.dumps(context.op_config))
write_op_config(op_config, exp_context, OpNames.OP_ANALYSIS.value)
write_op_config(
op_config, exp_context, OpNames.OP_ANALYSIS.value, op_config["remove_key_list"]
)
instantiated_op_config = instantiate(op_config, _convert_=ConvertMode.ALL)
data_description: DataDescription = (
exp_context.instantiate_datadescription_from_yaml()
Expand Down
9 changes: 5 additions & 4 deletions niceml/dagster/ops/experiment.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
"""Module for experiment op"""
import json

from niceml.config.writeopconfig import write_op_config
import mlflow

from niceml.experiments.experimentcontext import ExperimentContext
from niceml.experiments.expfilenames import OpNames
from niceml.utilities.factoryutils import subs_path_and_create_folder
from niceml.utilities.fsspec.locationutils import join_location_w_path
from niceml.utilities.idutils import generate_short_id
Expand All @@ -25,7 +25,8 @@
description="Folder pattern of the experiment. "
"Can use $RUN_ID and $SHORT_ID to make the name unique",
),
)
),
required_resource_keys={"mlflow"},
)
def experiment(context: OpExecutionContext) -> ExperimentContext:
"""This Op creates the experiment params"""
Expand All @@ -39,7 +40,7 @@ def experiment(context: OpExecutionContext) -> ExperimentContext:
run_id=local_run_id,
short_id=local_short_id,
)
write_op_config(op_config, exp_context, OpNames.OP_EXPERIMENT.value)
mlflow.set_tags(dict(run_id=local_run_id, short_id=local_short_id))

return exp_context

Expand Down
19 changes: 16 additions & 3 deletions niceml/dagster/ops/exptests.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,36 @@

from hydra.utils import ConvertMode, instantiate

from niceml.config.defaultremoveconfigkeys import DEFAULT_REMOVE_CONFIG_KEYS
from niceml.config.hydra import HydraInitField
from niceml.config.writeopconfig import write_op_config
from niceml.experiments.experimentcontext import ExperimentContext
from niceml.experiments.experimenttests.testinitializer import ExpTestProcess
from niceml.experiments.expfilenames import OpNames
from niceml.utilities.fsspec.locationutils import open_location
from dagster import OpExecutionContext, op
from dagster import OpExecutionContext, op, Field


# pylint: disable=use-dict-literal
@op(config_schema=dict(tests=HydraInitField(ExpTestProcess)))
@op(
config_schema=dict(
tests=HydraInitField(ExpTestProcess),
remove_key_list=Field(
list,
default_value=DEFAULT_REMOVE_CONFIG_KEYS,
description="These key are removed from any config recursively before it is saved.",
),
),
required_resource_keys={"mlflow"},
)
def exptests(
context: OpExecutionContext, exp_context: ExperimentContext
) -> ExperimentContext:
"""op to run the experiment tests"""
op_config = json.loads(json.dumps(context.op_config))
write_op_config(op_config, exp_context, OpNames.OP_EXPTESTS.value)
write_op_config(
op_config, exp_context, OpNames.OP_EXPTESTS.value, op_config["remove_key_list"]
)
instantiated_op_config = instantiate(op_config, _convert_=ConvertMode.ALL)
exp_test_process: ExpTestProcess = instantiated_op_config["tests"]
with open_location(exp_context.fs_config) as (file_system, root_path):
Expand Down
16 changes: 14 additions & 2 deletions niceml/dagster/ops/prediction.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import tqdm
from hydra.utils import ConvertMode, instantiate

from niceml.config.defaultremoveconfigkeys import DEFAULT_REMOVE_CONFIG_KEYS
from niceml.config.hydra import HydraInitField, HydraMapField, instantiate_from_yaml
from niceml.config.writeopconfig import write_op_config
from niceml.data.datadescriptions.datadescription import DataDescription
Expand Down Expand Up @@ -40,8 +41,14 @@
),
model_loader=HydraInitField(ModelLoader),
prediction_function=HydraInitField(PredictionFunction),
remove_key_list=Field(
list,
default_value=DEFAULT_REMOVE_CONFIG_KEYS,
description="These key are removed from any config recursively before it is saved.",
),
),
out={"expcontext": Out(), "filelock_dict": Out()},
required_resource_keys={"mlflow"},
)
def prediction(
context: OpExecutionContext,
Expand All @@ -50,7 +57,12 @@ def prediction(
) -> Tuple[ExperimentContext, Dict[str, FileLock]]:
"""Dagster op to predict the stored model with the given datasets"""
op_config = json.loads(json.dumps(context.op_config))
write_op_config(op_config, exp_context, OpNames.OP_PREDICTION.value)
write_op_config(
op_config,
exp_context,
OpNames.OP_PREDICTION.value,
op_config["remove_key_list"],
)
instantiated_op_config = instantiate(op_config, _convert_=ConvertMode.ALL)
data_description: DataDescription = (
exp_context.instantiate_datadescription_from_yaml()
Expand Down Expand Up @@ -95,7 +107,7 @@ def prediction(
return exp_context, filelock_dict


def predict_dataset( # pylint: disable=too-many-arguments
def predict_dataset( # noqa: PLR0913
data_description: DataDescription,
model,
prediction_handler: PredictionHandler,
Expand Down
18 changes: 15 additions & 3 deletions niceml/dagster/ops/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from hydra.utils import ConvertMode, instantiate

from niceml.config.defaultremoveconfigkeys import DEFAULT_REMOVE_CONFIG_KEYS
from niceml.config.hydra import HydraInitField
from niceml.config.trainparams import TrainParams
from niceml.config.writeopconfig import write_op_config
Expand All @@ -20,7 +21,7 @@
ModelCustomLoadObjects,
)
from niceml.mlcomponents.models.modelfactory import ModelFactory
from dagster import OpExecutionContext, op, Out
from dagster import OpExecutionContext, op, Out, Field

from niceml.utilities.readwritelock import FileLock

Expand All @@ -34,18 +35,29 @@
callbacks=HydraInitField(CallbackInitializer),
learner=HydraInitField(Learner),
exp_initializer=HydraInitField(ExpOutInitializer),
remove_key_list=Field(
list,
default_value=DEFAULT_REMOVE_CONFIG_KEYS,
description="These key are removed from any config recursively before it is saved.",
),
)


@op(config_schema=train_config, out={"expcontext": Out(), "filelock_dict": Out()})
@op(
config_schema=train_config,
out={"expcontext": Out(), "filelock_dict": Out()},
required_resource_keys={"mlflow"},
)
def train(
context: OpExecutionContext,
exp_context: ExperimentContext,
filelock_dict: Dict[str, FileLock],
) -> Tuple[ExperimentContext, Dict[str, FileLock]]:
"""DagsterOp that trains the model"""
op_config = json.loads(json.dumps(context.op_config))
write_op_config(op_config, exp_context, OpNames.OP_TRAIN.value)
write_op_config(
op_config, exp_context, OpNames.OP_TRAIN.value, op_config["remove_key_list"]
)
instantiated_op_config = instantiate(op_config, _convert_=ConvertMode.ALL)

data_train = instantiated_op_config["data_train"]
Expand Down
3 changes: 3 additions & 0 deletions niceml/dlframeworks/tensorflow/learners/defaultlearner.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Module for default learner"""
import mlflow.keras
import tensorflow as tf

# pylint: disable=import-error, no-name-in-module
Expand Down Expand Up @@ -36,6 +37,8 @@ def run_training(
custom_load_objects: ModelCustomLoadObjects,
callbacks: list,
):
"""runs the training"""
mlflow.keras.autolog()
model_bundle: ModelBundle = self.model_compiler.compile(
model_factory, data_description
)
Expand Down
Loading

0 comments on commit 23aa7dc

Please sign in to comment.