Skip to content

Commit

Permalink
Revert "Revert "[Core] Add "env_vars" field to runtime_env"" (ray-pro…
Browse files Browse the repository at this point in the history
  • Loading branch information
architkulkarni committed May 27, 2021
1 parent 94dc06d commit 65eab8f
Show file tree
Hide file tree
Showing 9 changed files with 409 additions and 261 deletions.
1 change: 1 addition & 0 deletions ci/travis/ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ test_python() {
-python/ray/tests:test_placement_group # timeout and OOM
-python/ray/tests:test_ray_init # test_redis_port() seems to fail here, but pass in isolation
-python/ray/tests:test_resource_demand_scheduler
-python/ray/tests:test_runtime_env_env_vars # runtime_env not supported on Windows
-python/ray/tests:test_stress # timeout
-python/ray/tests:test_stress_sharded # timeout
-python/ray/tests:test_k8s_operator_unit_tests
Expand Down
32 changes: 18 additions & 14 deletions python/ray/_private/runtime_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,29 +124,33 @@ def __init__(self, runtime_env_json: dict):
if "_ray_release" in runtime_env_json:
self._dict["_ray_release"] = runtime_env_json["_ray_release"]

self._dict["env_vars"] = None
if "env_vars" in runtime_env_json:
env_vars = runtime_env_json["env_vars"]
self._dict["env_vars"] = env_vars
if not (isinstance(env_vars, dict) and all(
isinstance(k, str) and isinstance(v, str)
for (k, v) in env_vars.items())):
raise TypeError("runtime_env['env_vars'] must be of type"
"Dict[str, str]")

if self._dict.get("working_dir"):
if self._dict["env_vars"] is None:
self._dict["env_vars"] = {}
# TODO(ekl): env vars is probably not the right long term impl.
self._dict["env_vars"].update(
RAY_RUNTIME_ENV_FILES=self._dict["working_dir"])

# TODO(ekl) we should have better schema validation here.
# TODO(ekl) support py_modules
# TODO(architkulkarni) support env_vars, docker
# TODO(architkulkarni) support docker

# TODO(architkulkarni) This is to make it easy for the worker caching
# code in C++ to check if the env is empty without deserializing and
# parsing it. We should use a less confusing approach here.
if all(val is None for val in self._dict.values()):
self._dict = {}

def to_worker_env_vars(self, override_environment_variables: dict) -> dict:
"""Given existing worker env vars, return an updated dict.
This sets any necessary env vars to setup the runtime env.
TODO(ekl): env vars is probably not the right long term impl.
"""
if override_environment_variables is None:
override_environment_variables = {}
if self._dict.get("working_dir"):
override_environment_variables.update(
RAY_RUNTIME_ENV_FILES=self._dict["working_dir"])
return override_environment_variables

def get_parsed_dict(self) -> dict:
return self._dict

Expand Down
14 changes: 9 additions & 5 deletions python/ray/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -701,13 +701,17 @@ def _remote(self,
creation_args = signature.flatten_args(function_signature, args,
kwargs)
if runtime_env:
parsed_runtime_env = runtime_support.RuntimeEnvDict(runtime_env)
override_environment_variables = (
parsed_runtime_env.to_worker_env_vars(
override_environment_variables))
runtime_env_dict = parsed_runtime_env.get_parsed_dict()
runtime_env_dict = runtime_support.RuntimeEnvDict(
runtime_env).get_parsed_dict()
else:
runtime_env_dict = {}

if override_environment_variables:
logger.warning("override_environment_variables is deprecated and "
"will be removed in Ray 1.5. Please use "
".options(runtime_env={'env_vars': {...}}).remote()"
"instead.")

actor_id = worker.core_worker.create_actor(
meta.language,
meta.actor_creation_function_descriptor,
Expand Down
5 changes: 3 additions & 2 deletions python/ray/job_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ def set_runtime_env(self, runtime_env: Optional[Dict[str, Any]]) -> None:
del without_dir["working_dir"]
self._parsed_runtime_env = runtime_support.RuntimeEnvDict(
without_dir)
self.worker_env = self._parsed_runtime_env.to_worker_env_vars(
self.worker_env)
self.worker_env.update(
self._parsed_runtime_env.get_parsed_dict().get("env_vars")
or {})
else:
self._parsed_runtime_env = runtime_support.RuntimeEnvDict({})
self.runtime_env = runtime_env or dict()
Expand Down
13 changes: 8 additions & 5 deletions python/ray/remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,14 +270,17 @@ def _remote(self,
accelerator_type)

if runtime_env:
parsed_runtime_env = runtime_support.RuntimeEnvDict(runtime_env)
override_environment_variables = (
parsed_runtime_env.to_worker_env_vars(
override_environment_variables))
runtime_env_dict = parsed_runtime_env.get_parsed_dict()
runtime_env_dict = runtime_support.RuntimeEnvDict(
runtime_env).get_parsed_dict()
else:
runtime_env_dict = {}

if override_environment_variables:
logger.warning("override_environment_variables is deprecated and "
"will be removed in Ray 1.5. Please use "
".options(runtime_env={'env_vars': {...}}).remote()"
"instead.")

def invocation(args, kwargs):
if self._is_cross_language:
list_args = cross_language.format_args(worker, args, kwargs)
Expand Down
1 change: 1 addition & 0 deletions python/ray/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ py_test_module_list(
"test_reconstruction.py",
"test_reference_counting.py",
"test_resource_demand_scheduler.py",
"test_runtime_env_env_vars.py",
"test_scheduling.py",
"test_serialization.py",
"test_stress.py",
Expand Down
235 changes: 0 additions & 235 deletions python/ray/tests/test_advanced_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -733,241 +733,6 @@ def test_k8s_cpu():
assert 50 < k8s_utils.cpu_percent() < 60


def test_override_environment_variables_task(ray_start_regular):
@ray.remote
def get_env(key):
return os.environ.get(key)

assert (ray.get(
get_env.options(override_environment_variables={
"a": "b",
}).remote("a")) == "b")


def test_override_environment_variables_actor(ray_start_regular):
@ray.remote
class EnvGetter:
def get(self, key):
return os.environ.get(key)

a = EnvGetter.options(override_environment_variables={
"a": "b",
"c": "d",
}).remote()
assert (ray.get(a.get.remote("a")) == "b")
assert (ray.get(a.get.remote("c")) == "d")


def test_override_environment_variables_nested_task(ray_start_regular):
@ray.remote
def get_env(key):
return os.environ.get(key)

@ray.remote
def get_env_wrapper(key):
return ray.get(get_env.remote(key))

assert (ray.get(
get_env_wrapper.options(override_environment_variables={
"a": "b",
}).remote("a")) == "b")


def test_override_environment_variables_multitenancy(shutdown_only):
ray.init(
job_config=ray.job_config.JobConfig(worker_env={
"foo1": "bar1",
"foo2": "bar2",
}))

@ray.remote
def get_env(key):
return os.environ.get(key)

assert ray.get(get_env.remote("foo1")) == "bar1"
assert ray.get(get_env.remote("foo2")) == "bar2"
assert ray.get(
get_env.options(override_environment_variables={
"foo1": "baz1",
}).remote("foo1")) == "baz1"
assert ray.get(
get_env.options(override_environment_variables={
"foo1": "baz1",
}).remote("foo2")) == "bar2"


def test_override_environment_variables_complex(shutdown_only):
ray.init(
job_config=ray.job_config.JobConfig(worker_env={
"a": "job_a",
"b": "job_b",
"z": "job_z",
}))

@ray.remote
def get_env(key):
return os.environ.get(key)

@ray.remote
class NestedEnvGetter:
def get(self, key):
return os.environ.get(key)

def get_task(self, key):
return ray.get(get_env.remote(key))

@ray.remote
class EnvGetter:
def get(self, key):
return os.environ.get(key)

def get_task(self, key):
return ray.get(get_env.remote(key))

def nested_get(self, key):
aa = NestedEnvGetter.options(override_environment_variables={
"c": "e",
"d": "dd",
}).remote()
return ray.get(aa.get.remote(key))

a = EnvGetter.options(override_environment_variables={
"a": "b",
"c": "d",
}).remote()
assert (ray.get(a.get.remote("a")) == "b")
assert (ray.get(a.get_task.remote("a")) == "b")
assert (ray.get(a.nested_get.remote("a")) == "b")
assert (ray.get(a.nested_get.remote("c")) == "e")
assert (ray.get(a.nested_get.remote("d")) == "dd")
assert (ray.get(
get_env.options(override_environment_variables={
"a": "b",
}).remote("a")) == "b")

assert (ray.get(a.get.remote("z")) == "job_z")
assert (ray.get(a.get_task.remote("z")) == "job_z")
assert (ray.get(a.nested_get.remote("z")) == "job_z")
assert (ray.get(
get_env.options(override_environment_variables={
"a": "b",
}).remote("z")) == "job_z")


def test_override_environment_variables_reuse(shutdown_only):
"""Test that previously set env vars don't pollute newer calls."""
ray.init()

env_var_name = "TEST123"
val1 = "VAL1"
val2 = "VAL2"
assert os.environ.get(env_var_name) is None

@ray.remote
def f():
return os.environ.get(env_var_name)

@ray.remote
def g():
return os.environ.get(env_var_name)

assert ray.get(f.remote()) is None
assert ray.get(
f.options(override_environment_variables={
env_var_name: val1
}).remote()) == val1
assert ray.get(f.remote()) is None
assert ray.get(g.remote()) is None
assert ray.get(
f.options(override_environment_variables={
env_var_name: val2
}).remote()) == val2
assert ray.get(g.remote()) is None
assert ray.get(f.remote()) is None


def test_override_environment_variables_env_caching(shutdown_only):
"""Test that workers with specified envs are cached and reused.
When a new task or actor is created with a new runtime env, a
new worker process is started. If a subsequent task or actor
uses the same runtime env, the same worker process should be
used. This function checks the pid of the worker to test this.
"""
ray.init()

env_var_name = "TEST123"
val1 = "VAL1"
val2 = "VAL2"
assert os.environ.get(env_var_name) is None

def task():
return os.environ.get(env_var_name), os.getpid()

@ray.remote
def f():
return task()

@ray.remote
def g():
return task()

# Empty runtime env does not set our env var.
assert ray.get(f.remote())[0] is None

# Worker pid1 should have an env var set.
ret_val1, pid1 = ray.get(
f.options(override_environment_variables={
env_var_name: val1
}).remote())
assert ret_val1 == val1

# Worker pid2 should have an env var set to something different.
ret_val2, pid2 = ray.get(
g.options(override_environment_variables={
env_var_name: val2
}).remote())
assert ret_val2 == val2

# Because the runtime env is different, it should use a different process.
assert pid1 != pid2

# Call g with an empty runtime env. It shouldn't reuse pid2, because
# pid2 has an env var set.
_, pid3 = ray.get(g.remote())
assert pid2 != pid3

# Call g with the same runtime env as pid2. Check it uses the same process.
_, pid4 = ray.get(
g.options(override_environment_variables={
env_var_name: val2
}).remote())
assert pid4 == pid2

# Call f with a different runtime env from pid1. Check that it uses a new
# process.
_, pid5 = ray.get(
f.options(override_environment_variables={
env_var_name: val2
}).remote())
assert pid5 != pid1

# Call f with the same runtime env as pid1. Check it uses the same
# process.
_, pid6 = ray.get(
f.options(override_environment_variables={
env_var_name: val1
}).remote())
assert pid6 == pid1

# Same as above but with g instead of f. Shouldn't affect the outcome.
_, pid7 = ray.get(
g.options(override_environment_variables={
env_var_name: val1
}).remote())
assert pid7 == pid1


def test_sync_job_config(shutdown_only):
num_java_workers_per_process = 8
worker_env = {
Expand Down
Loading

0 comments on commit 65eab8f

Please sign in to comment.