diff --git a/doc/source/serve/deploying-serve.md b/doc/source/serve/deploying-serve.md index 49e144a556523..6298736041a73 100644 --- a/doc/source/serve/deploying-serve.md +++ b/doc/source/serve/deploying-serve.md @@ -254,44 +254,15 @@ failure recovery solutions. Although Ray is not currently highly available (HA), the long term roadmap and being actively worked on. ::: -Ray Serve added an experimental feature to help recovering the state. -This features enables Serve to write all your deployment configuration and code into a storage location. +Ray Serve provides the feature to help recovering the state. +This feature enables Serve to write all your deployment configuration and code into Global Control Store +(GCS). Upon Ray cluster failure and restarts, you can simply call Serve to reconstruct the state. -Here is how to use it: -:::{warning} -The API is experimental and subject to change. We welcome you to test it out -and leave us feedback through github issues or discussion forum! -::: - -You can use both the start argument and the CLI to specify it: - -```python -serve.start(_checkpoint_path=...) -``` - -or - -```shell -serve start --checkpoint-path ... -``` - -The checkpoint path argument accepts the following format: - -- `file://local_file_path` -- `s3://bucket/path` -- `gs://bucket/path` -- `custom://importable.custom_python.Class/path` - -While we have native support for on disk, AWS S3, and Google Cloud Storage (GCS), there is no reason we cannot support more. - -In Kubernetes environment, we recommend using [Persistent Volumes] to create a disk and mount it into the Ray head node. -For example, you can provision Azure Disk, AWS Elastic Block Store, or GCP Persistent Disk using the K8s [Persistent Volumes] API. -Alternatively, you can also directly write to object store like S3. +In Kubernetes environment, we recommend using KubeRay (a Kubernetes operator for Ray Serve) to help deploy your Serve applications with Kubernetes, and help you recover the node crash from Customized Resource. -You can easily try to plug into your own implementation using the `custom://` path and inherit the [KVStoreBase] class. -Feel free to open new github issues and contribute more storage backends! +Feel free to open new github issues if you hit any problems from Failure Recovery. [ingress]: https://kubernetes.io/docs/concepts/services-networking/ingress/ [kubernetes default config]: https://github.com/ray-project/ray/blob/master/python/ray/autoscaler/kubernetes/example-full.yaml diff --git a/java/serve/src/main/java/io/ray/serve/api/Serve.java b/java/serve/src/main/java/io/ray/serve/api/Serve.java index 01e145a98ab48..f6080bc8d32c9 100644 --- a/java/serve/src/main/java/io/ray/serve/api/Serve.java +++ b/java/serve/src/main/java/io/ray/serve/api/Serve.java @@ -26,13 +26,11 @@ import java.util.Map; import java.util.Optional; import org.apache.commons.lang3.RandomStringUtils; -import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** Ray Serve global API. TODO: will be riched in the Java SDK/API PR. */ public class Serve { - private static final Logger LOGGER = LoggerFactory.getLogger(Serve.class); private static ReplicaContext INTERNAL_REPLICA_CONTEXT; @@ -50,13 +48,11 @@ public class Serve { * instance will live on the Ray cluster until it is explicitly stopped with Serve.shutdown(). * @param dedicatedCpu Whether to reserve a CPU core for the internal Serve controller actor. * Defaults to False. - * @param checkpointPath * @param config Configuration options for Serve. * @return */ public static synchronized ServeControllerClient start( - boolean detached, boolean dedicatedCpu, String checkpointPath, Map config) { - + boolean detached, boolean dedicatedCpu, Map config) { // Initialize ray if needed. if (!Ray.isInitialized()) { System.setProperty("ray.job.namespace", Constants.SERVE_NAMESPACE); @@ -66,7 +62,6 @@ public static synchronized ServeControllerClient start( try { ServeControllerClient client = getGlobalClient(true); LOGGER.info("Connecting to existing Serve app in namespace {}", Constants.SERVE_NAMESPACE); - checkCheckpointPath(client, checkpointPath); return client; } catch (RayServeException | IllegalStateException e) { LOGGER.info("There is no instance running on this Ray cluster. A new one will be started."); @@ -78,9 +73,6 @@ public static synchronized ServeControllerClient start( : CommonUtil.formatActorName( Constants.SERVE_CONTROLLER_NAME, RandomStringUtils.randomAlphabetic(6)); - if (StringUtils.isBlank(checkpointPath)) { - checkpointPath = Constants.DEFAULT_CHECKPOINT_PATH; - } int httpPort = Optional.ofNullable(config) .map(m -> m.get(RayServeConfig.PROXY_HTTP_PORT)) @@ -90,7 +82,6 @@ public static synchronized ServeControllerClient start( Ray.actor( PyActorClass.of("ray.serve.controller", "ServeControllerAvatar"), controllerName, - checkpointPath, detached, dedicatedCpu, httpPort) @@ -136,23 +127,12 @@ public static synchronized ServeControllerClient start( return client; } - private static void checkCheckpointPath(ServeControllerClient client, String checkpointPath) { - if (StringUtils.isNotBlank(checkpointPath) - && !StringUtils.equals(checkpointPath, client.getCheckpointPath())) { - LOGGER.warn( - "The new client checkpoint path '{}' is different from the existing one '{}'. The new checkpoint path is ignored.", - checkpointPath, - client.getCheckpointPath()); - } - } - /** * Completely shut down the connected Serve instance. * *

