Skip to content

Commit

Permalink
[tune] Add TuneController [no_early_kickoff] (ray-project#33499)
Browse files Browse the repository at this point in the history
This PR adds the `TuneController` class as the new execution backend for the Ray Tune flow. It will replace the trial runner + executor combination in favor of a tune-centric control flow (TuneController) + generic Ray core primitive managers (RayActorManager, ResourceManager, RayEventManager).

With this PR, we aim to run some existing Unit tests + examples using the new execution backend. Per default, we continue to use the old execution backend, This PR adds the `TuneController` class as the new execution backend for the Ray Tune flow. It will replace the trial runner + executor combination in favor of a tune-centric control flow (TuneController) + generic Ray core primitive managers (RayActorManager, ResourceManager, RayEventManager).

With this PR, we aim to run some existing Unit tests + examples using the new execution backend. Per default, we continue to use the old execution backend, but expose a flag to be able to dogfood the new one.but expose a flag to be able to dogfood the new one.

Signed-off-by: Kai Fricke <[email protected]>
Signed-off-by: Kai Fricke <[email protected]>
  • Loading branch information
krfricke committed Mar 25, 2023
1 parent c848ca9 commit 968195c
Show file tree
Hide file tree
Showing 25 changed files with 1,739 additions and 74 deletions.
63 changes: 63 additions & 0 deletions .buildkite/pipeline.ml.yml
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,69 @@
- bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only python/ray/tests/horovod/...
- bazel test --config=ci $(./ci/run/bazel_export_options) python/ray/tests/ray_lightning/...

### NEW EXECUTION PATH


- label: ":octopus: :sunny: New execution path: Tune tests and examples (small)"
conditions: ["NO_WHEELS_REQUIRED", "RAY_CI_TUNE_AFFECTED"]
instance_size: small
parallelism: 3
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT
- TUNE_TESTING=1 ./ci/env/install-dependencies.sh
- ./ci/env/env_info.sh
- ./ci/run/run_bazel_test_with_sharding.sh
--config=ci $(./ci/run/bazel_export_options) --build_tests_only
--test_env=TUNE_NEW_EXECUTION=1
--test_tag_filters=-medium_instance,-py37,-soft_imports,-gpu_only,-rllib,-multinode,-exclude_new_execution
python/ray/tune/...

- label: ":octopus: :sunny: New execution path:Tune tests and examples (medium)"
conditions: ["NO_WHEELS_REQUIRED", "RAY_CI_TUNE_AFFECTED"]
instance_size: medium
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT
- TUNE_TESTING=1 ./ci/env/install-dependencies.sh
- ./ci/env/env_info.sh
- bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only
--test_env=TUNE_NEW_EXECUTION=1
--test_tag_filters=medium_instance,-py37,-soft_imports,-gpu_only,-rllib,-multinode,-exclude_new_execution
python/ray/tune/...

- label: ":octopus: :brain: :sunny: New execution path: Tune tests and examples {using RLlib}"
conditions: ["NO_WHEELS_REQUIRED", "RAY_CI_TUNE_AFFECTED", "RAY_CI_RLLIB_AFFECTED"]
instance_size: large
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT
- TUNE_TESTING=1 ./ci/env/install-dependencies.sh
- ./ci/env/env_info.sh
- bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only
--test_env=TUNE_NEW_EXECUTION=1
--test_tag_filters=-gpu_only,rllib,-exclude_new_execution python/ray/tune/...

- label: ":octopus: :sunny: New execution path: Tune tests and examples. Python 3.7"
conditions: ["NO_WHEELS_REQUIRED", "RAY_CI_TUNE_AFFECTED"]
instance_size: small
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT
- TUNE_TESTING=1 INSTALL_HOROVOD=1 ./ci/env/install-dependencies.sh
- ./ci/env/env_info.sh
- bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only
--test_env=TUNE_NEW_EXECUTION=1
--test_tag_filters=py37,-client python/ray/tune/...

- label: ":octopus: :sunny: New execution path: ML library integrations tests and examples. Python 3.7"
conditions: ["NO_WHEELS_REQUIRED", "RAY_CI_TUNE_AFFECTED"]
instance_size: small
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT
- TUNE_TESTING=1 INSTALL_HOROVOD=1 ./ci/env/install-dependencies.sh
- ./ci/env/env_info.sh
- bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only --test_env=TUNE_NEW_EXECUTION=1 python/ray/tests/xgboost/...
- bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only --test_env=TUNE_NEW_EXECUTION=1 python/ray/tests/horovod/...
- bazel test --config=ci $(./ci/run/bazel_export_options) --test_env=TUNE_NEW_EXECUTION=1 python/ray/tests/ray_lightning/...


# TODO(amogkam): Re-enable Ludwig tests after Ludwig supports Ray 2.0
#- label: ":octopus: Ludwig tests and examples. Python 3.7"
# conditions: ["NO_WHEELS_REQUIRED", "RAY_CI_TUNE_AFFECTED"]
Expand Down
131 changes: 107 additions & 24 deletions python/ray/air/execution/_internal/actor_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import time
import uuid
from collections import defaultdict, Counter
from functools import lru_cache
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Type, Union

import ray
Expand All @@ -18,7 +17,6 @@
from ray.air.execution._internal.tracked_actor_task import TrackedActorTask
from ray.exceptions import RayTaskError, RayActorError


logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -150,12 +148,16 @@ def __init__(self, resource_manager: ResourceManager):
self._live_actors_to_ray_actors_resources: Dict[
TrackedActor, Tuple[ray.actor.ActorHandle, AcquiredResources]
] = {}
self._live_resource_cache: Optional[Dict[str, Any]] = None

# This dict contains all actors that should be killed (after calling
# `remove_actor()`). Kill requests will be handled in wait().
self._live_actors_to_kill: Set[TrackedActor] = set()

def next(self, timeout: Optional[Union[int, float]] = None) -> None:
# Track failed actors
self._failed_actor_ids: Set[int] = set()

def next(self, timeout: Optional[Union[int, float]] = None) -> bool:
"""Yield control to event manager to await the next event and invoke callbacks.
Calling this method will wait for up to ``timeout`` seconds for the next
Expand All @@ -178,6 +180,9 @@ def next(self, timeout: Optional[Union[int, float]] = None) -> None:
Args:
timeout: Timeout in seconds to wait for next event.
Returns:
True if at least one event was processed.
"""
# First issue any pending forceful actor kills
actor_killed = self._try_kill_actor()
Expand All @@ -187,7 +192,7 @@ def next(self, timeout: Optional[Union[int, float]] = None) -> None:

# If an actor was killed, this was our event, and we return.
if actor_killed:
return
return True

# Otherwise, collect all futures and await the next.
resource_futures = self._resource_manager.get_resource_futures()
Expand All @@ -209,7 +214,7 @@ def next(self, timeout: Optional[Union[int, float]] = None) -> None:
ready, _ = ray.wait(all_futures, num_returns=1, timeout=timeout)

if not ready:
return
return False

[future] = ready

Expand All @@ -228,6 +233,7 @@ def next(self, timeout: Optional[Union[int, float]] = None) -> None:
)

