Skip to content

Commit

Permalink
[ci][tune][py312] Remove rllib dependencies from Tune unit tests (ray…
Browse files Browse the repository at this point in the history
…-project#46905)

Many, many Tune tests currently depend on rllib default trainables
(`"PPO"`, `"__fake"`). RLlib depends on tensorflow, which is not easy to
upgrade for py312. The current policy is to skip tensorflow tests, but
the problem is that many Tune unit tests depend on RLlib + TF even
though they're testing generic, core functionality.

Therefore, the solution should be to decouple rllib from Tune tests, so
that the core Tune tests do not need to be skipped for this py312
effort.

---------

Signed-off-by: Justin Yu <[email protected]>
  • Loading branch information
justinvyu committed Aug 2, 2024
1 parent ab27fe8 commit fc359e5
Show file tree
Hide file tree
Showing 23 changed files with 328 additions and 472 deletions.
8 changes: 8 additions & 0 deletions python/ray/tune/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,14 @@ py_test(
tags = ["team:ml", "exclusive"]
)

py_test(
name = "test_controller_resume_integration",
size = "large",
srcs = ["tests/execution/test_controller_resume_integration.py"],
deps = [":tune_lib"],
tags = ["team:ml", "exclusive"]
)

py_test(
name = "test_controller_search_alg_integration",
size = "large",
Expand Down
16 changes: 11 additions & 5 deletions python/ray/tune/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,20 @@ def get_trainable_cls(trainable_name):


@DeveloperAPI
def validate_trainable(trainable_name):
if not _has_trainable(trainable_name):
def validate_trainable(trainable_name: str):
if not _has_trainable(trainable_name) and not _has_rllib_trainable(trainable_name):
raise TuneError(f"Unknown trainable: {trainable_name}")


def _has_rllib_trainable(trainable_name: str) -> bool:
try:
# Make sure everything rllib-related is registered.
from ray.rllib import _register_all
except (ImportError, ModuleNotFoundError):
return False

_register_all()
if not _has_trainable(trainable_name):
raise TuneError("Unknown trainable: " + trainable_name)
_register_all()
return _has_trainable(trainable_name)


@DeveloperAPI
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from ray.tune import Callback, ResumeConfig
from ray.tune.execution.tune_controller import TuneController
from ray.tune.experiment import Trial
from ray.tune.utils.mock_trainable import MOCK_TRAINABLE_NAME, register_mock_trainable


@pytest.fixture(scope="function")
Expand All @@ -18,6 +19,11 @@ def ray_start_4_cpus_2_gpus_extra():
ray.shutdown()


@pytest.fixture(autouse=True)
def register_test_trainable():
register_mock_trainable()


class StatefulCallback(Callback):
CKPT_FILE_TMPL = "test-callback-state-{}.json"

Expand Down Expand Up @@ -46,7 +52,7 @@ def test_callback_save_restore(
"""
storage = mock_storage_context()
runner = TuneController(callbacks=[StatefulCallback()], storage=storage)
runner.add_trial(Trial("__fake", stub=True, storage=storage))
runner.add_trial(Trial(MOCK_TRAINABLE_NAME, stub=True, storage=storage))
for i in range(3):
runner._callbacks.on_trial_result(
iteration=i, trials=None, trial=None, result=None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,16 @@
from ray.tune.schedulers import FIFOScheduler
from ray.tune.search import BasicVariantGenerator
from ray.tune.tests.tune_test_util import TrialResultObserver
from ray.tune.utils.mock_trainable import MOCK_TRAINABLE_NAME, register_mock_trainable

STORAGE = mock_storage_context()


@pytest.fixture(autouse=True)
def register_test_trainable():
register_mock_trainable()


@pytest.fixture(scope="function")
def ray_start_4_cpus_2_gpus_extra():
address_info = ray.init(num_cpus=4, num_gpus=2, resources={"a": 2})
Expand Down Expand Up @@ -83,17 +89,14 @@ def test_checkpoint_save_restore(
"checkpoint_config": CheckpointConfig(checkpoint_frequency=1),
"storage": STORAGE,
}
runner.add_trial(Trial("__fake", **kwargs))
runner.add_trial(Trial(MOCK_TRAINABLE_NAME, **kwargs))
trials = runner.get_trials()

runner.step() # Start trial

while trials[0].status != Trial.RUNNING:
runner.step()

# Set some state that will be saved in the checkpoint
assert ray.get(trials[0].temporary_state.ray_actor.set_info.remote(1)) == 1

while trials[0].status != Trial.TERMINATED:
runner.step()

Expand All @@ -103,7 +106,7 @@ def test_checkpoint_save_restore(

# Prepare new trial
kwargs["restore_path"] = trials[0].checkpoint.path
new_trial = Trial("__fake", **kwargs)
new_trial = Trial(MOCK_TRAINABLE_NAME, **kwargs)
runner.add_trial(new_trial)
trials = runner.get_trials()

Expand All @@ -116,8 +119,6 @@ def test_checkpoint_save_restore(
# Restore
runner.step()

assert ray.get(trials[1].temporary_state.ray_actor.get_info.remote()) == 1

# Run to termination
while trials[1].status != Trial.TERMINATED:
runner.step()
Expand Down Expand Up @@ -146,7 +147,7 @@ def test_checkpoint_at_end(ray_start_4_cpus_2_gpus_extra, resource_manager_cls,
"placement_group_factory": PlacementGroupFactory([{"CPU": 1, "GPU": 1}]),
"storage": STORAGE,
}
runner.add_trial(Trial("__fake", **kwargs))
runner.add_trial(Trial(MOCK_TRAINABLE_NAME, **kwargs))
trials = runner.get_trials()

while not runner.is_finished():
Expand Down Expand Up @@ -176,15 +177,12 @@ def test_pause_resume_trial(
"checkpoint_config": CheckpointConfig(checkpoint_frequency=1),
"storage": STORAGE,
}
runner.add_trial(Trial("__fake", **kwargs))
runner.add_trial(Trial(MOCK_TRAINABLE_NAME, **kwargs))
trials = runner.get_trials()

while trials[0].status != Trial.RUNNING:
runner.step()

assert ray.get(trials[0].temporary_state.ray_actor.get_info.remote()) is None
assert ray.get(trials[0].temporary_state.ray_actor.set_info.remote(1)) == 1

runner._schedule_trial_pause(trials[0], should_checkpoint=True)

while trials[0].status != Trial.PAUSED:
Expand All @@ -199,8 +197,6 @@ def test_pause_resume_trial(
while trials[0].status != Trial.RUNNING:
runner.step()

assert ray.get(trials[0].temporary_state.ray_actor.get_info.remote()) == 1

while trials[0].status != Trial.TERMINATED:
runner.step()

Expand All @@ -223,7 +219,9 @@ def test_checkpoint_num_to_keep(
Legacy test: test_trial_runner_2.py::TrialRunnerTest::testPauseResumeCheckpointCount
"""
trial = Trial(
"__fake", checkpoint_config=CheckpointConfig(num_to_keep=2), storage=STORAGE
MOCK_TRAINABLE_NAME,
checkpoint_config=CheckpointConfig(num_to_keep=2),
storage=STORAGE,
)
trial.init_local_path()

Expand Down Expand Up @@ -319,7 +317,7 @@ def test_checkpoint_freq_buffered(
{"TUNE_RESULT_BUFFER_LENGTH": "7", "TUNE_RESULT_BUFFER_MIN_TIME_S": "1"},
):
trial = Trial(
"__fake",
MOCK_TRAINABLE_NAME,
checkpoint_config=CheckpointConfig(checkpoint_frequency=3),
storage=STORAGE,
)
Expand Down Expand Up @@ -364,7 +362,7 @@ def test_checkpoint_at_end_not_buffered(
{"TUNE_RESULT_BUFFER_LENGTH": "7", "TUNE_RESULT_BUFFER_MIN_TIME_S": "0.5"},
):
trial = Trial(
"__fake",
MOCK_TRAINABLE_NAME,
checkpoint_config=CheckpointConfig(
checkpoint_at_end=True,
),
Expand Down Expand Up @@ -409,118 +407,6 @@ def test_checkpoint_at_end_not_buffered(
assert num_checkpoints(trial) == 1


@pytest.mark.parametrize(
"resource_manager_cls", [FixedResourceManager, PlacementGroupResourceManager]
)
def test_checkpoint_user_checkpoint(
ray_start_4_cpus_2_gpus_extra, resource_manager_cls, tmp_path
):
"""Test that user checkpoint freq is respected.
Legacy test: test_trial_runner_3.py::TrialRunnerTest::testUserCheckpoint
"""
with mock.patch.dict(
os.environ,
{"TUNE_RESULT_BUFFER_LENGTH": "1", "TUNE_MAX_PENDING_TRIALS_PG": "1"},
):
runner = TuneController(
resource_manager_factory=lambda: resource_manager_cls(), storage=STORAGE
)
runner.add_trial(
Trial("__fake", config={"user_checkpoint_freq": 2}, storage=STORAGE)
)
trials = runner.get_trials()

while not trials[0].status == Trial.RUNNING:
runner.step()
assert ray.get(trials[0].temporary_state.ray_actor.set_info.remote(1)) == 1

while trials[0].last_result.get(TRAINING_ITERATION, 0) < 1:
runner.step() # Process result
assert not trials[0].has_checkpoint()
while trials[0].last_result.get(TRAINING_ITERATION, 99) < 2:
runner.step() # Process result
assert not trials[0].has_checkpoint()

while trials[0].last_result.get(TRAINING_ITERATION, 99) < 3:
runner.step() # Process result
runner.step()

assert trials[0].has_checkpoint()
runner.checkpoint(force=True, wait=True)

runner2 = TuneController(
resource_manager_factory=lambda: resource_manager_cls(),
storage=STORAGE,
resume_config=ResumeConfig(),
)
trials2 = runner2.get_trials()
while not trials2[0].status == Trial.RUNNING:
runner2.step()
assert ray.get(trials2[0].temporary_state.ray_actor.get_info.remote()) == 1


@pytest.mark.parametrize(
"resource_manager_cls", [FixedResourceManager, PlacementGroupResourceManager]
)
def test_checkpoint_user_checkpoint_buffered(
ray_start_4_cpus_2_gpus_extra, resource_manager_cls, tmp_path
):
"""Test that user checkpoint freq is respected with buffered training.
Legacy test: test_trial_runner_3.py::TrialRunnerTest::testUserCheckpointBuffered
"""

with mock.patch.dict(
os.environ,
{"TUNE_RESULT_BUFFER_LENGTH": "8", "TUNE_RESULT_BUFFER_MIN_TIME_S": "1"},
):
runner = TuneController(
resource_manager_factory=lambda: resource_manager_cls(),
storage=STORAGE,
checkpoint_period=0,
)
runner.add_trial(
Trial("__fake", config={"user_checkpoint_freq": 10}, storage=STORAGE)
)
trials = runner.get_trials()

while trials[0].status != Trial.RUNNING:
runner.step()
assert ray.get(trials[0].temporary_state.ray_actor.set_info.remote(1)) == 1
assert num_checkpoints(trials[0]) == 0

while trials[0].last_result.get(TRAINING_ITERATION, 0) < 8:
runner.step()

assert not trials[0].has_checkpoint()
assert num_checkpoints(trials[0]) == 0

while trials[0].last_result.get(TRAINING_ITERATION) < 11:
runner.step()
runner.step()
assert trials[0].has_checkpoint()
assert num_checkpoints(trials[0]) == 1

while trials[0].last_result.get(TRAINING_ITERATION) < 19:
runner.step()
runner.step()
assert trials[0].has_checkpoint()
assert num_checkpoints(trials[0]) == 1

while trials[0].last_result.get(TRAINING_ITERATION) < 21:
runner.step()
runner.step()
assert trials[0].has_checkpoint()
assert num_checkpoints(trials[0]) == 2

while trials[0].last_result.get(TRAINING_ITERATION) < 29:
runner.step()
runner.step()
assert trials[0].has_checkpoint()
assert num_checkpoints(trials[0]) == 2


@pytest.mark.parametrize(
"resource_manager_cls", [FixedResourceManager, PlacementGroupResourceManager]
)
Expand All @@ -544,11 +430,7 @@ def test_checkpoint_auto_period(

with mock.patch.object(runner, "save_to_dir") as save_to_dir:
save_to_dir.side_effect = lambda *a, **kw: time.sleep(2)

runner.add_trial(
Trial("__fake", config={"user_checkpoint_freq": 1}, storage=storage)
)

runner.add_trial(Trial(MOCK_TRAINABLE_NAME, storage=storage))
runner.step() # Run one step, this will trigger checkpointing

assert runner._checkpoint_manager._checkpoint_period > 38.0
Expand Down Expand Up @@ -585,7 +467,7 @@ def get_json_state(self):
return "", ""

trial = CheckpointingTrial(
"__fake",
MOCK_TRAINABLE_NAME,
checkpoint_config=checkpoint_config,
stopping_criterion={"training_iteration": 10},
storage=storage,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from ray.tune import PlacementGroupFactory, register_trainable
from ray.tune.execution.tune_controller import TuneController
from ray.tune.experiment import Trial
from ray.tune.utils.mock_trainable import MOCK_TRAINABLE_NAME, register_mock_trainable

STORAGE = mock_storage_context()

Expand All @@ -28,6 +29,8 @@ def test_stop_trial(ray_start_4_cpus_2_gpus_extra, resource_manager_cls):
Legacy test: test_trial_runner_3.py::TrialRunnerTest::testStopTrial
"""

register_mock_trainable()
runner = TuneController(
resource_manager_factory=lambda: resource_manager_cls(), storage=STORAGE
)
Expand All @@ -38,10 +41,10 @@ def test_stop_trial(ray_start_4_cpus_2_gpus_extra, resource_manager_cls):
"storage": STORAGE,
}
trials = [
Trial("__fake", **kwargs),
Trial("__fake", **kwargs),
Trial("__fake", **kwargs),
Trial("__fake", **kwargs),
Trial(MOCK_TRAINABLE_NAME, **kwargs),
Trial(MOCK_TRAINABLE_NAME, **kwargs),
Trial(MOCK_TRAINABLE_NAME, **kwargs),
Trial(MOCK_TRAINABLE_NAME, **kwargs),
]
for t in trials:
runner.add_trial(t)
Expand Down
Loading

0 comments on commit fc359e5

Please sign in to comment.