Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Dashboard] Start the new dashboard #9860

Merged
merged 28 commits into from
Aug 13, 2020
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
f54d8eb
Use new dashboard if environment var RAY_USE_NEW_DASHBOARD exists; ne…
Jul 27, 2020
c89e64a
Make fake client/build/static directory for dashboard
Jul 27, 2020
39a3143
Add test_dashboard.py for new dashboard
Jul 27, 2020
96042fd
Travis CI enable new dashboard test
Jul 27, 2020
ae042ec
Update new dashboard
Jul 31, 2020
fd8bbb2
Agent manager service
Jul 31, 2020
7abc734
Merge remote-tracking branch 'origin_ray/master' into dashboard_switch
Jul 31, 2020
841c745
Add agent manager
Aug 1, 2020
bdfdd56
Merge remote-tracking branch 'origin_ray/master' into dashboard_switch
Aug 1, 2020
848d2fd
Register agent to agent manager
Aug 1, 2020
b7c3d97
Add a new line to the end of agent_manager.cc
Aug 1, 2020
d4c1076
Merge remote-tracking branch 'origin_ray/master' into dashboard_switch
Aug 5, 2020
8b900d9
Fix merge; Fix lint
Aug 5, 2020
1fc7207
Update dashboard/agent.py
fyrestone Aug 5, 2020
bca3fb3
Update dashboard/head.py
fyrestone Aug 5, 2020
8c9544f
Fix bug
Aug 5, 2020
e0dd2a9
Add tests for dashboard
Aug 6, 2020
a2b3393
Fix
Aug 6, 2020
7e63177
Merge remote-tracking branch 'origin_ray/master' into dashboard_switch
Aug 7, 2020
4969f56
Remove const from Process::Kill() & Fix bugs
Aug 7, 2020
7082824
Revert error check of execute_after
Aug 8, 2020
71c220d
Raise exception from DashboardAgent.run
Aug 10, 2020
8084c8e
Add more tests.
Aug 10, 2020
b98543f
Fix compile on Linux
Aug 11, 2020
08bbd48
Merge remote-tracking branch 'origin_ray/master' into dashboard_switch
Aug 12, 2020
3b212ff
Use dict comprehension instead of dict(generator)
Aug 12, 2020
f2df79b
Fix lint
Aug 12, 2020
c2e3f08
Merge remote-tracking branch 'origin_ray/master' into dashboard_switch
Aug 12, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,9 @@ script:
# ray dashboard tests
- if [ "$RAY_CI_DASHBOARD_AFFECTED" == "1" ]; then ./ci/keep_alive bazel test python/ray/dashboard/...; fi

# ray new dashboard tests
- if [ "$RAY_CI_DASHBOARD_AFFECTED" == "1" ]; then ./ci/keep_alive bazel test python/ray/new_dashboard/...; fi

# ray operator tests
- (cd deploy/ray-operator && export CC=gcc && suppress_output go build && suppress_output go test ./...)

Expand Down
34 changes: 34 additions & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,30 @@ cc_library(
],
)

# Agent manager.
cc_grpc_library(
name = "agent_manager_cc_grpc",
srcs = ["//src/ray/protobuf:agent_manager_proto"],
grpc_only = True,
deps = ["//src/ray/protobuf:agent_manager_cc_proto"],
)

cc_library(
name = "agent_manager_rpc",
hdrs = glob([
"src/ray/rpc/agent_manager/*.h",
]),
strip_include_prefix = "src",
copts = COPTS,
deps = [
":grpc_common_lib",
":agent_manager_cc_grpc",
":ray_common",
"@boost//:asio",
"@com_github_grpc_grpc//:grpc++",
],
)

# === End of rpc definitions ===

