Skip to content

Commit

Permalink
[Serve] Make the checkpoint and recover only from GCS (ray-project#26753
Browse files Browse the repository at this point in the history
)
  • Loading branch information
sihanwang41 authored and Scott Graham committed Aug 15, 2022
1 parent b21d2aa commit 351fe55
Show file tree
Hide file tree
Showing 28 changed files with 34 additions and 801 deletions.
39 changes: 5 additions & 34 deletions doc/source/serve/deploying-serve.md
Original file line number Diff line number Diff line change
Expand Up @@ -254,44 +254,15 @@ failure recovery solutions. Although Ray is not currently highly available (HA),
the long term roadmap and being actively worked on.
:::

Ray Serve added an experimental feature to help recovering the state.
This features enables Serve to write all your deployment configuration and code into a storage location.
Ray Serve provides the feature to help recovering the state.
This feature enables Serve to write all your deployment configuration and code into Global Control Store
(GCS).
Upon Ray cluster failure and restarts, you can simply call Serve to reconstruct the state.

Here is how to use it:

:::{warning}
The API is experimental and subject to change. We welcome you to test it out
and leave us feedback through github issues or discussion forum!
:::

You can use both the start argument and the CLI to specify it:

```python
serve.start(_checkpoint_path=...)
```

or

```shell
serve start --checkpoint-path ...
```

The checkpoint path argument accepts the following format:

- `file:https://local_file_path`
- `s3:https://bucket/path`
- `gs:https://bucket/path`
- `custom:https://importable.custom_python.Class/path`

While we have native support for on disk, AWS S3, and Google Cloud Storage (GCS), there is no reason we cannot support more.

In Kubernetes environment, we recommend using [Persistent Volumes] to create a disk and mount it into the Ray head node.
For example, you can provision Azure Disk, AWS Elastic Block Store, or GCP Persistent Disk using the K8s [Persistent Volumes] API.
Alternatively, you can also directly write to object store like S3.
In Kubernetes environment, we recommend using KubeRay (a Kubernetes operator for Ray Serve) to help deploy your Serve applications with Kubernetes, and help you recover the node crash from Customized Resource.

You can easily try to plug into your own implementation using the `custom:https://` path and inherit the [KVStoreBase] class.
Feel free to open new github issues and contribute more storage backends!
Feel free to open new github issues if you hit any problems from Failure Recovery.

