forked from ray-project/ray
-
Notifications
You must be signed in to change notification settings - Fork 0
/
gcs_utils.py
151 lines (121 loc) · 4.07 KB
/
gcs_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import logging
from typing import Optional
from ray._private import ray_constants
import ray._private.gcs_aio_client
from ray.core.generated.common_pb2 import ErrorType, JobConfig
from ray.core.generated.gcs_pb2 import (
ActorTableData,
AvailableResources,
ErrorTableData,
GcsEntry,
GcsNodeInfo,
JobTableData,
ObjectTableData,
PlacementGroupTableData,
PubSubMessage,
ResourceDemand,
ResourceLoad,
ResourceMap,
ResourcesData,
ResourceTableData,
ResourceUsageBatchData,
TablePrefix,
TablePubsub,
TaskEvents,
WorkerTableData,
)
logger = logging.getLogger(__name__)
__all__ = [
"ActorTableData",
"GcsNodeInfo",
"AvailableResources",
"JobTableData",
"JobConfig",
"ErrorTableData",
"ErrorType",
"GcsEntry",
"ResourceUsageBatchData",
"ResourcesData",
"ObjectTableData",
"TablePrefix",
"TablePubsub",
"TaskEvents",
"ResourceDemand",
"ResourceLoad",
"ResourceMap",
"ResourceTableData",
"PubSubMessage",
"WorkerTableData",
"PlacementGroupTableData",
]
WORKER = 0
DRIVER = 1
# Cap messages at 512MB
_MAX_MESSAGE_LENGTH = 512 * 1024 * 1024
# Send keepalive every 60s
_GRPC_KEEPALIVE_TIME_MS = 60 * 1000
# Keepalive should be replied < 60s
_GRPC_KEEPALIVE_TIMEOUT_MS = 60 * 1000
# Also relying on these defaults:
# grpc.keepalive_permit_without_calls=0: No keepalive without inflight calls.
# grpc.use_local_subchannel_pool=0: Subchannels are shared.
_GRPC_OPTIONS = [
*ray_constants.GLOBAL_GRPC_OPTIONS,
("grpc.max_send_message_length", _MAX_MESSAGE_LENGTH),
("grpc.max_receive_message_length", _MAX_MESSAGE_LENGTH),
("grpc.keepalive_time_ms", _GRPC_KEEPALIVE_TIME_MS),
("grpc.keepalive_timeout_ms", _GRPC_KEEPALIVE_TIMEOUT_MS),
]
def create_gcs_channel(address: str, aio=False):
"""Returns a GRPC channel to GCS.
Args:
address: GCS address string, e.g. ip:port
aio: Whether using grpc.aio
Returns:
grpc.Channel or grpc.aio.Channel to GCS
"""
from ray._private.utils import init_grpc_channel
return init_grpc_channel(address, options=_GRPC_OPTIONS, asynchronous=aio)
class GcsChannel:
def __init__(self, gcs_address: Optional[str] = None, aio: bool = False):
self._gcs_address = gcs_address
self._aio = aio
@property
def address(self):
return self._gcs_address
def connect(self):
# GCS server uses a cached port, so it should use the same port after
# restarting. This means GCS address should stay the same for the
# lifetime of the Ray cluster.
self._channel = create_gcs_channel(self._gcs_address, self._aio)
def channel(self):
return self._channel
# re-export
GcsAioClient = ray._private.gcs_aio_client.GcsAioClient
def cleanup_redis_storage(
host: str, port: int, password: str, use_ssl: bool, storage_namespace: str
):
"""This function is used to cleanup the storage. Before we having
a good design for storage backend, it can be used to delete the old
data. It support redis cluster and non cluster mode.
Args:
host: The host address of the Redis.
port: The port of the Redis.
password: The password of the Redis.
use_ssl: Whether to encrypt the connection.
storage_namespace: The namespace of the storage to be deleted.
"""
from ray._raylet import del_key_from_storage # type: ignore
if not isinstance(host, str):
raise ValueError("Host must be a string")
if not isinstance(password, str):
raise ValueError("Password must be a string")
if port < 0:
raise ValueError(f"Invalid port: {port}")
if not isinstance(use_ssl, bool):
raise TypeError("use_ssl must be a boolean")
if not isinstance(storage_namespace, str):
raise ValueError("storage namespace must be a string")
# Right now, GCS store all data into a hash set key by storage_namespace.
# So we only need to delete the specific key to cleanup the cluster.
return del_key_from_storage(host, port, password, use_ssl, storage_namespace)