Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[serve] Add telemetry for lightweight config updates #34039

Merged
merged 3 commits into from
Apr 4, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
add telemetry for lightweight config updates
Signed-off-by: Cindy Zhang <[email protected]>
  • Loading branch information
zcin committed Apr 4, 2023
commit 817ebe6ecd2093d8e54b5f14a4d047ac791fdcfa
3 changes: 3 additions & 0 deletions python/ray/serve/_private/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,9 @@ def dict_keys_snake_to_camel_case(snake_dict: dict) -> dict:
"SERVE_GRPC_INGRESS_USED": TagKey.SERVE_GRPC_INGRESS_USED,
"SERVE_REST_API_VERSION": TagKey.SERVE_REST_API_VERSION,
"SERVE_NUM_APPS": TagKey.SERVE_NUM_APPS,
"SERVE_NUM_REPLICAS_UPDATED": TagKey.SERVE_NUM_REPLICAS_UPDATED,
"SERVE_USER_CONFIG_UPDATED": TagKey.SERVE_USER_CONFIG_UPDATED,
"SERVE_AUTOSCALING_CONFIG_UPDATED": TagKey.SERVE_AUTOSCALING_CONFIG_UPDATED,
}


Expand Down
28 changes: 18 additions & 10 deletions python/ray/serve/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
get_random_letters,
)
from ray.serve._private.application_state import ApplicationStateManager
from ray._private.usage.usage_lib import TagKey, record_extra_usage_tag

logger = logging.getLogger(SERVE_LOGGER_NAME)

Expand Down Expand Up @@ -866,16 +867,17 @@ def _generate_deployment_config_versions(
d["name"]: d for d in last_deployed_config.get("deployments", [])
}

lightweight_update_options = {
"num_replicas": TagKey.SERVE_NUM_REPLICAS_UPDATED,
"user_config": TagKey.SERVE_USER_CONFIG_UPDATED,
"autoscaling_config": TagKey.SERVE_AUTOSCALING_CONFIG_UPDATED,
}

