From 0bb014caf0e356863aad0cf9ccf2355d528cd17e Mon Sep 17 00:00:00 2001 From: Catch-Bull Date: Mon, 7 Mar 2022 19:54:35 +0800 Subject: [PATCH 01/11] change runtime env protocol between user code and core_worker --- python/ray/_raylet.pyx | 8 +- python/ray/actor.py | 75 ++++++++----------- python/ray/job_config.py | 28 ++++--- python/ray/remote_function.py | 57 ++++++++------ python/ray/runtime_env.py | 39 +++++++++- src/ray/common/runtime_env_common.cc | 4 + src/ray/common/runtime_env_common.h | 5 ++ src/ray/common/task/task_util.h | 9 +-- src/ray/core_worker/common.h | 20 +++-- src/ray/core_worker/core_worker.cc | 108 +++++++++++++++++---------- src/ray/core_worker/core_worker.h | 7 +- 11 files changed, 219 insertions(+), 141 deletions(-) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 75591cb45b77d..bd0ac678cd831 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -1495,7 +1495,7 @@ cdef class CoreWorker: c_bool retry_exceptions, scheduling_strategy, c_string debugger_breakpoint, - c_string serialized_runtime_env, + c_string serialized_runtime_env_info, ): cdef: unordered_map[c_string, double] c_resources @@ -1523,7 +1523,7 @@ cdef class CoreWorker: ray_function, args_vector, CTaskOptions( name, num_returns, c_resources, b"", - serialized_runtime_env), + serialized_runtime_env_info), max_retries, retry_exceptions, c_scheduling_strategy, debugger_breakpoint) @@ -1555,7 +1555,7 @@ cdef class CoreWorker: c_string ray_namespace, c_bool is_asyncio, c_string extension_data, - c_string serialized_runtime_env, + c_string serialized_runtime_env_info, concurrency_groups_dict, int32_t max_pending_calls, scheduling_strategy, @@ -1600,7 +1600,7 @@ cdef class CoreWorker: ray_namespace, is_asyncio, c_scheduling_strategy, - serialized_runtime_env, + serialized_runtime_env_info, c_concurrency_groups, # execute out of order for # async or threaded actors. diff --git a/python/ray/actor.py b/python/ray/actor.py index 4a462ee6891e4..bfc4a55377ae2 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -1,11 +1,12 @@ import inspect import logging import weakref +from typing import Union, Optional import ray.ray_constants as ray_constants import ray._raylet import ray._private.signature as signature -from ray.runtime_env import RuntimeEnv +from ray.runtime_env import RuntimeEnv, get_runtime_env_info import ray.worker from ray.util.annotations import PublicAPI from ray.util.placement_group import configure_placement_group_based_on_context @@ -405,6 +406,25 @@ def __call__(self, *args, **kwargs): f"use '{self.__ray_metadata__.class_name}.remote()'." ) + def _parse_runtime_env(runtime_env: Optional[Union[dict, RuntimeEnv]]): + # Parse local pip/conda config files here. If we instead did it in + # .remote(), it would get run in the Ray Client server, which runs on + # a remote node where the files aren't available. + if runtime_env: + if isinstance(runtime_env, RuntimeEnv): + return runtime_env + elif isinstance(runtime_env, dict): + return RuntimeEnv(**(runtime_env or {})) + raise TypeError( + "runtime_env must be dict or RuntimeEnv, ", + f"but got: {type(runtime_env)}", + ) + else: + # Keep the new_runtime_env as None. In .remote(), we need to know + # if runtime_env is None to know whether or not to fall back to the + # runtime_env specified in the @ray.remote decorator. + return None + @classmethod def _ray_from_modified_class( cls, @@ -464,16 +484,7 @@ def __init__(self, *args, **kwargs): modified_class.__ray_actor_class__ ) - # Parse local pip/conda config files here. If we instead did it in - # .remote(), it would get run in the Ray Client server, which runs on - # a remote node where the files aren't available. - if runtime_env: - if isinstance(runtime_env, str): - new_runtime_env = runtime_env - else: - new_runtime_env = RuntimeEnv(**runtime_env).serialize() - else: - new_runtime_env = None + new_runtime_env = self._parse_runtime_env(runtime_env) self.__ray_metadata__ = ActorClassMetadata( Language.PYTHON, @@ -512,16 +523,7 @@ def _ray_from_function_descriptor( ): self = ActorClass.__new__(ActorClass) - # Parse local pip/conda config files here. If we instead did it in - # .remote(), it would get run in the Ray Client server, which runs on - # a remote node where the files aren't available. - if runtime_env: - if isinstance(runtime_env, str): - new_runtime_env = runtime_env - else: - new_runtime_env = RuntimeEnv(**runtime_env).serialize() - else: - new_runtime_env = None + new_runtime_env = self._parse_runtime_env(runtime_env) self.__ray_metadata__ = ActorClassMetadata( language, @@ -600,19 +602,7 @@ def method(self): actor_cls = self - # Parse local pip/conda config files here. If we instead did it in - # .remote(), it would get run in the Ray Client server, which runs on - # a remote node where the files aren't available. - if runtime_env: - if isinstance(runtime_env, str): - new_runtime_env = runtime_env - else: - new_runtime_env = RuntimeEnv(**(runtime_env or {})).serialize() - else: - # Keep the new_runtime_env as None. In .remote(), we need to know - # if runtime_env is None to know whether or not to fall back to the - # runtime_env specified in the @ray.remote decorator. - new_runtime_env = None + new_runtime_env = self._parse_runtime_env(runtime_env) cls_options = dict( num_cpus=num_cpus, @@ -966,15 +956,16 @@ def _remote( scheduling_strategy = "DEFAULT" if runtime_env: - if isinstance(runtime_env, str): - # Serialzed protobuf runtime env from Ray client. - new_runtime_env = runtime_env - elif isinstance(runtime_env, RuntimeEnv): - new_runtime_env = runtime_env.serialize() - else: - raise TypeError(f"Error runtime env type {type(runtime_env)}") + new_runtime_env = self._parse_runtime_env(runtime_env) else: new_runtime_env = meta.runtime_env + serialized_runtime_env_info = None + if new_runtime_env is not None: + serialized_runtime_env_info = get_runtime_env_info( + new_runtime_env, + is_job_runtime_env=False, + serialize=True, + ) concurrency_groups_dict = {} for cg_name in meta.concurrency_groups: @@ -1021,7 +1012,7 @@ def _remote( is_asyncio, # Store actor_method_cpu in actor handle's extension data. extension_data=str(actor_method_cpu), - serialized_runtime_env=new_runtime_env or "{}", + serialized_runtime_env_info=serialized_runtime_env_info or "{}", concurrency_groups_dict=concurrency_groups_dict or dict(), max_pending_calls=max_pending_calls, scheduling_strategy=scheduling_strategy, diff --git a/python/ray/job_config.py b/python/ray/job_config.py index d7a32b4c280f5..368922bcb43f1 100644 --- a/python/ray/job_config.py +++ b/python/ray/job_config.py @@ -65,7 +65,7 @@ def set_runtime_env( """ self.runtime_env = runtime_env if runtime_env is not None else {} if validate: - self.runtime_env = self._validate_runtime_env()[0] + self.runtime_env = self._validate_runtime_env() self._cached_pb = None def set_ray_namespace(self, ray_namespace: str) -> None: @@ -91,15 +91,17 @@ def _validate_runtime_env(self): # this dependency and pass in a validated runtime_env instead. from ray.runtime_env import RuntimeEnv - eager_install = self.runtime_env.get("eager_install", True) - if not isinstance(eager_install, bool): - raise TypeError("eager_install must be a boolean.") if isinstance(self.runtime_env, RuntimeEnv): - return self.runtime_env, eager_install - return RuntimeEnv(**self.runtime_env), eager_install + return self.runtime_env + return RuntimeEnv(**self.runtime_env) def get_proto_job_config(self): """Return the protobuf structure of JobConfig.""" + # TODO(edoakes): this is really unfortunate, but JobConfig is imported + # all over the place so this causes circular imports. We should remove + # this dependency and pass in a validated runtime_env instead. + from ray.runtime_env import get_runtime_env_info + if self._cached_pb is None: pb = gcs_utils.JobConfig() if self.ray_namespace is None: @@ -112,10 +114,12 @@ def get_proto_job_config(self): for k, v in self.metadata.items(): pb.metadata[k] = v - parsed_env, eager_install = self._validate_runtime_env() - pb.runtime_env_info.uris[:] = parsed_env.get_uris() - pb.runtime_env_info.serialized_runtime_env = parsed_env.serialize() - pb.runtime_env_info.runtime_env_eager_install = eager_install + parsed_env = self._validate_runtime_env() + pb.runtime_env_info = get_runtime_env_info( + parsed_env, + is_job_runtime_env=True, + serialize=False, + ) if self._default_actor_lifetime is not None: pb.default_actor_lifetime = self._default_actor_lifetime @@ -125,11 +129,11 @@ def get_proto_job_config(self): def runtime_env_has_uris(self): """Whether there are uris in runtime env or not""" - return self._validate_runtime_env()[0].has_uris() + return self._validate_runtime_env().has_uris() def get_serialized_runtime_env(self) -> str: """Return the JSON-serialized parsed runtime env dict""" - return self._validate_runtime_env()[0].serialize() + return self._validate_runtime_env().serialize() @classmethod def from_json(cls, job_config_json): diff --git a/python/ray/remote_function.py b/python/ray/remote_function.py index 65b736c6b812b..8b8e1b631d362 100644 --- a/python/ray/remote_function.py +++ b/python/ray/remote_function.py @@ -2,6 +2,7 @@ import inspect import logging import uuid +from typing import Union, Optional from ray import cloudpickle as pickle from ray.util.scheduling_strategies import ( @@ -14,7 +15,7 @@ from ray._private.client_mode_hook import client_mode_should_convert from ray.util.placement_group import configure_placement_group_based_on_context import ray._private.signature -from ray.runtime_env import RuntimeEnv +from ray.runtime_env import RuntimeEnv, get_runtime_env_info from ray.util.tracing.tracing_helper import ( _tracing_task_invocation, _inject_tracing_into_function, @@ -139,16 +140,9 @@ def __init__( if retry_exceptions is None else retry_exceptions ) - # Parse local pip/conda config files here. If we instead did it in - # .remote(), it would get run in the Ray Client server, which runs on - # a remote node where the files aren't available. - if runtime_env: - if isinstance(runtime_env, str): - self._runtime_env = runtime_env - else: - self._runtime_env = RuntimeEnv(**(runtime_env or {})).serialize() - else: - self._runtime_env = None + + self._runtime_env = self._parse_runtime_env(runtime_env) + self._placement_group = placement_group self._decorator = getattr(function, "__ray_invocation_decorator__", None) self._function_signature = ray._private.signature.extract_signature( @@ -166,6 +160,25 @@ def _remote_proxy(*args, **kwargs): self.remote = _remote_proxy + def _parse_runtime_env(runtime_env: Optional[Union[dict, RuntimeEnv]]): + # Parse local pip/conda config files here. If we instead did it in + # .remote(), it would get run in the Ray Client server, which runs on + # a remote node where the files aren't available. + if runtime_env: + if isinstance(runtime_env, RuntimeEnv): + return runtime_env + elif isinstance(runtime_env, dict): + return RuntimeEnv(**(runtime_env or {})) + raise TypeError( + "runtime_env must be dict or RuntimeEnv, ", + f"but got: {type(runtime_env)}", + ) + else: + # Keep the runtime_env as None. In .remote(), we need to know if + # runtime_env is None to know whether or not to fall back to the + # runtime_env specified in the @ray.remote decorator. + return None + def __call__(self, *args, **kwargs): raise TypeError( "Remote functions cannot be called directly. Instead " @@ -214,17 +227,8 @@ def f(): # Parse local pip/conda config files here. If we instead did it in # .remote(), it would get run in the Ray Client server, which runs on # a remote node where the files aren't available. - if runtime_env: - if isinstance(runtime_env, str): - # Serialzed protobuf runtime env from Ray client. - new_runtime_env = runtime_env - else: - new_runtime_env = RuntimeEnv(**runtime_env).serialize() - else: - # Keep the runtime_env as None. In .remote(), we need to know if - # runtime_env is None to know whether or not to fall back to the - # runtime_env specified in the @ray.remote decorator. - new_runtime_env = None + + new_runtime_env = self._parse_runtime_env(runtime_env) options = dict( num_returns=num_returns, @@ -419,6 +423,13 @@ def _remote( if not runtime_env or runtime_env == "{}": runtime_env = self._runtime_env + serialized_runtime_env_info = None + if runtime_env is not None: + serialized_runtime_env_info = get_runtime_env_info( + runtime_env, + is_job_runtime_env=False, + serialize=True, + ) def invocation(args, kwargs): if self._is_cross_language: @@ -445,7 +456,7 @@ def invocation(args, kwargs): retry_exceptions, scheduling_strategy, worker.debugger_breakpoint, - runtime_env or "{}", + serialized_runtime_env_info or "{}", ) # Reset worker's debug context from the last "remote" command # (which applies only to this .remote call). diff --git a/python/ray/runtime_env.py b/python/ray/runtime_env.py index 6800fadaecb21..1ea5188fe3614 100644 --- a/python/ray/runtime_env.py +++ b/python/ray/runtime_env.py @@ -6,7 +6,10 @@ from copy import deepcopy import ray -from ray.core.generated.runtime_env_common_pb2 import RuntimeEnv as ProtoRuntimeEnv +from ray.core.generated.runtime_env_common_pb2 import ( + RuntimeEnv as ProtoRuntimeEnv, + RuntimeEnvInfo as ProtoRuntimeEnvInfo, +) from ray._private.runtime_env.plugin import RuntimeEnvPlugin, encode_plugin_uri from ray._private.runtime_env.validation import OPTION_TO_VALIDATION_FN from ray._private.utils import import_attr @@ -575,3 +578,37 @@ def _build_proto_plugin_runtime_env(self, runtime_env: ProtoRuntimeEnv): plugin = runtime_env.python_runtime_env.plugin_runtime_env.plugins.add() plugin.class_path = class_path plugin.config = plugin_field + + +def get_runtime_env_info( + runtime_env: RuntimeEnv, + *, + is_job_runtime_env: bool = False, + serialize: bool = False, +): + """Create runtime env info from runtime env. + + In the user interface, the argument `runtime_env` contains some fields + which not contained in `ProtoRuntimeEnv` but in `ProtoRuntimeEnvInfo`, + such as `eager_install`. This function will extract those fields from + `RuntimeEnv` and create a new `ProtoRuntimeEnvInfo`, and serialize it. + """ + proto_runtime_env_info = ProtoRuntimeEnvInfo() + + proto_runtime_env_info.uris[:] = runtime_env.get_uris() + + eager_install = runtime_env.get("eager_install") + if is_job_runtime_env or eager_install is not None: + if not isinstance(eager_install, bool): + raise TypeError("eager_install must be a boolean.") + proto_runtime_env_info.runtime_env_eager_install = eager_install + + proto_runtime_env_info.serialized_runtime_env = runtime_env.serialize() + + if not serialize: + return proto_runtime_env_info + + return json.dumps( + json.loads(json_format.MessageToJson(proto_runtime_env_info)), + sort_keys=True, + ) diff --git a/src/ray/common/runtime_env_common.cc b/src/ray/common/runtime_env_common.cc index 891d390744758..a07c4e88cda90 100644 --- a/src/ray/common/runtime_env_common.cc +++ b/src/ray/common/runtime_env_common.cc @@ -19,4 +19,8 @@ bool IsRuntimeEnvEmpty(const std::string &serialized_runtime_env) { return serialized_runtime_env == "{}" || serialized_runtime_env == ""; } +bool IsRuntimeEnvInfoEmpty(const std::string &serialized_runtime_env_info) { + return serialized_runtime_env_info == "{}" || serialized_runtime_env_info == ""; +} + } // namespace ray diff --git a/src/ray/common/runtime_env_common.h b/src/ray/common/runtime_env_common.h index d025536615a59..89cc570fdf420 100644 --- a/src/ray/common/runtime_env_common.h +++ b/src/ray/common/runtime_env_common.h @@ -21,4 +21,9 @@ namespace ray { // or "{}" (from serializing an empty Python dict or a JSON file.) bool IsRuntimeEnvEmpty(const std::string &serialized_runtime_env); +// Return whether a string representation of a runtime env info represents an empty +// runtime env info. It could either be "" (from the default string value in protobuf), +// or "{}" (from serializing an empty Python dict or a JSON file.) +bool IsRuntimeEnvInfoEmpty(const std::string &serialized_runtime_env_info); + } // namespace ray diff --git a/src/ray/common/task/task_util.h b/src/ray/common/task/task_util.h index 9e837d7f2fe67..c043b42332749 100644 --- a/src/ray/common/task/task_util.h +++ b/src/ray/common/task/task_util.h @@ -104,8 +104,7 @@ class TaskSpecBuilder { const std::unordered_map &required_resources, const std::unordered_map &required_placement_resources, const std::string &debugger_breakpoint, int64_t depth, - const std::string &serialized_runtime_env = "{}", - const std::vector &runtime_env_uris = {}, + const std::shared_ptr runtime_env_info = nullptr, const std::string &concurrency_group_name = "") { message_->set_type(TaskType::NORMAL_TASK); message_->set_name(name); @@ -124,10 +123,8 @@ class TaskSpecBuilder { required_placement_resources.begin(), required_placement_resources.end()); message_->set_debugger_breakpoint(debugger_breakpoint); message_->set_depth(depth); - message_->mutable_runtime_env_info()->set_serialized_runtime_env( - serialized_runtime_env); - for (const std::string &uri : runtime_env_uris) { - message_->mutable_runtime_env_info()->add_uris(uri); + if (runtime_env_info) { + message_->mutable_runtime_env_info()->CopyFrom(*runtime_env_info); } message_->set_concurrency_group_name(concurrency_group_name); return *this; diff --git a/src/ray/core_worker/common.h b/src/ray/core_worker/common.h index 30794494d81e8..2f17fcf53b1c1 100644 --- a/src/ray/core_worker/common.h +++ b/src/ray/core_worker/common.h @@ -59,12 +59,12 @@ struct TaskOptions { TaskOptions(std::string name, int num_returns, std::unordered_map &resources, const std::string &concurrency_group_name = "", - const std::string &serialized_runtime_env = "{}") + const std::string &serialized_runtime_env_info = "{}") : name(name), num_returns(num_returns), resources(resources), concurrency_group_name(concurrency_group_name), - serialized_runtime_env(serialized_runtime_env) {} + serialized_runtime_env_info(serialized_runtime_env_info) {} /// The name of this task. std::string name; @@ -74,8 +74,10 @@ struct TaskOptions { std::unordered_map resources; /// The name of the concurrency group in which this task will be executed. std::string concurrency_group_name; - // Runtime Env used by this task. Propagated to child actors and tasks. - std::string serialized_runtime_env; + /// Runtime Env Info used by this task. It includes Runtime Env and some + /// fields which not contained in Runtime Env Info, such as eager_install. + /// Propagated to child actors and tasks. + std::string serialized_runtime_env_info; }; /// Options for actor creation tasks. @@ -89,7 +91,7 @@ struct ActorCreationOptions { std::optional is_detached, std::string &name, std::string &ray_namespace, bool is_asyncio, const rpc::SchedulingStrategy &scheduling_strategy, - const std::string &serialized_runtime_env = "{}", + const std::string &serialized_runtime_env_info = "{}", const std::vector &concurrency_groups = {}, bool execute_out_of_order = false, int32_t max_pending_calls = -1) : max_restarts(max_restarts), @@ -102,7 +104,7 @@ struct ActorCreationOptions { name(name), ray_namespace(ray_namespace), is_asyncio(is_asyncio), - serialized_runtime_env(serialized_runtime_env), + serialized_runtime_env_info(serialized_runtime_env_info), concurrency_groups(concurrency_groups.begin(), concurrency_groups.end()), execute_out_of_order(execute_out_of_order), max_pending_calls(max_pending_calls), @@ -138,8 +140,10 @@ struct ActorCreationOptions { const std::string ray_namespace; /// Whether to use async mode of direct actor call. const bool is_asyncio = false; - // Runtime Env used by this actor. Propagated to child actors and tasks. - std::string serialized_runtime_env; + /// Runtime Env Info used by this task. It includes Runtime Env and some + /// fields which not contained in Runtime Env Info, such as eager_install. + /// Propagated to child actors and tasks. + std::string serialized_runtime_env_info; /// The actor concurrency groups to indicate how this actor perform its /// methods concurrently. const std::vector concurrency_groups; diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 959de5c72d1c0..d7b4389f7f422 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1440,56 +1440,83 @@ static std::vector GetUrisFromRuntimeEnv( return result; } -static std::vector GetUrisFromSerializedRuntimeEnv( - const std::string &serialized_runtime_env) { - rpc::RuntimeEnv runtime_env; - if (!google::protobuf::util::JsonStringToMessage(serialized_runtime_env, &runtime_env) - .ok()) { - RAY_LOG(WARNING) << "Parse runtime env failed for " << serialized_runtime_env; - // TODO(SongGuyang): We pass the raw string here and the task will fail after an - // exception raised in runtime env agent. Actually, we can fail the task here. - return {}; - } - return GetUrisFromRuntimeEnv(&runtime_env); -} - -std::string CoreWorker::OverrideTaskOrActorRuntimeEnv( - const std::string &serialized_runtime_env, - std::vector *runtime_env_uris) { +std::shared_ptr CoreWorker::OverrideTaskOrActorRuntimeEnvInfo( + const std::string &serialized_runtime_env_info) { + // TODO(Catch-Bull,SongGuyang): task runtime env not support the field eager_install + // yet, we will overwrite the filed eager_install when it did. std::shared_ptr parent = nullptr; + std::shared_ptr runtime_env_info = nullptr; + runtime_env_info.reset(new rpc::RuntimeEnvInfo()); + + if (IsRuntimeEnvInfoEmpty(serialized_runtime_env_info)) { + runtime_env_info->set_serialized_runtime_env( + job_config_->runtime_env_info().serialized_runtime_env()); + runtime_env_info->clear_uris(); + for (const std::string &uri : GetUrisFromRuntimeEnv(job_runtime_env_.get())) { + runtime_env_info->add_uris(uri); + } + + return runtime_env_info; + } + + RAY_CHECK(!google::protobuf::util::JsonStringToMessage(serialized_runtime_env_info, + runtime_env_info.get()) + .ok()); + if (options_.worker_type == WorkerType::DRIVER) { - if (IsRuntimeEnvEmpty(serialized_runtime_env)) { - *runtime_env_uris = GetUrisFromRuntimeEnv(job_runtime_env_.get()); - return job_config_->runtime_env_info().serialized_runtime_env(); + if (IsRuntimeEnvEmpty(runtime_env_info->serialized_runtime_env())) { + runtime_env_info->set_serialized_runtime_env( + job_config_->runtime_env_info().serialized_runtime_env()); + runtime_env_info->clear_uris(); + for (const std::string &uri : GetUrisFromRuntimeEnv(job_runtime_env_.get())) { + runtime_env_info->add_uris(uri); + } + + return runtime_env_info; } parent = job_runtime_env_; } else { - if (IsRuntimeEnvEmpty(serialized_runtime_env)) { - *runtime_env_uris = - GetUrisFromRuntimeEnv(worker_context_.GetCurrentRuntimeEnv().get()); - return worker_context_.GetCurrentSerializedRuntimeEnv(); + if (IsRuntimeEnvEmpty(runtime_env_info->serialized_runtime_env())) { + runtime_env_info->set_serialized_runtime_env( + worker_context_.GetCurrentSerializedRuntimeEnv()); + runtime_env_info->clear_uris(); + for (const std::string &uri : + GetUrisFromRuntimeEnv(worker_context_.GetCurrentRuntimeEnv().get())) { + runtime_env_info->add_uris(uri); + } + + return runtime_env_info; } parent = worker_context_.GetCurrentRuntimeEnv(); } if (parent) { + std::string serialized_runtime_env = runtime_env_info->serialized_runtime_env(); rpc::RuntimeEnv child_runtime_env; if (!google::protobuf::util::JsonStringToMessage(serialized_runtime_env, &child_runtime_env) .ok()) { - RAY_LOG(WARNING) << "Parse runtime env failed for " << serialized_runtime_env; + RAY_LOG(WARNING) << "Parse runtime env failed for " << serialized_runtime_env + << ". serialized runtime env info: " + << serialized_runtime_env_info; // TODO(SongGuyang): We pass the raw string here and the task will fail after an // exception raised in runtime env agent. Actually, we can fail the task here. - return serialized_runtime_env; + return runtime_env_info; } auto override_runtime_env = OverrideRuntimeEnv(child_runtime_env, parent); - std::string result; - RAY_CHECK( - google::protobuf::util::MessageToJsonString(override_runtime_env, &result).ok()); - *runtime_env_uris = GetUrisFromRuntimeEnv(&override_runtime_env); - return result; + std::string serialized_override_runtime_env; + RAY_CHECK(google::protobuf::util::MessageToJsonString( + override_runtime_env, &serialized_override_runtime_env) + .ok()); + + runtime_env_info->set_serialized_runtime_env(serialized_override_runtime_env); + runtime_env_info->clear_uris(); + for (const std::string &uri : GetUrisFromRuntimeEnv(&override_runtime_env)) { + runtime_env_info->add_uris(uri); + } + + return runtime_env_info; } else { - *runtime_env_uris = GetUrisFromSerializedRuntimeEnv(serialized_runtime_env); - return serialized_runtime_env; + return runtime_env_info; } } @@ -1501,17 +1528,16 @@ void CoreWorker::BuildCommonTaskSpec( const std::unordered_map &required_resources, const std::unordered_map &required_placement_resources, const std::string &debugger_breakpoint, int64_t depth, - const std::string &serialized_runtime_env, + const std::string &serialized_runtime_env_info, const std::string &concurrency_group_name) { // Build common task spec. - std::vector runtime_env_uris; - auto override_runtime_env = - OverrideTaskOrActorRuntimeEnv(serialized_runtime_env, &runtime_env_uris); + auto override_runtime_env_info = + OverrideTaskOrActorRuntimeEnvInfo(serialized_runtime_env_info); builder.SetCommonTaskSpec( task_id, name, function.GetLanguage(), function.GetFunctionDescriptor(), job_id, current_task_id, task_index, caller_id, address, num_returns, required_resources, - required_placement_resources, debugger_breakpoint, depth, override_runtime_env, - runtime_env_uris, concurrency_group_name); + required_placement_resources, debugger_breakpoint, depth, override_runtime_env_info, + concurrency_group_name); // Set task arguments. for (const auto &arg : args) { builder.AddArg(*arg); @@ -1544,7 +1570,7 @@ std::vector CoreWorker::SubmitTask( worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(), rpc_address_, function, args, task_options.num_returns, constrained_resources, required_resources, debugger_breakpoint, - depth, task_options.serialized_runtime_env); + depth, task_options.serialized_runtime_env_info); builder.SetNormalTaskSpec(max_retries, retry_exceptions, scheduling_strategy); TaskSpecification task_spec = builder.Build(); RAY_LOG(DEBUG) << "Submitting normal task " << task_spec.DebugString(); @@ -1612,7 +1638,7 @@ Status CoreWorker::CreateActor(const RayFunction &function, worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(), rpc_address_, function, args, 1, new_resource, new_placement_resources, "" /* debugger_breakpoint */, depth, - actor_creation_options.serialized_runtime_env); + actor_creation_options.serialized_runtime_env_info); // If the namespace is not specified, get it from the job. const auto &ray_namespace = (actor_creation_options.ray_namespace.empty() @@ -1801,7 +1827,7 @@ std::optional> CoreWorker::SubmitActorTask( rpc_address_, function, args, num_returns, task_options.resources, required_resources, "", /* debugger_breakpoint */ depth, /*depth*/ - "{}", /* serialized_runtime_env */ + "{}", /* serialized_runtime_env_info */ task_options.concurrency_group_name); // NOTE: placement_group_capture_child_tasks and runtime_env will // be ignored in the actor because we should always follow the actor's option. diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index d3c5ff4d7e04f..d12a41b89e255 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -791,9 +791,8 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { FRIEND_TEST(TestOverrideRuntimeEnv, TestCondaInherit); FRIEND_TEST(TestOverrideRuntimeEnv, TestCondaOverride); - std::string OverrideTaskOrActorRuntimeEnv( - const std::string &serialized_runtime_env, - std::vector *runtime_env_uris /* output */); + std::shared_ptr OverrideTaskOrActorRuntimeEnvInfo( + const std::string &serialized_runtime_env_info); void BuildCommonTaskSpec( TaskSpecBuilder &builder, const JobID &job_id, const TaskID &task_id, @@ -803,7 +802,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { const std::unordered_map &required_resources, const std::unordered_map &required_placement_resources, const std::string &debugger_breakpoint, int64_t depth, - const std::string &serialized_runtime_env, + const std::string &serialized_runtime_env_info, const std::string &concurrency_group_name = ""); void SetCurrentTaskId(const TaskID &task_id, uint64_t attempt_number); From f7871ec76cbf9f0783a4df4015249ef54a0a3e08 Mon Sep 17 00:00:00 2001 From: Catch-Bull Date: Tue, 8 Mar 2022 01:11:02 +0800 Subject: [PATCH 02/11] fix UT --- python/ray/actor.py | 2 +- python/ray/remote_function.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/actor.py b/python/ray/actor.py index bfc4a55377ae2..f2e2345af06df 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -406,7 +406,7 @@ def __call__(self, *args, **kwargs): f"use '{self.__ray_metadata__.class_name}.remote()'." ) - def _parse_runtime_env(runtime_env: Optional[Union[dict, RuntimeEnv]]): + def _parse_runtime_env(self, runtime_env: Optional[Union[dict, RuntimeEnv]]): # Parse local pip/conda config files here. If we instead did it in # .remote(), it would get run in the Ray Client server, which runs on # a remote node where the files aren't available. diff --git a/python/ray/remote_function.py b/python/ray/remote_function.py index 8b8e1b631d362..a518755f40530 100644 --- a/python/ray/remote_function.py +++ b/python/ray/remote_function.py @@ -160,7 +160,7 @@ def _remote_proxy(*args, **kwargs): self.remote = _remote_proxy - def _parse_runtime_env(runtime_env: Optional[Union[dict, RuntimeEnv]]): + def _parse_runtime_env(self, runtime_env: Optional[Union[dict, RuntimeEnv]]): # Parse local pip/conda config files here. If we instead did it in # .remote(), it would get run in the Ray Client server, which runs on # a remote node where the files aren't available. From 16eecb9e990d0e031af3a44a7691030f0fa7e5f8 Mon Sep 17 00:00:00 2001 From: Catch-Bull Date: Tue, 8 Mar 2022 16:36:32 +0800 Subject: [PATCH 03/11] fix UT --- python/ray/job_config.py | 10 ++++++---- python/ray/runtime_env.py | 16 ++++++++++++++-- src/ray/core_worker/core_worker.cc | 6 +++--- 3 files changed, 23 insertions(+), 9 deletions(-) diff --git a/python/ray/job_config.py b/python/ray/job_config.py index 368922bcb43f1..55c86dbc38eb8 100644 --- a/python/ray/job_config.py +++ b/python/ray/job_config.py @@ -115,10 +115,12 @@ def get_proto_job_config(self): pb.metadata[k] = v parsed_env = self._validate_runtime_env() - pb.runtime_env_info = get_runtime_env_info( - parsed_env, - is_job_runtime_env=True, - serialize=False, + pb.runtime_env_info.CopyFrom( + get_runtime_env_info( + parsed_env, + is_job_runtime_env=True, + serialize=False, + ) ) if self._default_actor_lifetime is not None: diff --git a/python/ray/runtime_env.py b/python/ray/runtime_env.py index 1ea5188fe3614..c7b30164c2f79 100644 --- a/python/ray/runtime_env.py +++ b/python/ray/runtime_env.py @@ -579,6 +579,14 @@ def _build_proto_plugin_runtime_env(self, runtime_env: ProtoRuntimeEnv): plugin.class_path = class_path plugin.config = plugin_field + def __getstate__(self): + return dict(**self) + + def __setstate__(self, state): + for k, v in state.items(): + self[k] = v + self.__proto_runtime_env = None + def get_runtime_env_info( runtime_env: RuntimeEnv, @@ -599,8 +607,12 @@ def get_runtime_env_info( eager_install = runtime_env.get("eager_install") if is_job_runtime_env or eager_install is not None: - if not isinstance(eager_install, bool): - raise TypeError("eager_install must be a boolean.") + if eager_install is None: + eager_install = True + elif not isinstance(eager_install, bool): + raise TypeError( + f"eager_install must be a boolean. got {type(eager_install)}" + ) proto_runtime_env_info.runtime_env_eager_install = eager_install proto_runtime_env_info.serialized_runtime_env = runtime_env.serialize() diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index d7b4389f7f422..7f13467b1e928 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1459,9 +1459,9 @@ std::shared_ptr CoreWorker::OverrideTaskOrActorRuntimeEnvIn return runtime_env_info; } - RAY_CHECK(!google::protobuf::util::JsonStringToMessage(serialized_runtime_env_info, - runtime_env_info.get()) - .ok()); + RAY_CHECK(google::protobuf::util::JsonStringToMessage(serialized_runtime_env_info, + runtime_env_info.get()) + .ok()); if (options_.worker_type == WorkerType::DRIVER) { if (IsRuntimeEnvEmpty(runtime_env_info->serialized_runtime_env())) { From 59281a3de47506785d787f3b44884e410c46fa97 Mon Sep 17 00:00:00 2001 From: Catch-Bull Date: Tue, 8 Mar 2022 23:55:10 +0800 Subject: [PATCH 04/11] Fix lint --- .../ray/api/options/ActorCreationOptions.java | 7 +++- .../java/io/ray/api/runtime/RayRuntime.java | 4 +++ .../io/ray/api/runtimeenv/RuntimeEnvInfo.java | 23 +++++++++++++ .../io/ray/runtime/AbstractRayRuntime.java | 7 ++++ .../runtimeenv/RuntimeEnvInfoImpl.java | 31 ++++++++++++++++++ python/ray/tests/test_runtime_env_env_vars.py | 2 ++ src/ray/core_worker/core_worker.cc | 19 +++-------- .../scheduling/cluster_task_manager_test.cc | 32 ++++++++++++------- 8 files changed, 98 insertions(+), 27 deletions(-) create mode 100644 java/api/src/main/java/io/ray/api/runtimeenv/RuntimeEnvInfo.java create mode 100644 java/runtime/src/main/java/io/ray/runtime/runtimeenv/RuntimeEnvInfoImpl.java diff --git a/java/api/src/main/java/io/ray/api/options/ActorCreationOptions.java b/java/api/src/main/java/io/ray/api/options/ActorCreationOptions.java index 6c8e1b426397d..e954164614788 100644 --- a/java/api/src/main/java/io/ray/api/options/ActorCreationOptions.java +++ b/java/api/src/main/java/io/ray/api/options/ActorCreationOptions.java @@ -4,6 +4,7 @@ import io.ray.api.concurrencygroup.ConcurrencyGroup; import io.ray.api.placementgroup.PlacementGroup; import io.ray.api.runtimeenv.RuntimeEnv; +import io.ray.api.runtimeenv.RuntimeEnvInfo; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -183,6 +184,10 @@ public Builder setPlacementGroup(PlacementGroup group, int bundleIndex) { } public ActorCreationOptions build() { + RuntimeEnvInfo runtimeEnvInfo = + new RuntimeEnvInfo.Builder() + .setSerializedRuntimeEnv(runtimeEnv != null ? runtimeEnv.toJsonBytes() : "{}") + .build(); return new ActorCreationOptions( name, lifetime, @@ -193,7 +198,7 @@ public ActorCreationOptions build() { group, bundleIndex, concurrencyGroups, - runtimeEnv != null ? runtimeEnv.toJsonBytes() : "", + runtimeEnvInfo.toJsonBytes(), maxPendingCalls); } diff --git a/java/api/src/main/java/io/ray/api/runtime/RayRuntime.java b/java/api/src/main/java/io/ray/api/runtime/RayRuntime.java index be439927163bc..771be2b0dd7a7 100644 --- a/java/api/src/main/java/io/ray/api/runtime/RayRuntime.java +++ b/java/api/src/main/java/io/ray/api/runtime/RayRuntime.java @@ -20,6 +20,7 @@ import io.ray.api.runtimecontext.ResourceValue; import io.ray.api.runtimecontext.RuntimeContext; import io.ray.api.runtimeenv.RuntimeEnv; +import io.ray.api.runtimeenv.RuntimeEnvInfo; import java.util.List; import java.util.Map; import java.util.Optional; @@ -284,4 +285,7 @@ ActorHandle createActor( /** Create runtime env instance at runtime. */ RuntimeEnv createRuntimeEnv(Map envVars); + + /** Create runtime env info instance at runtime. */ + RuntimeEnvInfo createRuntimeEnvInfo(String serializedRuntimeEnv); } diff --git a/java/api/src/main/java/io/ray/api/runtimeenv/RuntimeEnvInfo.java b/java/api/src/main/java/io/ray/api/runtimeenv/RuntimeEnvInfo.java new file mode 100644 index 0000000000000..5ba31aa765751 --- /dev/null +++ b/java/api/src/main/java/io/ray/api/runtimeenv/RuntimeEnvInfo.java @@ -0,0 +1,23 @@ +package io.ray.api.runtimeenv; + +import io.ray.api.Ray; + +/** This is an experimental API to let you set runtime environments info for your actors. */ +public interface RuntimeEnvInfo { + + String toJsonBytes(); + + public static class Builder { + + String serializedRuntimeEnv; + + public Builder setSerializedRuntimeEnv(String serializedRuntimeEnv) { + this.serializedRuntimeEnv = serializedRuntimeEnv; + return this; + } + + public RuntimeEnvInfo build() { + return Ray.internal().createRuntimeEnvInfo(serializedRuntimeEnv); + } + } +} diff --git a/java/runtime/src/main/java/io/ray/runtime/AbstractRayRuntime.java b/java/runtime/src/main/java/io/ray/runtime/AbstractRayRuntime.java index bcf04fe9e14aa..ead14a566393b 100644 --- a/java/runtime/src/main/java/io/ray/runtime/AbstractRayRuntime.java +++ b/java/runtime/src/main/java/io/ray/runtime/AbstractRayRuntime.java @@ -22,6 +22,7 @@ import io.ray.api.placementgroup.PlacementGroup; import io.ray.api.runtimecontext.RuntimeContext; import io.ray.api.runtimeenv.RuntimeEnv; +import io.ray.api.runtimeenv.RuntimeEnvInfo; import io.ray.runtime.config.RayConfig; import io.ray.runtime.config.RunMode; import io.ray.runtime.context.RuntimeContextImpl; @@ -35,6 +36,7 @@ import io.ray.runtime.object.ObjectRefImpl; import io.ray.runtime.object.ObjectStore; import io.ray.runtime.runtimeenv.RuntimeEnvImpl; +import io.ray.runtime.runtimeenv.RuntimeEnvInfoImpl; import io.ray.runtime.task.ArgumentsBuilder; import io.ray.runtime.task.FunctionArg; import io.ray.runtime.task.TaskExecutor; @@ -293,6 +295,11 @@ public RuntimeEnv createRuntimeEnv(Map envVars) { return new RuntimeEnvImpl(envVars); } + @Override + public RuntimeEnvInfo createRuntimeEnvInfo(String serializedRuntimeEnv) { + return new RuntimeEnvInfoImpl(serializedRuntimeEnv); + } + private ObjectRef callNormalFunction( FunctionDescriptor functionDescriptor, Object[] args, diff --git a/java/runtime/src/main/java/io/ray/runtime/runtimeenv/RuntimeEnvInfoImpl.java b/java/runtime/src/main/java/io/ray/runtime/runtimeenv/RuntimeEnvInfoImpl.java new file mode 100644 index 0000000000000..ecb2210cbc423 --- /dev/null +++ b/java/runtime/src/main/java/io/ray/runtime/runtimeenv/RuntimeEnvInfoImpl.java @@ -0,0 +1,31 @@ +package io.ray.runtime.runtimeenv; + +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.util.JsonFormat; +import io.ray.api.runtimeenv.RuntimeEnvInfo; +import io.ray.runtime.generated.RuntimeEnvCommon; + +public class RuntimeEnvInfoImpl implements RuntimeEnvInfo { + + private String serializedRuntimeEnv = "{}"; + + public RuntimeEnvInfoImpl(String serializedRuntimeEnv) { + this.serializedRuntimeEnv = serializedRuntimeEnv; + } + + @Override + public String toJsonBytes() { + if (serializedRuntimeEnv.equals("{}") || serializedRuntimeEnv.isEmpty()) { + return "{}"; + } + RuntimeEnvCommon.RuntimeEnvInfo.Builder protoRuntimeEnvInfoBuilder = + RuntimeEnvCommon.RuntimeEnvInfo.newBuilder(); + protoRuntimeEnvInfoBuilder.setSerializedRuntimeEnv(serializedRuntimeEnv); + JsonFormat.Printer printer = JsonFormat.printer(); + try { + return printer.print(protoRuntimeEnvInfoBuilder); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + } +} diff --git a/python/ray/tests/test_runtime_env_env_vars.py b/python/ray/tests/test_runtime_env_env_vars.py index 4991f11d84560..a1691e6c8b71a 100644 --- a/python/ray/tests/test_runtime_env_env_vars.py +++ b/python/ray/tests/test_runtime_env_env_vars.py @@ -48,10 +48,12 @@ def get(self, key): def test_environment_variables_nested_task(ray_start_regular): @ray.remote def get_env(key): + print(os.environ) return os.environ.get(key) @ray.remote def get_env_wrapper(key): + assert os.environ.get(key) == "b" return ray.get(get_env.remote(key)) assert ( diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 7f13467b1e928..d301f28658abe 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1448,21 +1448,12 @@ std::shared_ptr CoreWorker::OverrideTaskOrActorRuntimeEnvIn std::shared_ptr runtime_env_info = nullptr; runtime_env_info.reset(new rpc::RuntimeEnvInfo()); - if (IsRuntimeEnvInfoEmpty(serialized_runtime_env_info)) { - runtime_env_info->set_serialized_runtime_env( - job_config_->runtime_env_info().serialized_runtime_env()); - runtime_env_info->clear_uris(); - for (const std::string &uri : GetUrisFromRuntimeEnv(job_runtime_env_.get())) { - runtime_env_info->add_uris(uri); - } - - return runtime_env_info; + if (!IsRuntimeEnvInfoEmpty(serialized_runtime_env_info)) { + RAY_CHECK(google::protobuf::util::JsonStringToMessage(serialized_runtime_env_info, + runtime_env_info.get()) + .ok()); } - RAY_CHECK(google::protobuf::util::JsonStringToMessage(serialized_runtime_env_info, - runtime_env_info.get()) - .ok()); - if (options_.worker_type == WorkerType::DRIVER) { if (IsRuntimeEnvEmpty(runtime_env_info->serialized_runtime_env())) { runtime_env_info->set_serialized_runtime_env( @@ -1507,13 +1498,11 @@ std::shared_ptr CoreWorker::OverrideTaskOrActorRuntimeEnvIn RAY_CHECK(google::protobuf::util::MessageToJsonString( override_runtime_env, &serialized_override_runtime_env) .ok()); - runtime_env_info->set_serialized_runtime_env(serialized_override_runtime_env); runtime_env_info->clear_uris(); for (const std::string &uri : GetUrisFromRuntimeEnv(&override_runtime_env)) { runtime_env_info->add_uris(uri); } - return runtime_env_info; } else { return runtime_env_info; diff --git a/src/ray/raylet/scheduling/cluster_task_manager_test.cc b/src/ray/raylet/scheduling/cluster_task_manager_test.cc index 93f87b610ce58..640a2126a22ca 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager_test.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager_test.cc @@ -137,8 +137,7 @@ std::shared_ptr CreateSingleNodeScheduler( RayTask CreateTask(const std::unordered_map &required_resources, int num_args = 0, std::vector args = {}, - const std::string &serialized_runtime_env = "{}", - const std::vector &runtime_env_uris = {}) { + std::shared_ptr runtime_env_info = nullptr) { TaskSpecBuilder spec_builder; TaskID id = RandomTaskId(); JobID job_id = RandomJobId(); @@ -146,8 +145,7 @@ RayTask CreateTask(const std::unordered_map &required_resou spec_builder.SetCommonTaskSpec(id, "dummy_task", Language::PYTHON, FunctionDescriptorBuilder::BuildPython("", "", "", ""), job_id, TaskID::Nil(), 0, TaskID::Nil(), address, 0, - required_resources, {}, "", 0, serialized_runtime_env, - runtime_env_uris); + required_resources, {}, "", 0, runtime_env_info); if (!args.empty()) { for (auto &arg : args) { @@ -474,8 +472,12 @@ TEST_F(ClusterTaskManagerTest, DispatchQueueNonBlockingTest) { {ray::kCPU_ResourceLabel, 4}}; std::string serialized_runtime_env_A = "mock_env_A"; - RayTask task_A = CreateTask(required_resources, /*num_args=*/0, /*args=*/{}, - serialized_runtime_env_A); + std::shared_ptr runtime_env_info_A = nullptr; + runtime_env_info_A.reset(new rpc::RuntimeEnvInfo()); + runtime_env_info_A->set_serialized_runtime_env(serialized_runtime_env_A); + + RayTask task_A = + CreateTask(required_resources, /*num_args=*/0, /*args=*/{}, runtime_env_info_A); rpc::RequestWorkerLeaseReply reply_A; bool callback_occurred = false; bool *callback_occurred_ptr = &callback_occurred; @@ -485,10 +487,14 @@ TEST_F(ClusterTaskManagerTest, DispatchQueueNonBlockingTest) { }; std::string serialized_runtime_env_B = "mock_env_B"; - RayTask task_B_1 = CreateTask(required_resources, /*num_args=*/0, /*args=*/{}, - serialized_runtime_env_B); - RayTask task_B_2 = CreateTask(required_resources, /*num_args=*/0, /*args=*/{}, - serialized_runtime_env_B); + std::shared_ptr runtime_env_info_B = nullptr; + runtime_env_info_B.reset(new rpc::RuntimeEnvInfo()); + runtime_env_info_B->set_serialized_runtime_env(serialized_runtime_env_B); + + RayTask task_B_1 = + CreateTask(required_resources, /*num_args=*/0, /*args=*/{}, runtime_env_info_B); + RayTask task_B_2 = + CreateTask(required_resources, /*num_args=*/0, /*args=*/{}, runtime_env_info_B); rpc::RequestWorkerLeaseReply reply_B_1; rpc::RequestWorkerLeaseReply reply_B_2; auto empty_callback = [](Status, std::function, std::function) {}; @@ -1785,8 +1791,12 @@ TEST_F(ClusterTaskManagerTest, TestResourceDiff) { TEST_F(ClusterTaskManagerTest, PopWorkerExactlyOnce) { // Create and queue one task. std::string serialized_runtime_env = "mock_env"; + std::shared_ptr runtime_env_info = nullptr; + runtime_env_info.reset(new rpc::RuntimeEnvInfo()); + runtime_env_info->set_serialized_runtime_env(serialized_runtime_env); + RayTask task = CreateTask({{ray::kCPU_ResourceLabel, 4}}, /*num_args=*/0, /*args=*/{}, - serialized_runtime_env); + runtime_env_info); auto runtime_env_hash = task.GetTaskSpecification().GetRuntimeEnvHash(); rpc::RequestWorkerLeaseReply reply; bool callback_occurred = false; From 619afb0ef1c9faddefa56528c23e3440e1c1974b Mon Sep 17 00:00:00 2001 From: Catch-Bull Date: Wed, 9 Mar 2022 02:57:09 +0800 Subject: [PATCH 05/11] Fix lint --- python/ray/actor.py | 4 +--- python/ray/remote_function.py | 4 +--- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/python/ray/actor.py b/python/ray/actor.py index f2e2345af06df..c098325744b81 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -411,9 +411,7 @@ def _parse_runtime_env(self, runtime_env: Optional[Union[dict, RuntimeEnv]]): # .remote(), it would get run in the Ray Client server, which runs on # a remote node where the files aren't available. if runtime_env: - if isinstance(runtime_env, RuntimeEnv): - return runtime_env - elif isinstance(runtime_env, dict): + if isinstance(runtime_env, dict): return RuntimeEnv(**(runtime_env or {})) raise TypeError( "runtime_env must be dict or RuntimeEnv, ", diff --git a/python/ray/remote_function.py b/python/ray/remote_function.py index a518755f40530..24232b7edc998 100644 --- a/python/ray/remote_function.py +++ b/python/ray/remote_function.py @@ -165,9 +165,7 @@ def _parse_runtime_env(self, runtime_env: Optional[Union[dict, RuntimeEnv]]): # .remote(), it would get run in the Ray Client server, which runs on # a remote node where the files aren't available. if runtime_env: - if isinstance(runtime_env, RuntimeEnv): - return runtime_env - elif isinstance(runtime_env, dict): + if isinstance(runtime_env, dict): return RuntimeEnv(**(runtime_env or {})) raise TypeError( "runtime_env must be dict or RuntimeEnv, ", From 1c13badb9331c26b4421ccb9e63312ae9fa8963e Mon Sep 17 00:00:00 2001 From: Catch-Bull Date: Wed, 9 Mar 2022 11:56:10 +0800 Subject: [PATCH 06/11] fix UT --- python/ray/runtime_env.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/python/ray/runtime_env.py b/python/ray/runtime_env.py index c7b30164c2f79..9d4ecdb2f286e 100644 --- a/python/ray/runtime_env.py +++ b/python/ray/runtime_env.py @@ -580,6 +580,8 @@ def _build_proto_plugin_runtime_env(self, runtime_env: ProtoRuntimeEnv): plugin.config = plugin_field def __getstate__(self): + # When pickle serialization, exclude some fields + # which can't be serialized by pickle return dict(**self) def __setstate__(self, state): @@ -605,6 +607,12 @@ def get_runtime_env_info( proto_runtime_env_info.uris[:] = runtime_env.get_uris() + # Normally, `RuntimeEnv` should guarantee the accuracy of field eager_install, + # but so far, the internal code has not completely prohibited direct + # modification of fields in RuntimeEnv, so we should check it for insurance. + # TODO(Catch-Bull): overload `__setitem__` for `RuntimeEnv`, change the + # runtime_env of all internal code from dict to RuntimeEnv. + eager_install = runtime_env.get("eager_install") if is_job_runtime_env or eager_install is not None: if eager_install is None: From b06c3dc6fe7cb0ae15bd7808a1b300578bd5b463 Mon Sep 17 00:00:00 2001 From: Catch-Bull Date: Wed, 9 Mar 2022 20:24:17 +0800 Subject: [PATCH 07/11] fix java by comment --- .../ray/api/options/ActorCreationOptions.java | 7 +---- .../java/io/ray/api/runtime/RayRuntime.java | 4 --- .../io/ray/api/runtimeenv/RuntimeEnvInfo.java | 23 -------------- .../io/ray/runtime/AbstractRayRuntime.java | 7 ----- .../runtime/runtimeenv/RuntimeEnvImpl.java | 20 ++++++++++-- .../runtimeenv/RuntimeEnvInfoImpl.java | 31 ------------------- 6 files changed, 19 insertions(+), 73 deletions(-) delete mode 100644 java/api/src/main/java/io/ray/api/runtimeenv/RuntimeEnvInfo.java delete mode 100644 java/runtime/src/main/java/io/ray/runtime/runtimeenv/RuntimeEnvInfoImpl.java diff --git a/java/api/src/main/java/io/ray/api/options/ActorCreationOptions.java b/java/api/src/main/java/io/ray/api/options/ActorCreationOptions.java index e954164614788..6c8e1b426397d 100644 --- a/java/api/src/main/java/io/ray/api/options/ActorCreationOptions.java +++ b/java/api/src/main/java/io/ray/api/options/ActorCreationOptions.java @@ -4,7 +4,6 @@ import io.ray.api.concurrencygroup.ConcurrencyGroup; import io.ray.api.placementgroup.PlacementGroup; import io.ray.api.runtimeenv.RuntimeEnv; -import io.ray.api.runtimeenv.RuntimeEnvInfo; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -184,10 +183,6 @@ public Builder setPlacementGroup(PlacementGroup group, int bundleIndex) { } public ActorCreationOptions build() { - RuntimeEnvInfo runtimeEnvInfo = - new RuntimeEnvInfo.Builder() - .setSerializedRuntimeEnv(runtimeEnv != null ? runtimeEnv.toJsonBytes() : "{}") - .build(); return new ActorCreationOptions( name, lifetime, @@ -198,7 +193,7 @@ public ActorCreationOptions build() { group, bundleIndex, concurrencyGroups, - runtimeEnvInfo.toJsonBytes(), + runtimeEnv != null ? runtimeEnv.toJsonBytes() : "", maxPendingCalls); } diff --git a/java/api/src/main/java/io/ray/api/runtime/RayRuntime.java b/java/api/src/main/java/io/ray/api/runtime/RayRuntime.java index 771be2b0dd7a7..be439927163bc 100644 --- a/java/api/src/main/java/io/ray/api/runtime/RayRuntime.java +++ b/java/api/src/main/java/io/ray/api/runtime/RayRuntime.java @@ -20,7 +20,6 @@ import io.ray.api.runtimecontext.ResourceValue; import io.ray.api.runtimecontext.RuntimeContext; import io.ray.api.runtimeenv.RuntimeEnv; -import io.ray.api.runtimeenv.RuntimeEnvInfo; import java.util.List; import java.util.Map; import java.util.Optional; @@ -285,7 +284,4 @@ ActorHandle createActor( /** Create runtime env instance at runtime. */ RuntimeEnv createRuntimeEnv(Map envVars); - - /** Create runtime env info instance at runtime. */ - RuntimeEnvInfo createRuntimeEnvInfo(String serializedRuntimeEnv); } diff --git a/java/api/src/main/java/io/ray/api/runtimeenv/RuntimeEnvInfo.java b/java/api/src/main/java/io/ray/api/runtimeenv/RuntimeEnvInfo.java deleted file mode 100644 index 5ba31aa765751..0000000000000 --- a/java/api/src/main/java/io/ray/api/runtimeenv/RuntimeEnvInfo.java +++ /dev/null @@ -1,23 +0,0 @@ -package io.ray.api.runtimeenv; - -import io.ray.api.Ray; - -/** This is an experimental API to let you set runtime environments info for your actors. */ -public interface RuntimeEnvInfo { - - String toJsonBytes(); - - public static class Builder { - - String serializedRuntimeEnv; - - public Builder setSerializedRuntimeEnv(String serializedRuntimeEnv) { - this.serializedRuntimeEnv = serializedRuntimeEnv; - return this; - } - - public RuntimeEnvInfo build() { - return Ray.internal().createRuntimeEnvInfo(serializedRuntimeEnv); - } - } -} diff --git a/java/runtime/src/main/java/io/ray/runtime/AbstractRayRuntime.java b/java/runtime/src/main/java/io/ray/runtime/AbstractRayRuntime.java index ead14a566393b..bcf04fe9e14aa 100644 --- a/java/runtime/src/main/java/io/ray/runtime/AbstractRayRuntime.java +++ b/java/runtime/src/main/java/io/ray/runtime/AbstractRayRuntime.java @@ -22,7 +22,6 @@ import io.ray.api.placementgroup.PlacementGroup; import io.ray.api.runtimecontext.RuntimeContext; import io.ray.api.runtimeenv.RuntimeEnv; -import io.ray.api.runtimeenv.RuntimeEnvInfo; import io.ray.runtime.config.RayConfig; import io.ray.runtime.config.RunMode; import io.ray.runtime.context.RuntimeContextImpl; @@ -36,7 +35,6 @@ import io.ray.runtime.object.ObjectRefImpl; import io.ray.runtime.object.ObjectStore; import io.ray.runtime.runtimeenv.RuntimeEnvImpl; -import io.ray.runtime.runtimeenv.RuntimeEnvInfoImpl; import io.ray.runtime.task.ArgumentsBuilder; import io.ray.runtime.task.FunctionArg; import io.ray.runtime.task.TaskExecutor; @@ -295,11 +293,6 @@ public RuntimeEnv createRuntimeEnv(Map envVars) { return new RuntimeEnvImpl(envVars); } - @Override - public RuntimeEnvInfo createRuntimeEnvInfo(String serializedRuntimeEnv) { - return new RuntimeEnvInfoImpl(serializedRuntimeEnv); - } - private ObjectRef callNormalFunction( FunctionDescriptor functionDescriptor, Object[] args, diff --git a/java/runtime/src/main/java/io/ray/runtime/runtimeenv/RuntimeEnvImpl.java b/java/runtime/src/main/java/io/ray/runtime/runtimeenv/RuntimeEnvImpl.java index cdc7b7d7f444e..97c012426ae78 100644 --- a/java/runtime/src/main/java/io/ray/runtime/runtimeenv/RuntimeEnvImpl.java +++ b/java/runtime/src/main/java/io/ray/runtime/runtimeenv/RuntimeEnvImpl.java @@ -17,17 +17,33 @@ public RuntimeEnvImpl(Map envVars) { @Override public String toJsonBytes() { + // Get serializedRuntimeEnv + String serializedRuntimeEnv = "{}"; if (!envVars.isEmpty()) { RuntimeEnvCommon.RuntimeEnv.Builder protoRuntimeEnvBuilder = RuntimeEnvCommon.RuntimeEnv.newBuilder(); protoRuntimeEnvBuilder.putAllEnvVars(envVars); JsonFormat.Printer printer = JsonFormat.printer(); try { - return printer.print(protoRuntimeEnvBuilder); + serializedRuntimeEnv = printer.print(protoRuntimeEnvBuilder); } catch (InvalidProtocolBufferException e) { throw new RuntimeException(e); } } - return "{}"; + + // Get serializedRuntimeEnvInfo + if (serializedRuntimeEnv.equals("{}") || serializedRuntimeEnv.isEmpty()) { + return "{}"; + } + RuntimeEnvCommon.RuntimeEnvInfo.Builder protoRuntimeEnvInfoBuilder = + RuntimeEnvCommon.RuntimeEnvInfo.newBuilder(); + protoRuntimeEnvInfoBuilder.setSerializedRuntimeEnv(serializedRuntimeEnv); + JsonFormat.Printer printer = JsonFormat.printer(); + try { + return printer.print(protoRuntimeEnvInfoBuilder); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + } } diff --git a/java/runtime/src/main/java/io/ray/runtime/runtimeenv/RuntimeEnvInfoImpl.java b/java/runtime/src/main/java/io/ray/runtime/runtimeenv/RuntimeEnvInfoImpl.java deleted file mode 100644 index ecb2210cbc423..0000000000000 --- a/java/runtime/src/main/java/io/ray/runtime/runtimeenv/RuntimeEnvInfoImpl.java +++ /dev/null @@ -1,31 +0,0 @@ -package io.ray.runtime.runtimeenv; - -import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.util.JsonFormat; -import io.ray.api.runtimeenv.RuntimeEnvInfo; -import io.ray.runtime.generated.RuntimeEnvCommon; - -public class RuntimeEnvInfoImpl implements RuntimeEnvInfo { - - private String serializedRuntimeEnv = "{}"; - - public RuntimeEnvInfoImpl(String serializedRuntimeEnv) { - this.serializedRuntimeEnv = serializedRuntimeEnv; - } - - @Override - public String toJsonBytes() { - if (serializedRuntimeEnv.equals("{}") || serializedRuntimeEnv.isEmpty()) { - return "{}"; - } - RuntimeEnvCommon.RuntimeEnvInfo.Builder protoRuntimeEnvInfoBuilder = - RuntimeEnvCommon.RuntimeEnvInfo.newBuilder(); - protoRuntimeEnvInfoBuilder.setSerializedRuntimeEnv(serializedRuntimeEnv); - JsonFormat.Printer printer = JsonFormat.printer(); - try { - return printer.print(protoRuntimeEnvInfoBuilder); - } catch (InvalidProtocolBufferException e) { - throw new RuntimeException(e); - } - } -} From 059a7cac04e95a5c1c64524a70d8e30d6fb466d3 Mon Sep 17 00:00:00 2001 From: Catch-Bull Date: Wed, 9 Mar 2022 21:18:17 +0800 Subject: [PATCH 08/11] fix by comment --- .../java/io/ray/runtime/runtimeenv/RuntimeEnvImpl.java | 1 - python/ray/tests/test_runtime_env_env_vars.py | 1 - src/ray/core_worker/common.h | 4 ++-- src/ray/raylet/scheduling/cluster_task_manager_test.cc | 7 ++++--- 4 files changed, 6 insertions(+), 7 deletions(-) diff --git a/java/runtime/src/main/java/io/ray/runtime/runtimeenv/RuntimeEnvImpl.java b/java/runtime/src/main/java/io/ray/runtime/runtimeenv/RuntimeEnvImpl.java index 97c012426ae78..ce3d736fdbeef 100644 --- a/java/runtime/src/main/java/io/ray/runtime/runtimeenv/RuntimeEnvImpl.java +++ b/java/runtime/src/main/java/io/ray/runtime/runtimeenv/RuntimeEnvImpl.java @@ -44,6 +44,5 @@ public String toJsonBytes() { } catch (InvalidProtocolBufferException e) { throw new RuntimeException(e); } - } } diff --git a/python/ray/tests/test_runtime_env_env_vars.py b/python/ray/tests/test_runtime_env_env_vars.py index a1691e6c8b71a..9a0f269f1905d 100644 --- a/python/ray/tests/test_runtime_env_env_vars.py +++ b/python/ray/tests/test_runtime_env_env_vars.py @@ -53,7 +53,6 @@ def get_env(key): @ray.remote def get_env_wrapper(key): - assert os.environ.get(key) == "b" return ray.get(get_env.remote(key)) assert ( diff --git a/src/ray/core_worker/common.h b/src/ray/core_worker/common.h index 2f17fcf53b1c1..64ce5fa559fe9 100644 --- a/src/ray/core_worker/common.h +++ b/src/ray/core_worker/common.h @@ -75,7 +75,7 @@ struct TaskOptions { /// The name of the concurrency group in which this task will be executed. std::string concurrency_group_name; /// Runtime Env Info used by this task. It includes Runtime Env and some - /// fields which not contained in Runtime Env Info, such as eager_install. + /// fields which not contained in Runtime Env, such as eager_install. /// Propagated to child actors and tasks. std::string serialized_runtime_env_info; }; @@ -141,7 +141,7 @@ struct ActorCreationOptions { /// Whether to use async mode of direct actor call. const bool is_asyncio = false; /// Runtime Env Info used by this task. It includes Runtime Env and some - /// fields which not contained in Runtime Env Info, such as eager_install. + /// fields which not contained in Runtime Env, such as eager_install. /// Propagated to child actors and tasks. std::string serialized_runtime_env_info; /// The actor concurrency groups to indicate how this actor perform its diff --git a/src/ray/raylet/scheduling/cluster_task_manager_test.cc b/src/ray/raylet/scheduling/cluster_task_manager_test.cc index 640a2126a22ca..74a5cf65cf083 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager_test.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager_test.cc @@ -135,9 +135,10 @@ std::shared_ptr CreateSingleNodeScheduler( return scheduler; } -RayTask CreateTask(const std::unordered_map &required_resources, - int num_args = 0, std::vector args = {}, - std::shared_ptr runtime_env_info = nullptr) { +RayTask CreateTask( + const std::unordered_map &required_resources, int num_args = 0, + std::vector args = {}, + const std::shared_ptr runtime_env_info = nullptr) { TaskSpecBuilder spec_builder; TaskID id = RandomTaskId(); JobID job_id = RandomJobId(); From 999a6107aa48de801e164a13c8a176881ff14b45 Mon Sep 17 00:00:00 2001 From: Catch-Bull Date: Thu, 10 Mar 2022 19:17:19 +0800 Subject: [PATCH 09/11] fix bt comment --- python/ray/actor.py | 28 +++++----------------------- python/ray/remote_function.py | 28 +++------------------------- python/ray/runtime_env.py | 23 +++++++++++++++++++---- 3 files changed, 27 insertions(+), 52 deletions(-) diff --git a/python/ray/actor.py b/python/ray/actor.py index c098325744b81..f65924ff46601 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -1,12 +1,11 @@ import inspect import logging import weakref -from typing import Union, Optional import ray.ray_constants as ray_constants import ray._raylet import ray._private.signature as signature -from ray.runtime_env import RuntimeEnv, get_runtime_env_info +from ray.runtime_env import get_runtime_env_info, parse_runtime_env import ray.worker from ray.util.annotations import PublicAPI from ray.util.placement_group import configure_placement_group_based_on_context @@ -406,23 +405,6 @@ def __call__(self, *args, **kwargs): f"use '{self.__ray_metadata__.class_name}.remote()'." ) - def _parse_runtime_env(self, runtime_env: Optional[Union[dict, RuntimeEnv]]): - # Parse local pip/conda config files here. If we instead did it in - # .remote(), it would get run in the Ray Client server, which runs on - # a remote node where the files aren't available. - if runtime_env: - if isinstance(runtime_env, dict): - return RuntimeEnv(**(runtime_env or {})) - raise TypeError( - "runtime_env must be dict or RuntimeEnv, ", - f"but got: {type(runtime_env)}", - ) - else: - # Keep the new_runtime_env as None. In .remote(), we need to know - # if runtime_env is None to know whether or not to fall back to the - # runtime_env specified in the @ray.remote decorator. - return None - @classmethod def _ray_from_modified_class( cls, @@ -482,7 +464,7 @@ def __init__(self, *args, **kwargs): modified_class.__ray_actor_class__ ) - new_runtime_env = self._parse_runtime_env(runtime_env) + new_runtime_env = parse_runtime_env(runtime_env) self.__ray_metadata__ = ActorClassMetadata( Language.PYTHON, @@ -521,7 +503,7 @@ def _ray_from_function_descriptor( ): self = ActorClass.__new__(ActorClass) - new_runtime_env = self._parse_runtime_env(runtime_env) + new_runtime_env = parse_runtime_env(runtime_env) self.__ray_metadata__ = ActorClassMetadata( language, @@ -600,7 +582,7 @@ def method(self): actor_cls = self - new_runtime_env = self._parse_runtime_env(runtime_env) + new_runtime_env = parse_runtime_env(runtime_env) cls_options = dict( num_cpus=num_cpus, @@ -954,7 +936,7 @@ def _remote( scheduling_strategy = "DEFAULT" if runtime_env: - new_runtime_env = self._parse_runtime_env(runtime_env) + new_runtime_env = parse_runtime_env(runtime_env) else: new_runtime_env = meta.runtime_env serialized_runtime_env_info = None diff --git a/python/ray/remote_function.py b/python/ray/remote_function.py index 24232b7edc998..fb92a5619a3b6 100644 --- a/python/ray/remote_function.py +++ b/python/ray/remote_function.py @@ -2,7 +2,6 @@ import inspect import logging import uuid -from typing import Union, Optional from ray import cloudpickle as pickle from ray.util.scheduling_strategies import ( @@ -15,7 +14,7 @@ from ray._private.client_mode_hook import client_mode_should_convert from ray.util.placement_group import configure_placement_group_based_on_context import ray._private.signature -from ray.runtime_env import RuntimeEnv, get_runtime_env_info +from ray.runtime_env import get_runtime_env_info, parse_runtime_env from ray.util.tracing.tracing_helper import ( _tracing_task_invocation, _inject_tracing_into_function, @@ -141,7 +140,7 @@ def __init__( else retry_exceptions ) - self._runtime_env = self._parse_runtime_env(runtime_env) + self._runtime_env = parse_runtime_env(runtime_env) self._placement_group = placement_group self._decorator = getattr(function, "__ray_invocation_decorator__", None) @@ -160,23 +159,6 @@ def _remote_proxy(*args, **kwargs): self.remote = _remote_proxy - def _parse_runtime_env(self, runtime_env: Optional[Union[dict, RuntimeEnv]]): - # Parse local pip/conda config files here. If we instead did it in - # .remote(), it would get run in the Ray Client server, which runs on - # a remote node where the files aren't available. - if runtime_env: - if isinstance(runtime_env, dict): - return RuntimeEnv(**(runtime_env or {})) - raise TypeError( - "runtime_env must be dict or RuntimeEnv, ", - f"but got: {type(runtime_env)}", - ) - else: - # Keep the runtime_env as None. In .remote(), we need to know if - # runtime_env is None to know whether or not to fall back to the - # runtime_env specified in the @ray.remote decorator. - return None - def __call__(self, *args, **kwargs): raise TypeError( "Remote functions cannot be called directly. Instead " @@ -222,11 +204,7 @@ def f(): """ func_cls = self - # Parse local pip/conda config files here. If we instead did it in - # .remote(), it would get run in the Ray Client server, which runs on - # a remote node where the files aren't available. - - new_runtime_env = self._parse_runtime_env(runtime_env) + new_runtime_env = parse_runtime_env(runtime_env) options = dict( num_returns=num_returns, diff --git a/python/ray/runtime_env.py b/python/ray/runtime_env.py index 9d4ecdb2f286e..ee7ff6833477b 100644 --- a/python/ray/runtime_env.py +++ b/python/ray/runtime_env.py @@ -628,7 +628,22 @@ def get_runtime_env_info( if not serialize: return proto_runtime_env_info - return json.dumps( - json.loads(json_format.MessageToJson(proto_runtime_env_info)), - sort_keys=True, - ) + return json_format.MessageToJson(proto_runtime_env_info) + + +def parse_runtime_env(runtime_env: Optional[Union[Dict, RuntimeEnv]]): + # Parse local pip/conda config files here. If we instead did it in + # .remote(), it would get run in the Ray Client server, which runs on + # a remote node where the files aren't available. + if runtime_env: + if isinstance(runtime_env, dict): + return RuntimeEnv(**(runtime_env or {})) + raise TypeError( + "runtime_env must be dict or RuntimeEnv, ", + f"but got: {type(runtime_env)}", + ) + else: + # Keep the new_runtime_env as None. In .remote(), we need to know + # if runtime_env is None to know whether or not to fall back to the + # runtime_env specified in the @ray.remote decorator. + return None From f9976be226336af10c38d4f8da8b61c61f65b3c9 Mon Sep 17 00:00:00 2001 From: Catch-Bull Date: Thu, 10 Mar 2022 19:26:50 +0800 Subject: [PATCH 10/11] move get_runtime_env_info and parse_runtime_env to ray.utils --- python/ray/actor.py | 2 +- python/ray/job_config.py | 2 +- python/ray/remote_function.py | 2 +- python/ray/runtime_env.py | 64 +-------------------------------- python/ray/utils.py | 66 +++++++++++++++++++++++++++++++++++ 5 files changed, 70 insertions(+), 66 deletions(-) diff --git a/python/ray/actor.py b/python/ray/actor.py index f65924ff46601..416cc0e8c3f8b 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -5,7 +5,7 @@ import ray.ray_constants as ray_constants import ray._raylet import ray._private.signature as signature -from ray.runtime_env import get_runtime_env_info, parse_runtime_env +from ray.utils import get_runtime_env_info, parse_runtime_env import ray.worker from ray.util.annotations import PublicAPI from ray.util.placement_group import configure_placement_group_based_on_context diff --git a/python/ray/job_config.py b/python/ray/job_config.py index 55c86dbc38eb8..77c817feaa983 100644 --- a/python/ray/job_config.py +++ b/python/ray/job_config.py @@ -100,7 +100,7 @@ def get_proto_job_config(self): # TODO(edoakes): this is really unfortunate, but JobConfig is imported # all over the place so this causes circular imports. We should remove # this dependency and pass in a validated runtime_env instead. - from ray.runtime_env import get_runtime_env_info + from ray.utils import get_runtime_env_info if self._cached_pb is None: pb = gcs_utils.JobConfig() diff --git a/python/ray/remote_function.py b/python/ray/remote_function.py index fb92a5619a3b6..760db761a2d96 100644 --- a/python/ray/remote_function.py +++ b/python/ray/remote_function.py @@ -14,7 +14,7 @@ from ray._private.client_mode_hook import client_mode_should_convert from ray.util.placement_group import configure_placement_group_based_on_context import ray._private.signature -from ray.runtime_env import get_runtime_env_info, parse_runtime_env +from ray.utils import get_runtime_env_info, parse_runtime_env from ray.util.tracing.tracing_helper import ( _tracing_task_invocation, _inject_tracing_into_function, diff --git a/python/ray/runtime_env.py b/python/ray/runtime_env.py index ee7ff6833477b..85442f7a482a3 100644 --- a/python/ray/runtime_env.py +++ b/python/ray/runtime_env.py @@ -6,10 +6,7 @@ from copy import deepcopy import ray -from ray.core.generated.runtime_env_common_pb2 import ( - RuntimeEnv as ProtoRuntimeEnv, - RuntimeEnvInfo as ProtoRuntimeEnvInfo, -) +from ray.core.generated.runtime_env_common_pb2 import RuntimeEnv as ProtoRuntimeEnv from ray._private.runtime_env.plugin import RuntimeEnvPlugin, encode_plugin_uri from ray._private.runtime_env.validation import OPTION_TO_VALIDATION_FN from ray._private.utils import import_attr @@ -588,62 +585,3 @@ def __setstate__(self, state): for k, v in state.items(): self[k] = v self.__proto_runtime_env = None - - -def get_runtime_env_info( - runtime_env: RuntimeEnv, - *, - is_job_runtime_env: bool = False, - serialize: bool = False, -): - """Create runtime env info from runtime env. - - In the user interface, the argument `runtime_env` contains some fields - which not contained in `ProtoRuntimeEnv` but in `ProtoRuntimeEnvInfo`, - such as `eager_install`. This function will extract those fields from - `RuntimeEnv` and create a new `ProtoRuntimeEnvInfo`, and serialize it. - """ - proto_runtime_env_info = ProtoRuntimeEnvInfo() - - proto_runtime_env_info.uris[:] = runtime_env.get_uris() - - # Normally, `RuntimeEnv` should guarantee the accuracy of field eager_install, - # but so far, the internal code has not completely prohibited direct - # modification of fields in RuntimeEnv, so we should check it for insurance. - # TODO(Catch-Bull): overload `__setitem__` for `RuntimeEnv`, change the - # runtime_env of all internal code from dict to RuntimeEnv. - - eager_install = runtime_env.get("eager_install") - if is_job_runtime_env or eager_install is not None: - if eager_install is None: - eager_install = True - elif not isinstance(eager_install, bool): - raise TypeError( - f"eager_install must be a boolean. got {type(eager_install)}" - ) - proto_runtime_env_info.runtime_env_eager_install = eager_install - - proto_runtime_env_info.serialized_runtime_env = runtime_env.serialize() - - if not serialize: - return proto_runtime_env_info - - return json_format.MessageToJson(proto_runtime_env_info) - - -def parse_runtime_env(runtime_env: Optional[Union[Dict, RuntimeEnv]]): - # Parse local pip/conda config files here. If we instead did it in - # .remote(), it would get run in the Ray Client server, which runs on - # a remote node where the files aren't available. - if runtime_env: - if isinstance(runtime_env, dict): - return RuntimeEnv(**(runtime_env or {})) - raise TypeError( - "runtime_env must be dict or RuntimeEnv, ", - f"but got: {type(runtime_env)}", - ) - else: - # Keep the new_runtime_env as None. In .remote(), we need to know - # if runtime_env is None to know whether or not to fall back to the - # runtime_env specified in the @ray.remote decorator. - return None diff --git a/python/ray/utils.py b/python/ray/utils.py index aff4f519fc974..770d70bde4093 100644 --- a/python/ray/utils.py +++ b/python/ray/utils.py @@ -1,4 +1,11 @@ +from typing import Dict, Union, Optional +from google.protobuf import json_format + import ray._private.utils as private_utils +from ray.runtime import RuntimeEnv +from ray.core.generated.runtime_env_common_pb2 import ( + RuntimeEnvInfo as ProtoRuntimeEnvInfo, +) deprecated = private_utils.deprecated( "If you need to use this function, open a feature request issue on " "GitHub.", @@ -7,3 +14,62 @@ ) get_system_memory = deprecated(private_utils.get_system_memory) + + +def get_runtime_env_info( + runtime_env: RuntimeEnv, + *, + is_job_runtime_env: bool = False, + serialize: bool = False, +): + """Create runtime env info from runtime env. + + In the user interface, the argument `runtime_env` contains some fields + which not contained in `ProtoRuntimeEnv` but in `ProtoRuntimeEnvInfo`, + such as `eager_install`. This function will extract those fields from + `RuntimeEnv` and create a new `ProtoRuntimeEnvInfo`, and serialize it. + """ + proto_runtime_env_info = ProtoRuntimeEnvInfo() + + proto_runtime_env_info.uris[:] = runtime_env.get_uris() + + # Normally, `RuntimeEnv` should guarantee the accuracy of field eager_install, + # but so far, the internal code has not completely prohibited direct + # modification of fields in RuntimeEnv, so we should check it for insurance. + # TODO(Catch-Bull): overload `__setitem__` for `RuntimeEnv`, change the + # runtime_env of all internal code from dict to RuntimeEnv. + + eager_install = runtime_env.get("eager_install") + if is_job_runtime_env or eager_install is not None: + if eager_install is None: + eager_install = True + elif not isinstance(eager_install, bool): + raise TypeError( + f"eager_install must be a boolean. got {type(eager_install)}" + ) + proto_runtime_env_info.runtime_env_eager_install = eager_install + + proto_runtime_env_info.serialized_runtime_env = runtime_env.serialize() + + if not serialize: + return proto_runtime_env_info + + return json_format.MessageToJson(proto_runtime_env_info) + + +def parse_runtime_env(runtime_env: Optional[Union[Dict, RuntimeEnv]]): + # Parse local pip/conda config files here. If we instead did it in + # .remote(), it would get run in the Ray Client server, which runs on + # a remote node where the files aren't available. + if runtime_env: + if isinstance(runtime_env, dict): + return RuntimeEnv(**(runtime_env or {})) + raise TypeError( + "runtime_env must be dict or RuntimeEnv, ", + f"but got: {type(runtime_env)}", + ) + else: + # Keep the new_runtime_env as None. In .remote(), we need to know + # if runtime_env is None to know whether or not to fall back to the + # runtime_env specified in the @ray.remote decorator. + return None From fb41a04f3078a8d4c87b96ae5acb54dd8753b10e Mon Sep 17 00:00:00 2001 From: Catch-Bull Date: Thu, 10 Mar 2022 19:30:29 +0800 Subject: [PATCH 11/11] fix --- python/ray/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/utils.py b/python/ray/utils.py index 770d70bde4093..c01d6d304b8a4 100644 --- a/python/ray/utils.py +++ b/python/ray/utils.py @@ -2,7 +2,7 @@ from google.protobuf import json_format import ray._private.utils as private_utils -from ray.runtime import RuntimeEnv +from ray.runtime_env import RuntimeEnv from ray.core.generated.runtime_env_common_pb2 import ( RuntimeEnvInfo as ProtoRuntimeEnvInfo, )