Skip to content

Commit

Permalink
[Tune] TrialRunner checkpointing shouldn't fail if ray.data.Dataset
Browse files Browse the repository at this point in the history
… w/o lineage captured in trial config (ray-project#33565)

* Fix dataset serialization if no lineage

Signed-off-by: Justin Yu <[email protected]>

* add a unit test

Signed-off-by: Justin Yu <[email protected]>

* Add data dependencies for the test

Signed-off-by: Justin Yu <[email protected]>

---------

Signed-off-by: Justin Yu <[email protected]>
  • Loading branch information
justinvyu committed Mar 23, 2023
1 parent 24a89a6 commit d4ee925
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 2 deletions.
2 changes: 1 addition & 1 deletion .buildkite/pipeline.ml.yml
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@
instance_size: medium
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT
- TUNE_TESTING=1 ./ci/env/install-dependencies.sh
- TUNE_TESTING=1 DATA_PROCESSING_TESTING=1 ./ci/env/install-dependencies.sh
- ./ci/env/env_info.sh
- bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only
--test_tag_filters=medium_instance,-py37,-soft_imports,-gpu_only,-rllib,-multinode
Expand Down
5 changes: 5 additions & 0 deletions python/ray/tune/execution/experiment_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,11 @@ def checkpoint(
# Checkpoint
checkpoint_time_start = time.monotonic()

# NOTE: This context manager is for Ray Datasets captured in a trial config.
# This is the case when *tuning over datasets*.
# If the datasets have already been full executed, then serializing
# block refs means that this checkpoint is not usable in a new Ray cluster.
# This context will serialize the dataset execution plan instead, if available.
with out_of_band_serialize_dataset():
save_fn()

Expand Down
2 changes: 1 addition & 1 deletion python/ray/tune/impl/out_of_band_serialize_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def _reduce(ds: ray.data.Dataset):
if "serialize_lineage" in tb:
_already_in_out_of_band_serialization = True
break
if not _already_in_out_of_band_serialization:
if not _already_in_out_of_band_serialization and ds.has_serializable_lineage():
return _deserialize_and_fully_execute_if_needed, (ds.serialize_lineage(),)
else:
return ds.__reduce__()
Expand Down
70 changes: 70 additions & 0 deletions python/ray/tune/tests/test_trial_runner_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from collections import Counter
import logging
import os
import pandas as pd
import pickle
import shutil
import sys
Expand All @@ -14,6 +15,7 @@
import ray
from ray.air import CheckpointConfig
from ray.air.execution import PlacementGroupResourceManager, FixedResourceManager
from ray.exceptions import OwnerDiedError
from ray.rllib import _register_all
from ray.rllib.algorithms.callbacks import DefaultCallbacks

Expand Down Expand Up @@ -1155,6 +1157,74 @@ def testPeriodicCloudCheckpointSyncTimeout(self):
assert any("did not finish running within the timeout" in x for x in buffer)
assert syncer.sync_up_counter == 2

def testExperimentCheckpointWithDatasets(self):
"""Test trial runner checkpointing where trials contain Ray Datasets.
When possible, a dataset plan should be saved (for read_* APIs).
See `Dataset.serialize_lineage` for more information.
If a dataset cannot be serialized, an experiment checkpoint
should still be created. Users can pass in the dataset again by
re-specifying the `param_space`.
"""
ray.init(num_cpus=2)
# Save some test data to load
data_filepath = os.path.join(self.tmpdir, "test.csv")
pd.DataFrame({"x": list(range(10))}).to_csv(data_filepath)

def create_trial_config():
return {
"datasets": {
"with_lineage": ray.data.read_csv(data_filepath),
"no_lineage": ray.data.from_items([{"x": i} for i in range(10)]),
}
}

resolvers = create_resolvers_map()
config_with_placeholders = inject_placeholders(create_trial_config(), resolvers)
trial = Trial(
"__fake",
experiment_path=self.tmpdir,
config=config_with_placeholders,
)
trial.init_local_path()
runner = TrialRunner(
experiment_path=self.tmpdir, placeholder_resolvers=resolvers
)
runner.add_trial(trial)
# Req: TrialRunner checkpointing shouldn't error
runner.checkpoint(force=True)

# Manually clear all block refs that may have been created
ray.shutdown()
ray.init(num_cpus=2)

new_runner = TrialRunner(experiment_path=self.tmpdir)
new_runner.resume()
[loaded_trial] = new_runner.get_trials()
loaded_datasets = loaded_trial.config["datasets"]

# Req: The deserialized dataset (w/ lineage) should be usable.
assert [el["x"] for el in loaded_datasets["with_lineage"].take()] == list(
range(10)
)
# Req: The deserialized dataset (w/o lineage) should NOT be usable.
with self.assertRaises(OwnerDiedError):
loaded_datasets["no_lineage"].take()

replaced_resolvers = create_resolvers_map()
inject_placeholders(create_trial_config(), replaced_resolvers)

respecified_config_runner = TrialRunner(
placeholder_resolvers=replaced_resolvers,
local_checkpoint_dir=self.tmpdir,
)
respecified_config_runner.resume()
[loaded_trial] = respecified_config_runner.get_trials()
ray_ds_no_lineage = loaded_trial.config["datasets"]["no_lineage"]

# Req: The dataset (w/o lineage) can be re-specified and is usable after.
assert [el["x"] for el in ray_ds_no_lineage.take()] == list(range(10))


class FixedResourceTrialRunnerTest3(TrialRunnerTest3):
def _resourceManager(self):
Expand Down

0 comments on commit d4ee925

Please sign in to comment.