# === Begin of plasma definitions ===
Expand Down Expand Up @@ -414,6 +438,7 @@ cc_library(
":gcs_service_rpc",
":gcs_table_storage_lib",
":node_manager_rpc",
":agent_manager_rpc",
":raylet_client_lib",
":worker_rpc",
],
Expand Down Expand Up @@ -503,6 +528,7 @@ cc_library(
":gcs",
":node_manager_fbs",
":node_manager_rpc",
":agent_manager_rpc",
":object_manager",
":plasma_client",
":ray_common",
Expand Down Expand Up @@ -545,6 +571,7 @@ cc_library(
deps = [
":node_manager_fbs",
":node_manager_rpc",
":agent_manager_rpc",
":ray_common",
":ray_util",
"//src/ray/protobuf:gcs_cc_proto",
Expand Down Expand Up @@ -1463,6 +1490,7 @@ cc_library(
":hiredis",
":node_manager_fbs",
":node_manager_rpc",
":agent_manager_rpc",
":ray_common",
":ray_util",
":stats_lib",
Expand Down Expand Up @@ -1801,9 +1829,11 @@ filegroup(
srcs = [
"//src/ray/protobuf:common_py_proto",
"//src/ray/protobuf:core_worker_py_proto",
"//src/ray/protobuf:gcs_service_py_proto",
"//src/ray/protobuf:gcs_py_proto",
"//src/ray/protobuf:node_manager_py_proto",
"//src/ray/protobuf:reporter_py_proto",
"//src/ray/protobuf:agent_manager_py_proto",
],
)

Expand Down Expand Up @@ -1895,10 +1925,14 @@ genrule(
python/ray/core/generated/common_pb2.py
python/ray/core/generated/node_manager_pb2.py
python/ray/core/generated/node_manager_pb2_grpc.py
python/ray/core/generated/agent_manager_pb2.py
python/ray/core/generated/agent_manager_pb2_grpc.py
python/ray/core/generated/reporter_pb2.py
python/ray/core/generated/reporter_pb2_grpc.py
python/ray/core/generated/core_worker_pb2.py
python/ray/core/generated/core_worker_pb2_grpc.py
python/ray/core/generated/gcs_service_pb2.py
python/ray/core/generated/gcs_service_pb2_grpc.py
)
sed -i -E 's/from src.ray.protobuf/from ./' "$${files[@]}"
echo "$${PWD}" > $@
Expand Down
2 changes: 2 additions & 0 deletions ci/travis/determine_tests_to_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ def list_changed_files(commit_range):
RAY_CI_MACOS_WHEELS_AFFECTED = 1
elif changed_file.startswith("python/ray/dashboard"):
RAY_CI_DASHBOARD_AFFECTED = 1
elif changed_file.startswith("dashboard"):
RAY_CI_DASHBOARD_AFFECTED = 1
elif changed_file.startswith("python/"):
RAY_CI_TUNE_AFFECTED = 1
RAY_CI_SGD_AFFECTED = 1
Expand Down
13 changes: 13 additions & 0 deletions dashboard/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# This is a dummy test dependency that causes the above tests to be
# re-run if any of these files changes.
py_library(
name = "dashboard_lib",
srcs = glob(["**/*.py"],exclude=["tests/*"]),
)

py_test(
name = "test_dashboard",
size = "small",
srcs = glob(["tests/*.py"]),
deps = [":dashboard_lib"]
)
115 changes: 91 additions & 24 deletions dashboard/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@
import logging.handlers
import os
import sys
import socket
import json
import traceback

import aiohttp
import aioredis
import aiohttp.web
import aiohttp_cors
from aiohttp import hdrs
from grpc.experimental import aio as aiogrpc

import ray
Expand All @@ -16,9 +20,12 @@
import ray.ray_constants as ray_constants
import ray.services
import ray.utils
from ray.core.generated import agent_manager_pb2
from ray.core.generated import agent_manager_pb2_grpc
import psutil

logger = logging.getLogger(__name__)
routes = dashboard_utils.ClassMethodRouteTable

aiogrpc.init_grpc_aio()

Expand All @@ -33,11 +40,8 @@ def __init__(self,
object_store_name=None,
raylet_name=None):
"""Initialize the DashboardAgent object."""
self._agent_cls_list = dashboard_utils.get_all_modules(
dashboard_utils.DashboardAgentModule)
ip, port = redis_address.split(":")
# Public attributes are accessible for all agent modules.
self.redis_address = (ip, int(port))
self.redis_address = dashboard_utils.address_tuple(redis_address)
self.redis_password = redis_password
self.temp_dir = temp_dir
self.log_dir = log_dir
Expand All @@ -46,39 +50,29 @@ def __init__(self,
self.raylet_name = raylet_name
self.ip = ray.services.get_node_ip_address()
self.server = aiogrpc.server(options=(("grpc.so_reuseport", 0), ))
listen_address = "[::]:0"
logger.info("Dashboard agent listen at: %s", listen_address)
self.port = self.server.add_insecure_port(listen_address)
self.grpc_port = self.server.add_insecure_port("[::]:0")
logger.info("Dashboard agent grpc address: %s:%s", self.ip,
self.grpc_port)
self.aioredis_client = None
self.aiogrpc_raylet_channel = aiogrpc.insecure_channel("{}:{}".format(
self.ip, self.node_manager_port))
self.http_session = aiohttp.ClientSession(
loop=asyncio.get_event_loop())
self.http_session = None

def _load_modules(self):
"""Load dashboard agent modules."""
modules = []
for cls in self._agent_cls_list:
agent_cls_list = dashboard_utils.get_all_modules(
dashboard_utils.DashboardAgentModule)
for cls in agent_cls_list:
logger.info("Load %s: %s",
dashboard_utils.DashboardAgentModule.__name__, cls)
c = cls(self)
dashboard_utils.ClassMethodRouteTable.bind(c)
modules.append(c)
logger.info("Load {} modules.".format(len(modules)))
return modules

async def run(self):
# Create an aioredis client for all modules.
self.aioredis_client = await aioredis.create_redis_pool(
address=self.redis_address, password=self.redis_password)

# Start a grpc asyncio server.
await self.server.start()

# Write the dashboard agent port to redis.
await self.aioredis_client.set(
"{}{}".format(dashboard_consts.DASHBOARD_AGENT_PORT_PREFIX,
self.ip), self.port)

async def _check_parent():
"""Check if raylet is dead."""
curr_proc = psutil.Process()
Expand All @@ -92,10 +86,83 @@ async def _check_parent():
dashboard_consts.
DASHBOARD_AGENT_CHECK_PARENT_INTERVAL_SECONDS)

check_parent_task = asyncio.create_task(_check_parent())

# Create an aioredis client for all modules.
fyrestone marked this conversation as resolved.
Show resolved Hide resolved
try:
self.aioredis_client = await dashboard_utils.get_aioredis_client(
self.redis_address, self.redis_password,
dashboard_consts.CONNECT_REDIS_INTERNAL_SECONDS,
dashboard_consts.RETRY_REDIS_CONNECTION_TIMES)
except (socket.gaierror, ConnectionRefusedError):
logger.error(
"Dashboard agent suicide, "
fyrestone marked this conversation as resolved.
Show resolved Hide resolved
"Failed to connect to redis at %s", self.redis_address)
sys.exit(-1)

# Create a http session for all modules.
self.http_session = aiohttp.ClientSession(
loop=asyncio.get_event_loop())

# Start a grpc asyncio server.
await self.server.start()

modules = self._load_modules()
await asyncio.gather(_check_parent(),

# Http server should be initialized after all modules loaded.
app = aiohttp.web.Application()
app.add_routes(routes=routes.bound_routes())

# Enable CORS on all routes.
cors = aiohttp_cors.setup(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, we disabled cors when it was dev mode because frontend usually wants to start a react server. We aren't doing that anymore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, the CORS is for the new log module. Each agent starts a http server for log file access. Allow new dashboard frontend to access log file as static resource directly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, so we allow access to all methods on all routes of the app. Isn't this a security risk?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dashboard has the same access level with agent. We can access all routes of dashboard, why disallow access to all routes of agent?

The loghttp server on each agent can reduce the load of the dashboard.

app,
defaults={
"*": aiohttp_cors.ResourceOptions(
allow_credentials=True,
expose_headers="*",
allow_methods="*",
allow_headers=("Content-Type", "X-Header"),
)
})
for route in list(app.router.routes()):
cors.add(route)

runner = aiohttp.web.AppRunner(app)
await runner.setup()
site = aiohttp.web.TCPSite(runner, self.ip, 0)
await site.start()
http_host, http_port = site._server.sockets[0].getsockname()
logger.info("Dashboard agent http address: %s:%s", http_host,
http_port)

# Dump registered http routes.
dump_routes = [
r for r in app.router.routes() if r.method != hdrs.METH_HEAD
]
for r in dump_routes:
logger.info(r)
logger.info("Registered %s routes.", len(dump_routes))

# Write the dashboard agent port to redis.
await self.aioredis_client.set(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should remove this from Redis when agents are shutdown. One of the most common workflow of Ray is using autoscaling clusters (and we will invest lots of resources on this), and this means agents will be killed occasionally from the cluster.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The agent will be restarted after killed, and the new value will be set to Redis. So, can we leave this from Redis when agents are shutdown?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a node is removed from the cluster, will the redis entry for the agent from that node be cleaned up?

Copy link
Contributor Author

@fyrestone fyrestone Aug 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, If a node is removed from the cluster, the entry for the agent will not be cleaned up. Dashboard use the node ip to access agent entry, so the dead agent entry is not a problem. If we really need to cleanup the dead agent entry, we can:

  • cleanup the agent entry by GCS. (node can't ensure the cleanup)
  • set TTL for the agent entry, agent will update the TTL periodically.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is not a big deal for now if the only concern is having unused entries to GCS.

"{}{}".format(dashboard_consts.DASHBOARD_AGENT_PORT_PREFIX,
self.ip), json.dumps([http_port, self.grpc_port]))

# Register agent to agent manager.
raylet_stub = agent_manager_pb2_grpc.AgentManagerServiceStub(
self.aiogrpc_raylet_channel)

await raylet_stub.RegisterAgent(
agent_manager_pb2.RegisterAgentRequest(
agent_pid=os.getpid(),
agent_port=self.grpc_port,
agent_ip_address=self.ip))

await asyncio.gather(check_parent_task,
*(m.run(self.server) for m in modules))
await self.server.wait_for_termination()
# Wait for finish signal.
await runner.cleanup()


if __name__ == "__main__":
Expand Down
Empty file.
Empty file.
6 changes: 5 additions & 1 deletion dashboard/consts.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
DASHBOARD_LOG_FILENAME = "dashboard.log"
DASHBOARD_AGENT_PORT_PREFIX = "DASHBOARD_AGENT_PORT_PREFIX:"
DASHBOARD_AGENT_LOG_FILENAME = "dashboard_agent.log"
DASHBOARD_AGENT_CHECK_PARENT_INTERVAL_SECONDS = 2
MAX_COUNT_OF_GCS_RPC_ERROR = 10
RETRY_REDIS_CONNECTION_TIMES = 10
UPDATE_NODES_INTERVAL_SECONDS = 5
CONNECT_GCS_INTERVAL_SECONDS = 2
CONNECT_REDIS_INTERNAL_SECONDS = 2
PURGE_DATA_INTERVAL_SECONDS = 60 * 10
REDIS_KEY_DASHBOARD = "dashboard"
REDIS_KEY_DASHBOARD_RPC = "dashboard_rpc"
REDIS_KEY_GCS_SERVER_ADDRESS = "GcsServerAddress"
REPORT_METRICS_TIMEOUT_SECONDS = 2
REPORT_METRICS_INTERVAL_SECONDS = 10
# Named signals
SIGNAL_NODE_INFO_FETCHED = "node_info_fetched"
# Default param for RotatingFileHandler
LOGGING_ROTATE_BYTES = 100 * 1000 # maxBytes
LOGGING_ROTATE_BYTES = 100 * 1000 * 1000 # maxBytes
LOGGING_ROTATE_BACKUP_COUNT = 5 # backupCount
Loading