From 208e4526265acfeaad7cf312a5168b8a3925173a Mon Sep 17 00:00:00 2001 From: Ruiyang Wang <56065503+rynewang@users.noreply.github.com> Date: Fri, 8 Dec 2023 15:45:53 -0800 Subject: [PATCH] [core] retryable exceptions for method (#41194) Implements retryable exceptions for methods. Also plumbs direct_actor_task_submitter quite a bit. Behavior: Added new method-level annotation max_retries that overrides actor's max_task_retries if any. If user_exceptions=True | List[exception class types] and ((max_retries or max_task_retries) > 0) we may retry the method by issuing another task invocation to the actor. Both exception-retry and actor-death-retry counts toward max_retries. For example for a max_retries=3 call, and there are 2 actor deaths and 1 exception and 1 return, we can return the value. For a streaming generator call, if it yielded 4 values then raises an exception, we retry by calling the method again and ignoring the first 4 values and start yielding the 5th value in the second run. Java and CPP: they still have max_task_retries on actor deaths, but I did not add max_retries or retry_exceptions. --- .../runtime/task/local_mode_task_submitter.cc | 9 +- .../ray/runtime/task/native_task_submitter.cc | 7 + .../ray-core/fault_tolerance/actors.rst | 26 ++ .../ray/runtime/task/NativeTaskSubmitter.java | 3 + python/ray/_private/ray_option_utils.py | 6 - python/ray/_raylet.pyx | 69 +++- python/ray/actor.py | 100 ++++- python/ray/includes/function_descriptor.pxi | 16 + python/ray/includes/libcoreworker.pxd | 4 + python/ray/serve/tests/test_proxy_state.py | 2 +- python/ray/tests/BUILD | 1 + python/ray/tests/test_actor_retry.py | 364 ++++++++++++++++++ python/ray/tests/test_chaos.py | 2 +- .../workloads/xgboost_benchmark.py | 1 + src/ray/common/task/task_spec.cc | 10 +- src/ray/common/task/task_util.h | 14 +- src/ray/core_worker/actor_handle.cc | 14 +- src/ray/core_worker/actor_handle.h | 6 +- src/ray/core_worker/actor_manager.cc | 2 +- src/ray/core_worker/actor_manager.h | 2 +- src/ray/core_worker/core_worker.cc | 48 ++- src/ray/core_worker/core_worker.h | 15 +- ...io_ray_runtime_task_NativeTaskSubmitter.cc | 18 +- src/ray/core_worker/task_manager.cc | 19 +- src/ray/core_worker/task_manager.h | 4 +- src/ray/core_worker/test/core_worker_test.cc | 6 +- src/ray/core_worker/test/task_manager_test.cc | 5 +- .../transport/actor_submit_queue.h | 6 +- .../transport/direct_actor_task_submitter.cc | 41 +- .../transport/direct_actor_task_submitter.h | 7 +- .../out_of_order_actor_submit_queue.cc | 4 +- .../out_of_order_actor_submit_queue.h | 2 +- .../sequential_actor_submit_queue.cc | 4 +- .../transport/sequential_actor_submit_queue.h | 9 +- 34 files changed, 748 insertions(+), 98 deletions(-) create mode 100644 python/ray/tests/test_actor_retry.py diff --git a/cpp/src/ray/runtime/task/local_mode_task_submitter.cc b/cpp/src/ray/runtime/task/local_mode_task_submitter.cc index faf56663b5e8d..97610e4f6606f 100644 --- a/cpp/src/ray/runtime/task/local_mode_task_submitter.cc +++ b/cpp/src/ray/runtime/task/local_mode_task_submitter.cc @@ -85,8 +85,13 @@ ObjectID LocalModeTaskSubmitter::Submit(InvocationSpec &invocation, TaskID::ForActorCreationTask(invocation.actor_id); const ObjectID actor_creation_dummy_object_id = ObjectID::FromIndex(actor_creation_task_id, 1); - builder.SetActorTaskSpec( - invocation.actor_id, actor_creation_dummy_object_id, invocation.actor_counter); + // NOTE: Ray CPP doesn't support retries and retry_exceptions. + builder.SetActorTaskSpec(invocation.actor_id, + actor_creation_dummy_object_id, + /*max_retries=*/0, + /*retry_exceptions=*/false, + /*serialized_retry_exception_allowlist=*/"", + invocation.actor_counter); } else { throw RayException("unknown task type"); } diff --git a/cpp/src/ray/runtime/task/native_task_submitter.cc b/cpp/src/ray/runtime/task/native_task_submitter.cc index a088c3f035672..bd1a544ca23a6 100644 --- a/cpp/src/ray/runtime/task/native_task_submitter.cc +++ b/cpp/src/ray/runtime/task/native_task_submitter.cc @@ -69,10 +69,17 @@ ObjectID NativeTaskSubmitter::Submit(InvocationSpec &invocation, options.generator_backpressure_num_objects = -1; std::vector return_refs; if (invocation.task_type == TaskType::ACTOR_TASK) { + // NOTE: Ray CPP doesn't support per-method max_retries and retry_exceptions + const auto native_actor_handle = core_worker.GetActorHandle(invocation.actor_id); + int max_retries = native_actor_handle->MaxTaskRetries(); + auto status = core_worker.SubmitActorTask(invocation.actor_id, BuildRayFunction(invocation), invocation.args, options, + max_retries, + /*retry_exceptions=*/false, + /*serialized_retry_exception_allowlist=*/"", return_refs); if (!status.ok()) { return ObjectID::Nil(); diff --git a/doc/source/ray-core/fault_tolerance/actors.rst b/doc/source/ray-core/fault_tolerance/actors.rst index a254fa5fb562a..fdac85b1ffe6e 100644 --- a/doc/source/ray-core/fault_tolerance/actors.rst +++ b/doc/source/ray-core/fault_tolerance/actors.rst @@ -134,3 +134,29 @@ terminating ` the actor. You can do this by calling original handle to the actor. If ``max_restarts`` is set, you can also allow Ray to automatically restart the actor by passing ``no_restart=False`` to ``ray.kill``. + + +Actor method exceptions +----------------------- + +Sometime you want to retry when an actor method raises exceptions. Use ``max_task_retries`` with ``retry_exceptions`` to retry. + +Note that by default, retrying on user raised exceptions is disabled. To enable it, make sure the method is **idempotent**, that is, invoking it multiple times should be equivalent to invoking it only once. + +You can set ``retry_exceptions`` in the `@ray.method(retry_exceptions=...)` decorator, or in the `.options(retry_exceptions=...)` in the method call. + +Retry behavior depends on the value you set ``retry_exceptions`` to: +- ``retry_exceptions == False`` (default): No retries for user exceptions. +- ``retry_exceptions == True``: Ray retries a method on user exception up to ``max_retries`` times. +- ``retry_exceptions`` is a list of exceptions: Ray retries a method on user exception up to ``max_retries`` times, only if the method raises an exception from these specific classes. + +``max_task_retries`` applies to both exceptions and actor crashes. Ray searches for the first non-default value of ``max_task_retries`` in this order: + +.. - The method call's value, for example, `actor.method.options(_max_retries=2)`. Ray ignores this value if you didn't set it. +.. - The method definition's value, for example, `@ray.method(_max_retries=2)`. Ray ignores this value if you didn't set it. + +- The actor creation call's value, for example, `Actor.options(max_task_retries=2)`. Ray ignores this value if you didn't set it. +- The Actor class definition's value, for example, `@ray.remote(max_task_retries=2)` decorator. Ray ignores this value if you didn't set it. +- The default value,`0`. + +For example, if a method sets `max_retries=5` and `retry_exceptions=True`, and the actor sets `max_restarts=2`, Ray executes the method up to 6 times: once for the initial invocation, and 5 additional retries. The 6 invocations may include 2 actor crashes. After the 6th invocation, a `ray.get` call to the result Ray ObjectRef raises the exception raised in the last invocation, or `ray.exceptions.RayActorError` if the actor crashed in the last invocation. diff --git a/java/runtime/src/main/java/io/ray/runtime/task/NativeTaskSubmitter.java b/java/runtime/src/main/java/io/ray/runtime/task/NativeTaskSubmitter.java index aa82c7cb949d5..d52b7433cdfad 100644 --- a/java/runtime/src/main/java/io/ray/runtime/task/NativeTaskSubmitter.java +++ b/java/runtime/src/main/java/io/ray/runtime/task/NativeTaskSubmitter.java @@ -80,6 +80,9 @@ public List submitActorTask( int numReturns, CallOptions options) { Preconditions.checkState(actor instanceof NativeActorHandle); + // TODO: Ray Java does not support per-method MaxRetries. It only supports + // setting Actor-level MaxTaskRetries for any method calls. + // See: src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc List returnIds = nativeSubmitActorTask( actor.getId().getBytes(), diff --git a/python/ray/_private/ray_option_utils.py b/python/ray/_private/ray_option_utils.py index de27a125b3241..ce9a4f3c59314 100644 --- a/python/ray/_private/ray_option_utils.py +++ b/python/ray/_private/ray_option_utils.py @@ -343,12 +343,6 @@ def validate_actor_options(options: Dict[str, Any], in_options: bool): "Setting 'concurrency_groups' is not supported in '.options()'." ) - if options.get("max_restarts", 0) == 0 and options.get("max_task_retries", 0) != 0: - raise ValueError( - "'max_task_retries' cannot be set if 'max_restarts' " - "is 0 or if 'max_restarts' is not set." - ) - if options.get("get_if_exists") and not options.get("name"): raise ValueError("The actor name must be specified to use `get_if_exists`.") diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 7d32523e23a44..8df84171fc6c0 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -968,7 +968,21 @@ cdef raise_if_dependency_failed(arg): raise arg +def serialize_retry_exception_allowlist(retry_exception_allowlist, function_descriptor): + try: + return ray_pickle.dumps(retry_exception_allowlist) + except TypeError as e: + msg = ( + "Could not serialize the retry exception allowlist" + f"{retry_exception_allowlist} for task {function_descriptor.repr}. " + "See " + "https://docs.ray.io/en/master/ray-core/objects/serialization.html#troubleshooting " # noqa + "for more information.") + raise TypeError(msg) from e + + cdef c_bool determine_if_retryable( + c_bool should_retry_exceptions, Exception e, const c_string serialized_retry_exception_allowlist, FunctionDescriptor function_descriptor, @@ -983,6 +997,8 @@ cdef c_bool determine_if_retryable( - Deserialization of exception allowlist fails (TypeError) - Exception allowlist is not None and not a tuple (AssertionError) """ + if not should_retry_exceptions: + return False if len(serialized_retry_exception_allowlist) == 0: # No exception allowlist specified, default to all retryable. return True @@ -1505,12 +1521,13 @@ cdef create_generator_error_object( CoreWorker core_worker = worker.core_worker is_retryable_error[0] = determine_if_retryable( + should_retry_exceptions, e, serialized_retry_exception_allowlist, function_descriptor, ) - if is_retryable_error[0] and should_retry_exceptions: + if is_retryable_error[0]: logger.debug( "Task failed with retryable exception:" " {}.".format(task_id), exc_info=True) @@ -1571,11 +1588,12 @@ cdef execute_dynamic_generator_and_store_task_outputs( generator_id) except Exception as error: is_retryable_error[0] = determine_if_retryable( + should_retry_exceptions, error, serialized_retry_exception_allowlist, function_descriptor, ) - if is_retryable_error[0] and should_retry_exceptions: + if is_retryable_error[0]: logger.info("Task failed with retryable exception:" " {}.".format( core_worker.get_current_task_id()), @@ -1710,7 +1728,7 @@ cdef void execute_task( raise RayActorError( ActorDiedErrorContext( error_message=error_message, - actor_id=core_worker.get_actor_id(), + actor_id=core_worker.get_actor_id().binary(), class_name=class_name ) ) @@ -1872,11 +1890,12 @@ cdef void execute_task( exit_current_actor_if_asyncio() except Exception as e: is_retryable_error[0] = determine_if_retryable( - e, - serialized_retry_exception_allowlist, - function_descriptor, - ) - if is_retryable_error[0] and should_retry_exceptions: + should_retry_exceptions, + e, + serialized_retry_exception_allowlist, + function_descriptor, + ) + if is_retryable_error[0]: logger.debug("Task failed with retryable exception:" " {}.".format( core_worker.get_current_task_id()), @@ -3738,18 +3757,9 @@ cdef class CoreWorker: self.python_scheduling_strategy_to_c( scheduling_strategy, &c_scheduling_strategy) - try: - serialized_retry_exception_allowlist = ray_pickle.dumps( - retry_exception_allowlist, - ) - except TypeError as e: - msg = ( - "Could not serialize the retry exception allowlist" - f"{retry_exception_allowlist} for task {function_descriptor.repr}. " - "Check " - "https://docs.ray.io/en/master/ray-core/objects/serialization.html#troubleshooting " # noqa - "for more information.") - raise TypeError(msg) from e + serialized_retry_exception_allowlist = serialize_retry_exception_allowlist( + retry_exception_allowlist, + function_descriptor) with self.profile_event(b"submit_task"): prepare_resources(resources, &c_resources) @@ -3946,6 +3956,9 @@ cdef class CoreWorker: args, c_string name, int num_returns, + int max_retries, + c_bool retry_exceptions, + retry_exception_allowlist, double num_method_cpus, c_string concurrency_group_name, int64_t generator_backpressure_num_objects): @@ -3962,6 +3975,11 @@ cdef class CoreWorker: # This task id is incorrect if async task is used. # In this case, we should use task_id_in_async_context TaskID current_task = self.get_current_task_id() + c_string serialized_retry_exception_allowlist + + serialized_retry_exception_allowlist = serialize_retry_exception_allowlist( + retry_exception_allowlist, + function_descriptor) with self.profile_event(b"submit_task"): if num_method_cpus > 0: @@ -3991,6 +4009,9 @@ cdef class CoreWorker: c_resources, concurrency_group_name, generator_backpressure_num_objects), + max_retries, + retry_exceptions, + serialized_retry_exception_allowlist, return_refs, current_c_task_id) # These arguments were serialized and put into the local object @@ -4093,6 +4114,7 @@ cdef class CoreWorker: dereference(c_actor_handle).ActorLanguage()) actor_creation_function_descriptor = CFunctionDescriptorToPython( dereference(c_actor_handle).ActorCreationTaskFunctionDescriptor()) + max_task_retries = dereference(c_actor_handle).MaxTaskRetries() if language == Language.PYTHON: assert isinstance(actor_creation_function_descriptor, PythonFunctionDescriptor) @@ -4106,21 +4128,26 @@ cdef class CoreWorker: job_id, actor_creation_function_descriptor) method_meta = ray.actor._ActorClassMethodMetadata.create( actor_class, actor_creation_function_descriptor) - return ray.actor.ActorHandle(language, actor_id, + return ray.actor.ActorHandle(language, actor_id, max_task_retries, method_meta.method_is_generator, method_meta.decorators, method_meta.signatures, method_meta.num_returns, + method_meta.max_retries, + method_meta.retry_exceptions, method_meta.generator_backpressure_num_objects, # noqa actor_method_cpu, actor_creation_function_descriptor, worker.current_session_and_job) else: return ray.actor.ActorHandle(language, actor_id, + 0, # max_task_retries, {}, # method is_generator {}, # method decorators {}, # method signatures {}, # method num_returns + {}, # method max_retries + {}, # method retry_exceptions {}, # generator_backpressure_num_objects 0, # actor method cpu actor_creation_function_descriptor, diff --git a/python/ray/actor.py b/python/ray/actor.py index e650ca84bd0f9..25cc550814abd 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -72,6 +72,8 @@ def bar(self): valid_kwargs = [ "num_returns", "concurrency_group", + "_max_retries", + "retry_exceptions", "_generator_backpressure_num_objects", ] error_string = ( @@ -90,6 +92,10 @@ def bar(self): def annotate_method(method): if "num_returns" in kwargs: method.__ray_num_returns__ = kwargs["num_returns"] + if "_max_retries" in kwargs: + method.__ray_max_retries__ = kwargs["_max_retries"] + if "retry_exceptions" in kwargs: + method.__ray_retry_exceptions__ = kwargs["retry_exceptions"] if "concurrency_group" in kwargs: method.__ray_concurrency_group__ = kwargs["concurrency_group"] if "_generator_backpressure_num_objects" in kwargs: @@ -117,6 +123,9 @@ class ActorMethod: invocation should return. If None is given, it uses DEFAULT_ACTOR_METHOD_NUM_RETURN_VALS for a normal actor task and "streaming" for a generator task (when `is_generator` is True). + _max_retries: [Internal] Number of retries on method failure. + _retry_exceptions: Boolean of whether you want to retry all user-raised + exceptions, or a list of allowlist exceptions to retry. _is_generator: True if a given method is a Python generator. _generator_backpressure_num_objects: Generator-only config. If a number of unconsumed objects reach this threshold, @@ -135,6 +144,8 @@ def __init__( actor, method_name, num_returns: Optional[Union[int, str]], + _max_retries: int, + retry_exceptions: Union[bool, list, tuple], is_generator: bool, generator_backpressure_num_objects: int, decorator=None, @@ -143,7 +154,6 @@ def __init__( self._actor_ref = weakref.ref(actor) self._method_name = method_name self._num_returns = num_returns - self._is_generator = is_generator # Default case. if self._num_returns is None: @@ -152,6 +162,9 @@ def __init__( else: self._num_returns = ray_constants.DEFAULT_ACTOR_METHOD_NUM_RETURN_VALS + self._max_retries = _max_retries + self._retry_exceptions = retry_exceptions + self._is_generator = is_generator self._generator_backpressure_num_objects = generator_backpressure_num_objects # This is a decorator that is used to wrap the function invocation (as # opposed to the function execution). The decorator must return a @@ -205,11 +218,20 @@ def _remote( kwargs=None, name="", num_returns=None, + _max_retries=None, + retry_exceptions=None, concurrency_group=None, _generator_backpressure_num_objects=None, ): if num_returns is None: num_returns = self._num_returns + max_retries = _max_retries + if max_retries is None: + max_retries = self._max_retries + if max_retries is None: + max_retries = 0 + if retry_exceptions is None: + retry_exceptions = self._retry_exceptions if _generator_backpressure_num_objects is None: _generator_backpressure_num_objects = ( self._generator_backpressure_num_objects @@ -225,6 +247,8 @@ def invocation(args, kwargs): kwargs=kwargs, name=name, num_returns=num_returns, + max_retries=max_retries, + retry_exceptions=retry_exceptions, concurrency_group_name=concurrency_group, generator_backpressure_num_objects=( _generator_backpressure_num_objects @@ -242,6 +266,8 @@ def __getstate__(self): "actor": self._actor_ref(), "method_name": self._method_name, "num_returns": self._num_returns, + "max_retries": self._max_retries, + "retry_exceptions": self._retry_exceptions, "decorator": self._decorator, "is_generator": self._is_generator, "generator_backpressure_num_objects": self._generator_backpressure_num_objects, # noqa @@ -252,6 +278,8 @@ def __setstate__(self, state): state["actor"], state["method_name"], state["num_returns"], + state["max_retries"], + state["retry_exceptions"], state["is_generator"], state["generator_backpressure_num_objects"], state["decorator"], @@ -271,6 +299,9 @@ class _ActorClassMethodMetadata(object): signatures: The signatures of the methods. num_returns: The default number of return values for each actor method. + max_retries: Number of retries on method failure. + retry_exceptions: Boolean of whether you want to retry all user-raised + exceptions, or a list of allowlist exceptions to retry, for each method. """ _cache = {} # This cache will be cleared in ray._private.worker.disconnect() @@ -306,6 +337,8 @@ def create(cls, modified_class, actor_creation_function_descriptor): self.decorators = {} self.signatures = {} self.num_returns = {} + self.max_retries = {} + self.retry_exceptions = {} self.method_is_generator = {} self.generator_backpressure_num_objects = {} self.concurrency_group_for_methods = {} @@ -332,6 +365,16 @@ def create(cls, modified_class, actor_creation_function_descriptor): else: self.num_returns[method_name] = None + # Only contains entries from `@ray.method(_max_retries=...)` + # Ray may not populate the others with max_task_retries here because you may + # have set in `actor.method.options(_max_retries=...)`. So Ray always stores + # both max_retries and max_task_retries, and favors the former. + if hasattr(method, "__ray_max_retries__"): + self.max_retries[method_name] = method.__ray_max_retries__ + + if hasattr(method, "__ray_retry_exceptions__"): + self.retry_exceptions[method_name] = method.__ray_retry_exceptions__ + if hasattr(method, "__ray_invocation_decorator__"): self.decorators[method_name] = method.__ray_invocation_decorator__ @@ -631,7 +674,9 @@ def options(self, **actor_options): system will retry the failed task up to n times, after which the task will throw a `RayActorError` exception upon :obj:`ray.get`. Note that Python exceptions are not considered system errors - and will not trigger retries. + and don't trigger retries. [Internal use: You can override this number + with the method's "_max_retries" option at @ray.method decorator or + at .option() time.] max_pending_calls: Set the max number of pending calls allowed on the actor handle. When this value is exceeded, PendingCallsLimitExceeded will be raised for further tasks. @@ -1059,10 +1104,13 @@ def _remote(self, args=None, kwargs=None, **actor_options): actor_handle = ActorHandle( meta.language, actor_id, + max_task_retries, meta.method_meta.method_is_generator, meta.method_meta.decorators, meta.method_meta.signatures, meta.method_meta.num_returns, + meta.method_meta.max_retries, + meta.method_meta.retry_exceptions, meta.method_meta.generator_backpressure_num_objects, actor_method_cpu, meta.actor_creation_function_descriptor, @@ -1107,8 +1155,12 @@ class ActorHandle: invocation side, whereas a regular decorator can be used to change the behavior on the execution side. _ray_method_signatures: The signatures of the actor methods. + _ray_method_max_retries: Max number of retries on method failure. _ray_method_num_returns: The default number of return values for each method. + _ray_method_retry_exceptions: The default value of boolean of whether you want + to retry all user-raised exceptions, or a list of allowlist exceptions to + retry. _ray_method_generator_backpressure_num_objects: Generator-only config. The max number of objects to generate before it starts pausing a generator. @@ -1125,10 +1177,13 @@ def __init__( self, language, actor_id, + max_task_retries: Optional[int], method_is_generator: Dict[str, bool], method_decorators, method_signatures, method_num_returns: Dict[str, int], + method_max_retries: Dict[str, int], + method_retry_exceptions: Dict[str, Union[bool, list, tuple]], method_generator_backpressure_num_objects: Dict[str, int], actor_method_cpus: int, actor_creation_function_descriptor, @@ -1137,11 +1192,14 @@ def __init__( ): self._ray_actor_language = language self._ray_actor_id = actor_id + self._ray_max_task_retries = max_task_retries self._ray_original_handle = original_handle self._ray_method_is_generator = method_is_generator self._ray_method_decorators = method_decorators self._ray_method_signatures = method_signatures self._ray_method_num_returns = method_num_returns + self._ray_method_max_retries = method_max_retries + self._ray_method_retry_exceptions = method_retry_exceptions self._ray_method_generator_backpressure_num_objects = ( method_generator_backpressure_num_objects ) @@ -1168,6 +1226,11 @@ def __init__( self, method_name, self._ray_method_num_returns[method_name], + self._ray_method_max_retries.get( + method_name, self._ray_max_task_retries + ) + or 0, # never None + self._ray_method_retry_exceptions.get(method_name), self._ray_method_is_generator[method_name], self._ray_method_generator_backpressure_num_objects.get( method_name @@ -1197,6 +1260,8 @@ def _actor_method_call( kwargs: Dict[str, Any] = None, name: str = "", num_returns: Optional[int] = None, + max_retries: int = None, + retry_exceptions: Union[bool, list, tuple] = None, concurrency_group_name: Optional[str] = None, generator_backpressure_num_objects: Optional[int] = None, ): @@ -1213,6 +1278,9 @@ def _actor_method_call( kwargs: A dictionary of keyword arguments for the actor method. name: The name to give the actor method call task. num_returns: The number of return values for the method. + max_retries: Number of retries when method fails. + retry_exceptions: Boolean of whether you want to retry all user-raised + exceptions, or a list of allowlist exceptions to retry. Returns: object_refs: A list of object refs returned by the remote actor @@ -1253,6 +1321,17 @@ def _actor_method_call( # Remove it when we migrate to the streaming generator. num_returns = ray._raylet.STREAMING_GENERATOR_RETURN + retry_exception_allowlist = None + if retry_exceptions is None: + retry_exceptions = False + elif isinstance(retry_exceptions, (list, tuple)): + retry_exception_allowlist = tuple(retry_exceptions) + retry_exceptions = True + assert isinstance( + retry_exceptions, bool + ), "retry_exceptions can either be \ + boolean or list/tuple of exception types." + if generator_backpressure_num_objects is None: generator_backpressure_num_objects = -1 @@ -1263,6 +1342,9 @@ def _actor_method_call( list_args, name, num_returns, + max_retries, + retry_exceptions, + retry_exception_allowlist, self._ray_actor_method_cpus, concurrency_group_name if concurrency_group_name is not None else b"", generator_backpressure_num_objects, @@ -1305,10 +1387,12 @@ def remote(self, *args, **kwargs): return FakeActorMethod() return ActorMethod( - self, - item, + self, # actor + item, # method_name ray_constants.DEFAULT_ACTOR_METHOD_NUM_RETURN_VALS, - False, + 0, # max_retries + False, # retry_exceptions + False, # is_generator self._ray_method_generator_backpressure_num_objects.get(item, -1), # Currently, cross-lang actor method not support decorator decorator=None, @@ -1347,10 +1431,13 @@ def _serialization_helper(self): { "actor_language": self._ray_actor_language, "actor_id": self._ray_actor_id, + "max_task_retries": self._ray_max_task_retries, "method_is_generator": self._ray_method_is_generator, "method_decorators": self._ray_method_decorators, "method_signatures": self._ray_method_signatures, "method_num_returns": self._ray_method_num_returns, + "method_max_retries": self._ray_method_max_retries, + "method_retry_exceptions": self._ray_method_retry_exceptions, "method_generator_backpressure_num_objects": ( self._ray_method_generator_backpressure_num_objects ), @@ -1388,10 +1475,13 @@ def _deserialization_helper(cls, state, outer_object_ref=None): # thread-safe. state["actor_language"], state["actor_id"], + state["max_task_retries"], state["method_is_generator"], state["method_decorators"], state["method_signatures"], state["method_num_returns"], + state["method_max_retries"], + state["method_retry_exceptions"], state["method_generator_backpressure_num_objects"], state["actor_method_cpus"], state["actor_creation_function_descriptor"], diff --git a/python/ray/includes/function_descriptor.pxi b/python/ray/includes/function_descriptor.pxi index 10a3875389703..693546e991392 100644 --- a/python/ray/includes/function_descriptor.pxi +++ b/python/ray/includes/function_descriptor.pxi @@ -55,6 +55,10 @@ cdef class FunctionDescriptor: d[k] = v.__get__(self) return d + @property + def repr(self): + return self.__repr__() + FunctionDescriptor_constructor_map[EmptyFunctionDescriptorType] = \ EmptyFunctionDescriptor.from_cpp @@ -72,6 +76,10 @@ cdef class EmptyFunctionDescriptor(FunctionDescriptor): cdef from_cpp(const CFunctionDescriptor &c_function_descriptor): return EmptyFunctionDescriptor() + @property + def repr(self): + return "FunctionDescriptor(empty)" + FunctionDescriptor_constructor_map[JavaFunctionDescriptorType] = \ JavaFunctionDescriptor.from_cpp @@ -132,6 +140,10 @@ cdef class JavaFunctionDescriptor(FunctionDescriptor): """ return self.typed_descriptor.Signature() + @property + def repr(self): + return f"{self.class_name}.{self.function_name}" + FunctionDescriptor_constructor_map[PythonFunctionDescriptorType] = \ PythonFunctionDescriptor.from_cpp @@ -378,3 +390,7 @@ cdef class CppFunctionDescriptor(FunctionDescriptor): The class name of the function descriptor. """ return self.typed_descriptor.ClassName() + + @property + def repr(self): + return f"{self.class_name}::{self.function_name}" diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index d849cdc168937..97035f419be71 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -98,6 +98,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: CFunctionDescriptor ActorCreationTaskFunctionDescriptor() const c_string ExtensionData() const int MaxPendingCalls() const + int MaxTaskRetries() const cdef cppclass CCoreWorker "ray::core::CoreWorker": void ConnectToRaylet() @@ -130,6 +131,9 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: const CActorID &actor_id, const CRayFunction &function, const c_vector[unique_ptr[CTaskArg]] &args, const CTaskOptions &options, + int max_retries, + c_bool retry_exceptions, + c_string serialized_retry_exception_allowlist, c_vector[CObjectReference] &task_returns, const CTaskID current_task_id) CRayStatus KillActor( diff --git a/python/ray/serve/tests/test_proxy_state.py b/python/ray/serve/tests/test_proxy_state.py index 2704900b96a0e..bb34424c99889 100644 --- a/python/ray/serve/tests/test_proxy_state.py +++ b/python/ray/serve/tests/test_proxy_state.py @@ -640,7 +640,7 @@ def test_proxy_state_update_unhealthy_check_health_succeed(): @patch("ray.serve._private.proxy_state.PROXY_HEALTH_CHECK_TIMEOUT_S", 0) @patch("ray.serve._private.proxy_state.PROXY_HEALTH_CHECK_PERIOD_S", 0) def test_unhealthy_retry_correct_number_of_times(): - """Test the unhealthy retry logic retires the correct number of times. + """Test the unhealthy retry logic retries the correct number of times. When the health check fails 3 times (default retry threshold), the proxy state should change from HEALTHY to UNHEALTHY. diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index 2e31c67fad2dc..181428450ad18 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -374,6 +374,7 @@ py_test_module_list( py_test_module_list( files = [ "test_actor.py", + "test_actor_retry.py", "test_actor_failures.py", "test_actor_resources.py", "test_failure.py", diff --git a/python/ray/tests/test_actor_retry.py b/python/ray/tests/test_actor_retry.py new file mode 100644 index 0000000000000..db23d14b0c995 --- /dev/null +++ b/python/ray/tests/test_actor_retry.py @@ -0,0 +1,364 @@ +import os +import sys + +import pytest + +import ray + + +class MyError(Exception): + pass + + +@ray.remote +class Counter: + def __init__(self) -> None: + self.count = 0 + + def increment(self) -> int: + c = self.count + self.count += 1 + return c + + def get_count(self) -> int: + return self.count + + +# TODO: also do work for async and threaded actors +@ray.remote(max_task_retries=3) +class TroubleMaker: + @ray.method(_max_retries=5, retry_exceptions=[MyError]) + def may_raise_n_times(self, counter, n): + """ + Raises if there were n calls before this call. + Returns the number of calls before this call, if it's > n. + """ + c = ray.get(counter.increment.remote()) + print(f"may_raise_n_times, n = {n}, count = {c}") + if c < n: + print(f"method raises in {c} th call, want {n} times") + raise MyError() + return c + + @ray.method(retry_exceptions=[MyError]) + def raise_or_exit(self, counter, actions): + """ + Increments the counter and performs an action based on the param actions[count]. + If count >= len(actions), return the count. + + Note: This method doesn't set `_max_retries`. Ray expects it to inherit + max_task_retries = 3. + + @param actions: List["raise" | "exit"] + """ + + c = ray.get(counter.increment.remote()) + action = "return" if c >= len(actions) else actions[c] + print(f"raise_or_exit, action = {action}, count = {c}") + + if action == "raise": + raise MyError() + elif action == "exit": + sys.exit(1) + else: + return c + + +@ray.remote(max_task_retries=3) +class AsyncTroubleMaker: + """ + Same as TroubleMaker, just all methods are async. + """ + + @ray.method(_max_retries=5, retry_exceptions=[MyError]) + async def may_raise_n_times(self, counter, n): + c = ray.get(counter.increment.remote()) + print(f"may_raise_n_times, n = {n}, count = {c}") + if c < n: + print(f"method raises in {c} th call, want {n} times") + raise MyError() + return c + + @ray.method(retry_exceptions=[MyError]) + async def raise_or_exit(self, counter, actions): + c = await counter.increment.remote() + action = "return" if c >= len(actions) else actions[c] + print(f"raise_or_exit, action = {action}, count = {c}") + + if action == "raise": + raise MyError() + elif action == "exit": + # import signal + # sys.exit(1) -> hang + # ray.actor.exit_actor() -> failed, no retry + # os.kill(os.getpid(), signal.SIGTERM) -> ignored, continued to return + # os.kill(os.getpid(), signal.SIGKILL) -> retries + os._exit(0) + return -42 + else: + return c + + @ray.method(num_returns="streaming") # retry_exceptions=None aka False. + async def yield_or_raise(self, counter, actions): + while True: + c = await counter.increment.remote() + a = actions[c] + if isinstance(a, BaseException): + raise a + else: + yield a + if c == len(actions) - 1: + # don't over call counter. Only call #yield and #raise times. + return + + +def test_method_raise_5_times(shutdown_only): + counter = Counter.remote() + trouble_maker = TroubleMaker.remote() + assert ray.get(trouble_maker.may_raise_n_times.remote(counter, 5)) == 5 + assert ray.get(counter.get_count.remote()) == 6 + + +def test_method_raise_no_over_retry(shutdown_only): + counter = Counter.remote() + trouble_maker = TroubleMaker.remote() + with pytest.raises(MyError): + ray.get(trouble_maker.may_raise_n_times.remote(counter, 6)) + assert ray.get(counter.get_count.remote()) == 6 + + +def test_method_no_retry_without_retry_exceptions(shutdown_only): + counter = Counter.remote() + trouble_maker = TroubleMaker.remote() + with pytest.raises(MyError): + ray.get( + trouble_maker.may_raise_n_times.options(retry_exceptions=False).remote( + counter, 5 + ) + ) + assert ray.get(counter.get_count.remote()) == 1 + + +def test_generator_method_no_retry_without_retry_exceptions(shutdown_only): + counter = Counter.remote() + trouble_maker = AsyncTroubleMaker.remote() + + gen = trouble_maker.yield_or_raise.remote( + counter, + [ + # First round: 1 then raise + 1, + MyError(), + # No retry, no second round + 1, + 2, + ], + ) + assert ray.get(next(gen)) == 1 + with pytest.raises(MyError): + ray.get(next(gen)) + with pytest.raises(StopIteration): + ray.get(next(gen)) + assert ray.get(counter.get_count.remote()) == 2 + + +def test_generator_method_retry_exact_times(shutdown_only): + counter = Counter.remote() + trouble_maker = AsyncTroubleMaker.remote() + + # Should retry out max_task_retries=3 times + gen = trouble_maker.yield_or_raise.options(retry_exceptions=[MyError]).remote( + counter, + [ + # First round + 1, + MyError(), + # retry 1 + 1, + MyError(), + # retry 2 + 1, + MyError(), + # retry 3 + 1, + 2, + 3, + ], + ) + assert ray.get(next(gen)) == 1 + assert ray.get(next(gen)) == 2 + assert ray.get(next(gen)) == 3 + with pytest.raises(StopIteration): + ray.get(next(gen)) + assert ray.get(counter.get_count.remote()) == 9 + + +def test_generator_method_does_not_over_retry(shutdown_only): + counter = Counter.remote() + trouble_maker = AsyncTroubleMaker.remote() + + # Should retry out max_task_retries=3 times + gen = trouble_maker.yield_or_raise.options(retry_exceptions=[MyError]).remote( + counter, + [ + # First round + 1, + MyError(), + # retry 1 + 1, + MyError(), + # retry 2, + 1, + MyError(), + # retry 3 + 1, + MyError(), + # no retry 4! + 1, + 2, + ], + ) + assert ray.get(next(gen)) == 1 + with pytest.raises(MyError): + ray.get(next(gen)) + with pytest.raises(StopIteration): + ray.get(next(gen)) + assert ray.get(counter.get_count.remote()) == 8 + + +def test_options_takes_precedence(shutdown_only): + counter = Counter.remote() + trouble_maker = TroubleMaker.remote() + assert ( + ray.get( + trouble_maker.may_raise_n_times.options(_max_retries=10).remote(counter, 10) + ) + == 10 + ) + assert ray.get(counter.get_count.remote()) == 11 + + +def test_options_takes_precedence_no_over_retry(shutdown_only): + counter = Counter.remote() + trouble_maker = TroubleMaker.remote() + + with pytest.raises(MyError): + ray.get( + trouble_maker.may_raise_n_times.options(_max_retries=10).remote(counter, 11) + ) + assert ray.get(counter.get_count.remote()) == 11 + + +@pytest.mark.parametrize( + "actions", + [ + ["exit", "raise", "raise"], + ["raise", "exit", "raise"], + ["raise", "raise", "exit"], + ["raise", "raise", "raise"], + ], + ids=lambda lst: ",".join(lst), # test case show name +) +@pytest.mark.parametrize( + "is_async", [False, True], ids=lambda a: "async" if a else "sync" +) +@pytest.mark.parametrize("max_restarts", [-1, 4], ids=lambda r: f"max_restarts({r})") +@pytest.mark.parametrize("_max_retries", [-1, 4], ids=lambda r: f"_max_retries({r})") +def test_method_raise_and_exit( + actions, is_async, max_restarts, _max_retries, shutdown_only +): + """ + Test we can endure a mix of raises and exits. Note the number of exits we can endure + is subject to max_restarts. + + The retry behavior should work for Async actors. + The retry behavior should work if the _max_retries or max_restarts are -1 (infinite + retry). + """ + actor_class = AsyncTroubleMaker if is_async else TroubleMaker + counter = Counter.remote() + trouble_maker = actor_class.options(max_restarts=max_restarts).remote() + assert ( + ray.get( + trouble_maker.raise_or_exit.options(_max_retries=_max_retries).remote( + counter, actions + ) + ) + == 3 + ) + # 4 = 1 initial + 3 retries (with the 1 restart included) + assert ray.get(counter.get_count.remote()) == 4 + + +def test_method_exit_and_raise_no_over_retry(shutdown_only): + """ + Test we can endure a mix of raises and exits. Note the number of exits we can endure + is subject to max_restarts. + """ + counter = Counter.remote() + trouble_maker = TroubleMaker.options(max_restarts=1).remote() + with pytest.raises(MyError): + assert ray.get( + trouble_maker.raise_or_exit.options(_max_retries=2).remote( + counter, ["exit", "raise", "raise"] + ) + ) + assert ray.get(counter.get_count.remote()) == 3 + + +def test_method_exit_no_over_retry_max_restarts(shutdown_only): + """ + Even if we have enough _max_retries, we may still raise due to max_restarts. + """ + counter = Counter.remote() + trouble_maker = TroubleMaker.options(max_restarts=1).remote() + with pytest.raises(ray.exceptions.RayActorError): + assert ray.get( + trouble_maker.raise_or_exit.options(_max_retries=4).remote( + counter, ["raise", "exit", "exit"] + ) + ) + # 2 calls: 1 initial + 1 exception-retry + 1 exit-retry (then no more) + assert ray.get(counter.get_count.remote()) == 3 + + +@pytest.mark.parametrize( + "is_async", [False, True], ids=lambda a: "async" if a else "sync" +) +def test_exit_only(is_async, shutdown_only): + """ + Sanity testing: only do exit-retry works + """ + actor_class = AsyncTroubleMaker if is_async else TroubleMaker + counter = Counter.remote() + trouble_maker = actor_class.options(max_restarts=2).remote() + with pytest.raises(ray.exceptions.RayActorError): + ret = ray.get( + trouble_maker.raise_or_exit.options(_max_retries=2).remote( + counter, ["exit", "exit", "exit"] + ) + ) + print(f"should not print: ret = {ret}") + # 3 = 1 initial + 2 retries (with the 2 restarts included) + assert ray.get(counter.get_count.remote()) == 3 + + +def test_exit_only_no_over_retry(shutdown_only): + """ + Sanity testing: only do exit-retry works + """ + counter = Counter.remote() + trouble_maker = TroubleMaker.options(max_restarts=2).remote() + ray.get( + trouble_maker.raise_or_exit.options(_max_retries=2).remote( + counter, ["exit", "exit"] + ) + ) == 2 + assert ray.get(counter.get_count.remote()) == 3 + + +if __name__ == "__main__": + if os.environ.get("PARALLEL_CI"): + sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) + else: + sys.exit(pytest.main(["-sv", __file__])) diff --git a/python/ray/tests/test_chaos.py b/python/ray/tests/test_chaos.py index ca6d1cde47610..3cb7930702b28 100644 --- a/python/ray/tests/test_chaos.py +++ b/python/ray/tests/test_chaos.py @@ -263,7 +263,7 @@ def worker_to_kill(): time.sleep(3) # Run WorkerKillerActor to kill 3 tasks, and run remote "worker_to_kill" - # task with max_retires=3. 4 tasks in total (1 initial + 3 retries). + # task with max_retries=3. 4 tasks in total (1 initial + 3 retries). # First 3 tasks will be killed, the last retry will succeed. worker_killer = get_and_run_resource_killer( WorkerKillerActor, diff --git a/release/air_tests/air_benchmarks/workloads/xgboost_benchmark.py b/release/air_tests/air_benchmarks/workloads/xgboost_benchmark.py index bb38c21637106..92aab45dcaa64 100644 --- a/release/air_tests/air_benchmarks/workloads/xgboost_benchmark.py +++ b/release/air_tests/air_benchmarks/workloads/xgboost_benchmark.py @@ -127,6 +127,7 @@ def __call__(self, data: pd.DataFrame) -> Dict[str, np.ndarray]: batch_size=8192, compute=ray.data.ActorPoolStrategy(min_size=1, max_size=None), fn_constructor_kwargs={"model": model}, + batch_format="pandas", ) for _ in result.iter_batches(): diff --git a/src/ray/common/task/task_spec.cc b/src/ray/common/task/task_spec.cc index 47b0c8273022a..86f5c672e8f17 100644 --- a/src/ray/common/task/task_spec.cc +++ b/src/ray/common/task/task_spec.cc @@ -485,13 +485,13 @@ std::string TaskSpecification::DebugString() const { stream << ", task_id=" << TaskId() << ", task_name=" << GetName() << ", job_id=" << JobId() << ", num_args=" << NumArgs() - << ", num_returns=" << NumReturns() << ", depth=" << GetDepth() - << ", attempt_number=" << AttemptNumber(); + << ", num_returns=" << NumReturns() << ", max_retries=" << MaxRetries() + << ", depth=" << GetDepth() << ", attempt_number=" << AttemptNumber(); if (IsActorCreationTask()) { // Print actor creation task spec. stream << ", actor_creation_task_spec={actor_id=" << ActorCreationId() - << ", max_restarts=" << MaxActorRestarts() << ", max_retries=" << MaxRetries() + << ", max_restarts=" << MaxActorRestarts() << ", max_concurrency=" << MaxActorConcurrency() << ", is_asyncio_actor=" << IsAsyncioActor() << ", is_detached=" << IsDetachedActor() << "}"; @@ -499,9 +499,7 @@ std::string TaskSpecification::DebugString() const { // Print actor task spec. stream << ", actor_task_spec={actor_id=" << ActorId() << ", actor_caller_id=" << CallerId() << ", actor_counter=" << ActorCounter() - << "}"; - } else if (IsNormalTask()) { - stream << ", max_retries=" << MaxRetries(); + << ", retry_exceptions=" << ShouldRetryExceptions() << "}"; } // Print non-sensitive runtime env info. diff --git a/src/ray/common/task/task_util.h b/src/ray/common/task/task_util.h index 6e558cb163495..e9ce45a608d4a 100644 --- a/src/ray/common/task/task_util.h +++ b/src/ray/common/task/task_util.h @@ -263,10 +263,18 @@ class TaskSpecBuilder { /// See `common.proto` for meaning of the arguments. /// /// \return Reference to the builder object itself. - TaskSpecBuilder &SetActorTaskSpec(const ActorID &actor_id, - const ObjectID &actor_creation_dummy_object_id, - uint64_t actor_counter) { + TaskSpecBuilder &SetActorTaskSpec( + const ActorID &actor_id, + const ObjectID &actor_creation_dummy_object_id, + int max_retries, + bool retry_exceptions, + const std::string &serialized_retry_exception_allowlist, + uint64_t actor_counter) { message_->set_type(TaskType::ACTOR_TASK); + message_->set_max_retries(max_retries); + message_->set_retry_exceptions(retry_exceptions); + message_->set_serialized_retry_exception_allowlist( + serialized_retry_exception_allowlist); auto actor_spec = message_->mutable_actor_task_spec(); actor_spec->set_actor_id(actor_id.Binary()); actor_spec->set_actor_creation_dummy_object_id( diff --git a/src/ray/core_worker/actor_handle.cc b/src/ray/core_worker/actor_handle.cc index 12eaa5bd1254e..2003855e11f97 100644 --- a/src/ray/core_worker/actor_handle.cc +++ b/src/ray/core_worker/actor_handle.cc @@ -118,13 +118,23 @@ ActorHandle::ActorHandle(const rpc::ActorTableData &actor_table_data, const rpc::TaskSpec &task_spec) : ActorHandle(CreateInnerActorHandleFromActorData(actor_table_data, task_spec)) {} -void ActorHandle::SetActorTaskSpec(TaskSpecBuilder &builder, const ObjectID new_cursor) { +void ActorHandle::SetActorTaskSpec( + TaskSpecBuilder &builder, + const ObjectID new_cursor, + int max_retries, + bool retry_exceptions, + const std::string &serialized_retry_exception_allowlist) { absl::MutexLock guard(&mutex_); // Build actor task spec. const TaskID actor_creation_task_id = TaskID::ForActorCreationTask(GetActorID()); const ObjectID actor_creation_dummy_object_id = ObjectID::FromIndex(actor_creation_task_id, /*index=*/1); - builder.SetActorTaskSpec(GetActorID(), actor_creation_dummy_object_id, task_counter_++); + builder.SetActorTaskSpec(GetActorID(), + actor_creation_dummy_object_id, + max_retries, + retry_exceptions, + serialized_retry_exception_allowlist, + task_counter_++); } void ActorHandle::SetResubmittedActorTaskSpec(TaskSpecification &spec, diff --git a/src/ray/core_worker/actor_handle.h b/src/ray/core_worker/actor_handle.h index 209fc8c149280..baa81aade02ea 100644 --- a/src/ray/core_worker/actor_handle.h +++ b/src/ray/core_worker/actor_handle.h @@ -76,7 +76,11 @@ class ActorHandle { /// \param[in] builder Task spec builder. /// \param[in] new_cursor Actor dummy object. This is legacy code needed for /// raylet-based actor restart. - void SetActorTaskSpec(TaskSpecBuilder &builder, const ObjectID new_cursor); + void SetActorTaskSpec(TaskSpecBuilder &builder, + const ObjectID new_cursor, + int max_retries, + bool retry_exceptions, + const std::string &serialized_retry_exception_allowlist); /// Reset the actor task spec fields of an existing task so that the task can /// be re-executed. diff --git a/src/ray/core_worker/actor_manager.cc b/src/ray/core_worker/actor_manager.cc index a78ec52571842..0d1dbf8e1c097 100644 --- a/src/ray/core_worker/actor_manager.cc +++ b/src/ray/core_worker/actor_manager.cc @@ -41,7 +41,7 @@ ActorID ActorManager::RegisterActorHandle(std::unique_ptr actor_han return actor_id; } -std::shared_ptr ActorManager::GetActorHandle(const ActorID &actor_id) { +std::shared_ptr ActorManager::GetActorHandle(const ActorID &actor_id) const { absl::MutexLock lock(&mutex_); auto it = actor_handles_.find(actor_id); RAY_CHECK(it != actor_handles_.end()) diff --git a/src/ray/core_worker/actor_manager.h b/src/ray/core_worker/actor_manager.h index 53dec9a3ff6df..373bf11fdd39f 100644 --- a/src/ray/core_worker/actor_manager.h +++ b/src/ray/core_worker/actor_manager.h @@ -67,7 +67,7 @@ class ActorManager { /// \param[in] actor_id The actor handle to get. /// \return reference to the actor_handle's pointer. /// NOTE: Returned actorHandle should not be stored anywhere. - std::shared_ptr GetActorHandle(const ActorID &actor_id); + std::shared_ptr GetActorHandle(const ActorID &actor_id) const; /// Get actor handle by name. /// We cache pair after getting the named actor from GCS, so that it can use diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index e5a5e6b94a297..b06aafd51939b 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -374,7 +374,10 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ RAY_CHECK_OK(PutInLocalPlasmaStore(object, object_id, /*pin_object=*/true)); }, /* retry_task_callback= */ - [this](TaskSpecification &spec, bool object_recovery, uint32_t delay_ms) { + [this](TaskSpecification &spec, + bool object_recovery, + bool update_seqno, + uint32_t delay_ms) { spec.GetMutableMessage().set_attempt_number(spec.AttemptNumber() + 1); if (!object_recovery) { // Retry after a delay to emulate the existing Raylet reconstruction @@ -382,12 +385,14 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ RAY_LOG(INFO) << "Will resubmit task after a " << delay_ms << "ms delay: " << spec.DebugString(); absl::MutexLock lock(&mutex_); - TaskToRetry task_to_retry{current_time_ms() + delay_ms, spec}; + TaskToRetry task_to_retry{current_time_ms() + delay_ms, spec, update_seqno}; to_resubmit_.push(std::move(task_to_retry)); } else { if (spec.IsActorTask()) { - auto actor_handle = actor_manager_->GetActorHandle(spec.ActorId()); - actor_handle->SetResubmittedActorTaskSpec(spec, spec.ActorDummyObject()); + if (update_seqno) { + auto actor_handle = actor_manager_->GetActorHandle(spec.ActorId()); + actor_handle->SetResubmittedActorTaskSpec(spec, spec.ActorDummyObject()); + } RAY_CHECK_OK(direct_actor_submitter_->SubmitTask(spec)); } else { RAY_CHECK_OK(direct_task_submitter_->SubmitTask(spec)); @@ -981,18 +986,23 @@ void CoreWorker::ExitIfParentRayletDies() { void CoreWorker::InternalHeartbeat() { // Retry tasks. - std::vector tasks_to_resubmit; + std::vector tasks_to_resubmit; { absl::MutexLock lock(&mutex_); while (!to_resubmit_.empty() && current_time_ms() > to_resubmit_.top().execution_time_ms) { - tasks_to_resubmit.push_back(std::move(to_resubmit_.top().task_spec)); + tasks_to_resubmit.push_back(std::move(to_resubmit_.top())); to_resubmit_.pop(); } } - for (auto &spec : tasks_to_resubmit) { + for (auto &task_to_retry : tasks_to_resubmit) { + auto &spec = task_to_retry.task_spec; if (spec.IsActorTask()) { + if (task_to_retry.update_seqno) { + auto actor_handle = actor_manager_->GetActorHandle(spec.ActorId()); + actor_handle->SetResubmittedActorTaskSpec(spec, spec.ActorDummyObject()); + } RAY_CHECK_OK(direct_actor_submitter_->SubmitTask(spec)); } else { RAY_CHECK_OK(direct_task_submitter_->SubmitTask(spec)); @@ -2200,12 +2210,16 @@ Status CoreWorker::WaitPlacementGroupReady(const PlacementGroupID &placement_gro } } -Status CoreWorker::SubmitActorTask(const ActorID &actor_id, - const RayFunction &function, - const std::vector> &args, - const TaskOptions &task_options, - std::vector &task_returns, - const TaskID current_task_id) { +Status CoreWorker::SubmitActorTask( + const ActorID &actor_id, + const RayFunction &function, + const std::vector> &args, + const TaskOptions &task_options, + int max_retries, + bool retry_exceptions, + const std::string &serialized_retry_exception_allowlist, + std::vector &task_returns, + const TaskID current_task_id) { absl::ReleasableMutexLock lock(&actor_task_mutex_); task_returns.clear(); if (!direct_actor_submitter_->CheckActorExists(actor_id)) { @@ -2271,7 +2285,11 @@ Status CoreWorker::SubmitActorTask(const ActorID &actor_id, // NOTE: placement_group_capture_child_tasks and runtime_env will // be ignored in the actor because we should always follow the actor's option. - actor_handle->SetActorTaskSpec(builder, ObjectID::Nil()); + actor_handle->SetActorTaskSpec(builder, + ObjectID::Nil(), + max_retries, + retry_exceptions, + serialized_retry_exception_allowlist); // Submit task. TaskSpecification task_spec = builder.Build(); RAY_LOG(DEBUG) << "Submitting actor task " << task_spec.DebugString(); @@ -2285,7 +2303,7 @@ Status CoreWorker::SubmitActorTask(const ActorID &actor_id, returned_refs = ExecuteTaskLocalMode(task_spec, actor_id); } else { returned_refs = task_manager_->AddPendingTask( - rpc_address_, task_spec, CurrentCallSite(), actor_handle->MaxTaskRetries()); + rpc_address_, task_spec, CurrentCallSite(), max_retries); RAY_CHECK_OK(direct_actor_submitter_->SubmitTask(task_spec)); } diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 463296303655c..a8901345f1798 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -265,6 +265,9 @@ struct TaskToRetry { /// The details of the task. TaskSpecification task_spec; + + /// Updates the actor seqno if true. + bool update_seqno; }; /// Sorts TaskToRetry in descending order of the execution time. @@ -838,7 +841,8 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// \param[in] function The remote function to execute. /// \param[in] args Arguments of this task. /// \param[in] task_options Options for this task. - /// \param[in] max_retires max number of retry when the task fails. + /// \param[in] max_retries max number of retry when the task fails. + /// \param[in] retry_exceptions whether a user exception/error is eligible to retry. /// \param[in] scheduling_strategy Strategy about how to schedule the task. /// \param[in] debugger_breakpoint breakpoint to drop into for the debugger after this /// task starts executing, or "" if we do not want to drop into the debugger. @@ -847,7 +851,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// that serves as an allowlist of frontend-language exceptions/errors that should be /// retried. Default is an empty string, which will be treated as an allow-all in the /// language worker. - /// param[in] current_task_id The current task_id that submits the task. + /// \param[in] current_task_id The current task_id that submits the task. /// If Nil() is given, it will be automatically propagated from worker_context. /// This is used when worker_context cannot reliably obtain the curernt task_id /// i.e., Python async actors. @@ -917,6 +921,10 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// \param[in] function The remote function to execute. /// \param[in] args Arguments of this task. /// \param[in] task_options Options for this task. + /// \param[in] max_retries max number of retry when the task fails. + /// \param[in] serialized_retry_exception_allowlist A serialized exception list + /// that serves as an allowlist of frontend-language exceptions/errors that should be + /// retried. Empty string means an allow-all in the language worker. /// \param[out] task_returns The object returned by this task /// param[in] current_task_id The current task_id that submits the task. /// If Nil() is given, it will be automatically propagated from worker_context. @@ -928,6 +936,9 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { const RayFunction &function, const std::vector> &args, const TaskOptions &task_options, + int max_retries, + bool retry_exceptions, + const std::string &serialized_retry_exception_allowlist, std::vector &task_returns, const TaskID current_task_id = TaskID::Nil()); diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc b/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc index 5ba99ec481952..3b5a7ba1b3805 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc @@ -442,9 +442,25 @@ Java_io_ray_runtime_task_NativeTaskSubmitter_nativeSubmitActorTask( auto task_args = ToTaskArgs(env, args); RAY_CHECK(callOptions != nullptr); auto task_options = ToTaskOptions(env, numReturns, callOptions); + + // NOTE: An actor method call from Java ActorHandle only recognizes the actor's + // max_task_retries. It does NOT recognize per-method max_retries. It also only retries + // on actor death, not on user exceptions. The max_task_retries is read from CoreWorker. + // TODO: support Java max_retries and retry_exceptions. + const auto native_actor_handle = + CoreWorkerProcess::GetCoreWorker().GetActorHandle(actor_id); + int max_retries = native_actor_handle->MaxTaskRetries(); + std::vector return_refs; auto status = CoreWorkerProcess::GetCoreWorker().SubmitActorTask( - actor_id, ray_function, task_args, task_options, return_refs); + actor_id, + ray_function, + task_args, + task_options, + max_retries, + /*retry_exceptions=*/false, + /*serialized_retry_exception_allowlist=*/"", + return_refs); if (!status.ok()) { std::stringstream ss; ss << "The task " << ray_function.GetFunctionDescriptor()->ToString() diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index 97c4e36f84806..d314759773852 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -335,7 +335,10 @@ bool TaskManager::ResubmitTask(const TaskID &task_id, std::vector *tas RAY_LOG(INFO) << "Resubmitting task that produced lost plasma object, attempt #" << spec.AttemptNumber() << ": " << spec.DebugString(); - retry_task_callback_(spec, /*object_recovery*/ true, /*delay_ms*/ 0); + // We should actually detect if the actor for this task is dead, but let's just assume + // it's not for now. + retry_task_callback_( + spec, /*object_recovery*/ true, /*update_seqno=*/true, /*delay_ms*/ 0); } return true; @@ -895,6 +898,7 @@ bool TaskManager::RetryTaskIfPossible(const TaskID &task_id, int32_t num_retries_left = 0; int32_t num_oom_retries_left = 0; bool task_failed_due_to_oom = error_info.error_type() == rpc::ErrorType::OUT_OF_MEMORY; + bool actor_died = error_info.error_type() == rpc::ErrorType::ACTOR_DIED; { absl::MutexLock lock(&mu_); auto it = submissible_tasks_.find(task_id); @@ -947,7 +951,11 @@ bool TaskManager::RetryTaskIfPossible(const TaskID &task_id, spec.AttemptNumber(), RayConfig::instance().task_oom_retry_delay_base_ms()) : RayConfig::instance().task_retry_delay_ms(); - retry_task_callback_(spec, /*object_recovery*/ false, delay_ms); + // If actor is not dead, we should update the seq no. If an actor is dead and + // restarted, the seqno is reset, and we don't need to update it when resubmitting a + // task. + retry_task_callback_( + spec, /*object_recovery*/ false, /*update_seqno=*/!actor_died, delay_ms); return true; } else { RAY_LOG(INFO) << "No retries left for task " << spec.TaskId() @@ -1039,7 +1047,9 @@ bool TaskManager::FailOrRetryPendingTask(const TaskID &task_id, // loudly with ERROR here. RAY_LOG(DEBUG) << "Task attempt " << task_id << " failed with error " << rpc::ErrorType_Name(error_type) << " Fail immediately? " - << fail_immediately; + << fail_immediately << ", status " << *status << ", error info " + << (ray_error_info == nullptr ? "nullptr" + : ray_error_info->DebugString()); bool will_retry = false; if (!fail_immediately) { will_retry = RetryTaskIfPossible( @@ -1318,7 +1328,8 @@ void TaskManager::MarkTaskWaitingForExecution(const TaskID &task_id, if (it == submissible_tasks_.end()) { return; } - RAY_CHECK(it->second.GetStatus() == rpc::TaskStatus::PENDING_NODE_ASSIGNMENT); + RAY_CHECK(it->second.GetStatus() == rpc::TaskStatus::PENDING_NODE_ASSIGNMENT) + << ", task ID = " << it->first << ", status = " << it->second.GetStatus(); it->second.SetNodeId(node_id); it->second.SetStatus(rpc::TaskStatus::SUBMITTED_TO_WORKER); RecordTaskStatusEvent(it->second.spec.AttemptNumber(), diff --git a/src/ray/core_worker/task_manager.h b/src/ray/core_worker/task_manager.h index c151a24c16528..02efc6a0cd83e 100644 --- a/src/ray/core_worker/task_manager.h +++ b/src/ray/core_worker/task_manager.h @@ -79,8 +79,8 @@ class TaskResubmissionInterface { using TaskStatusCounter = CounterMap>; using PutInLocalPlasmaCallback = std::function; -using RetryTaskCallback = - std::function; +using RetryTaskCallback = std::function; using ReconstructObjectCallback = std::function; using PushErrorCallback = std::functionsecond.rpc_client) { @@ -453,6 +454,7 @@ void CoreWorkerDirectActorTaskSubmitter::ResendOutOfOrderTasks(const ActorID &ac RAY_CHECK(!client_queue.worker_id.empty()); auto out_of_order_completed_tasks = client_queue.actor_submit_queue->PopAllOutOfOrderCompletedTasks(); + for (const auto &completed_task : out_of_order_completed_tasks) { // Making a copy here because we are flipping a flag and the original value is // const. @@ -530,6 +532,7 @@ void CoreWorkerDirectActorTaskSubmitter::HandlePushTaskReply( const auto actor_id = task_spec.ActorId(); const auto actor_counter = task_spec.ActorCounter(); const auto task_skipped = task_spec.GetMessage().skip_execution(); + const bool is_retryable_exception = status.ok() && reply.is_retryable_error(); /// Whether or not we will retry this actor task. auto will_retry = false; @@ -538,7 +541,9 @@ void CoreWorkerDirectActorTaskSubmitter::HandlePushTaskReply( // reply for a previously completed task. We are not calling CompletePendingTask // because the tasks are pushed directly to the actor, not placed on any queues // in task_finisher_. - } else if (status.ok()) { + } else if (status.ok() && !is_retryable_exception) { + // status.ok() means the worker completed the reply, either succeeded or with a + // retryable failure (e.g. user exceptions). We complete only on non-retryable case. task_finisher_.CompletePendingTask( task_id, reply, addr, reply.is_application_error()); } else if (status.IsSchedulingCancelled()) { @@ -559,7 +564,12 @@ void CoreWorkerDirectActorTaskSubmitter::HandlePushTaskReply( bool fail_immediatedly = false; rpc::ErrorType error_type; rpc::RayErrorInfo error_info; - { + if (status.ok()) { + // retryable user exception. + RAY_CHECK(is_retryable_exception); + error_type = rpc::ErrorType::TASK_EXECUTION_EXCEPTION; + error_info = gcs::GetRayErrorInfo(error_type, reply.task_execution_error()); + } else { // push task failed due to network error. For example, actor is dead // and no process response for the push task. absl::MutexLock lock(&mu_); @@ -589,12 +599,18 @@ void CoreWorkerDirectActorTaskSubmitter::HandlePushTaskReply( &error_info, /*mark_task_object_failed*/ is_actor_dead, fail_immediatedly); - if (!is_actor_dead && !will_retry) { - // No retry == actor is dead. - // If actor is not dead yet, wait for the grace period until we mark the - // return object as failed. - if (RayConfig::instance().timeout_ms_task_wait_for_death_info() != 0) { + // Ran out of retries, last failure = either user exception or actor death. + if (status.ok()) { + // last failure = user exception, just complete it with failure. + RAY_CHECK(reply.is_retryable_error()); + + GetTaskFinisherWithoutMu().CompletePendingTask( + task_id, reply, addr, reply.is_application_error()); + } + // last failure = Actor death, but we still see the actor "alive" so we optionally + // wait for a grace period for the death info. + else if (RayConfig::instance().timeout_ms_task_wait_for_death_info() != 0) { int64_t death_info_grace_period_ms = current_time_ms() + RayConfig::instance().timeout_ms_task_wait_for_death_info(); @@ -626,8 +642,11 @@ void CoreWorkerDirectActorTaskSubmitter::HandlePushTaskReply( auto queue_pair = client_queues_.find(actor_id); RAY_CHECK(queue_pair != client_queues_.end()); auto &queue = queue_pair->second; - if (!will_retry) { - queue.actor_submit_queue->MarkTaskCompleted(actor_counter, task_spec); + // Every seqno for the actor_submit_queue must be MarkSeqnoCompleted. + // On exception-retry we update the seqno so we need to call; + // On exception's or actor's last try we also need to call. + if ((!will_retry) || is_retryable_exception) { + queue.actor_submit_queue->MarkSeqnoCompleted(actor_counter, task_spec); } queue.cur_pending_calls--; } diff --git a/src/ray/core_worker/transport/direct_actor_task_submitter.h b/src/ray/core_worker/transport/direct_actor_task_submitter.h index d861a7161578a..ba1bcb9221068 100644 --- a/src/ray/core_worker/transport/direct_actor_task_submitter.h +++ b/src/ray/core_worker/transport/direct_actor_task_submitter.h @@ -365,9 +365,14 @@ class CoreWorkerDirectActorTaskSubmitter /// Resend all previously-received, out-of-order, received tasks for an actor. /// When sending these tasks, the tasks will have the flag skip_execution=true. /// + /// This is useful because we want the replies to be in-order. For the out of order + /// completed tasks we resend them to the new restarted actor with skip_execution=True + /// and expect those tasks replies to fill the seqno. + /// /// \param[in] actor_id Actor ID. /// \return Void. - void ResendOutOfOrderTasks(const ActorID &actor_id) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); + void ResendOutOfOrderCompletedTasks(const ActorID &actor_id) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); /// Disconnect the RPC client for an actor. void DisconnectRpcClient(ClientQueue &queue) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); diff --git a/src/ray/core_worker/transport/out_of_order_actor_submit_queue.cc b/src/ray/core_worker/transport/out_of_order_actor_submit_queue.cc index 71bfaf0126d36..33572d98b7461 100644 --- a/src/ray/core_worker/transport/out_of_order_actor_submit_queue.cc +++ b/src/ray/core_worker/transport/out_of_order_actor_submit_queue.cc @@ -101,8 +101,8 @@ uint64_t OutofOrderActorSubmitQueue::GetSequenceNumber( return task_spec.ActorCounter(); } -void OutofOrderActorSubmitQueue::MarkTaskCompleted(uint64_t position, - const TaskSpecification &task_spec) {} +void OutofOrderActorSubmitQueue::MarkSeqnoCompleted(uint64_t position, + const TaskSpecification &task_spec) {} } // namespace core } // namespace ray diff --git a/src/ray/core_worker/transport/out_of_order_actor_submit_queue.h b/src/ray/core_worker/transport/out_of_order_actor_submit_queue.h index 553074c52c7b8..70b740a79b046 100644 --- a/src/ray/core_worker/transport/out_of_order_actor_submit_queue.h +++ b/src/ray/core_worker/transport/out_of_order_actor_submit_queue.h @@ -67,7 +67,7 @@ class OutofOrderActorSubmitQueue : public IActorSubmitQueue { /// This is ignored by the receivier but only for debugging purpose. uint64_t GetSequenceNumber(const TaskSpecification &task_spec) const override; /// Mark a task has been executed on the receiver side. - void MarkTaskCompleted(uint64_t position, const TaskSpecification &task_spec) override; + void MarkSeqnoCompleted(uint64_t position, const TaskSpecification &task_spec) override; private: ActorID kActorId; diff --git a/src/ray/core_worker/transport/sequential_actor_submit_queue.cc b/src/ray/core_worker/transport/sequential_actor_submit_queue.cc index fa6a8ce5baa26..2bb71a84f84b8 100644 --- a/src/ray/core_worker/transport/sequential_actor_submit_queue.cc +++ b/src/ray/core_worker/transport/sequential_actor_submit_queue.cc @@ -101,8 +101,8 @@ uint64_t SequentialActorSubmitQueue::GetSequenceNumber( return task_spec.ActorCounter() - caller_starts_at; } -void SequentialActorSubmitQueue::MarkTaskCompleted(uint64_t sequence_no, - const TaskSpecification &task_spec) { +void SequentialActorSubmitQueue::MarkSeqnoCompleted(uint64_t sequence_no, + const TaskSpecification &task_spec) { // Try to increment queue.next_task_reply_position consecutively until we // cannot. In the case of tasks not received in order, the following block // ensure queue.next_task_reply_position are incremented to the max possible diff --git a/src/ray/core_worker/transport/sequential_actor_submit_queue.h b/src/ray/core_worker/transport/sequential_actor_submit_queue.h index 2a3919c6cd349..4c787de73fd0a 100644 --- a/src/ray/core_worker/transport/sequential_actor_submit_queue.h +++ b/src/ray/core_worker/transport/sequential_actor_submit_queue.h @@ -63,8 +63,8 @@ class SequentialActorSubmitQueue : public IActorSubmitQueue { /// Get the task's sequence number according to the internal offset. uint64_t GetSequenceNumber(const TaskSpecification &task_spec) const override; /// Mark a task has been executed on the receiver side. - void MarkTaskCompleted(uint64_t sequence_no, - const TaskSpecification &task_spec) override; + void MarkSeqnoCompleted(uint64_t sequence_no, + const TaskSpecification &task_spec) override; private: /// The ID of the actor. @@ -109,6 +109,11 @@ class SequentialActorSubmitQueue : public IActorSubmitQueue { /// /// The send position of the next task to send to this actor. This sequence /// number increases monotonically. + /// + /// If a task raised a retryable user exception, it's marked as "completed" via + /// `MarkSeqnoCompleted` and `next_task_reply_position` may be updated. Afterwards Ray + /// retries by creating another task pushed to the back of the queue, making it executes + /// later than all tasks pending in the queue. uint64_t next_send_position = 0; /// The offset at which the the actor should start its counter for this /// caller. This is used for actors that can be restarted, so that the new