Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Serve] Make the checkpoint and recover only from GCS #26753

Merged
merged 9 commits into from
Jul 25, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 5 additions & 34 deletions doc/source/serve/deploying-serve.md
Original file line number Diff line number Diff line change
Expand Up @@ -254,44 +254,15 @@ failure recovery solutions. Although Ray is not currently highly available (HA),
the long term roadmap and being actively worked on.
:::

Ray Serve added an experimental feature to help recovering the state.
This features enables Serve to write all your deployment configuration and code into a storage location.
Ray Serve provides the feature to help recovering the state.
This feature enables Serve to write all your deployment configuration and code into Global Control Store
(GCS).
Upon Ray cluster failure and restarts, you can simply call Serve to reconstruct the state.

Here is how to use it:

:::{warning}
The API is experimental and subject to change. We welcome you to test it out
and leave us feedback through github issues or discussion forum!
:::

You can use both the start argument and the CLI to specify it:

```python
serve.start(_checkpoint_path=...)
```

or

```shell
serve start --checkpoint-path ...
```

The checkpoint path argument accepts the following format:

- `file:https://local_file_path`
- `s3:https://bucket/path`
- `gs:https://bucket/path`
- `custom:https://importable.custom_python.Class/path`

While we have native support for on disk, AWS S3, and Google Cloud Storage (GCS), there is no reason we cannot support more.

In Kubernetes environment, we recommend using [Persistent Volumes] to create a disk and mount it into the Ray head node.
For example, you can provision Azure Disk, AWS Elastic Block Store, or GCP Persistent Disk using the K8s [Persistent Volumes] API.
Alternatively, you can also directly write to object store like S3.
In Kubernetes environment, we recommend using KubeRay (a Kubernetes operator for Ray Serve) to help deploy your Serve applications with Kubernetes, and help you recover the node crash from Customized Resource.

You can easily try to plug into your own implementation using the `custom:https://` path and inherit the [KVStoreBase] class.
Feel free to open new github issues and contribute more storage backends!
Feel free to open new github issues if you hit any problems from Failure Recovery.