def exclude_lightweight_update_options(dict):
# Exclude config options from dict that qualify for a lightweight config
# update. Changes in any other config options are considered a code change,
# and require a version change to trigger an update that tears
# down existing replicas and replaces them with updated ones.
lightweight_update_options = [
"num_replicas",
"user_config",
"autoscaling_config",
]
return {
option: dict[option]
for option in dict
Expand All @@ -884,15 +886,21 @@ def exclude_lightweight_update_options(dict):

updated_versions = {}
for name in new_deployments:
new_deployment = exclude_lightweight_update_options(new_deployments[name])
old_deployment = exclude_lightweight_update_options(
old_deployments.get(name, {})
)
old_deployment = old_deployments.get(name, {})
new_deployment = new_deployments[name]
new_deployment_filtered = exclude_lightweight_update_options(new_deployment)
old_deployment_filtered = exclude_lightweight_update_options(old_deployment)

# If config options haven't changed, version stays the same
# otherwise, generate a new random version
if old_deployment == new_deployment:
if old_deployment_filtered == new_deployment_filtered:
updated_versions[name] = last_deployed_versions[name]

# If the rest of the options haven't changed, but a lightweight option has
# changed, then Serve will execute a lightweight update
for option, tagkey in lightweight_update_options.items():
if old_deployment.get(option) != new_deployment.get(option):
record_extra_usage_tag(tagkey, "True")
else:
updated_versions[name] = get_random_letters()

Expand Down
105 changes: 105 additions & 0 deletions python/ray/serve/tests/test_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from ray.serve.context import get_global_client
from ray.serve._private.common import ApplicationStatus
from ray._private.usage.usage_lib import get_extra_usage_tags_to_report
from ray.serve.schema import ServeDeploySchema


TELEMETRY_ROUTE_PREFIX = "/telemetry"
Expand Down Expand Up @@ -382,5 +383,109 @@ def test_rest_api(manage_ray, tmp_dir, version):
assert int(report["extra_usage_tags"]["serve_num_deployments"]) == 1


@serve.deployment(ray_actor_options={"num_cpus": 0})
class Tester:
def __call__(self, *args):
pass

def reconfigure(self, *args):
pass


tester = Tester.bind()


@pytest.mark.parametrize(
"lightweight_option,value",
[
("num_replicas", 2),
("user_config", {"some_setting": 10}),
("autoscaling_config", {"max_replicas": 5}),
],
)
def test_lightweight_config_options(manage_ray, lightweight_option, value):
"""
Check that lightweight config options are detected by telemetry.
"""

lightweight_tagkeys = [
"serve_num_replicas_updated",
"serve_user_config_updated",
"serve_autoscaling_config_updated",
]

subprocess.check_output(["ray", "start", "--head"])
wait_for_condition(check_ray_started, timeout=5)
storage = TelemetryStorage.remote()

config = {
"applications": [
{
"name": "receiver_app",
"import_path": "ray.serve.tests.test_telemetry.receiver_app",
"route_prefix": TELEMETRY_ROUTE_PREFIX,
},
{
"name": "test_app",
"import_path": "ray.serve.tests.test_telemetry.tester",
"deployments": [{"name": "Tester"}],
},
]
}

# Deploy first config
client = serve.start(detached=True)
client.deploy_apps(ServeDeploySchema(**config))
wait_for_condition(
lambda: client.get_serve_status("receiver_app").app_status.status
== ApplicationStatus.RUNNING,
timeout=15,
)
wait_for_condition(
lambda: client.get_serve_status("test_app").app_status.status
== ApplicationStatus.RUNNING,
timeout=15,
)
wait_for_condition(
lambda: ray.get(storage.get_reports_received.remote()) > 0, timeout=5
)
report = ray.get(storage.get_report.remote())

# Check
assert int(report["extra_usage_tags"]["serve_num_apps"]) == 2
assert report["extra_usage_tags"]["serve_api_version"] == "v2"
for tagkey in lightweight_tagkeys:
assert tagkey not in report["extra_usage_tags"]

# Change config and deploy again
config["applications"][1]["deployments"][0][lightweight_option] = value
client.deploy_apps(ServeDeploySchema(**config))
wait_for_condition(
lambda: client.get_serve_status("receiver_app").app_status.status
== ApplicationStatus.DEPLOYING,
timeout=15,
)
wait_for_condition(
lambda: client.get_serve_status("test_app").app_status.status
== ApplicationStatus.RUNNING,
timeout=15,
)

# Check again
wait_for_condition(
lambda: ray.get(storage.get_report.remote())["extra_usage_tags"][
f"serve_{lightweight_option}_updated"
]
== "True",
timeout=5,
)
report = ray.get(storage.get_report.remote())
assert int(report["extra_usage_tags"]["serve_num_apps"]) == 2
assert report["extra_usage_tags"]["serve_api_version"] == "v2"
for tagkey in lightweight_tagkeys:
if not tagkey == f"serve_{lightweight_option}_updated":
assert tagkey not in report["extra_usage_tags"]


if __name__ == "__main__":
sys.exit(pytest.main(["-v", "-s", __file__]))
6 changes: 6 additions & 0 deletions src/ray/protobuf/usage.proto
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ enum TagKey {
SERVE_REST_API_VERSION = 13;
// The number of serve apps running in the cluster as a string.
SERVE_NUM_APPS = 14;
// Whether num replicas changed
SERVE_NUM_REPLICAS_UPDATED = 15;
// Whether user config changed
SERVE_USER_CONFIG_UPDATED = 16;
// Whether autoscaling config changed
SERVE_AUTOSCALING_CONFIG_UPDATED = 17;

// Ray Core State API
// NOTE(rickyxx): Currently only setting "1" for tracking existence of
Expand Down