Skip to content

Commit

Permalink
[Core] [Dashboard] Allow ray:https:// ("Ray Client") addresses for Submis…
Browse files Browse the repository at this point in the history
…sionClients (ray-project#28643)

* Allow ray:https:// addresses for SubmissionClients

* Remove stray TODO

* Fix issue ray-project#27954 where "ray:https://" was prepended

* Remove stray comment

* Fix bug where address_env_var wasn't used properly

* Fix env var logic

* Fix NoneType has no attribute startswith

* Fix

* Fix split_address

Signed-off-by: Archit Kulkarni <[email protected]>

* Fix API server URL check

Signed-off-by: Archit Kulkarni <[email protected]>

* Fix import

Signed-off-by: Archit Kulkarni <[email protected]>

* Directly get dashboard_url from ClientContext

Signed-off-by: Archit Kulkarni <[email protected]>

* Fix unit tests

Signed-off-by: Archit Kulkarni <[email protected]>

* Fix parse_cluster_info unit tests

Signed-off-by: Archit Kulkarni <[email protected]>

* Lint

Signed-off-by: Archit Kulkarni <[email protected]>

Signed-off-by: Archit Kulkarni <[email protected]>
  • Loading branch information
architkulkarni committed Oct 4, 2022
1 parent ce7f927 commit 1bd3f94
Show file tree
Hide file tree
Showing 12 changed files with 247 additions and 90 deletions.
42 changes: 26 additions & 16 deletions dashboard/modules/dashboard_sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
from ray._private.runtime_env.working_dir import upload_working_dir_if_needed
from ray.dashboard.modules.job.common import uri_to_http_components

from ray.util.annotations import PublicAPI
from ray.client_builder import _split_address
from ray.util.annotations import DeveloperAPI, PublicAPI
from ray._private.utils import split_address
from ray.autoscaler._private.cli_logger import cli_logger

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -125,6 +125,7 @@ def parse_cluster_info(
metadata: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, Any]] = None,
) -> ClusterInfo:
"""Create a cluster if needed and return its address, cookies, and metadata."""
if address is None:
if (
ray.is_initialized()
Expand All @@ -135,26 +136,30 @@ def parse_cluster_info(
"http:https://"
f"{ray._private.worker.global_worker.node.address_info['webui_url']}"
)
logger.info(
f"No address provided but Ray is running; using address {address}."
)
else:
logger.info(
f"No address provided, defaulting to {DEFAULT_DASHBOARD_ADDRESS}."
)
address = DEFAULT_DASHBOARD_ADDRESS

module_string, inner_address = _split_address(address)
if address == "auto":
raise ValueError("Internal error: unexpected address 'auto'.")

# If user passes in ray:https://, raise error. Dashboard submission should
# not use a Ray client address.
if module_string == "ray":
raise ValueError(
f'Got an unexpected Ray client address "{address}" while trying '
"to connect to the Ray dashboard. The dashboard SDK requires the "
"Ray dashboard server's HTTP(S) address (which should start with "
'"http:https://" or "https://", not "ray:https://"). If this address '
"wasn't passed explicitly, it may be set in the RAY_ADDRESS "
"environment variable."
if ":https://" not in address:
# Default to HTTP.
logger.info(
"No scheme (e.g. 'http:https://') or module string (e.g. 'ray:https://') "
f"provided in address {address}, defaulting to HTTP."
)
address = f"http:https://{address}"

module_string, inner_address = split_address(address)

if module_string == "ray":
raise ValueError(f"Internal error: unexpected Ray Client address {address}.")
# If user passes http(s):https://, go through normal parsing.
if module_string in {"http", "https"}:
return get_job_submission_client_cluster_info(
Expand All @@ -163,7 +168,7 @@ def parse_cluster_info(
cookies=cookies,
metadata=metadata,
headers=headers,
_use_tls=module_string == "https",
_use_tls=(module_string == "https"),
)
# Try to dynamically import the function to get cluster info.
else:
Expand All @@ -172,11 +177,12 @@ def parse_cluster_info(
except Exception:
raise RuntimeError(
f"Module: {module_string} does not exist.\n"
f"This module was parsed from Address: {address}"
f"This module was parsed from address: {address}"
) from None
assert "get_job_submission_client_cluster_info" in dir(module), (
f"Module: {module_string} does "
"not have `get_job_submission_client_cluster_info`."
"not have `get_job_submission_client_cluster_info`.\n"
f"This module was parsed from address: {address}"
)

return module.get_job_submission_client_cluster_info(
Expand Down Expand Up @@ -381,3 +387,7 @@ def get_version(self) -> str:
return r.json().get("version")
else:
self._raise_error(r)

@DeveloperAPI
def get_address(self) -> str:
return self._address
10 changes: 4 additions & 6 deletions dashboard/modules/job/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@
def _get_sdk_client(
address: Optional[str], create_cluster_if_needed: bool = False
) -> JobSubmissionClient:

if address is None and "RAY_ADDRESS" in os.environ:
address = os.environ["RAY_ADDRESS"]

cli_logger.labeled_value("Job submission server address", address)
return JobSubmissionClient(address, create_cluster_if_needed)
client = JobSubmissionClient(address, create_cluster_if_needed)
client_address = client.get_address()
cli_logger.labeled_value("Job submission server address", client_address)
return client


def _log_big_success_msg(success_msg):
Expand Down
57 changes: 41 additions & 16 deletions dashboard/modules/job/sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import logging
from typing import Any, Dict, Iterator, List, Optional

from ray.dashboard.utils import get_address_for_submission_client

try:
import aiohttp
import requests
Expand Down Expand Up @@ -35,6 +37,25 @@ class JobSubmissionClient(SubmissionClient):
"""A local client for submitting and interacting with jobs on a remote cluster.
Submits requests over HTTP to the job server on the cluster using the REST API.
Args:
address: Either (1) the address of the Ray cluster, or (2) the HTTP address
of the dashboard server on the head node, e.g. "http:https://<head-node-ip>:8265".
In case (1) it must be specified as an address that can be passed to
ray.init(), e.g. a Ray Client address (ray:https://<head_node_host>:10001),
or "auto", or "localhost:<port>". If unspecified, will try to connect to
a running local Ray cluster. This argument is always overridden by the
RAY_ADDRESS environment variable.
create_cluster_if_needed: Indicates whether the cluster at the specified
address needs to already be running. Ray doesn't start a cluster
before interacting with jobs, but third-party job managers may do so.
cookies: Cookies to use when sending requests to the HTTP job server.
metadata: Arbitrary metadata to store along with all jobs. New metadata
specified per job will be merged with the global metadata provided here
via a simple dict update.
headers: Headers to use when sending requests to the HTTP job server, used
for cases like authentication to a remote cluster.
"""

def __init__(
Expand All @@ -45,28 +66,32 @@ def __init__(
metadata: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, Any]] = None,
):
"""Initialize a JobSubmissionClient and check the connection to the cluster.
Args:
address: The IP address and port of the head node. Defaults to
http:https://localhost:8265.
create_cluster_if_needed: Indicates whether the cluster at the specified
address needs to already be running. Ray doesn't start a cluster
before interacting with jobs, but external job managers may do so.
cookies: Cookies to use when sending requests to the HTTP job server.
metadata: Arbitrary metadata to store along with all jobs. New metadata
specified per job will be merged with the global metadata provided here
via a simple dict update.
headers: Headers to use when sending requests to the HTTP job server, used
for cases like authentication to a remote cluster.
"""
"""Initialize a JobSubmissionClient and check the connection to the cluster."""
if requests is None:
raise RuntimeError(
"The Ray jobs CLI & SDK require the ray[default] "
"installation: `pip install 'ray[default']``"
)

# Check types of arguments
if address is not None and not isinstance(address, str):
raise TypeError(f"address must be a string, got {type(address)}")
if not isinstance(create_cluster_if_needed, bool):
raise TypeError(
f"create_cluster_if_needed must be a bool, got"
f" {type(create_cluster_if_needed)}"
)
if cookies is not None and not isinstance(cookies, dict):
raise TypeError(f"cookies must be a dict, got {type(cookies)}")
if metadata is not None and not isinstance(metadata, dict):
raise TypeError(f"metadata must be a dict, got {type(metadata)}")
if headers is not None and not isinstance(headers, dict):
raise TypeError(f"headers must be a dict, got {type(headers)}")

api_server_url = get_address_for_submission_client(address)

super().__init__(
address=address,
address=api_server_url,
create_cluster_if_needed=create_cluster_if_needed,
cookies=cookies,
metadata=metadata,
Expand Down
16 changes: 16 additions & 0 deletions dashboard/modules/job/tests/test_sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
wait_until_server_available,
)

from ray._private.utils import split_address

from ray.dashboard.modules.dashboard_sdk import (
ClusterInfo,
DEFAULT_DASHBOARD_ADDRESS,
Expand Down Expand Up @@ -142,5 +144,19 @@ def test_temporary_uri_reference(monkeypatch, expiration_s):
print("Internal KV was GC'ed at time ", time.time() - start)


def test_split_address():
assert split_address("ray:https://my_cluster") == ("ray", "my_cluster")
assert split_address("ray:https://my_cluster:1234") == ("ray", "my_cluster:1234")
assert split_address("ray:https://") == ("ray", "")
assert split_address("ray:https://:1234") == ("ray", ":1234")
assert split_address("ray:https://my_cluster:1234?foo=bar") == (
"ray",
"my_cluster:1234?foo=bar",
)
assert split_address("http:https://localhost:10001") == ("http", "localhost:10001")
with pytest.raises(ValueError):
split_address("localhost:10001")


if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))
22 changes: 22 additions & 0 deletions dashboard/modules/serve/sdk.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Any, Dict, Optional, Union
from ray._private.utils import split_address

try:
import aiohttp
Expand Down Expand Up @@ -30,6 +31,27 @@ def __init__(
"The Serve CLI requires the ray[default] "
"installation: `pip install 'ray[default']``"
)

invalid_address_message = (
"Got an unexpected address"
f'"{dashboard_agent_address}" while trying '
"to connect to the Ray dashboard agent. The Serve SDK/CLI requires the "
"Ray dashboard agent's HTTP(S) address (which should start with "
'"http:https://" or "https://". If this address '
"wasn't passed explicitly, it may be set in the RAY_AGENT_ADDRESS "
"environment variable."
)

if ":https://" not in dashboard_agent_address:
raise ValueError(invalid_address_message)

module_string, _ = split_address(dashboard_agent_address)

# If user passes in ray:https://, raise error. Serve submission should
# not use a Ray client address.
if module_string not in ["http", "https"]:
raise ValueError(invalid_address_message)

super().__init__(
address=dashboard_agent_address,
create_cluster_if_needed=create_cluster_if_needed,
Expand Down
96 changes: 96 additions & 0 deletions dashboard/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,18 @@
import importlib
import json
import logging
import os
import pkgutil
from abc import ABCMeta, abstractmethod
from base64 import b64decode
from collections import namedtuple
from collections.abc import MutableMapping, Mapping, Sequence
from typing import Optional
import ray
import ray._private.ray_constants as ray_constants
import ray._private.services as services
from ray._private.gcs_utils import GcsClient
from ray._private.utils import split_address

import aiosignal # noqa: F401

Expand Down Expand Up @@ -507,3 +514,92 @@ async def _looper(*args, **kwargs):
return _looper

return _wrapper


def ray_client_address_to_api_server_url(address: str):
"""Convert a Ray Client address of a running Ray cluster to its API server URL.
Args:
address: The Ray Client address, e.g. "ray:https://my-cluster".
Returns:
str: The API server URL of the cluster, e.g. "http:https://<head-node-ip>:8265".
"""
with ray.init(address=address) as client_context:
dashboard_url = client_context.dashboard_url

return f"http:https://{dashboard_url}"


def ray_address_to_api_server_url(address: Optional[str]) -> str:
"""Parse a Ray cluster address into API server URL.
When an address is provided, it will be used to query GCS for
API server address from GCS, so a Ray cluster must be running.
When an address is not provided, it will first try to auto-detect
a running Ray instance, or look for local GCS process.
Args:
address: Ray cluster bootstrap address or Ray Client address.
Could also be `auto`.
Returns:
API server HTTP URL.
"""

address = services.canonicalize_bootstrap_address_or_die(address)
gcs_client = GcsClient(address=address, nums_reconnect_retry=0)

ray.experimental.internal_kv._initialize_internal_kv(gcs_client)
api_server_url = ray._private.utils.internal_kv_get_with_retry(
gcs_client,
ray_constants.DASHBOARD_ADDRESS,
namespace=ray_constants.KV_NAMESPACE_DASHBOARD,
num_retries=20,
)

if api_server_url is None:
raise ValueError(
(
"Couldn't obtain the API server address from GCS. It is likely that "
"the GCS server is down. Check gcs_server.[out | err] to see if it is "
"still alive."
)
)
api_server_url = f"http:https://{api_server_url.decode()}"
return api_server_url


def get_address_for_submission_client(address: Optional[str]) -> str:
"""Get Ray API server address from Ray bootstrap or Client address.
If None, it will try to auto-detect a running Ray instance, or look
for local GCS process.
`address` is always overridden by the RAY_ADDRESS environment
variable, just like the `address` argument in `ray.init()`.
Args:
address: Ray cluster bootstrap address or Ray Client address.
Could also be "auto".
Returns:
API server HTTP URL, e.g. "http:https://<head-node-ip>:8265".
"""
if os.environ.get("RAY_ADDRESS"):
logger.debug(f"Using RAY_ADDRESS={os.environ['RAY_ADDRESS']}")
address = os.environ["RAY_ADDRESS"]

if address and ":https://" in address:
module_string, _ = split_address(address)
if module_string == "ray":
logger.debug(
f"Retrieving API server address from Ray Client address {address}..."
)
address = ray_client_address_to_api_server_url(address)
else:
# User specified a non-Ray-Client Ray cluster address.
address = ray_address_to_api_server_url(address)
logger.debug(f"Using API server address {address}.")
return address
27 changes: 27 additions & 0 deletions python/ray/_private/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1512,3 +1512,30 @@ def parse_runtime_env(runtime_env: Optional[Union[Dict, "RuntimeEnv"]]):
# if runtime_env is None to know whether or not to fall back to the
# runtime_env specified in the @ray.remote decorator.
return None


def split_address(address: str) -> Tuple[str, str]:
"""Splits address into a module string (scheme) and an inner_address.
We use a custom splitting function instead of urllib because
PEP allows "underscores" in a module names, while URL schemes do not
allow them.
Args:
address: The address to split.
Returns:
A tuple of (scheme, inner_address).
Raises:
ValueError: If the address does not contain ':https://'.
Examples:
>>> split_address("ray:https://my_cluster")
("ray", "my_cluster")
"""
if ":https://" not in address:
raise ValueError("Address must contain ':https://'")

module_string, inner_address = address.split(":https://", maxsplit=1)
return (module_string, inner_address)
Loading

0 comments on commit 1bd3f94

Please sign in to comment.