self._try_start_actors()
return True

def _actor_start_resolved(self, tracked_actor: TrackedActor, future: ray.ObjectRef):
"""Callback to be invoked when actor started"""
Expand All @@ -245,6 +251,8 @@ def _actor_stop_resolved(self, tracked_actor: TrackedActor):

def _actor_start_failed(self, tracked_actor: TrackedActor, exception: Exception):
"""Callback to be invoked when actor start/stop failed"""
self._failed_actor_ids.add(tracked_actor.actor_id)

self._cleanup_actor(tracked_actor=tracked_actor)

if tracked_actor._on_error:
Expand All @@ -262,16 +270,19 @@ def _actor_task_failed(
tracked_actor = tracked_actor_task._tracked_actor

if isinstance(exception, RayActorError):
# Here the actual actor process died.
# First, clean up any references to the actor and its futures
self._failed_actor_ids.add(tracked_actor.actor_id)

# Clean up any references to the actor and its futures
self._cleanup_actor(tracked_actor=tracked_actor)

# Handle actor state callbacks
if tracked_actor._on_error:
tracked_actor._on_error(tracked_actor, exception)

# Then trigger actor task error callback
if tracked_actor_task._on_error:
tracked_actor_task._on_error(tracked_actor, exception)

elif isinstance(exception, RayTaskError):
# Otherwise only the task failed. Invoke callback
if tracked_actor_task._on_error:
Expand Down Expand Up @@ -385,7 +396,7 @@ def on_error(exception: Exception):
actor,
acquired_resources,
)
self.get_live_actors_resources.cache_clear()
self._live_resource_cache = None

