Skip to content

Commit

Permalink
[Dashboard] Enable dashboard in the minimal ray installation (ray-pro…
Browse files Browse the repository at this point in the history
…ject#21896)

This is the last PR to enable dashboard in the minimal ray installation.

Look https://docs.google.com/document/d/12qP3x5uaqZSKS-A_kK0ylPOp0E02_l-deAbmm8YtdFw/edit# for more details;
  • Loading branch information
rkooo567 committed Feb 1, 2022
1 parent fd20cf3 commit 3566cfd
Show file tree
Hide file tree
Showing 8 changed files with 286 additions and 158 deletions.
2 changes: 2 additions & 0 deletions .buildkite/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,8 @@
python/ray/tests/test_runtime_env_validation
- bazel test --test_output=streamed --config=ci --test_env=RAY_MINIMAL=1 $(./scripts/bazel_export_options)
python/ray/tests/test_serve_ray_minimal
- bazel test --test_output=streamed --config=ci --test_env=RAY_MINIMAL=1 $(./scripts/bazel_export_options)
python/ray/dashboard/test_dashboard

- label: ":python: (Small & Client)"
conditions: ["RAY_CI_PYTHON_AFFECTED"]
Expand Down
88 changes: 8 additions & 80 deletions dashboard/dashboard.py
Original file line number Diff line number Diff line change
@@ -1,62 +1,24 @@
import sys

import argparse
import asyncio
import errno
import logging
import logging.handlers
import os
import platform
import traceback

import ray.dashboard.consts as dashboard_consts
import ray.dashboard.head as dashboard_head
import ray.dashboard.optional_utils as dashboard_optional_utils
import ray.dashboard.utils as dashboard_utils
import ray.ray_constants as ray_constants
import ray._private.gcs_utils as gcs_utils
import ray._private.services
import ray._private.utils
from ray._private.gcs_pubsub import gcs_pubsub_enabled, GcsPublisher
from ray._private.ray_logging import setup_component_logger
from ray._private.metrics_agent import PrometheusServiceDiscoveryWriter

# All third-party dependencies that are not included in the minimal Ray
# installation must be included in this file. This allows us to determine if
# the agent has the necessary dependencies to be started.
from ray.dashboard.optional_deps import aiohttp

# Logger for this module. It should be configured at the entry point
# into the program using Ray. Ray provides a default configuration at
# entry/init points.
logger = logging.getLogger(__name__)
routes = dashboard_optional_utils.ClassMethodRouteTable


class FrontendNotFoundError(OSError):
pass


def setup_static_dir():
build_dir = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "client", "build"
)
module_name = os.path.basename(os.path.dirname(__file__))
if not os.path.isdir(build_dir):
raise FrontendNotFoundError(
errno.ENOENT,
"Dashboard build directory not found. If installing "
"from source, please follow the additional steps "
"required to build the dashboard"
f"(cd python/ray/{module_name}/client "
"&& npm install "
"&& npm ci "
"&& npm run build)",
build_dir,
)

static_dir = os.path.join(build_dir, "static")
routes.static("/static", static_dir, follow_symlinks=True)
return build_dir