Shuts down all processes and deletes all state associated with the instance. */ public static void shutdown() { - ServeControllerClient client = null; try { client = getGlobalClient(); @@ -263,7 +243,6 @@ public static void setGlobalClient(ServeControllerClient client) { * @return */ public static ServeControllerClient connect() { - // Initialize ray if needed. if (!Ray.isInitialized()) { System.setProperty("ray.job.namespace", Constants.SERVE_NAMESPACE); diff --git a/java/serve/src/main/java/io/ray/serve/api/ServeControllerClient.java b/java/serve/src/main/java/io/ray/serve/api/ServeControllerClient.java index cdded4412f252..aae180d58b547 100644 --- a/java/serve/src/main/java/io/ray/serve/api/ServeControllerClient.java +++ b/java/serve/src/main/java/io/ray/serve/api/ServeControllerClient.java @@ -32,7 +32,6 @@ import org.slf4j.LoggerFactory; public class ServeControllerClient { - private static final Logger LOGGER = LoggerFactory.getLogger(ServeControllerClient.class); private static long CLIENT_POLLING_INTERVAL_S = 1; @@ -49,8 +48,6 @@ public class ServeControllerClient { private String rootUrl; - private String checkpointPath; - @SuppressWarnings("unchecked") public ServeControllerClient( BaseActorHandle controller, String controllerName, boolean detached) { @@ -65,17 +62,6 @@ public ServeControllerClient( .task(ServeController::getRootUrl) .remote() .get(); - this.checkpointPath = - controller instanceof PyActorHandle - ? (String) - ((PyActorHandle) controller) - .task(PyActorMethod.of("get_checkpoint_path")) - .remote() - .get() - : ((ActorHandle) controller) - .task(ServeController::getCheckpointPath) - .remote() - .get(); } /** @@ -87,7 +73,6 @@ public ServeControllerClient( */ @SuppressWarnings("unchecked") public RayServeHandle getHandle(String deploymentName, boolean missingOk) { - String cacheKey = deploymentName + "#" + missingOk; if (handleCache.containsKey(cacheKey)) { return handleCache.get(cacheKey); @@ -132,7 +117,6 @@ public void deploy( String routePrefix, String url, Boolean blocking) { - if (deploymentConfig == null) { deploymentConfig = new DeploymentConfig(); } @@ -202,7 +186,6 @@ public void waitForDeploymentHealthy(String name, Long timeoutS) { long start = System.currentTimeMillis(); boolean isTimeout = true; while (timeoutS == null || System.currentTimeMillis() - start < timeoutS * 1000) { - DeploymentStatusInfo status = getDeploymentStatus(name); if (status == null) { throw new RayServeException( @@ -311,10 +294,6 @@ public String getRootUrl() { return rootUrl; } - public String getCheckpointPath() { - return checkpointPath; - } - public DeploymentRoute getDeploymentInfo(String name) { return DeploymentRoute.fromProtoBytes( (byte[]) @@ -325,7 +304,6 @@ public DeploymentRoute getDeploymentInfo(String name) { } public Map listDeployments() { - DeploymentRouteList deploymentRouteList = ServeProtoUtil.bytesToProto( (byte[]) diff --git a/java/serve/src/main/java/io/ray/serve/controller/ServeController.java b/java/serve/src/main/java/io/ray/serve/controller/ServeController.java index a86f4b087f602..0946e0d67d350 100644 --- a/java/serve/src/main/java/io/ray/serve/controller/ServeController.java +++ b/java/serve/src/main/java/io/ray/serve/controller/ServeController.java @@ -4,12 +4,9 @@ import io.ray.serve.poll.LongPollResult; public interface ServeController { - byte[] getAllEndpoints(); LongPollResult listenForChange(LongPollRequest longPollRequest); String getRootUrl(); - - String getCheckpointPath(); } diff --git a/java/serve/src/test/java/io/ray/serve/BaseServeTest.java b/java/serve/src/test/java/io/ray/serve/BaseServeTest.java index d04c3c3155552..3f58ad13d2df0 100644 --- a/java/serve/src/test/java/io/ray/serve/BaseServeTest.java +++ b/java/serve/src/test/java/io/ray/serve/BaseServeTest.java @@ -23,7 +23,7 @@ public void setUpBase(Method method) { Map config = Maps.newHashMap(); // The default port 8000 is occupied by other processes on the ci platform config.put(RayServeConfig.PROXY_HTTP_PORT, "8341"); - client = Serve.start(true, false, null, config); + client = Serve.start(true, false, config); } @AfterMethod(alwaysRun = true) diff --git a/java/serve/src/test/java/io/ray/serve/DummyServeController.java b/java/serve/src/test/java/io/ray/serve/DummyServeController.java index f2c2453095033..fc3507766f73e 100644 --- a/java/serve/src/test/java/io/ray/serve/DummyServeController.java +++ b/java/serve/src/test/java/io/ray/serve/DummyServeController.java @@ -8,18 +8,14 @@ import java.util.Map; public class DummyServeController implements ServeController { - private Map endpoints; private LongPollResult longPollResult; private String rootUrl; - private String checkpointPath; - - public DummyServeController(String rootUrl, String checkpointPath) { + public DummyServeController(String rootUrl) { this.rootUrl = rootUrl; - this.checkpointPath = checkpointPath; } @Override @@ -51,9 +47,4 @@ public String getRootUrl() { public void setRootUrl(String rootUrl) { this.rootUrl = rootUrl; } - - @Override - public String getCheckpointPath() { - return checkpointPath; - } } diff --git a/java/serve/src/test/java/io/ray/serve/HttpProxyTest.java b/java/serve/src/test/java/io/ray/serve/HttpProxyTest.java index 175005d72e214..a47cae2a8c07c 100644 --- a/java/serve/src/test/java/io/ray/serve/HttpProxyTest.java +++ b/java/serve/src/test/java/io/ray/serve/HttpProxyTest.java @@ -22,7 +22,6 @@ import org.testng.annotations.Test; public class HttpProxyTest extends BaseTest { - @Test public void test() throws IOException { init(); @@ -38,7 +37,7 @@ public void test() throws IOException { // Controller ActorHandle controllerHandle = - Ray.actor(DummyServeController::new, "", "").setName(controllerName).remote(); + Ray.actor(DummyServeController::new, "").setName(controllerName).remote(); Map endpointInfos = new HashMap<>(); endpointInfos.put( @@ -61,7 +60,6 @@ public void test() throws IOException { HttpPost httpPost = new HttpPost("http://localhost:" + httpProxy.getPort() + route); try (CloseableHttpResponse httpResponse = (CloseableHttpResponse) httpClient.execute(httpPost)) { - // No replica, so error is expected. int status = httpResponse.getCode(); Assert.assertEquals(status, HttpURLConnection.HTTP_INTERNAL_ERROR); diff --git a/java/serve/src/test/java/io/ray/serve/ProxyActorTest.java b/java/serve/src/test/java/io/ray/serve/ProxyActorTest.java index c913d3618f64c..5e135df5002ca 100644 --- a/java/serve/src/test/java/io/ray/serve/ProxyActorTest.java +++ b/java/serve/src/test/java/io/ray/serve/ProxyActorTest.java @@ -30,7 +30,6 @@ import org.testng.annotations.Test; public class ProxyActorTest extends BaseTest { - @Test public void test() throws IOException { init(); @@ -50,7 +49,7 @@ public void test() throws IOException { // Controller ActorHandle controller = - Ray.actor(DummyServeController::new, "", "").setName(controllerName).remote(); + Ray.actor(DummyServeController::new, "").setName(controllerName).remote(); Map endpointInfos = new HashMap<>(); endpointInfos.put( endpointName, @@ -95,7 +94,6 @@ public void test() throws IOException { + route); try (CloseableHttpResponse httpResponse = (CloseableHttpResponse) httpClient.execute(httpPost)) { - int status = httpResponse.getCode(); Assert.assertEquals(status, HttpURLConnection.HTTP_OK); Object result = diff --git a/java/serve/src/test/java/io/ray/serve/ProxyRouterTest.java b/java/serve/src/test/java/io/ray/serve/ProxyRouterTest.java index 89c30b8805d48..473f90f374b10 100644 --- a/java/serve/src/test/java/io/ray/serve/ProxyRouterTest.java +++ b/java/serve/src/test/java/io/ray/serve/ProxyRouterTest.java @@ -16,7 +16,6 @@ import org.testng.annotations.Test; public class ProxyRouterTest extends BaseTest { - @Test public void test() { init(); @@ -33,7 +32,7 @@ public void test() { // Controller ActorHandle controllerHandle = - Ray.actor(DummyServeController::new, "", "").setName(controllerName).remote(); + Ray.actor(DummyServeController::new, "").setName(controllerName).remote(); Map endpointInfos = new HashMap<>(); endpointInfos.put( endpointName1, diff --git a/java/serve/src/test/java/io/ray/serve/RayServeHandleTest.java b/java/serve/src/test/java/io/ray/serve/RayServeHandleTest.java index 95d42e68b6ec9..de0b3f2c1d2b9 100644 --- a/java/serve/src/test/java/io/ray/serve/RayServeHandleTest.java +++ b/java/serve/src/test/java/io/ray/serve/RayServeHandleTest.java @@ -22,7 +22,6 @@ import org.testng.annotations.Test; public class RayServeHandleTest extends BaseTest { - @Test public void test() { init(); @@ -40,7 +39,7 @@ public void test() { // Controller ActorHandle controllerHandle = - Ray.actor(DummyServeController::new, "", "").setName(controllerName).remote(); + Ray.actor(DummyServeController::new, "").setName(controllerName).remote(); // Set ReplicaContext Serve.setInternalReplicaContext(null, null, controllerName, null, config); diff --git a/java/serve/src/test/java/io/ray/serve/RayServeReplicaTest.java b/java/serve/src/test/java/io/ray/serve/RayServeReplicaTest.java index 98df64033c64c..ceca54a5c8f1b 100644 --- a/java/serve/src/test/java/io/ray/serve/RayServeReplicaTest.java +++ b/java/serve/src/test/java/io/ray/serve/RayServeReplicaTest.java @@ -22,7 +22,6 @@ import org.testng.annotations.Test; public class RayServeReplicaTest extends BaseTest { - @SuppressWarnings("unused") @Test public void test() throws IOException { @@ -37,7 +36,7 @@ public void test() throws IOException { config.put(RayServeConfig.LONG_POOL_CLIENT_ENABLED, "false"); ActorHandle controllerHandle = - Ray.actor(DummyServeController::new, "", "").setName(controllerName).remote(); + Ray.actor(DummyServeController::new, "").setName(controllerName).remote(); DeploymentConfig deploymentConfig = new DeploymentConfig().setDeploymentLanguage(DeploymentLanguage.JAVA); diff --git a/java/serve/src/test/java/io/ray/serve/ReplicaSetTest.java b/java/serve/src/test/java/io/ray/serve/ReplicaSetTest.java index a49eda9bdaf64..76930f823b675 100644 --- a/java/serve/src/test/java/io/ray/serve/ReplicaSetTest.java +++ b/java/serve/src/test/java/io/ray/serve/ReplicaSetTest.java @@ -21,7 +21,6 @@ import org.testng.annotations.Test; public class ReplicaSetTest extends BaseTest { - private String deploymentName = "ReplicaSetTest"; @Test @@ -48,7 +47,7 @@ public void assignReplicaTest() { // Controller ActorHandle controllerHandle = - Ray.actor(DummyServeController::new, "", "").setName(controllerName).remote(); + Ray.actor(DummyServeController::new, "").setName(controllerName).remote(); // Replica DeploymentConfig deploymentConfig = diff --git a/java/serve/src/test/java/io/ray/serve/RouterTest.java b/java/serve/src/test/java/io/ray/serve/RouterTest.java index 7375877bdfc01..cbad0f6593e75 100644 --- a/java/serve/src/test/java/io/ray/serve/RouterTest.java +++ b/java/serve/src/test/java/io/ray/serve/RouterTest.java @@ -23,7 +23,6 @@ import org.testng.annotations.Test; public class RouterTest extends BaseTest { - @Test public void test() { init(); @@ -41,7 +40,7 @@ public void test() { // Controller ActorHandle controllerHandle = - Ray.actor(DummyServeController::new, "", "").setName(controllerName).remote(); + Ray.actor(DummyServeController::new, "").setName(controllerName).remote(); // Replica DeploymentConfig deploymentConfig = diff --git a/java/serve/src/test/java/io/ray/serve/api/ClientTest.java b/java/serve/src/test/java/io/ray/serve/api/ClientTest.java index 3e1d85d9a2c7a..596da728f0402 100644 --- a/java/serve/src/test/java/io/ray/serve/api/ClientTest.java +++ b/java/serve/src/test/java/io/ray/serve/api/ClientTest.java @@ -15,7 +15,6 @@ import org.testng.annotations.Test; public class ClientTest { - @Test public void getHandleTest() { boolean inited = Ray.isInitialized(); @@ -34,7 +33,7 @@ public void getHandleTest() { // Controller. ActorHandle controllerHandle = - Ray.actor(DummyServeController::new, "", "").setName(controllerName).remote(); + Ray.actor(DummyServeController::new, "").setName(controllerName).remote(); // Set ReplicaContext Serve.setInternalReplicaContext(null, null, controllerName, null, config); diff --git a/java/serve/src/test/java/io/ray/serve/api/ServeTest.java b/java/serve/src/test/java/io/ray/serve/api/ServeTest.java index 0d5120e6add8b..11f72ec4c92f0 100644 --- a/java/serve/src/test/java/io/ray/serve/api/ServeTest.java +++ b/java/serve/src/test/java/io/ray/serve/api/ServeTest.java @@ -12,10 +12,8 @@ import org.testng.annotations.Test; public class ServeTest extends BaseTest { - @Test public void replicaContextTest() { - try { // Test context setting and getting. String deploymentName = "deploymentName"; @@ -52,7 +50,7 @@ public void getGlobalClientTest() { CommonUtil.formatActorName( Constants.SERVE_CONTROLLER_NAME, RandomStringUtils.randomAlphabetic(6)); ActorHandle actorHandle = - Ray.actor(DummyServeController::new, "", "").setName(controllerName).remote(); + Ray.actor(DummyServeController::new, "").setName(controllerName).remote(); Serve.setInternalReplicaContext(null, null, controllerName, null, null); client = Serve.getGlobalClient(); Assert.assertNotNull(client); diff --git a/java/serve/src/test/java/io/ray/serve/poll/LongPollClientTest.java b/java/serve/src/test/java/io/ray/serve/poll/LongPollClientTest.java index d6803004eb45b..db32d3aa0dafb 100644 --- a/java/serve/src/test/java/io/ray/serve/poll/LongPollClientTest.java +++ b/java/serve/src/test/java/io/ray/serve/poll/LongPollClientTest.java @@ -18,7 +18,6 @@ import org.testng.annotations.Test; public class LongPollClientTest { - @Test public void disableTest() throws Throwable { Map config = new HashMap<>(); @@ -48,7 +47,7 @@ public void normalTest() throws Throwable { CommonUtil.formatActorName( Constants.SERVE_CONTROLLER_NAME, RandomStringUtils.randomAlphabetic(6)); ActorHandle controllerHandle = - Ray.actor(DummyServeController::new, "", "").setName(controllerName).remote(); + Ray.actor(DummyServeController::new, "").setName(controllerName).remote(); Serve.setInternalReplicaContext(null, null, controllerName, null, null); diff --git a/python/ray/serve/api.py b/python/ray/serve/api.py index c4026eafc3992..44641e923f526 100644 --- a/python/ray/serve/api.py +++ b/python/ray/serve/api.py @@ -21,7 +21,6 @@ from ray.serve.config import AutoscalingConfig, DeploymentConfig, HTTPOptions from ray.serve.constants import ( CONTROLLER_MAX_CONCURRENCY, - DEFAULT_CHECKPOINT_PATH, DEFAULT_HTTP_HOST, DEFAULT_HTTP_PORT, HTTP_PROXY_TIMEOUT, @@ -60,7 +59,6 @@ def start( detached: bool = False, http_options: Optional[Union[dict, HTTPOptions]] = None, dedicated_cpu: bool = False, - _checkpoint_path: str = DEFAULT_CHECKPOINT_PATH, **kwargs, ) -> ServeControllerClient: """Initialize a serve instance. @@ -121,7 +119,7 @@ def start( f'Connecting to existing Serve app in namespace "{SERVE_NAMESPACE}".' ) - _check_http_and_checkpoint_options(client, http_options, _checkpoint_path) + _check_http_options(client, http_options) return client except RayServeException: pass @@ -154,7 +152,6 @@ def start( ).remote( controller_name, http_config=http_options, - checkpoint_path=_checkpoint_path, head_node_id=head_node_id, detached=detached, ) @@ -642,18 +639,9 @@ def build(target: Union[ClassNode, FunctionNode]) -> Application: return Application(pipeline_build(target)) -def _check_http_and_checkpoint_options( - client: ServeControllerClient, - http_options: Union[dict, HTTPOptions], - checkpoint_path: str, +def _check_http_options( + client: ServeControllerClient, http_options: Union[dict, HTTPOptions] ) -> None: - if checkpoint_path and checkpoint_path != client.checkpoint_path: - logger.warning( - f"The new client checkpoint path '{checkpoint_path}' " - f"is different from the existing one '{client.checkpoint_path}'. " - "The new checkpoint path is ignored." - ) - if http_options: client_http_options = client.http_config new_http_options = ( diff --git a/python/ray/serve/client.py b/python/ray/serve/client.py index 0f4ad21d70c2b..7069b3ec3c529 100644 --- a/python/ray/serve/client.py +++ b/python/ray/serve/client.py @@ -51,7 +51,6 @@ def __init__( self._shutdown = False self._http_config: HTTPOptions = ray.get(controller.get_http_config.remote()) self._root_url = ray.get(controller.get_root_url.remote()) - self._checkpoint_path = ray.get(controller.get_checkpoint_path.remote()) # Each handle has the overhead of long poll client, therefore cached. self.handle_cache = dict() @@ -75,10 +74,6 @@ def root_url(self): def http_config(self): return self._http_config - @property - def checkpoint_path(self): - return self._checkpoint_path - def __del__(self): if not self._detached: logger.debug( diff --git a/python/ray/serve/constants.py b/python/ray/serve/constants.py index 9d2975aecc740..aec04a6a58c94 100644 --- a/python/ray/serve/constants.py +++ b/python/ray/serve/constants.py @@ -25,9 +25,6 @@ #: HTTP Port DEFAULT_HTTP_PORT = 8000 -#: Controller checkpoint path -DEFAULT_CHECKPOINT_PATH = "ray://" - #: Max concurrency ASYNC_CONCURRENCY = int(1e6) diff --git a/python/ray/serve/controller.py b/python/ray/serve/controller.py index 41d492e70c8b9..43227d32a216e 100644 --- a/python/ray/serve/controller.py +++ b/python/ray/serve/controller.py @@ -38,7 +38,6 @@ from ray.serve.logging_utils import configure_component_logger from ray.serve.long_poll import LongPollHost from ray.serve.schema import ServeApplicationSchema -from ray.serve.storage.checkpoint_path import make_kv_store from ray.serve.storage.kv_store import RayInternalKVStore from ray.serve.utils import ( override_runtime_envs_except_env_vars, @@ -86,7 +85,6 @@ async def __init__( controller_name: str, *, http_config: HTTPOptions, - checkpoint_path: str, head_node_id: str, detached: bool = False, ): @@ -97,9 +95,8 @@ async def __init__( # Used to read/write checkpoints. self.ray_worker_namespace = ray.get_runtime_context().namespace self.controller_name = controller_name - self.checkpoint_path = checkpoint_path kv_store_namespace = f"{self.controller_name}-{self.ray_worker_namespace}" - self.kv_store = make_kv_store(checkpoint_path, namespace=kv_store_namespace) + self.kv_store = RayInternalKVStore(kv_store_namespace) self.snapshot_store = RayInternalKVStore(namespace=kv_store_namespace) # Dictionary of deployment_name -> proxy_name -> queue length. @@ -193,9 +190,6 @@ async def listen_for_change_java(self, keys_to_snapshot_ids_bytes: bytes): self.long_poll_host.listen_for_change_java(keys_to_snapshot_ids_bytes) ) - def get_checkpoint_path(self) -> str: - return self.checkpoint_path - def get_all_endpoints(self) -> Dict[EndpointTag, Dict[str, Any]]: """Returns a dictionary of deployment name to config.""" return self.endpoint_state.get_endpoints() @@ -626,7 +620,6 @@ class ServeControllerAvatar: def __init__( self, controller_name: str, - checkpoint_path: str, detached: bool = False, dedicated_cpu: bool = False, http_proxy_port: int = 8000, @@ -657,7 +650,6 @@ def __init__( ).remote( controller_name, http_config=http_config, - checkpoint_path=checkpoint_path, head_node_id=head_node_id, detached=detached, ) diff --git a/python/ray/serve/scripts.py b/python/ray/serve/scripts.py index 94f1614ff1c8b..ab5f46c041028 100644 --- a/python/ray/serve/scripts.py +++ b/python/ray/serve/scripts.py @@ -17,7 +17,6 @@ from ray.serve.api import build as build_app from ray.serve.config import DeploymentMode from ray.serve.constants import ( - DEFAULT_CHECKPOINT_PATH, DEFAULT_HTTP_HOST, DEFAULT_HTTP_PORT, SERVE_NAMESPACE, @@ -78,20 +77,7 @@ def cli(): type=click.Choice(list(DeploymentMode)), help="Location of the HTTP servers. Defaults to HeadOnly.", ) -@click.option( - "--checkpoint-path", - default=DEFAULT_CHECKPOINT_PATH, - required=False, - type=str, - hidden=True, -) -def start( - address, - http_host, - http_port, - http_location, - checkpoint_path, -): +def start(address, http_host, http_port, http_location): ray.init( address=address, namespace=SERVE_NAMESPACE, @@ -103,7 +89,6 @@ def start( port=http_port, location=http_location, ), - _checkpoint_path=checkpoint_path, ) diff --git a/python/ray/serve/storage/checkpoint_path.py b/python/ray/serve/storage/checkpoint_path.py deleted file mode 100644 index 8feaa09b4942d..0000000000000 --- a/python/ray/serve/storage/checkpoint_path.py +++ /dev/null @@ -1,86 +0,0 @@ -import logging -from urllib.parse import parse_qsl, urlparse - -from ray._private.utils import import_attr -from ray.serve.constants import DEFAULT_CHECKPOINT_PATH, SERVE_LOGGER_NAME -from ray.serve.storage.kv_store import RayInternalKVStore, RayLocalKVStore, RayS3KVStore -from ray.serve.storage.kv_store_base import KVStoreBase -from ray.serve.storage.ray_gcs_kv_store import RayGcsKVStore - -logger = logging.getLogger(SERVE_LOGGER_NAME) - - -def make_kv_store(checkpoint_path, namespace): - """Create KVStore instance based on checkpoint_path configuration""" - - if checkpoint_path == DEFAULT_CHECKPOINT_PATH: - logger.info("Using RayInternalKVStore for controller checkpoint and recovery.") - return RayInternalKVStore(namespace) - else: - parsed_url = urlparse(checkpoint_path) - if parsed_url.scheme not in {"gs", "s3", "file", "custom"}: - raise ValueError( - f"Checkpoint must be one of `{DEFAULT_CHECKPOINT_PATH}`, " - "`file://path...`, `gs://path...`, `s3://path...`, or " - "`custom://my_module.ClassName?arg1=val1`. But it is " - f"{checkpoint_path}" - ) - - if parsed_url.scheme == "file": - db_path = parsed_url.netloc + parsed_url.path - logger.info( - "Using RayLocalKVStore for controller " - f"checkpoint and recovery: path={db_path}" - ) - return RayLocalKVStore(namespace, db_path) - - if parsed_url.scheme == "gs": - bucket = parsed_url.netloc - # We need to strip leading "/" in path as right key to use in - # gcs. Ex: gs://bucket/folder/file.zip -> key = "folder/file.zip" - prefix = parsed_url.path.lstrip("/") - logger.info( - "Using Ray GCS KVStore for controller checkpoint and" - " recovery: " - f"bucket={bucket} checkpoint_path={checkpoint_path}" - ) - return RayGcsKVStore( - namespace, - bucket=bucket, - prefix=prefix, - ) - - if parsed_url.scheme == "s3": - bucket = parsed_url.netloc - # We need to strip leading "/" in path as right key to use in - # boto3. Ex: s3://bucket/folder/file.zip -> key = "folder/file.zip" - prefix = parsed_url.path.lstrip("/") - logger.info( - "Using Ray S3 KVStore for controller checkpoint and recovery: " - f"bucket={bucket} checkpoint_path={checkpoint_path}" - ) - return RayS3KVStore( - namespace, - bucket=bucket, - prefix=prefix, - ) - - if parsed_url.scheme == "custom": - kwargs = dict(parse_qsl(parsed_url.query)) - - # Prepare the parameters to initialize imported class. - checkpoint_provider = parsed_url.netloc - KVStoreClass = import_attr(checkpoint_provider) - if not issubclass(KVStoreClass, KVStoreBase): - raise ValueError( - f"{KVStoreClass} doesn't inherit from " - "`ray.serve.storage.kv_store_base.KVStoreBase`." - ) - - logger.info( - f"Using {checkpoint_provider} for controller checkpoint and " - f"recovery: kwargs={kwargs}" - ) - return KVStoreClass(namespace=namespace, **kwargs) - - raise RuntimeError("This shouldn't be reachable.") diff --git a/python/ray/serve/storage/kv_store.py b/python/ray/serve/storage/kv_store.py index 66e01372ce17e..1bc58f51d9297 100644 --- a/python/ray/serve/storage/kv_store.py +++ b/python/ray/serve/storage/kv_store.py @@ -1,6 +1,4 @@ import logging -import os -import sqlite3 from typing import Optional import ray @@ -9,12 +7,6 @@ from ray.serve.constants import RAY_SERVE_KV_TIMEOUT_S, SERVE_LOGGER_NAME from ray.serve.storage.kv_store_base import KVStoreBase -try: - import boto3 - from botocore.exceptions import ClientError -except ImportError: - boto3 = None - logger = logging.getLogger(SERVE_LOGGER_NAME) @@ -111,213 +103,3 @@ def delete(self, key: str): ) except Exception as e: raise KVStoreError(e.code()) - - -class RayLocalKVStore(KVStoreBase): - """Persistent version of KVStoreBase for cluster fault - tolerance. Writes to local disk by sqlite3 with given db_path. - - Supports only string type for key and bytes type for value, - caller must handle serialization. - """ - - def __init__( - self, - namepsace: str, - db_path: str, - ): - if len(db_path) == 0: - raise ValueError("LocalKVStore's path shouldn't be empty.") - - # Ensture that parent directory is created. - parent_dir = os.path.split(db_path)[0] - if parent_dir: - os.makedirs(parent_dir, exist_ok=True) - - self._namespace = namepsace - self._conn = sqlite3.connect(db_path) - cursor = self._conn.cursor() - cursor.execute( - f'CREATE TABLE IF NOT EXISTS "{self._namespace}"' - "(key TEXT UNIQUE, value BLOB)" - ) - self._conn.commit() - - def get_storage_key(self, key: str) -> str: - return "{ns}-{key}".format(ns=self._namespace, key=key) - - def put(self, key: str, val: bytes) -> bool: - """Put the key-value pair into the store. - - Args: - key (str) - val (bytes) - """ - if not isinstance(key, str): - raise TypeError(f"key must be a string, got: {type(key)}.") - if not isinstance(val, bytes): - raise TypeError(f"val must be bytes, got: {type(val)}.") - - cursor = self._conn.cursor() - cursor.execute( - f'INSERT OR REPLACE INTO "{self._namespace}" ' "(key, value) VALUES (?,?)", - (self.get_storage_key(key), val), - ) - self._conn.commit() - return True - - def get(self, key: str) -> Optional[bytes]: - """Get the value associated with the given key from the store. - - Args: - key (str) - - Returns: - The bytes value. If the key wasn't found, returns None. - """ - if not isinstance(key, str): - raise TypeError(f"key must be a string, got: {type(key)}.") - - cursor = self._conn.cursor() - result = list( - cursor.execute( - f'SELECT value FROM "{self._namespace}" WHERE key = (?)', - (self.get_storage_key(key),), - ) - ) - if len(result) == 0: - return None - else: - # Due to UNIQUE constraint, there can only be one value. - value, *_ = result[0] - return value - - def delete(self, key: str): - """Delete the value associated with the given key from the store. - - Args: - key (str) - """ - - if not isinstance(key, str): - raise TypeError("key must be a string, got: {}.".format(type(key))) - - cursor = self._conn.cursor() - cursor.execute( - f'DELETE FROM "{self._namespace}" ' "WHERE key = (?)", - (self.get_storage_key(key),), - ) - self._conn.commit() - - -class RayS3KVStore(KVStoreBase): - """Persistent version of KVStoreBase for cluster fault - tolerance. Writes to S3 bucket with provided path and credentials. - - Supports only string type for key and bytes type for value, - caller must handle serialization. - """ - - def __init__( - self, - namespace: str, - bucket="", - prefix="", - region_name="us-west-2", - aws_access_key_id=None, - aws_secret_access_key=None, - aws_session_token=None, - endpoint_url=None, - ): - self._namespace = namespace - self._bucket = bucket - self._prefix = prefix - if not boto3: - raise ImportError( - "You tried to use S3KVstore client without boto3 installed." - "Please run `pip install boto3`" - ) - self._s3 = boto3.client( - "s3", - region_name=region_name, - aws_access_key_id=aws_access_key_id, - aws_secret_access_key=aws_secret_access_key, - aws_session_token=aws_session_token, - endpoint_url=endpoint_url, - ) - - def get_storage_key(self, key: str) -> str: - return f"{self._prefix}/{self._namespace}-{key}" - - def put(self, key: str, val: bytes) -> bool: - """Put the key-value pair into the store. - - Args: - key (str) - val (bytes) - """ - if not isinstance(key, str): - raise TypeError("key must be a string, got: {}.".format(type(key))) - if not isinstance(val, bytes): - raise TypeError("val must be bytes, got: {}.".format(type(val))) - - try: - self._s3.put_object( - Body=val, Bucket=self._bucket, Key=self.get_storage_key(key) - ) - except ClientError as e: - message = e.response["Error"]["Message"] - logger.error( - f"Encountered ClientError while calling put() " - f"in RayExternalKVStore: {message}" - ) - raise e - - def get(self, key: str) -> Optional[bytes]: - """Get the value associated with the given key from the store. - - Args: - key (str) - - Returns: - The bytes value. If the key wasn't found, returns None. - """ - if not isinstance(key, str): - raise TypeError("key must be a string, got: {}.".format(type(key))) - - try: - response = self._s3.get_object( - Bucket=self._bucket, Key=self.get_storage_key(key) - ) - return response["Body"].read() - except ClientError as e: - if e.response["Error"]["Code"] == "NoSuchKey": - logger.warning(f"No such key in s3 for key = {key}") - return None - else: - message = e.response["Error"]["Message"] - logger.error( - f"Encountered ClientError while calling get() " - f"in RayExternalKVStore: {message}" - ) - raise e - - def delete(self, key: str): - """Delete the value associated with the given key from the store. - - Args: - key (str) - """ - - if not isinstance(key, str): - raise TypeError("key must be a string, got: {}.".format(type(key))) - - try: - self._s3.delete_object(Bucket=self._bucket, Key=self.get_storage_key(key)) - except ClientError as e: - message = e.response["Error"]["Message"] - logger.error( - f"Encountered ClientError while calling delete() " - f"in RayExternalKVStore: {message}" - ) - raise e diff --git a/python/ray/serve/storage/ray_gcs_kv_store.py b/python/ray/serve/storage/ray_gcs_kv_store.py deleted file mode 100644 index f48ee32081c0b..0000000000000 --- a/python/ray/serve/storage/ray_gcs_kv_store.py +++ /dev/null @@ -1,108 +0,0 @@ -import io -import logging -from typing import Optional - -try: - from google.cloud import storage - from google.cloud.exceptions import NotFound -except ImportError: - storage = None - -from ray.serve.constants import SERVE_LOGGER_NAME -from ray.serve.storage.kv_store_base import KVStoreBase - -logger = logging.getLogger(SERVE_LOGGER_NAME) - - -class RayGcsKVStore(KVStoreBase): - """Persistent version of KVStoreBase for cluster fault - tolerance. Writes to GCS bucket with provided path and credentials. - - Supports only string type for key and bytes type for value, - caller must handle serialization. - """ - - def __init__( - self, - namespace: str, - bucket="", - prefix="", - ): - self._namespace = namespace - self._bucket = bucket - self._prefix = prefix + "/" if prefix else "" - if not storage: - raise ImportError( - "You tried to use RayGcsKVStore client without" - "google-cloud-storage installed." - "Please run `pip install google-cloud-storage`" - ) - self._gcs = storage.Client() - self._bucket = self._gcs.bucket(bucket) - - def get_storage_key(self, key: str) -> str: - return f"{self._prefix}{self._namespace}-{key}" - - def put(self, key: str, val: bytes) -> bool: - """Put the key-value pair into the store. - - Args: - key (str) - val (bytes) - """ - if not isinstance(key, str): - raise TypeError("key must be a string, got: {}.".format(type(key))) - if not isinstance(val, bytes): - raise TypeError("val must be bytes, got: {}.".format(type(val))) - - try: - blob = self._bucket.blob(blob_name=self.get_storage_key(key)) - f = io.BytesIO(val) - blob.upload_from_file(f, num_retries=5) - except Exception as e: - message = str(e) - logger.error( - f"Encountered ClientError while calling put() " - f"in RayExternalKVStore: {message}" - ) - raise e - - def get(self, key: str) -> Optional[bytes]: - """Get the value associated with the given key from the store. - - Args: - key (str) - - Returns: - The bytes value. If the key wasn't found, returns None. - """ - if not isinstance(key, str): - raise TypeError("key must be a string, got: {}.".format(type(key))) - - try: - blob = self._bucket.blob(blob_name=self.get_storage_key(key)) - return blob.download_as_bytes() - except NotFound: - logger.warning(f"No such key in GCS for key = {key}") - return None - - def delete(self, key: str): - """Delete the value associated with the given key from the store. - - Args: - key (str) - """ - - if not isinstance(key, str): - raise TypeError("key must be a string, got: {}.".format(type(key))) - - try: - blob_name = self.get_storage_key(key) - blob = self._bucket.blob(blob_name=blob_name) - blob.delete() - except NotFound: - logger.error( - f"Encountered ClientError while calling delete() " - f"in RayExternalKVStore - " - f"Blob {blob_name} was not found!" - ) diff --git a/python/ray/serve/tests/storage_tests/test_kv_store.py b/python/ray/serve/tests/storage_tests/test_kv_store.py index 26460248de5cd..640908220a4e1 100644 --- a/python/ray/serve/tests/storage_tests/test_kv_store.py +++ b/python/ray/serve/tests/storage_tests/test_kv_store.py @@ -1,15 +1,6 @@ -import os -import tempfile -from typing import Optional - import pytest -from ray._private.test_utils import simulate_storage -from ray.serve.constants import DEFAULT_CHECKPOINT_PATH -from ray.serve.storage.checkpoint_path import make_kv_store -from ray.serve.storage.kv_store import RayInternalKVStore, RayLocalKVStore, RayS3KVStore -from ray.serve.storage.kv_store_base import KVStoreBase -from ray.serve.storage.ray_gcs_kv_store import RayGcsKVStore +from ray.serve.storage.kv_store import RayInternalKVStore def test_ray_internal_kv(serve_instance): # noqa: F811 @@ -49,138 +40,6 @@ def test_ray_internal_kv_collisions(serve_instance): # noqa: F811 assert kv1.get("1") == b"1" -def _test_operations(kv_store): - # Trival get & put - kv_store.put("1", b"1") - assert kv_store.get("1") == b"1" - kv_store.put("2", b"2") - assert kv_store.get("1") == b"1" - assert kv_store.get("2") == b"2" - - # Overwrite same key - kv_store.put("1", b"-1") - assert kv_store.get("1") == b"-1" - - # Get non-existing key - assert kv_store.get("3") is None - - # Delete existing key - kv_store.delete("1") - kv_store.delete("2") - assert kv_store.get("1") is None - assert kv_store.get("2") is None - - # Delete non-existing key - kv_store.delete("3") - - -def test_external_kv_local_disk(): - kv_store = RayLocalKVStore( - "namespace", os.path.join(tempfile.gettempdir(), "test_kv_store.db") - ) - - _test_operations(kv_store) - - -def test_external_kv_aws_s3(): - with simulate_storage("s3", "serve-test") as uri: - from urllib.parse import parse_qs, urlparse - - o = urlparse(uri) - qs = parse_qs(o.query) - region_name = qs["region"][0] - endpoint_url = qs["endpoint_override"][0] - - import boto3 - - s3 = boto3.client( - "s3", - region_name=region_name, - endpoint_url=endpoint_url, - ) - s3.create_bucket( - Bucket="serve-test", - CreateBucketConfiguration={"LocationConstraint": "us-west-2"}, - ) - - kv_store = RayS3KVStore( - "namespace", - bucket="serve-test", - prefix="checkpoint", - region_name=region_name, - endpoint_url=endpoint_url, - ) - - _test_operations(kv_store) - - -@pytest.mark.skip(reason="Need to figure out credentials for testing") -def test_external_kv_gcs(): - kv_store = RayGcsKVStore( - "namespace", - bucket="jiao-test", - prefix="/checkpoint", - ) - - _test_operations(kv_store) - - -class MyNonCompliantStoreCls: - pass - - -class MyCustomStorageCls(KVStoreBase): - def __init__(self, namespace, **kwargs): - self.namespace = namespace - self.kwargs = kwargs - - def delete(self, key: str) -> None: - return super().delete(key) - - def get(self, key: str) -> Optional[bytes]: - return super().get(key) - - def get_storage_key(self, key: str) -> str: - return super().get_storage_key(key) - - def put(self, key: str, val: bytes) -> bool: - return super().put(key, val) - - -def test_make_kv_store(serve_instance): - namespace = "ns" - assert isinstance( - make_kv_store(DEFAULT_CHECKPOINT_PATH, namespace), RayInternalKVStore - ) - assert isinstance( - make_kv_store("file:///tmp/deep/dir/my_path", namespace), RayLocalKVStore - ) - assert isinstance( - make_kv_store("s3://object_store/my_path", namespace), RayS3KVStore - ) - - with pytest.raises(ValueError, match="shouldn't be empty"): - # Empty path - make_kv_store("file://", namespace) - - with pytest.raises(ValueError, match="must be one of"): - # Wrong prefix - make_kv_store("s4://some_path", namespace) - - module_name = "ray.serve.tests.storage_tests.test_kv_store" - with pytest.raises(ValueError, match="doesn't inherit"): - make_kv_store( - f"custom://{module_name}.MyNonCompliantStoreCls", namespace=namespace - ) - - store = make_kv_store( - f"custom://{module_name}.MyCustomStorageCls?arg1=val1&arg2=val2", - namespace=namespace, - ) - assert store.namespace == namespace - assert store.kwargs == {"arg1": "val1", "arg2": "val2"} - - if __name__ == "__main__": import sys diff --git a/python/ray/serve/tests/test_deployment_state.py b/python/ray/serve/tests/test_deployment_state.py index ceb05484ba3f2..e654e2d4b3b3e 100644 --- a/python/ray/serve/tests/test_deployment_state.py +++ b/python/ray/serve/tests/test_deployment_state.py @@ -1,5 +1,4 @@ from dataclasses import dataclass -import os import sys import time from typing import Any, Dict, List, Optional, Tuple @@ -26,10 +25,9 @@ ReplicaState, ReplicaStateContainer, VersionedReplica, - CHECKPOINT_KEY, rank_replicas_for_stopping, ) -from ray.serve.storage.kv_store import RayLocalKVStore +from ray.serve.storage.kv_store import RayInternalKVStore from ray.serve.utils import get_random_letters @@ -1937,6 +1935,7 @@ def test_deploy_with_transient_constructor_failure(mock_deployment_state): @pytest.fixture def mock_deployment_state_manager() -> Tuple[DeploymentStateManager, Mock]: + ray.init() timer = MockTimer() with patch( "ray.serve.deployment_state.ActorReplicaWrapper", new=MockReplicaActorWrapper @@ -1944,7 +1943,7 @@ def mock_deployment_state_manager() -> Tuple[DeploymentStateManager, Mock]: "ray.serve.long_poll.LongPollHost" ) as mock_long_poll: - kv_store = RayLocalKVStore("TEST_DB", "test_kv_store.db") + kv_store = RayInternalKVStore("test") all_current_actor_names = [] deployment_state_manager = DeploymentStateManager( "name", @@ -1954,11 +1953,7 @@ def mock_deployment_state_manager() -> Tuple[DeploymentStateManager, Mock]: all_current_actor_names, ) yield deployment_state_manager, timer - # Clear checkpoint at the end of each test - kv_store.delete(CHECKPOINT_KEY) - if sys.platform != "win32": - # This line fails on windows with a PermissionError. - os.remove("test_kv_store.db") + ray.shutdown() def test_shutdown(mock_deployment_state_manager): diff --git a/python/ray/serve/tests/test_standalone.py b/python/ray/serve/tests/test_standalone.py index f3cb00fd4c1f0..56493e4ca5bd8 100644 --- a/python/ray/serve/tests/test_standalone.py +++ b/python/ray/serve/tests/test_standalone.py @@ -5,10 +5,8 @@ import logging import os import socket -import subprocess import sys import time -from tempfile import mkstemp import pydantic import pytest @@ -568,69 +566,11 @@ class A: ) -def test_local_store_recovery(ray_shutdown): - _, tmp_path = mkstemp() - - @serve.deployment - def hello(_): - return "hello" - - # https://github.com/ray-project/ray/issues/19987 - @serve.deployment - def world(_): - return "world" - - def check(name, raise_error=False): - try: - resp = requests.get(f"http://localhost:8000/{name}") - assert resp.text == name - return True - except Exception as e: - if raise_error: - raise e - return False - - # https://github.com/ray-project/ray/issues/20159 - # https://github.com/ray-project/ray/issues/20158 - def clean_up_leaked_processes(): - import psutil - - for proc in psutil.process_iter(): - try: - cmdline = " ".join(proc.cmdline()) - if "ray::" in cmdline: - print(f"Kill {proc} {cmdline}") - proc.kill() - except Exception: - pass - - def crash(): - subprocess.call(["ray", "stop", "--force"]) - clean_up_leaked_processes() - ray.shutdown() - serve.shutdown() - - serve.start(detached=True, _checkpoint_path=f"file://{tmp_path}") - hello.deploy() - world.deploy() - assert check("hello", raise_error=True) - assert check("world", raise_error=True) - crash() - - # Simulate a crash - - serve.start(detached=True, _checkpoint_path=f"file://{tmp_path}") - wait_for_condition(lambda: check("hello")) - # wait_for_condition(lambda: check("world")) - crash() - - @pytest.mark.parametrize("ray_start_with_dashboard", [{"num_cpus": 4}], indirect=True) def test_snapshot_always_written_to_internal_kv( ray_start_with_dashboard, ray_shutdown # noqa: F811 ): # https://github.com/ray-project/ray/issues/19752 - _, tmp_path = mkstemp() @serve.deployment() def hello(_): @@ -644,7 +584,7 @@ def check(): except Exception: return False - serve.start(detached=True, _checkpoint_path=f"file://{tmp_path}") + serve.start(detached=True) serve.run(hello.bind()) check() @@ -687,12 +627,10 @@ def emit(self, record): # create a different config test_http = dict(host="127.1.1.8", port=new_port()) - _, tmp_path = mkstemp() - test_ckpt = f"file://{tmp_path}" - serve.start(detached=True, http_options=test_http, _checkpoint_path=test_ckpt) + serve.start(detached=True, http_options=test_http) - for test_config, msg in zip([[test_ckpt], ["host", "port"]], warning_msg): + for test_config, msg in zip([["host", "port"]], warning_msg): for test_msg in test_config: if "Autoscaling metrics pusher thread" in msg: continue diff --git a/release/serve_tests/workloads/serve_test_cluster_utils.py b/release/serve_tests/workloads/serve_test_cluster_utils.py index 21e67bf82e3ee..9f7ce39d7e133 100644 --- a/release/serve_tests/workloads/serve_test_cluster_utils.py +++ b/release/serve_tests/workloads/serve_test_cluster_utils.py @@ -7,7 +7,6 @@ from ray._private.test_utils import monitor_memory_usage from ray.cluster_utils import Cluster from ray.serve.config import DeploymentMode -from ray.serve.constants import DEFAULT_CHECKPOINT_PATH logger = logging.getLogger(__file__) @@ -19,7 +18,6 @@ def setup_local_single_node_cluster( num_nodes: int, num_cpu_per_node=NUM_CPU_PER_NODE, - checkpoint_path: str = DEFAULT_CHECKPOINT_PATH, namespace="serve", ): """Setup ray cluster locally via ray.init() and Cluster() @@ -37,15 +35,13 @@ def setup_local_single_node_cluster( ) ray.init(address=cluster.address, dashboard_host="0.0.0.0", namespace=namespace) serve_client = serve.start( - detached=True, - http_options={"location": DeploymentMode.EveryNode}, - _checkpoint_path=checkpoint_path, + detached=True, http_options={"location": DeploymentMode.EveryNode} ) return serve_client, cluster -def setup_anyscale_cluster(checkpoint_path: str = DEFAULT_CHECKPOINT_PATH): +def setup_anyscale_cluster(): """Setup ray cluster at anyscale via ray.client() Note this is by default large scale and should be kicked off @@ -61,10 +57,7 @@ def setup_anyscale_cluster(checkpoint_path: str = DEFAULT_CHECKPOINT_PATH): # to reduce spam. runtime_env={"env_vars": {"SERVE_ENABLE_SCALING_LOG": "0"}}, ) - serve_client = serve.start( - http_options={"location": DeploymentMode.EveryNode}, - _checkpoint_path=checkpoint_path, - ) + serve_client = serve.start(http_options={"location": DeploymentMode.EveryNode}) # Print memory usage on the head node to help diagnose/debug memory leaks. monitor_memory_usage()