Skip to content

Commit

Permalink
[core] Add stats for the gcs backend for telemetry. (ray-project#27876)
Browse files Browse the repository at this point in the history
## Why are these changes needed?

To get better understanding of how GCS FT is used, adding this metrics.

Test:
```
cat /tmp/ray/session_latest/usage_stats.json
{"usage_stats": {"ray_version": "3.0.0.dev0", "python_version": "3.9.12", "schema_version": "0.1", "source": "OSS", "session_id": "70d3ecd3-5b16-40c3-9301-fd05404ea92a", "git_commit": "{{RAY_COMMIT_SHA}}", "os": "linux", "collect_timestamp_ms": 1660587366806, "session_start_timestamp_ms": 1660587351586, "cloud_provider": null, "min_workers": null, "max_workers": null, "head_node_instance_type": null, "worker_node_instance_types": null, "total_num_cpus": 16, "total_num_gpus": null, "total_memory_gb": 16.10752945020795, "total_object_store_memory_gb": 8.053764724172652, "library_usages": ["serve"], "total_success": 0, "total_failed": 13, "seq_number": 13, "extra_usage_tags": {"serve_api_version": "v1", "gcs_storage": "redis", "serve_num_deployments": "1"}, "total_num_nodes": 2, "total_num_running_jobs": 2}}
```
  • Loading branch information
fishbone authored and JiahaoYao committed Aug 30, 2022
1 parent c48f8a3 commit 3323b74
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 6 deletions.
2 changes: 2 additions & 0 deletions ci/ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,8 @@ test_minimal() {
bazel test --test_output=streamed --config=ci --test_env=RAY_MINIMAL=1 ${BAZEL_EXPORT_OPTIONS} python/ray/dashboard/test_dashboard
# shellcheck disable=SC2086
bazel test --test_output=streamed --config=ci --test_env=RAY_MINIMAL=1 ${BAZEL_EXPORT_OPTIONS} python/ray/tests/test_usage_stats
# shellcheck disable=SC2086
bazel test --test_output=streamed --config=ci --test_env=RAY_MINIMAL=1 --test_env=TEST_EXTERNAL_REDIS=1 ${BAZEL_EXPORT_OPTIONS} python/ray/tests/test_usage_stats
}

_main() {
Expand Down
15 changes: 15 additions & 0 deletions python/ray/_private/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ def __init__(
# Makes sure the Node object has valid addresses after setup.
self.validate_ip_port(self.address)
self.validate_ip_port(self.gcs_address)
self._record_stats()

@staticmethod
def validate_ip_port(ip_port):
Expand Down Expand Up @@ -1501,3 +1502,17 @@ def _remove_protocol_from_url(self, url: Optional[str]) -> str:
scheme = "%s:https://" % parsed_url.scheme
return parsed_url.geturl().replace(scheme, "", 1)
return url

def _record_stats(self):
# Initialize the internal kv so that the metrics can be put
from ray._private.usage.usage_lib import TagKey, record_extra_usage_tag

if not ray.experimental.internal_kv._internal_kv_initialized():
ray.experimental.internal_kv._initialize_internal_kv(self.get_gcs_client)
assert ray.experimental.internal_kv._internal_kv_initialized()
if self.head:
# record head node stats
gcs_storage_type = (
"redis" if os.environ.get("RAY_REDIS_ADDRESS") is not None else "memory"
)
record_extra_usage_tag(TagKey.GCS_STORAGE, gcs_storage_type)
3 changes: 3 additions & 0 deletions python/ray/_private/usage/usage_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,9 @@ class TagKey(Enum):
# The total number of running serve deployments as a string.
SERVE_NUM_DEPLOYMENTS = auto()

# The GCS storage type, which could be memory or redis
GCS_STORAGE = auto()


def record_extra_usage_tag(key: TagKey, value: str):
"""Record extra kv usage tag.
Expand Down
37 changes: 31 additions & 6 deletions python/ray/tests/test_usage_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ def print_dashboard_log():
pprint(contents)


@pytest.fixture
def gcs_storage_type():
storage = "redis" if os.environ.get("RAY_REDIS_ADDRESS") else "memory"
yield storage


@pytest.fixture
def reset_usage_stats():
yield
Expand All @@ -116,7 +122,7 @@ def reset_ray_version_commit():

@pytest.mark.parametrize("ray_client", [True, False])
def test_get_extra_usage_tags_to_report(
monkeypatch, call_ray_start, reset_usage_stats, ray_client
monkeypatch, call_ray_start, reset_usage_stats, ray_client, gcs_storage_type
):
with monkeypatch.context() as m:
# Test a normal case.
Expand Down Expand Up @@ -172,13 +178,23 @@ def test_get_extra_usage_tags_to_report(
result = ray_usage_lib.get_extra_usage_tags_to_report(
ray.experimental.internal_kv.internal_kv_get_gcs_client()
)
assert result == {"key": "val", "_test1": "val1", "_test2": "val2"}
assert result == {
"key": "val",
"_test1": "val1",
"_test2": "val2",
"gcs_storage": gcs_storage_type,
}
# Make sure the value is overwritten.
ray_usage_lib.record_extra_usage_tag(ray_usage_lib.TagKey._TEST2, "val3")
result = ray_usage_lib.get_extra_usage_tags_to_report(
ray.experimental.internal_kv.internal_kv_get_gcs_client()
)
assert result == {"key": "val", "_test1": "val1", "_test2": "val3"}
assert result == {
"key": "val",
"_test1": "val1",
"_test2": "val3",
"gcs_storage": gcs_storage_type,
}


def test_usage_stats_enabledness(monkeypatch, tmp_path, reset_usage_stats):
Expand Down Expand Up @@ -843,7 +859,9 @@ def ready(self):
sys.platform == "win32",
reason="Test depends on runtime env feature not supported on Windows.",
)
def test_usage_report_e2e(monkeypatch, ray_start_cluster, tmp_path, reset_usage_stats):
def test_usage_report_e2e(
monkeypatch, ray_start_cluster, tmp_path, reset_usage_stats, gcs_storage_type
):
"""
Test usage report works e2e with env vars.
"""
Expand Down Expand Up @@ -957,6 +975,7 @@ def ready(self):
"_test2": "extra_v3",
"serve_num_deployments": "1",
"serve_api_version": "v1",
"gcs_storage": gcs_storage_type,
}
assert payload["total_num_nodes"] == 1
assert payload["total_num_running_jobs"] == 1
Expand Down Expand Up @@ -1248,7 +1267,9 @@ def verify():
wait_for_condition(verify)


def test_usage_stats_tags(monkeypatch, ray_start_cluster, reset_usage_stats):
def test_usage_stats_tags(
monkeypatch, ray_start_cluster, reset_usage_stats, gcs_storage_type
):
"""
Test usage tags are correctly reported.
"""
Expand All @@ -1272,7 +1293,11 @@ def test_usage_stats_tags(monkeypatch, ray_start_cluster, reset_usage_stats):
def verify():
tags = read_file(temp_dir, "usage_stats")["extra_usage_tags"]
num_nodes = read_file(temp_dir, "usage_stats")["total_num_nodes"]
assert tags == {"key": "val", "key2": "val2"}
assert tags == {
"key": "val",
"key2": "val2",
"gcs_storage": gcs_storage_type,
}
assert num_nodes == 2
return True

Expand Down

0 comments on commit 3323b74

Please sign in to comment.