self._enqueue_cached_actor_tasks(tracked_actor=tracked_actor)

Expand Down Expand Up @@ -422,27 +433,21 @@ def _try_kill_actor(self) -> bool:
# Hard kill if requested
ray.kill(ray_actor)

self._cleanup_actor_futures(tracked_actor)

self._actor_stop_resolved(tracked_actor)

return True

def _cleanup_actor(self, tracked_actor: TrackedActor):
# Remove all actor task futures
futures = self._tracked_actors_to_task_futures.pop(tracked_actor, [])
for future in futures:
self._actor_task_events.discard_future(future)

# Remove all actor state futures
futures = self._tracked_actors_to_state_futures.pop(tracked_actor, [])
for future in futures:
self._actor_state_events.discard_future(future)
self._cleanup_actor_futures(tracked_actor)

# Remove from tracked actors
(
ray_actor,
acquired_resources,
) = self._live_actors_to_ray_actors_resources.pop(tracked_actor)
self.get_live_actors_resources.cache_clear()
self._live_resource_cache = None

# Return resources
self._resource_manager.free_resources(acquired_resource=acquired_resources)
Expand Down Expand Up @@ -482,13 +487,16 @@ def num_actor_tasks(self):
"""Return number of pending tasks"""
return self._actor_task_events.num_futures

@lru_cache()
def get_live_actors_resources(self):
if self._live_resource_cache:
return self._live_resource_cache

counter = Counter()
for _, acq in self._live_actors_to_ray_actors_resources.values():
for bdl in acq.resource_request.bundles:
counter.update(bdl)
return dict(counter)
self._live_resource_cache = dict(counter)
return self._live_resource_cache