[ingress]: https://kubernetes.io/docs/concepts/services-networking/ingress/
[kubernetes default config]: https://github.com/ray-project/ray/blob/master/python/ray/autoscaler/kubernetes/example-full.yaml
Expand Down
18 changes: 3 additions & 15 deletions python/ray/serve/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from ray.serve.config import AutoscalingConfig, DeploymentConfig, HTTPOptions
from ray.serve.constants import (
CONTROLLER_MAX_CONCURRENCY,
DEFAULT_CHECKPOINT_PATH,
DEFAULT_HTTP_HOST,
DEFAULT_HTTP_PORT,
HTTP_PROXY_TIMEOUT,
Expand Down Expand Up @@ -60,7 +59,6 @@ def start(
detached: bool = False,
http_options: Optional[Union[dict, HTTPOptions]] = None,
dedicated_cpu: bool = False,
_checkpoint_path: str = DEFAULT_CHECKPOINT_PATH,
**kwargs,
) -> ServeControllerClient:
"""Initialize a serve instance.
Expand Down Expand Up @@ -121,7 +119,7 @@ def start(
f'Connecting to existing Serve app in namespace "{SERVE_NAMESPACE}".'
)

_check_http_and_checkpoint_options(client, http_options, _checkpoint_path)
_check_http_options(client, http_options)
return client
except RayServeException:
pass
Expand Down Expand Up @@ -154,7 +152,6 @@ def start(
).remote(
controller_name,
http_config=http_options,
checkpoint_path=_checkpoint_path,
head_node_id=head_node_id,
detached=detached,
)
Expand Down Expand Up @@ -642,18 +639,9 @@ def build(target: Union[ClassNode, FunctionNode]) -> Application:
return Application(pipeline_build(target))


def _check_http_and_checkpoint_options(
client: ServeControllerClient,
http_options: Union[dict, HTTPOptions],
checkpoint_path: str,
def _check_http_options(
client: ServeControllerClient, http_options: Union[dict, HTTPOptions]
) -> None:
if checkpoint_path and checkpoint_path != client.checkpoint_path:
logger.warning(
f"The new client checkpoint path '{checkpoint_path}' "
f"is different from the existing one '{client.checkpoint_path}'. "
"The new checkpoint path is ignored."
)

if http_options:
client_http_options = client.http_config
new_http_options = (
Expand Down
5 changes: 0 additions & 5 deletions python/ray/serve/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ def __init__(
self._shutdown = False
self._http_config: HTTPOptions = ray.get(controller.get_http_config.remote())
self._root_url = ray.get(controller.get_root_url.remote())
self._checkpoint_path = ray.get(controller.get_checkpoint_path.remote())

# Each handle has the overhead of long poll client, therefore cached.
self.handle_cache = dict()
Expand All @@ -75,10 +74,6 @@ def root_url(self):
def http_config(self):
return self._http_config

@property
def checkpoint_path(self):
return self._checkpoint_path

def __del__(self):
if not self._detached:
logger.debug(
Expand Down
3 changes: 0 additions & 3 deletions python/ray/serve/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@
#: HTTP Port
DEFAULT_HTTP_PORT = 8000

#: Controller checkpoint path
DEFAULT_CHECKPOINT_PATH = "ray:https://"

#: Max concurrency
ASYNC_CONCURRENCY = int(1e6)

Expand Down
9 changes: 1 addition & 8 deletions python/ray/serve/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
from ray.serve.logging_utils import configure_component_logger
from ray.serve.long_poll import LongPollHost
from ray.serve.schema import ServeApplicationSchema
from ray.serve.storage.checkpoint_path import make_kv_store
from ray.serve.storage.kv_store import RayInternalKVStore
from ray.serve.utils import (
override_runtime_envs_except_env_vars,
Expand Down Expand Up @@ -86,7 +85,6 @@ async def __init__(
controller_name: str,
*,
http_config: HTTPOptions,
checkpoint_path: str,
head_node_id: str,
detached: bool = False,
):
Expand All @@ -97,9 +95,8 @@ async def __init__(
# Used to read/write checkpoints.
self.ray_worker_namespace = ray.get_runtime_context().namespace
self.controller_name = controller_name
self.checkpoint_path = checkpoint_path
kv_store_namespace = f"{self.controller_name}-{self.ray_worker_namespace}"
self.kv_store = make_kv_store(checkpoint_path, namespace=kv_store_namespace)
self.kv_store = RayInternalKVStore(kv_store_namespace)
self.snapshot_store = RayInternalKVStore(namespace=kv_store_namespace)

# Dictionary of deployment_name -> proxy_name -> queue length.
Expand Down Expand Up @@ -193,9 +190,6 @@ async def listen_for_change_java(self, keys_to_snapshot_ids_bytes: bytes):
self.long_poll_host.listen_for_change_java(keys_to_snapshot_ids_bytes)
)

def get_checkpoint_path(self) -> str:
return self.checkpoint_path

def get_all_endpoints(self) -> Dict[EndpointTag, Dict[str, Any]]:
"""Returns a dictionary of deployment name to config."""
return self.endpoint_state.get_endpoints()
Expand Down Expand Up @@ -626,7 +620,6 @@ class ServeControllerAvatar:
def __init__(
self,
controller_name: str,
checkpoint_path: str,
detached: bool = False,
dedicated_cpu: bool = False,
http_proxy_port: int = 8000,
Expand Down
17 changes: 1 addition & 16 deletions python/ray/serve/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from ray.serve.api import build as build_app
from ray.serve.config import DeploymentMode
from ray.serve.constants import (
DEFAULT_CHECKPOINT_PATH,
DEFAULT_HTTP_HOST,
DEFAULT_HTTP_PORT,
SERVE_NAMESPACE,
Expand Down Expand Up @@ -78,20 +77,7 @@ def cli():
type=click.Choice(list(DeploymentMode)),
help="Location of the HTTP servers. Defaults to HeadOnly.",
)
@click.option(
"--checkpoint-path",
default=DEFAULT_CHECKPOINT_PATH,
required=False,
type=str,
hidden=True,
)
def start(
address,
http_host,
http_port,
http_location,
checkpoint_path,
):
def start(address, http_host, http_port, http_location):
ray.init(
address=address,
namespace=SERVE_NAMESPACE,
Expand All @@ -103,7 +89,6 @@ def start(
port=http_port,
location=http_location,
),
_checkpoint_path=checkpoint_path,
)


Expand Down
86 changes: 0 additions & 86 deletions python/ray/serve/storage/checkpoint_path.py

This file was deleted.

Loading