Skip to content

Commit

Permalink
Support recording usage in ray client (ray-project#26957)
Browse files Browse the repository at this point in the history
Signed-off-by: Jiajun Yao [email protected]

# Why are these changes needed?

Fix internval kv operations in ray client mode
Support recording usages in ray client. Currently we may miss library usages if users use ray client to run their program.
  • Loading branch information
jjyao committed Jul 27, 2022
1 parent 4c30325 commit 6c2bfcc
Show file tree
Hide file tree
Showing 9 changed files with 235 additions and 93 deletions.
29 changes: 19 additions & 10 deletions python/ray/_private/usage/usage_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,9 @@ def _put_library_usage(library_usage: str):
assert _internal_kv_initialized()
try:
_internal_kv_put(
f"{usage_constant.LIBRARY_USAGE_PREFIX}{library_usage}",
"",
namespace=usage_constant.USAGE_STATS_NAMESPACE,
f"{usage_constant.LIBRARY_USAGE_PREFIX}{library_usage}".encode(),
b"",
namespace=usage_constant.USAGE_STATS_NAMESPACE.encode(),
)
except Exception as e:
logger.debug(f"Failed to put library usage, {e}")
Expand Down Expand Up @@ -275,9 +275,9 @@ def _put_extra_usage_tag(key: str, value: str):
assert _internal_kv_initialized()
try:
_internal_kv_put(
f"{usage_constant.EXTRA_USAGE_TAG_PREFIX}{key}",
value,
namespace=usage_constant.USAGE_STATS_NAMESPACE,
f"{usage_constant.EXTRA_USAGE_TAG_PREFIX}{key}".encode(),
value.encode(),
namespace=usage_constant.USAGE_STATS_NAMESPACE.encode(),
)
except Exception as e:
logger.debug(f"Failed to put extra usage tag, {e}")
Expand All @@ -299,12 +299,13 @@ def record_library_usage(library_usage: str):
# This happens if the library is imported before ray.init
return

# Only report lib usage for driver / workers. Otherwise,
# Only report lib usage for driver / ray client / workers. Otherwise,
# it can be reported if the library is imported from
# e.g., API server.
if (
ray._private.worker.global_worker.mode == ray.SCRIPT_MODE
or ray._private.worker.global_worker.mode == ray.WORKER_MODE
or ray.util.client.ray.is_connected()
):
_put_library_usage(library_usage)

Expand All @@ -314,8 +315,12 @@ def _put_pre_init_library_usages():
# NOTE: When the lib is imported from a worker, ray should
# always be initialized, so there's no need to register the
# pre init hook.
if ray._private.worker.global_worker.mode != ray.SCRIPT_MODE:
if not (
ray._private.worker.global_worker.mode == ray.SCRIPT_MODE
or ray.util.client.ray.is_connected()
):
return

for library_usage in _recorded_library_usages:
_put_library_usage(library_usage)

Expand All @@ -326,8 +331,12 @@ def _put_pre_init_extra_usage_tags():
_put_extra_usage_tag(k, v)


ray._private.worker._post_init_hooks.append(_put_pre_init_library_usages)
ray._private.worker._post_init_hooks.append(_put_pre_init_extra_usage_tags)
def put_pre_init_usage_stats():
_put_pre_init_library_usages()
_put_pre_init_extra_usage_tags()


ray._private.worker._post_init_hooks.append(put_pre_init_usage_stats)


def _usage_stats_report_url():
Expand Down
10 changes: 9 additions & 1 deletion python/ray/_private/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1226,7 +1226,15 @@ def init(
passed_kwargs[argument_name] = passed_value
passed_kwargs.update(kwargs)
builder._init_args(**passed_kwargs)
return builder.connect()
ctx = builder.connect()
from ray._private.usage import usage_lib

if passed_kwargs.get("allow_multiple") is True:
with ctx:
usage_lib.put_pre_init_usage_stats()
else:
usage_lib.put_pre_init_usage_stats()
return ctx

if kwargs:
# User passed in extra keyword arguments but isn't connecting through
Expand Down
6 changes: 2 additions & 4 deletions python/ray/tests/test_basic_5.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import ray
import ray.cluster_utils
from ray._private.test_utils import (
client_test_enabled,
run_string_as_driver,
wait_for_pid_to_exit,
)
Expand Down Expand Up @@ -83,16 +82,15 @@ def foo(self):
assert ray.get(worker_2.foo.remote()) is None


@pytest.mark.skipif(
client_test_enabled(), reason="client api doesn't support namespace right now."
)
def test_internal_kv(ray_start_regular):
import ray.experimental.internal_kv as kv

assert kv._internal_kv_get("k1") is None
assert kv._internal_kv_put("k1", "v1") is False
assert kv._internal_kv_put("k1", "v1") is True
assert kv._internal_kv_get("k1") == b"v1"
assert kv._internal_kv_exists(b"k1") is True
assert kv._internal_kv_exists(b"k2") is False

assert kv._internal_kv_get("k1", namespace="n") is None
assert kv._internal_kv_put("k1", "v1", namespace="n") is False
Expand Down
Loading

0 comments on commit 6c2bfcc

Please sign in to comment.