[ingress]: https://kubernetes.io/docs/concepts/services-networking/ingress/
[kubernetes default config]: https://github.com/ray-project/ray/blob/master/python/ray/autoscaler/kubernetes/example-full.yaml
Expand Down
23 changes: 1 addition & 22 deletions java/serve/src/main/java/io/ray/serve/api/Serve.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,11 @@
import java.util.Map;
import java.util.Optional;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Ray Serve global API. TODO: will be riched in the Java SDK/API PR. */
public class Serve {

private static final Logger LOGGER = LoggerFactory.getLogger(Serve.class);

private static ReplicaContext INTERNAL_REPLICA_CONTEXT;
Expand All @@ -50,13 +48,11 @@ public class Serve {
* instance will live on the Ray cluster until it is explicitly stopped with Serve.shutdown().
* @param dedicatedCpu Whether to reserve a CPU core for the internal Serve controller actor.
* Defaults to False.
* @param checkpointPath
* @param config Configuration options for Serve.
* @return
*/
public static synchronized ServeControllerClient start(
boolean detached, boolean dedicatedCpu, String checkpointPath, Map<String, String> config) {

boolean detached, boolean dedicatedCpu, Map<String, String> config) {
// Initialize ray if needed.
if (!Ray.isInitialized()) {
System.setProperty("ray.job.namespace", Constants.SERVE_NAMESPACE);
Expand All @@ -66,7 +62,6 @@ public static synchronized ServeControllerClient start(
try {
ServeControllerClient client = getGlobalClient(true);
LOGGER.info("Connecting to existing Serve app in namespace {}", Constants.SERVE_NAMESPACE);
checkCheckpointPath(client, checkpointPath);
return client;
} catch (RayServeException | IllegalStateException e) {
LOGGER.info("There is no instance running on this Ray cluster. A new one will be started.");
Expand All @@ -78,9 +73,6 @@ public static synchronized ServeControllerClient start(
: CommonUtil.formatActorName(
Constants.SERVE_CONTROLLER_NAME, RandomStringUtils.randomAlphabetic(6));

if (StringUtils.isBlank(checkpointPath)) {
checkpointPath = Constants.DEFAULT_CHECKPOINT_PATH;
}
int httpPort =
Optional.ofNullable(config)
.map(m -> m.get(RayServeConfig.PROXY_HTTP_PORT))
Expand All @@ -90,7 +82,6 @@ public static synchronized ServeControllerClient start(
Ray.actor(
PyActorClass.of("ray.serve.controller", "ServeControllerAvatar"),
controllerName,
checkpointPath,
detached,
dedicatedCpu,
httpPort)
Expand Down Expand Up @@ -136,23 +127,12 @@ public static synchronized ServeControllerClient start(
return client;
}

private static void checkCheckpointPath(ServeControllerClient client, String checkpointPath) {
if (StringUtils.isNotBlank(checkpointPath)
&& !StringUtils.equals(checkpointPath, client.getCheckpointPath())) {
LOGGER.warn(
"The new client checkpoint path '{}' is different from the existing one '{}'. The new checkpoint path is ignored.",
checkpointPath,
client.getCheckpointPath());
}
}

/**
* Completely shut down the connected Serve instance.
*
* <p>Shuts down all processes and deletes all state associated with the instance.
*/
public static void shutdown() {

ServeControllerClient client = null;
try {
client = getGlobalClient();
Expand Down Expand Up @@ -263,7 +243,6 @@ public static void setGlobalClient(ServeControllerClient client) {
* @return
*/
public static ServeControllerClient connect() {

// Initialize ray if needed.
if (!Ray.isInitialized()) {
System.setProperty("ray.job.namespace", Constants.SERVE_NAMESPACE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.slf4j.LoggerFactory;

public class ServeControllerClient {

private static final Logger LOGGER = LoggerFactory.getLogger(ServeControllerClient.class);

private static long CLIENT_POLLING_INTERVAL_S = 1;
Expand All @@ -49,8 +48,6 @@ public class ServeControllerClient {

private String rootUrl;

private String checkpointPath;

@SuppressWarnings("unchecked")
public ServeControllerClient(
BaseActorHandle controller, String controllerName, boolean detached) {
Expand All @@ -65,17 +62,6 @@ public ServeControllerClient(
.task(ServeController::getRootUrl)
.remote()
.get();
this.checkpointPath =
controller instanceof PyActorHandle
? (String)
((PyActorHandle) controller)
.task(PyActorMethod.of("get_checkpoint_path"))
.remote()
.get()
: ((ActorHandle<ServeController>) controller)
.task(ServeController::getCheckpointPath)
.remote()
.get();
}

/**
Expand All @@ -87,7 +73,6 @@ public ServeControllerClient(
*/
@SuppressWarnings("unchecked")
public RayServeHandle getHandle(String deploymentName, boolean missingOk) {

String cacheKey = deploymentName + "#" + missingOk;
if (handleCache.containsKey(cacheKey)) {
return handleCache.get(cacheKey);
Expand Down Expand Up @@ -132,7 +117,6 @@ public void deploy(
String routePrefix,
String url,
Boolean blocking) {

if (deploymentConfig == null) {
deploymentConfig = new DeploymentConfig();
}
Expand Down Expand Up @@ -202,7 +186,6 @@ public void waitForDeploymentHealthy(String name, Long timeoutS) {
long start = System.currentTimeMillis();
boolean isTimeout = true;
while (timeoutS == null || System.currentTimeMillis() - start < timeoutS * 1000) {

DeploymentStatusInfo status = getDeploymentStatus(name);
if (status == null) {
throw new RayServeException(
Expand Down Expand Up @@ -311,10 +294,6 @@ public String getRootUrl() {
return rootUrl;
}

public String getCheckpointPath() {
return checkpointPath;
}

public DeploymentRoute getDeploymentInfo(String name) {
return DeploymentRoute.fromProtoBytes(
(byte[])
Expand All @@ -325,7 +304,6 @@ public DeploymentRoute getDeploymentInfo(String name) {
}

public Map<String, DeploymentRoute> listDeployments() {

DeploymentRouteList deploymentRouteList =
ServeProtoUtil.bytesToProto(
(byte[])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,9 @@
import io.ray.serve.poll.LongPollResult;

public interface ServeController {

byte[] getAllEndpoints();

LongPollResult listenForChange(LongPollRequest longPollRequest);

String getRootUrl();

String getCheckpointPath();
}
2 changes: 1 addition & 1 deletion java/serve/src/test/java/io/ray/serve/BaseServeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public void setUpBase(Method method) {
Map<String, String> config = Maps.newHashMap();
// The default port 8000 is occupied by other processes on the ci platform
config.put(RayServeConfig.PROXY_HTTP_PORT, "8341");
client = Serve.start(true, false, null, config);
client = Serve.start(true, false, config);
}

@AfterMethod(alwaysRun = true)
Expand Down
11 changes: 1 addition & 10 deletions java/serve/src/test/java/io/ray/serve/DummyServeController.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,14 @@
import java.util.Map;

public class DummyServeController implements ServeController {

private Map<String, EndpointInfo> endpoints;

private LongPollResult longPollResult;

private String rootUrl;

private String checkpointPath;

public DummyServeController(String rootUrl, String checkpointPath) {
public DummyServeController(String rootUrl) {
this.rootUrl = rootUrl;
this.checkpointPath = checkpointPath;
}

@Override
Expand Down Expand Up @@ -51,9 +47,4 @@ public String getRootUrl() {
public void setRootUrl(String rootUrl) {
this.rootUrl = rootUrl;
}

@Override
public String getCheckpointPath() {
return checkpointPath;
}
}
4 changes: 1 addition & 3 deletions java/serve/src/test/java/io/ray/serve/HttpProxyTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.testng.annotations.Test;

public class HttpProxyTest extends BaseTest {

@Test
public void test() throws IOException {
init();
Expand All @@ -38,7 +37,7 @@ public void test() throws IOException {

// Controller
ActorHandle<DummyServeController> controllerHandle =
Ray.actor(DummyServeController::new, "", "").setName(controllerName).remote();
Ray.actor(DummyServeController::new, "").setName(controllerName).remote();

Map<String, EndpointInfo> endpointInfos = new HashMap<>();
endpointInfos.put(
Expand All @@ -61,7 +60,6 @@ public void test() throws IOException {
HttpPost httpPost = new HttpPost("http:https://localhost:" + httpProxy.getPort() + route);
try (CloseableHttpResponse httpResponse =
(CloseableHttpResponse) httpClient.execute(httpPost)) {

// No replica, so error is expected.
int status = httpResponse.getCode();
Assert.assertEquals(status, HttpURLConnection.HTTP_INTERNAL_ERROR);
Expand Down
4 changes: 1 addition & 3 deletions java/serve/src/test/java/io/ray/serve/ProxyActorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.testng.annotations.Test;

public class ProxyActorTest extends BaseTest {

@Test
public void test() throws IOException {
init();
Expand All @@ -50,7 +49,7 @@ public void test() throws IOException {

// Controller
ActorHandle<DummyServeController> controller =
Ray.actor(DummyServeController::new, "", "").setName(controllerName).remote();
Ray.actor(DummyServeController::new, "").setName(controllerName).remote();
Map<String, EndpointInfo> endpointInfos = new HashMap<>();
endpointInfos.put(
endpointName,
Expand Down Expand Up @@ -95,7 +94,6 @@ public void test() throws IOException {
+ route);
try (CloseableHttpResponse httpResponse =
(CloseableHttpResponse) httpClient.execute(httpPost)) {

int status = httpResponse.getCode();
Assert.assertEquals(status, HttpURLConnection.HTTP_OK);
Object result =
Expand Down
3 changes: 1 addition & 2 deletions java/serve/src/test/java/io/ray/serve/ProxyRouterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.testng.annotations.Test;

public class ProxyRouterTest extends BaseTest {

@Test
public void test() {
init();
Expand All @@ -33,7 +32,7 @@ public void test() {

// Controller
ActorHandle<DummyServeController> controllerHandle =
Ray.actor(DummyServeController::new, "", "").setName(controllerName).remote();
Ray.actor(DummyServeController::new, "").setName(controllerName).remote();
Map<String, EndpointInfo> endpointInfos = new HashMap<>();
endpointInfos.put(
endpointName1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.testng.annotations.Test;

public class RayServeHandleTest extends BaseTest {

@Test
public void test() {
init();
Expand All @@ -40,7 +39,7 @@ public void test() {

// Controller
ActorHandle<DummyServeController> controllerHandle =
Ray.actor(DummyServeController::new, "", "").setName(controllerName).remote();
Ray.actor(DummyServeController::new, "").setName(controllerName).remote();

// Set ReplicaContext
Serve.setInternalReplicaContext(null, null, controllerName, null, config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.testng.annotations.Test;

public class RayServeReplicaTest extends BaseTest {

@SuppressWarnings("unused")
@Test
public void test() throws IOException {
Expand All @@ -37,7 +36,7 @@ public void test() throws IOException {
config.put(RayServeConfig.LONG_POOL_CLIENT_ENABLED, "false");

ActorHandle<DummyServeController> controllerHandle =
Ray.actor(DummyServeController::new, "", "").setName(controllerName).remote();
Ray.actor(DummyServeController::new, "").setName(controllerName).remote();

DeploymentConfig deploymentConfig =
new DeploymentConfig().setDeploymentLanguage(DeploymentLanguage.JAVA);
Expand Down
3 changes: 1 addition & 2 deletions java/serve/src/test/java/io/ray/serve/ReplicaSetTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.testng.annotations.Test;

public class ReplicaSetTest extends BaseTest {

private String deploymentName = "ReplicaSetTest";

@Test
Expand All @@ -48,7 +47,7 @@ public void assignReplicaTest() {

// Controller
ActorHandle<DummyServeController> controllerHandle =
Ray.actor(DummyServeController::new, "", "").setName(controllerName).remote();
Ray.actor(DummyServeController::new, "").setName(controllerName).remote();

// Replica
DeploymentConfig deploymentConfig =
Expand Down
Loading

0 comments on commit 351fe55

Please sign in to comment.