def add_actor(
self,
Expand Down Expand Up @@ -535,6 +543,7 @@ def remove_actor(
self,
tracked_actor: TrackedActor,
kill: bool = False,
stop_future: Optional[ray.ObjectRef] = None,
) -> None:
"""Remove a tracked actor.
Expand All @@ -546,7 +555,6 @@ def remove_actor(
If the actor has only been requested, but not started, yet, this will cancel
the actor request. This will not trigger any callback.
If ``kill=True``, this will use ``ray.kill()`` to forcefully terminate the
actor. Otherwise, graceful actor deconstruction will be scheduled after
all currently tracked futures are resolved.
Expand All @@ -555,23 +563,51 @@ def remove_actor(
tracked_actor: Tracked actor to be removed.
kill: If set, will forcefully terminate the actor instead of gracefully
scheduling termination.
stop_future: If set, use this future to track actor termination.
Otherwise, schedule a ``__ray_terminate__`` future.
"""
if tracked_actor in self._live_actors_to_ray_actors_resources:
if tracked_actor.actor_id in self._failed_actor_ids:
logger.debug(
f"Tracked actor already failed, no need to remove: {tracked_actor}"
)
elif tracked_actor in self._live_actors_to_ray_actors_resources:
# Ray actor is running.

if not kill:
# Schedule __ray_terminate__ future
ray_actor, _ = self._live_actors_to_ray_actors_resources[tracked_actor]

# Clear state futures here to avoid resolving __ray_ready__ futures
for future in list(
self._tracked_actors_to_state_futures[tracked_actor]
):
self._actor_state_events.discard_future(future)
self._tracked_actors_to_state_futures[tracked_actor].remove(future)

# If the __ray_ready__ future hasn't resolved yet, but we already
# scheduled the actor via Actor.remote(), we just want to stop
# it but not trigger any callbacks. This is in accordance with
# the contract defined in the docstring.
tracked_actor._on_start = None
tracked_actor._on_stop = None
tracked_actor._on_error = None

def on_actor_stop(*args, **kwargs):
self._actor_stop_resolved(tracked_actor=tracked_actor)

stop_future = ray_actor.__ray_terminate__.remote()
if stop_future:
# If the stop future was schedule via the actor manager,
# discard (track it as state future instead).
self._actor_task_events.discard_future(stop_future)
else:
stop_future = ray_actor.__ray_terminate__.remote()

self._actor_state_events.track_future(
future=stop_future,
on_result=on_actor_stop,
on_error=on_actor_stop,
)

self._tracked_actors_to_state_futures[tracked_actor].add(stop_future)

else:
Expand All @@ -581,6 +617,9 @@ def on_actor_stop(*args, **kwargs):
elif tracked_actor in self._pending_actors_to_attrs:
# Actor is pending, stop
_, _, resource_request = self._pending_actors_to_attrs.pop(tracked_actor)
self._resource_request_to_pending_actors[resource_request].remove(
tracked_actor
)
self._resource_manager.cancel_resource_request(
resource_request=resource_request
)
Expand All @@ -593,7 +632,13 @@ def is_actor_started(self, tracked_actor: TrackedActor) -> bool:
Args:
tracked_actor: Tracked actor object.
"""
return tracked_actor in self._live_actors_to_ray_actors_resources
return (
tracked_actor in self._live_actors_to_ray_actors_resources
and tracked_actor.actor_id not in self._failed_actor_ids
)

def is_actor_failed(self, tracked_actor: TrackedActor) -> bool:
return tracked_actor.actor_id in self._failed_actor_ids

def get_actor_resources(
self, tracked_actor: TrackedActor
Expand Down Expand Up @@ -675,6 +720,7 @@ def schedule_actor_task(
method_name=method_name,
args=args,
kwargs=kwargs,
_return_future=_return_future,
)
if _return_future:
return res[1]
Expand Down Expand Up @@ -794,3 +840,40 @@ def schedule_actor_tasks(
on_result=on_result,
on_error=on_error,
)

def clear_actor_task_futures(self, tracked_actor: TrackedActor):
"""Discard all actor task futures from a tracked actor."""
futures = self._tracked_actors_to_task_futures.pop(tracked_actor, [])
for future in futures:
self._actor_task_events.discard_future(future)

def _cleanup_actor_futures(self, tracked_actor: TrackedActor):
# Remove all actor task futures
self.clear_actor_task_futures(tracked_actor=tracked_actor)

# Remove all actor state futures
futures = self._tracked_actors_to_state_futures.pop(tracked_actor, [])
for future in futures:
self._actor_state_events.discard_future(future)

def cleanup(self):
for (
actor,
acquired_resources,
) in self._live_actors_to_ray_actors_resources.values():
ray.kill(actor)
self._resource_manager.free_resources(acquired_resources)

for (
resource_request,
pending_actors,
) in self._resource_request_to_pending_actors.items():
for i in range(len(pending_actors)):
self._resource_manager.cancel_resource_request(resource_request)

self._resource_manager.clear()

self.__init__(resource_manager=self._resource_manager)

def __del__(self):
self.cleanup()
8 changes: 6 additions & 2 deletions python/ray/air/execution/_internal/event_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,13 @@ def resolve_future(self, future: ray.ObjectRef):
try:
result = ray.get(future)
except Exception as e:
on_error(e)
if on_error:
on_error(e)
else:
raise e
else:
on_result(result)
if on_result:
on_result(result)

def wait(
self,
Expand Down
Loading

0 comments on commit 968195c

Please sign in to comment.