Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIR] Fix ResourceChangingScheduler not working with AIR #26307

Merged
merged 22 commits into from
Jul 12, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
WIP
  • Loading branch information
Yard1 committed Jun 29, 2022
commit 383d32ce5533c897d1a56a303ad2f26b5bd7e6ee
68 changes: 68 additions & 0 deletions python/ray/air/tests/test_resource_changing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from typing import Callable, Dict, Optional, Union
from ray.air import session
from ray.air.checkpoint import Checkpoint
from ray.air.config import ScalingConfigDataClass
from ray.tune.tune_config import TuneConfig
from ray.tune.tuner import Tuner
from ray.train.data_parallel_trainer import DataParallelTrainer
from ray.train.base_trainer import BaseTrainer
from ray.train.gbdt_trainer import GBDTTrainer
import pytest
import ray
from ray import tune
from ray.tune.schedulers.resource_changing_scheduler import DistributeResources, ResourceChangingScheduler
from ray.tune.schedulers.async_hyperband import ASHAScheduler


@pytest.fixture
def ray_start_8_cpus():
address_info = ray.init(num_cpus=8)
yield address_info
# The code after the yield will run as teardown code.
ray.shutdown()


def train_fn(config):
start_epoch = 0

print(session.get_trial_resources())
checkpoint = session.get_checkpoint()
if checkpoint:
# assume that we have run the session.report() example
# and successfully save some model weights
checkpoint_dict = checkpoint.to_dict()
start_epoch = checkpoint_dict.get("epoch", -1) + 1

# wrap the model in DDP
for epoch in range(start_epoch, config["num_epochs"]):
checkpoint = Checkpoint.from_dict(dict(epoch=epoch))
session.report({"metric": config["metric"] * epoch, "epoch": epoch}, checkpoint=checkpoint)


class DummyDataParallelTrainer(DataParallelTrainer):
def training_loop(self) -> None:
scaling_config_dataclass = self._validate_and_get_scaling_config_data_class(
self.scaling_config
)
assert scaling_config_dataclass.as_placement_group_factory() == session.get_trial_resources()
return super().training_loop()


def test_data_parallel_trainer(ray_start_8_cpus):
trainer = DummyDataParallelTrainer(train_fn, scaling_config=dict(num_workers=2))
tuner = Tuner(
trainer,
param_space={
"train_loop_config": {
"num_epochs": 100,
"metric": tune.grid_search([1, 2, 3, 4, 5]),
}
},
tune_config=TuneConfig(
mode="max",
metric="metric",
scheduler=ResourceChangingScheduler(ASHAScheduler(), resources_allocation_function=DistributeResources(add_bundles=True)),
),
)
result_grid = tuner.fit()
assert not any(x.error for x in result_grid)
2 changes: 1 addition & 1 deletion python/ray/tune/trainable/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def trial_id(self) -> str:

@property
def trial_resources(self) -> Dict[str, float]:
Yard1 marked this conversation as resolved.
Show resolved Hide resolved
return self._status_reporter.trial_resources.required_resources
return self._status_reporter.trial_resources


@PublicAPI
Expand Down