-
Notifications
You must be signed in to change notification settings - Fork 5.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Redis port option to startup script #232
Changes from 1 commit
d0986c1
519ddd6
e041919
933ea67
beabd4f
18de1ff
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -101,7 +101,7 @@ def cleanup(): | |
"""When running in local mode, shutdown the Ray processes. | ||
|
||
This method is used to shutdown processes that were started with | ||
services.start_ray_local(). It kills all scheduler, object store, and worker | ||
services.start_ray_head(). It kills all scheduler, object store, and worker | ||
processes that were started by this services module. Driver processes are | ||
started and disconnected by worker.py. | ||
""" | ||
|
@@ -178,7 +178,7 @@ def wait_for_redis_to_start(redis_host, redis_port, num_retries=5): | |
if counter == num_retries: | ||
raise Exception("Unable to connect to Redis. If the Redis instance is on a different machine, check that your firewall is configured properly.") | ||
|
||
def start_redis(node_ip_address, num_retries=20, cleanup=True, redirect_output=False): | ||
def start_redis(node_ip_address, port=None, num_retries=20, cleanup=True, redirect_output=False): | ||
"""Start a Redis server. | ||
|
||
Args: | ||
|
@@ -200,10 +200,14 @@ def start_redis(node_ip_address, num_retries=20, cleanup=True, redirect_output=F | |
assert os.path.isfile(redis_filepath) | ||
assert os.path.isfile(redis_module) | ||
counter = 0 | ||
if port: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we use |
||
if num_retries != 1: | ||
raise Exception("Num retries must be 1 if port is specified") | ||
else: | ||
port = new_port() | ||
while counter < num_retries: | ||
if counter > 0: | ||
print("Redis failed to start, retrying now.") | ||
port = new_port() | ||
with open(os.devnull, "w") as FNULL: | ||
stdout = FNULL if redirect_output else None | ||
stderr = FNULL if redirect_output else None | ||
|
@@ -215,6 +219,7 @@ def start_redis(node_ip_address, num_retries=20, cleanup=True, redirect_output=F | |
if cleanup: | ||
all_processes[PROCESS_TYPE_REDIS_SERVER].append(p) | ||
break | ||
port = new_port() | ||
counter += 1 | ||
if counter == num_retries: | ||
raise Exception("Couldn't start Redis.") | ||
|
@@ -370,7 +375,8 @@ def start_ray_processes(address_info=None, | |
worker_path=None, | ||
cleanup=True, | ||
redirect_output=False, | ||
include_global_scheduler=False): | ||
include_global_scheduler=False, | ||
include_redis=False): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. document this argument There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Have added this. |
||
"""Helper method to start Ray processes. | ||
|
||
Args: | ||
|
@@ -410,11 +416,23 @@ def start_ray_processes(address_info=None, | |
# warning messages when it starts up. Instead of suppressing the output, we | ||
# should address the warnings. | ||
redis_address = address_info.get("redis_address") | ||
if redis_address is None: | ||
redis_address = start_redis(node_ip_address, cleanup=cleanup, | ||
redirect_output=redirect_output) | ||
address_info["redis_address"] = redis_address | ||
time.sleep(0.1) | ||
if include_redis: | ||
if redis_address is None: | ||
redis_address = start_redis(node_ip_address, cleanup=cleanup, | ||
redirect_output=redirect_output) | ||
address_info["redis_address"] = redis_address | ||
time.sleep(0.1) | ||
else: | ||
redis_host, redis_port = redis_address.split(":") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the current terminology is I like blah_host better instead of blah_ip_address, but we should make it the same everywhere |
||
redis_address = start_redis(redis_host, | ||
port=int(redis_port), | ||
num_retries=1, | ||
cleanup=cleanup, | ||
redirect_output=redirect_output) | ||
else: | ||
if redis_address is None: | ||
raise Exception("Redis address expected") | ||
|
||
redis_port = get_port(redis_address) | ||
|
||
# Start the global scheduler, if necessary. | ||
|
@@ -519,7 +537,7 @@ def start_ray_node(node_ip_address, | |
cleanup=cleanup, | ||
redirect_output=redirect_output) | ||
|
||
def start_ray_local(address_info=None, | ||
def start_ray_head(address_info=None, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the indentation in the lines below needs to be fixed |
||
node_ip_address="127.0.0.1", | ||
num_workers=0, | ||
num_local_schedulers=1, | ||
|
@@ -558,4 +576,5 @@ def start_ray_local(address_info=None, | |
worker_path=worker_path, | ||
cleanup=cleanup, | ||
redirect_output=redirect_output, | ||
include_global_scheduler=True) | ||
include_global_scheduler=True, | ||
include_redis=True) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,7 +9,8 @@ | |
|
||
parser = argparse.ArgumentParser(description="Parse addresses for the worker to connect to.") | ||
parser.add_argument("--node-ip-address", required=False, type=str, help="the IP address of the worker's node") | ||
parser.add_argument("--redis-address", required=False, type=str, help="the address to use for Redis") | ||
parser.add_argument("--redis-address", required=False, type=str, help="the address to use for connecting to Redis") | ||
parser.add_argument("--redis-port", required=False, type=str, help="the port to use for starting Redis") | ||
parser.add_argument("--num-workers", default=10, required=False, type=int, help="the number of workers to start on this node") | ||
parser.add_argument("--head", action="store_true", help="provide this argument for the head node") | ||
|
||
|
@@ -41,13 +42,21 @@ def check_no_existing_redis_clients(node_ip_address, redis_address): | |
# Start Ray on the head node. | ||
if args.redis_address is not None: | ||
raise Exception("If --head is passed in, a Redis server will be started, so a Redis address should not be provided.") | ||
|
||
# Get the node IP address if one is not provided. | ||
if args.node_ip_address is None: | ||
node_ip_address = services.get_node_ip_address() | ||
else: | ||
node_ip_address = args.node_ip_address | ||
print("Using IP address {} for this node.".format(node_ip_address)) | ||
address_info = services.start_ray_local(node_ip_address=node_ip_address, | ||
|
||
if args.redis_port is not None: | ||
address_info_in = { "redis_address" : "{}:{}".format(node_ip_address, args.redis_port) } | ||
else: | ||
address_info_in = None | ||
|
||
address_info = services.start_ray_head(address_info=address_info_in, | ||
node_ip_address=node_ip_address, | ||
num_workers=args.num_workers, | ||
cleanup=False, | ||
redirect_output=True) | ||
|
@@ -69,6 +78,8 @@ def check_no_existing_redis_clients(node_ip_address, redis_address): | |
address_info["redis_address"])) | ||
else: | ||
# Start Ray on a non-head node. | ||
if args.redis_port: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we use |
||
raise Exception("If --head is not passed in, --redis-port is not allowed") | ||
if args.redis_address is None: | ||
raise Exception("If --head is not passed in, --redis-address must be provided.") | ||
redis_host, redis_port = args.redis_address.split(":") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for completeness, lets add documentation for "port"