class Dashboard:
Expand Down Expand Up @@ -85,6 +47,8 @@ def __init__(
redis_address,
redis_password=None,
log_dir=None,
temp_dir=None,
minimal=False,
):
self.dashboard_head = dashboard_head.DashboardHead(
http_host=host,
Expand All @@ -94,36 +58,8 @@ def __init__(
redis_address=redis_address,
redis_password=redis_password,
log_dir=log_dir,
)

# Setup Dashboard Routes
try:
build_dir = setup_static_dir()
logger.info("Setup static dir for dashboard: %s", build_dir)
except FrontendNotFoundError as ex:
# Not to raise FrontendNotFoundError due to NPM incompatibilities
# with Windows.
# Please refer to ci.sh::build_dashboard_front_end()
if sys.platform in ["win32", "cygwin"]:
logger.warning(ex)
else:
raise ex
dashboard_optional_utils.ClassMethodRouteTable.bind(self)

@routes.get("/")
async def get_index(self, req) -> aiohttp.web.FileResponse:
return aiohttp.web.FileResponse(
os.path.join(
os.path.dirname(os.path.abspath(__file__)), "client/build/index.html"
)
)

@routes.get("/favicon.ico")
async def get_favicon(self, req) -> aiohttp.web.FileResponse:
return aiohttp.web.FileResponse(
os.path.join(
os.path.dirname(os.path.abspath(__file__)), "client/build/favicon.ico"
)
temp_dir=temp_dir,
minimal=minimal,
)

async def run(self):
Expand Down Expand Up @@ -250,17 +186,9 @@ async def run(self):
args.redis_address,
redis_password=args.redis_password,
log_dir=args.log_dir,
temp_dir=args.temp_dir,
minimal=args.minimal,
)
# TODO(fyrestone): Avoid using ray.state in dashboard, it's not
# asynchronous and will lead to low performance. ray disconnect()
# will be hang when the ray.state is connected and the GCS is exit.
# Please refer to: https://github.com/ray-project/ray/issues/16328
service_discovery = PrometheusServiceDiscoveryWriter(
args.redis_address, args.redis_password, args.gcs_address, args.temp_dir
)
# Need daemon True to avoid dashboard hangs at exit.
service_discovery.daemon = True
service_discovery.start()
loop = asyncio.get_event_loop()
loop.run_until_complete(dashboard.run())
except Exception as e:
Expand All @@ -270,7 +198,7 @@ async def run(self):
f"failed with the following "
f"error:\n{traceback_str}"
)
if isinstance(e, FrontendNotFoundError):
if isinstance(e, dashboard_utils.FrontendNotFoundError):
logger.warning(message)
else:
logger.error(message)
Expand Down
89 changes: 34 additions & 55 deletions dashboard/head.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@
import socket
import asyncio
import logging
import ipaddress
import threading
from concurrent.futures import Future
from queue import Queue

from distutils.version import LooseVersion
import grpc

