Skip to content

Commit

Permalink
[dashboard] Remove redis in dashboard (ray-project#22788)
Browse files Browse the repository at this point in the history
As we are turning redisless ray by default, dashboard doesn't need to talk with redis anymore. Instead it should talk with gcs and gcs can talk with redis.
  • Loading branch information
fishbone committed Mar 4, 2022
1 parent 3fe6f3b commit 11bbf00
Show file tree
Hide file tree
Showing 14 changed files with 78 additions and 420 deletions.
81 changes: 9 additions & 72 deletions dashboard/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import os
import platform
import sys
import socket
import json
import traceback

Expand All @@ -21,12 +20,8 @@
import ray.ray_constants as ray_constants
import ray._private.services
import ray._private.utils
from ray._private.gcs_pubsub import gcs_pubsub_enabled, GcsPublisher
from ray._private.gcs_utils import (
GcsClient,
get_gcs_address_from_redis,
use_gcs_for_bootstrap,
)
from ray._private.gcs_pubsub import GcsPublisher
from ray._private.gcs_utils import GcsClient
from ray.core.generated import agent_manager_pb2
from ray.core.generated import agent_manager_pb2_grpc
from ray._private.ray_logging import setup_component_logger
Expand All @@ -48,11 +43,9 @@ class DashboardAgent(object):
def __init__(
self,
node_ip_address,
redis_address,
dashboard_agent_port,
gcs_address,
minimal,
redis_password=None,
temp_dir=None,
session_dir=None,
runtime_env_dir=None,
Expand All @@ -69,14 +62,8 @@ def __init__(
self.ip = node_ip_address
self.minimal = minimal

if use_gcs_for_bootstrap():
assert gcs_address is not None
self.gcs_address = gcs_address
else:
self.redis_address = dashboard_utils.address_tuple(redis_address)
self.redis_password = redis_password
self.aioredis_client = None
self.gcs_address = None
assert gcs_address is not None
self.gcs_address = gcs_address

self.temp_dir = temp_dir
self.session_dir = session_dir
Expand Down Expand Up @@ -159,32 +146,10 @@ async def _check_parent():
if sys.platform not in ["win32", "cygwin"]:
check_parent_task = create_task(_check_parent())

if not use_gcs_for_bootstrap():
# Create an aioredis client for all modules.
try:
self.aioredis_client = await dashboard_utils.get_aioredis_client(
self.redis_address,
self.redis_password,
dashboard_consts.CONNECT_REDIS_INTERNAL_SECONDS,
dashboard_consts.RETRY_REDIS_CONNECTION_TIMES,
)
except (socket.gaierror, ConnectionRefusedError):
logger.error(
"Dashboard agent exiting: " "Failed to connect to redis at %s",
self.redis_address,
)
sys.exit(-1)

# Start a grpc asyncio server.
await self.server.start()

if not use_gcs_for_bootstrap():
gcs_address = await self.aioredis_client.get(
dashboard_consts.GCS_SERVER_ADDRESS
)
self.gcs_client = GcsClient(address=gcs_address.decode())
else:
self.gcs_client = GcsClient(address=self.gcs_address)
self.gcs_client = GcsClient(address=self.gcs_address)
modules = self._load_modules()

# Setup http server if necessary.
Expand All @@ -196,7 +161,7 @@ async def _check_parent():
# included in the minimal ray package.
self.http_server = await self._configure_http_server(modules)

# Write the dashboard agent port to redis.
# Write the dashboard agent port to kv.
# TODO: Use async version if performance is an issue
# -1 should indicate that http server is not started.
http_port = -1 if not self.http_server else self.http_server.http_port
Expand Down Expand Up @@ -239,10 +204,7 @@ async def _check_parent():
help="the IP address of this node.",
)
parser.add_argument(
"--gcs-address", required=False, type=str, help="The address (ip:port) of GCS."
)
parser.add_argument(
"--redis-address", required=True, type=str, help="The address to use for Redis."
"--gcs-address", required=True, type=str, help="The address (ip:port) of GCS."
)
parser.add_argument(
"--metrics-export-port",
Expand Down Expand Up @@ -283,13 +245,6 @@ async def _check_parent():
default=None,
help="The socket path of the raylet process",
)
parser.add_argument(
"--redis-password",
required=False,
type=str,
default=None,
help="The password to use for Redis",
)
parser.add_argument(
"--logging-level",
required=False,
Expand Down Expand Up @@ -384,11 +339,9 @@ async def _check_parent():

agent = DashboardAgent(
args.node_ip_address,
args.redis_address,
args.dashboard_agent_port,
args.gcs_address,
args.minimal,
redis_password=args.redis_password,
temp_dir=args.temp_dir,
session_dir=args.session_dir,
runtime_env_dir=args.runtime_env_dir,
Expand Down Expand Up @@ -416,23 +369,7 @@ async def _check_parent():
# Agent is failed to be started many times.
# Push an error to all drivers, so that users can know the
# impact of the issue.
redis_client = None
gcs_publisher = None
if gcs_pubsub_enabled():
if use_gcs_for_bootstrap():
gcs_publisher = GcsPublisher(args.gcs_address)
else:
redis_client = ray._private.services.create_redis_client(
args.redis_address, password=args.redis_password
)
gcs_publisher = GcsPublisher(
address=get_gcs_address_from_redis(redis_client)
)
else:
redis_client = ray._private.services.create_redis_client(
args.redis_address, password=args.redis_password
)

gcs_publisher = GcsPublisher(args.gcs_address)
traceback_str = ray._private.utils.format_error_message(
traceback.format_exc()
)
Expand All @@ -451,7 +388,7 @@ async def _check_parent():
ray._private.utils.publish_error_to_driver(
ray_constants.DASHBOARD_AGENT_DIED_ERROR,
message,
redis_client=redis_client,
redis_client=None,
gcs_publisher=gcs_publisher,
)
logger.error(message)
Expand Down
53 changes: 6 additions & 47 deletions dashboard/dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@
import ray.dashboard.head as dashboard_head
import ray.dashboard.utils as dashboard_utils
import ray.ray_constants as ray_constants
import ray._private.gcs_utils as gcs_utils
import ray._private.services
import ray._private.utils
from ray._private.gcs_pubsub import gcs_pubsub_enabled, GcsPublisher
from ray._private.gcs_pubsub import GcsPublisher
from ray._private.ray_logging import setup_component_logger

# Logger for this module. It should be configured at the entry point
Expand All @@ -33,8 +32,6 @@ class Dashboard:
port(int): Port number of dashboard aiohttp server.
port_retries(int): The retry times to select a valid port.
gcs_address(str): GCS address of the cluster
redis_address(str): Redis address of a Ray cluster
redis_password(str): Redis password to access GCS
log_dir(str): Log directory of dashboard.
"""

Expand All @@ -44,8 +41,6 @@ def __init__(
port,
port_retries,
gcs_address,
redis_address,
redis_password=None,
log_dir=None,
temp_dir=None,
session_dir=None,
Expand All @@ -56,8 +51,6 @@ def __init__(
http_port=port,
http_port_retries=port_retries,
gcs_address=gcs_address,
redis_address=redis_address,
redis_password=redis_password,
log_dir=log_dir,
temp_dir=temp_dir,
session_dir=session_dir,
Expand All @@ -84,17 +77,7 @@ async def run(self):
help="The retry times to select a valid port.",
)
parser.add_argument(
"--gcs-address", required=False, type=str, help="The address (ip:port) of GCS."
)
parser.add_argument(
"--redis-address", required=True, type=str, help="The address to use for Redis."
)
parser.add_argument(
"--redis-password",
required=False,
type=str,
default=None,
help="The password to use for Redis",
"--gcs-address", required=True, type=str, help="The address (ip:port) of GCS."
)
parser.add_argument(
"--logging-level",
Expand Down Expand Up @@ -171,12 +154,6 @@ async def run(self):

args = parser.parse_args()

if gcs_utils.use_gcs_for_bootstrap():
args.redis_address = None
args.redis_password = None
else:
args.gcs_address = None

try:
setup_component_logger(
logging_level=args.logging_level,
Expand All @@ -192,8 +169,6 @@ async def run(self):
args.port,
args.port_retries,
args.gcs_address,
args.redis_address,
redis_password=args.redis_password,
log_dir=args.log_dir,
temp_dir=args.temp_dir,
session_dir=args.session_dir,
Expand All @@ -215,28 +190,12 @@ async def run(self):
raise e

# Something went wrong, so push an error to all drivers.
redis_client = None
gcs_publisher = None
if gcs_pubsub_enabled():
if gcs_utils.use_gcs_for_bootstrap():
gcs_publisher = GcsPublisher(args.gcs_address)
else:
redis_client = ray._private.services.create_redis_client(
args.redis_address, password=args.redis_password
)
gcs_publisher = GcsPublisher(
address=gcs_utils.get_gcs_address_from_redis(redis_client)
)
redis_client = None
else:
redis_client = ray._private.services.create_redis_client(
args.redis_address, password=args.redis_password
)
gcs_publisher = GcsPublisher(args.gcs_address)

ray._private.utils.publish_error_to_driver(
redis_client,
ray_constants.DASHBOARD_DIED_ERROR,
message,
redis_client=redis_client,
gcs_publisher=gcs_publisher,
None,
None,
gcs_publisher,
)
Loading

0 comments on commit 11bbf00

Please sign in to comment.