Skip to content

Commit

Permalink
[dashboard] Add last_activity_at field to `/api/component_activitie…
Browse files Browse the repository at this point in the history
…s` (ray-project#27284)

Add optional last_activity_at field to /api/component_activities to record end time of most recently finished activity

Signed-off-by: Nikita Vemuri <[email protected]>
  • Loading branch information
nikitavemuri committed Aug 2, 2022
1 parent bda5026 commit 9a0b991
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 4 deletions.
3 changes: 3 additions & 0 deletions dashboard/modules/snapshot/component_activities_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
},
"timestamp": {
"type": ["number"]
},
"last_activity_at": {
"type": ["number", "null"]
}
},
"required": ["is_active"]
Expand Down
30 changes: 29 additions & 1 deletion dashboard/modules/snapshot/snapshot_head.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ class RayActivityResponse(BaseModel, extra=Extra.allow):
"This is in the format of seconds since unix epoch."
),
)
last_activity_at: Optional[float] = Field(
None,
description=(
"Timestamp when last actvity of this Ray component finished in format of "
"seconds since unix epoch. This field does not need to be populated "
"for Ray components where it is not meaningful."
),
)

@validator("reason", always=True)
def reason_required(cls, v, values, **kwargs):
Expand Down Expand Up @@ -212,14 +220,31 @@ async def _get_job_activity_info(self, timeout: int) -> RayActivityResponse:
)

num_active_drivers = 0
latest_job_end_time = 0
for job_table_entry in reply.job_info_list:
is_dead = bool(job_table_entry.is_dead)
in_internal_namespace = job_table_entry.config.ray_namespace.startswith(
"_ray_internal_"
)
latest_job_end_time = (
max(latest_job_end_time, job_table_entry.end_time)
if job_table_entry.end_time
else latest_job_end_time
)
if not is_dead and not in_internal_namespace:
num_active_drivers += 1

current_timestamp = datetime.now().timestamp()
# Latest job end time must be before or equal to the current timestamp.
# Job end times may be provided in epoch milliseconds. Check if this
# is true, and convert to seconds
if latest_job_end_time > current_timestamp:
latest_job_end_time = latest_job_end_time / 1000
assert current_timestamp >= latest_job_end_time, (
f"Most recent job end time {latest_job_end_time} must be "
f"before or equal to the current timestamp {current_timestamp}"
)

is_active = (
RayActivityStatus.ACTIVE
if num_active_drivers > 0
Expand All @@ -230,7 +255,10 @@ async def _get_job_activity_info(self, timeout: int) -> RayActivityResponse:
reason=f"Number of active drivers: {num_active_drivers}"
if num_active_drivers
else None,
timestamp=datetime.now().timestamp(),
timestamp=current_timestamp,
# If latest_job_end_time == 0, no jobs have finished yet so don't
# populate last_activity_at
last_activity_at=latest_job_end_time if latest_job_end_time else None,
)
except Exception as e:
logger.exception("Failed to get activity status of Ray drivers.")
Expand Down
21 changes: 18 additions & 3 deletions dashboard/modules/snapshot/tests/test_snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ def test_active_component_activities(ray_start_with_dashboard):
ray.init(address="auto", namespace="{namespace}")
"""
run_string_as_driver_nonblocking(driver_template.format(namespace="my_namespace"))
# Wait for above driver to start and finish
time.sleep(2)

run_string_as_driver_nonblocking(driver_template.format(namespace="my_namespace"))
run_string_as_driver_nonblocking(
driver_template.format(namespace="_ray_internal_job_info_id1")
Expand All @@ -135,7 +138,7 @@ def test_active_component_activities(ray_start_with_dashboard):
driver_template.format(namespace="_ray_internal_dashboard")
)

# Wait 1 sec for drivers to start
# Wait 1.5 sec for drivers to start (but not finish)
time.sleep(1.5)

# Verify drivers are considered active after running script
Expand All @@ -158,10 +161,22 @@ def test_active_component_activities(ray_start_with_dashboard):

assert driver_ray_activity_response.is_active == "ACTIVE"
# Drivers with namespace starting with "_ray_internal" are not
# considered active drivers. Three active drivers are the two
# considered active drivers. Two active drivers are the second one
# run with namespace "my_namespace" and the one started
# from ray_start_with_dashboard
assert driver_ray_activity_response.reason == "Number of active drivers: 3"
assert driver_ray_activity_response.reason == "Number of active drivers: 2"

# Get expected_last_activity at from snapshot endpoint which returns details
# about all jobs
jobs_snapshot_data = requests.get(f"{webui_url}/api/snapshot").json()["data"][
"snapshot"
]["jobs"]
# Divide endTime by 1000 to convert from milliseconds to seconds
expected_last_activity_at = max(
[job.get("endTime", 0) / 1000 for (job_id, job) in jobs_snapshot_data.items()]
)

assert driver_ray_activity_response.last_activity_at == expected_last_activity_at


def test_snapshot(ray_start_with_dashboard):
Expand Down

0 comments on commit 9a0b991

Please sign in to comment.