try:
Expand All @@ -22,7 +20,6 @@
import ray._private.services
import ray.dashboard.consts as dashboard_consts
import ray.dashboard.utils as dashboard_utils
import ray.dashboard.optional_utils as dashboard_optional_utils
from ray import ray_constants
from ray._private.gcs_pubsub import (
gcs_pubsub_enabled,
Expand All @@ -34,13 +31,7 @@
from ray.dashboard.datacenter import DataOrganizer
from ray.dashboard.utils import async_loop_forever

# All third-party dependencies that are not included in the minimal Ray
# installation must be included in this file. This allows us to determine if
# the agent has the necessary dependencies to be started.
from ray.dashboard.optional_deps import aiohttp, hdrs

logger = logging.getLogger(__name__)
routes = dashboard_optional_utils.ClassMethodRouteTable

aiogrpc.init_grpc_aio()
GRPC_CHANNEL_OPTIONS = (
Expand Down Expand Up @@ -118,7 +109,10 @@ def __init__(
redis_address,
redis_password,
log_dir,
temp_dir,
minimal,
):
self.minimal = minimal
self.health_check_thread: GCSHealthCheckThread = None
self._gcs_rpc_error_counter = 0
# Public attributes are accessible for all head modules.
Expand All @@ -127,6 +121,9 @@ def __init__(
self.http_port = http_port
self.http_port_retries = http_port_retries

self.gcs_address = None
self.redis_address = None
self.redis_password = None
if use_gcs_for_bootstrap():
assert gcs_address is not None
self.gcs_address = gcs_address
Expand All @@ -135,11 +132,11 @@ def __init__(
self.redis_password = redis_password

self.log_dir = log_dir
self.temp_dir = temp_dir
self.aioredis_client = None
self.aiogrpc_gcs_channel = None
self.gcs_error_subscriber = None
self.gcs_log_subscriber = None
self.http_session = None
self.ip = ray.util.get_node_ip_address()
if not use_gcs_for_bootstrap():
ip, port = redis_address.split(":")
Expand All @@ -152,6 +149,23 @@ def __init__(
self.server, f"{grpc_ip}:0"
)
logger.info("Dashboard head grpc address: %s:%s", grpc_ip, self.grpc_port)
# If the dashboard is started as non-minimal version, http server should
# be configured to expose APIs.
self.http_server = None

async def _configure_http_server(self, modules):
from ray.dashboard.http_server_head import HttpServerDashboardHead

http_server = HttpServerDashboardHead(
self.ip, self.http_host, self.http_port, self.http_port_retries
)
await http_server.run(modules)
return http_server

@property
def http_session(self):
assert self.http_server, "Accessing unsupported API in a minimal ray."
return self.http_server.http_session

@async_loop_forever(dashboard_consts.GCS_CHECK_ALIVE_INTERVAL_SECONDS)
async def _gcs_check_alive(self):
Expand Down Expand Up @@ -199,7 +213,6 @@ def _load_modules(self):
"Loading %s: %s", dashboard_utils.DashboardHeadModule.__name__, cls
)
c = cls(self)
dashboard_optional_utils.ClassMethodRouteTable.bind(c)
modules.append(c)
logger.info("Loaded %d modules.", len(modules))
return modules
Expand All @@ -225,14 +238,6 @@ async def get_gcs_address(self):
return await get_gcs_address_with_retry(self.aioredis_client)

async def run(self):

# Create a http session for all modules.
# aiohttp<4.0.0 uses a 'loop' variable, aiohttp>=4.0.0 doesn't anymore
if LooseVersion(aiohttp.__version__) < LooseVersion("4.0.0"):
self.http_session = aiohttp.ClientSession(loop=asyncio.get_event_loop())
else:
self.http_session = aiohttp.ClientSession()

gcs_address = await self.get_gcs_address()

# Dashboard will handle connection failure automatically
Expand Down Expand Up @@ -264,53 +269,24 @@ async def _async_notify():

modules = self._load_modules()

# Http server should be initialized after all modules loaded.
# working_dir uploads for job submission can be up to 100MiB.
app = aiohttp.web.Application(client_max_size=100 * 1024 ** 2)
app.add_routes(routes=routes.bound_routes())

runner = aiohttp.web.AppRunner(app)
await runner.setup()
last_ex = None
for i in range(1 + self.http_port_retries):
try:
site = aiohttp.web.TCPSite(runner, self.http_host, self.http_port)
await site.start()
break
except OSError as e:
last_ex = e
self.http_port += 1
logger.warning("Try to use port %s: %s", self.http_port, e)
else:
raise Exception(
f"Failed to find a valid port for dashboard after "
f"{self.http_port_retries} retries: {last_ex}"
)
http_host, http_port, *_ = site._server.sockets[0].getsockname()
http_host = (
self.ip if ipaddress.ip_address(http_host).is_unspecified else http_host
)
logger.info("Dashboard head http address: %s:%s", http_host, http_port)

# TODO: Use async version if performance is an issue
# Write the dashboard head port to gcs kv.
http_host, http_port = self.http_host, self.http_port
if not self.minimal:
self.http_server = await self._configure_http_server(modules)
http_host, http_port = self.http_server.get_address()
internal_kv._internal_kv_put(
ray_constants.DASHBOARD_ADDRESS,
f"{http_host}:{http_port}",
namespace=ray_constants.KV_NAMESPACE_DASHBOARD,
)

# TODO: Use async version if performance is an issue
# Write the dashboard head port to gcs kv.
internal_kv._internal_kv_put(
dashboard_consts.DASHBOARD_RPC_ADDRESS,
f"{self.ip}:{self.grpc_port}",
namespace=ray_constants.KV_NAMESPACE_DASHBOARD,
)

# Dump registered http routes.
dump_routes = [r for r in app.router.routes() if r.method != hdrs.METH_HEAD]
for r in dump_routes:
logger.info(r)
logger.info("Registered %s routes.", len(dump_routes))

# Freeze signal after all modules loaded.
dashboard_utils.SignalManager.freeze()
concurrent_tasks = [
Expand All @@ -321,3 +297,6 @@ async def _async_notify():
]
await asyncio.gather(*concurrent_tasks, *(m.run(self.server) for m in modules))
await self.server.wait_for_termination()

if self.http_server:
await self.http_server.cleanup()
Loading

0 comments on commit 3566cfd

Please sign in to comment.