Skip to content

Commit

Permalink
[dashboard] Remove /api/snapshot endpoint (#40269)
Browse files Browse the repository at this point in the history
No longer supported.
  • Loading branch information
edoakes committed Oct 17, 2023
1 parent a2ef28d commit 58cd807
Show file tree
Hide file tree
Showing 8 changed files with 182 additions and 960 deletions.
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
/src/ray/protobuf/common.proto @wuisawesome @ericl @ameerhajali @robertnishihara @pcmoritz @raulchen @ray-project/ray-core
/src/ray/protobuf/gcs.proto @wuisawesome @ericl @ameerhajali @robertnishihara @pcmoritz @raulchen @ray-project/ray-core
/src/ray/protobuf/gcs_service.proto @wuisawesome @ericl @ameerhajali @robertnishihara @pcmoritz @raulchen @ray-project/ray-core
/dashboard/modules/snapshot @wuisawesome @ijrsvt @edoakes @alanwguo @architkulkarni
/dashboard/modules/snapshot @alanwguo @nikitavemuri
/python/ray/autoscaler/_private/monitor.py @wuisawesome @DmitriGekhtman

# Autoscaler
Expand Down
2 changes: 0 additions & 2 deletions dashboard/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ py_test_run_all_subdirectory(
"modules/job/tests/backwards_compatibility_scripts/test_backwards_compatibility.sh",
"modules/job/tests/backwards_compatibility_scripts/script.py",
"modules/job/tests/pip_install_test-0.5-py3-none-any.whl",
"modules/snapshot/snapshot_schema.json",
"modules/tests/test_config_files/basic_runtime_env.yaml",
] + glob([
"modules/job/tests/subprocess_driver_scripts/*.py",
Expand All @@ -59,7 +58,6 @@ py_test(
"modules/job/tests/backwards_compatibility_scripts/test_backwards_compatibility.sh",
"modules/job/tests/backwards_compatibility_scripts/script.py",
"modules/job/tests/pip_install_test-0.5-py3-none-any.whl",
"modules/snapshot/snapshot_schema.json",
"modules/tests/test_config_files/basic_runtime_env.yaml",
] + glob([
"modules/job/tests/subprocess_driver_scripts/*.py",
Expand Down
134 changes: 3 additions & 131 deletions dashboard/modules/snapshot/snapshot_head.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,21 @@
import asyncio
import concurrent.futures
from datetime import datetime
import enum
import logging
import json
import os
from typing import Dict, Optional
from typing import Optional

import aiohttp.web

import ray
from ray.dashboard.consts import RAY_CLUSTER_ACTIVITY_HOOK
import ray.dashboard.optional_utils as dashboard_optional_utils
import ray.dashboard.utils as dashboard_utils
from ray._private.storage import _load_class
from ray.core.generated import gcs_pb2, gcs_service_pb2, gcs_service_pb2_grpc
from ray.dashboard.modules.job.common import JOB_ID_METADATA_KEY, JobInfoStorageClient
from ray.core.generated import gcs_service_pb2, gcs_service_pb2_grpc
from ray.dashboard.modules.job.common import JobInfoStorageClient
from ray._private.pydantic_compat import BaseModel, Extra, Field, validator

from ray.job_submission import JobInfo
from ray.runtime_env import RuntimeEnv

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
Expand Down Expand Up @@ -117,32 +113,6 @@ async def kill_actor_gcs(self, req) -> aiohttp.web.Response:

return dashboard_optional_utils.rest_response(success=True, message=message)

@routes.get("/api/snapshot")
async def snapshot(self, req):
timeout = req.query.get("timeout", None)
if timeout and timeout.isdigit():
timeout = int(timeout)
else:
timeout = SNAPSHOT_API_TIMEOUT_SECONDS

actor_limit = int(req.query.get("actor_limit", "1000"))
(job_info, job_submission_data, actor_data) = await asyncio.gather(
self.get_job_info(timeout),
self.get_job_submission_info(timeout),
self.get_actor_info(actor_limit, timeout),
)
snapshot = {
"jobs": job_info,
"job_submission": job_submission_data,
"actors": actor_data,
"session_name": self._dashboard_head.session_name,
"ray_version": ray.__version__,
"ray_commit": ray.__commit__,
}
return dashboard_optional_utils.rest_response(
success=True, message="hello", snapshot=snapshot
)

@routes.get("/api/component_activities")
async def get_component_activities(self, req) -> aiohttp.web.Response:
timeout = req.query.get("timeout", None)
Expand Down Expand Up @@ -264,104 +234,6 @@ async def _get_job_activity_info(self, timeout: int) -> RayActivityResponse:
timestamp=datetime.now().timestamp(),
)

async def _get_job_info(self, metadata: Dict[str, str]) -> Optional[JobInfo]:
# If a job submission ID has been added to a job, the status is
# guaranteed to be returned.
job_submission_id = metadata.get(JOB_ID_METADATA_KEY)
return await self._job_info_client.get_info(job_submission_id)

async def get_job_info(self, timeout: int = SNAPSHOT_API_TIMEOUT_SECONDS):
"""Return info for each job. Here a job is a Ray driver."""
request = gcs_service_pb2.GetAllJobInfoRequest()
reply = await self._gcs_job_info_stub.GetAllJobInfo(request, timeout=timeout)

jobs = {}
for job_table_entry in reply.job_info_list:
job_id = job_table_entry.job_id.hex()
metadata = dict(job_table_entry.config.metadata)
config = {
"namespace": job_table_entry.config.ray_namespace,
"metadata": metadata,
"runtime_env": RuntimeEnv.deserialize(
job_table_entry.config.runtime_env_info.serialized_runtime_env
),
}
info = await self._get_job_info(metadata)
entry = {
"status": None if info is None else info.status,
"status_message": None if info is None else info.message,
"is_dead": job_table_entry.is_dead,
"start_time": job_table_entry.start_time,
"end_time": job_table_entry.end_time,
"config": config,
}
jobs[job_id] = entry

return jobs

async def get_job_submission_info(
self, timeout: int = SNAPSHOT_API_TIMEOUT_SECONDS
):
"""Info for Ray job submission. Here a job can have 0 or many drivers."""

jobs = {}
fetched_jobs = await self._job_info_client.get_all_jobs(timeout)
for (
job_submission_id,
job_info,
) in fetched_jobs.items():
if job_info is not None:
entry = {
"job_submission_id": job_submission_id,
"status": job_info.status,
"message": job_info.message,
"error_type": job_info.error_type,
"start_time": job_info.start_time,
"end_time": job_info.end_time,
"metadata": job_info.metadata,
"runtime_env": job_info.runtime_env,
"entrypoint": job_info.entrypoint,
}
jobs[job_submission_id] = entry
return jobs

async def get_actor_info(
self, limit: int = 1000, timeout: int = SNAPSHOT_API_TIMEOUT_SECONDS
):
# TODO (Alex): GCS still needs to return actors from dead jobs.
request = gcs_service_pb2.GetAllActorInfoRequest()
request.show_dead_jobs = True
request.limit = limit
reply = await self._gcs_actor_info_stub.GetAllActorInfo(
request, timeout=timeout
)
actors = {}
for actor_table_entry in reply.actor_table_data:
actor_id = actor_table_entry.actor_id.hex()
runtime_env = json.loads(actor_table_entry.serialized_runtime_env)
entry = {
"job_id": actor_table_entry.job_id.hex(),
"state": gcs_pb2.ActorTableData.ActorState.Name(
actor_table_entry.state
),
"name": actor_table_entry.name,
"namespace": actor_table_entry.ray_namespace,
"runtime_env": runtime_env,
"start_time": actor_table_entry.start_time,
"end_time": actor_table_entry.end_time,
"is_detached": actor_table_entry.is_detached,
"resources": dict(actor_table_entry.required_resources),
"actor_class": actor_table_entry.class_name,
"current_worker_id": actor_table_entry.address.worker_id.hex(),
"current_raylet_id": actor_table_entry.address.raylet_id.hex(),
"ip_address": actor_table_entry.address.ip_address,
"port": actor_table_entry.address.port,
"metadata": dict(),
}
actors[actor_id] = entry

return actors

async def run(self, server):
self._gcs_job_info_stub = gcs_service_pb2_grpc.JobInfoGcsServiceStub(
self._dashboard_head.aiogrpc_gcs_channel
Expand Down
Loading

0 comments on commit 58cd807

Please sign in to comment.