From bd383e912a55f0afbd9cc3c239771dbbf3dcb900 Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Wed, 26 Jun 2024 01:47:21 -0700 Subject: [PATCH 01/23] [Core] Add docker run options (#3682) * Add docker run options * Add docs * Add warning for docker run options in kubernetes * Update docs/source/reference/config.rst Co-authored-by: Romil Bhardwaj * update * update doc * Stream logs * allow changing the `run_options` --------- Co-authored-by: Romil Bhardwaj --- docs/source/reference/config.rst | 25 +++++++++++++ sky/backends/backend_utils.py | 15 ++++++++ sky/provision/docker_utils.py | 6 ++-- sky/provision/instance_setup.py | 55 ++++++++++++++++------------- sky/templates/aws-ray.yml.j2 | 3 ++ sky/templates/azure-ray.yml.j2 | 3 ++ sky/templates/gcp-ray.yml.j2 | 3 ++ sky/templates/paperspace-ray.yml.j2 | 3 ++ sky/utils/schemas.py | 18 ++++++++++ 9 files changed, 105 insertions(+), 26 deletions(-) diff --git a/docs/source/reference/config.rst b/docs/source/reference/config.rst index 96be48e71e3..ea744f925f1 100644 --- a/docs/source/reference/config.rst +++ b/docs/source/reference/config.rst @@ -40,6 +40,31 @@ Available fields and semantics: - gcp - kubernetes + docker: + # Additional Docker run options (optional). + # + # When image_id: docker: is used in a task YAML, additional + # run options for starting the Docker container can be specified here. + # These options will be passed directly as command line args to `docker run`, + # see: https://docs.docker.com/reference/cli/docker/container/run/ + # + # The following run options are applied by default and cannot be overridden: + # --net=host + # --cap-add=SYS_ADMIN + # --device=/dev/fuse + # --security-opt=apparmor:unconfined + # --runtime=nvidia # Applied if nvidia GPUs are detected on the host + # + # This field can be useful for mounting volumes and other advanced Docker + # configurations. You can specify a list of arguments or a string, where the + # former will be combined into a single string with spaces. The following is + # an example option for allowing running Docker inside Docker and increase + # the size of /dev/shm.: + # sky launch --cloud aws --image-id docker:continuumio/miniconda3 "apt update; apt install -y docker.io; docker run hello-world" + run_options: + - -v /var/run/docker.sock:/var/run/docker.sock + - --shm-size=2g + nvidia_gpus: # Disable ECC for NVIDIA GPUs (optional). # diff --git a/sky/backends/backend_utils.py b/sky/backends/backend_utils.py index 0989a3f9122..e760132068b 100644 --- a/sky/backends/backend_utils.py +++ b/sky/backends/backend_utils.py @@ -146,6 +146,7 @@ # Clouds with new provisioner has docker_login_config in the # docker field, instead of the provider field. ('docker', 'docker_login_config'), + ('docker', 'run_options'), # Other clouds ('provider', 'docker_login_config'), ('provider', 'firewall_rule'), @@ -873,6 +874,17 @@ def write_cluster_config( f'open(os.path.expanduser("{constants.SKY_REMOTE_RAY_PORT_FILE}"), "w", encoding="utf-8"))\'' ) + # Docker run options + docker_run_options = skypilot_config.get_nested(('docker', 'run_options'), + []) + if isinstance(docker_run_options, str): + docker_run_options = [docker_run_options] + if docker_run_options and isinstance(to_provision.cloud, clouds.Kubernetes): + logger.warning(f'{colorama.Style.DIM}Docker run options are specified, ' + 'but ignored for Kubernetes: ' + f'{" ".join(docker_run_options)}' + f'{colorama.Style.RESET_ALL}') + # Use a tmp file path to avoid incomplete YAML file being re-used in the # future. initial_setup_commands = [] @@ -923,6 +935,9 @@ def write_cluster_config( wheel_hash).replace('{cloud}', str(cloud).lower())), + # Docker + 'docker_run_options': docker_run_options, + # Port of Ray (GCS server). # Ray's default port 6379 is conflicted with Redis. 'ray_port': constants.SKY_REMOTE_RAY_PORT, diff --git a/sky/provision/docker_utils.py b/sky/provision/docker_utils.py index 046800ca9d1..9fbc19c2959 100644 --- a/sky/provision/docker_utils.py +++ b/sky/provision/docker_utils.py @@ -176,8 +176,10 @@ def _run(self, subprocess_utils.handle_returncode( rc, cmd, - error_msg='Failed to run docker setup commands', - stderr=stdout + stderr) + error_msg='Failed to run docker setup commands.', + stderr=stdout + stderr, + # Print out the error message if the command failed. + stream_logs=True) return stdout.strip() def initialize(self) -> str: diff --git a/sky/provision/instance_setup.py b/sky/provision/instance_setup.py index c81ecd78db4..1fb80ba542a 100644 --- a/sky/provision/instance_setup.py +++ b/sky/provision/instance_setup.py @@ -6,8 +6,9 @@ import os import resource import time -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Callable, Dict, List, Optional, Tuple +from sky import exceptions from sky import provision from sky import sky_logging from sky.provision import common @@ -68,29 +69,34 @@ 'sky.skylet.attempt_skylet;') -def _auto_retry(func): +def _auto_retry(should_retry: Callable[[Exception], bool] = lambda _: True): """Decorator that retries the function if it fails. This decorator is mostly for SSH disconnection issues, which might happen during the setup of instances. """ - @functools.wraps(func) - def retry(*args, **kwargs): - backoff = common_utils.Backoff(initial_backoff=1, max_backoff_factor=5) - for retry_cnt in range(_MAX_RETRY): - try: - return func(*args, **kwargs) - except Exception as e: # pylint: disable=broad-except - if retry_cnt >= _MAX_RETRY - 1: - raise e - sleep = backoff.current_backoff() - logger.info( - f'{func.__name__}: Retrying in {sleep:.1f} seconds, ' - f'due to {e}') - time.sleep(sleep) - - return retry + def decorator(func): + + @functools.wraps(func) + def retry(*args, **kwargs): + backoff = common_utils.Backoff(initial_backoff=1, + max_backoff_factor=5) + for retry_cnt in range(_MAX_RETRY): + try: + return func(*args, **kwargs) + except Exception as e: # pylint: disable=broad-except + if not should_retry(e) or retry_cnt >= _MAX_RETRY - 1: + raise + sleep = backoff.current_backoff() + logger.info( + f'{func.__name__}: Retrying in {sleep:.1f} seconds, ' + f'due to {e}') + time.sleep(sleep) + + return retry + + return decorator def _log_start_end(func): @@ -156,7 +162,8 @@ def initialize_docker(cluster_name: str, docker_config: Dict[str, Any], return None _hint_worker_log_path(cluster_name, cluster_info, 'initialize_docker') - @_auto_retry + @_auto_retry(should_retry=lambda e: isinstance(e, exceptions.CommandError) + and e.returncode == 255) def _initialize_docker(runner: command_runner.CommandRunner, log_path: str): docker_user = docker_utils.DockerInitializer(docker_config, runner, log_path).initialize() @@ -193,7 +200,7 @@ def setup_runtime_on_cluster(cluster_name: str, setup_commands: List[str], hasher.update(d) digest = hasher.hexdigest() - @_auto_retry + @_auto_retry() def _setup_node(runner: command_runner.CommandRunner, log_path: str): for cmd in setup_commands: returncode, stdout, stderr = runner.run( @@ -254,7 +261,7 @@ def _ray_gpu_options(custom_resource: str) -> str: @_log_start_end -@_auto_retry +@_auto_retry() def start_ray_on_head_node(cluster_name: str, custom_resource: Optional[str], cluster_info: common.ClusterInfo, ssh_credentials: Dict[str, Any]) -> None: @@ -314,7 +321,7 @@ def start_ray_on_head_node(cluster_name: str, custom_resource: Optional[str], @_log_start_end -@_auto_retry +@_auto_retry() def start_ray_on_worker_nodes(cluster_name: str, no_restart: bool, custom_resource: Optional[str], ray_port: int, cluster_info: common.ClusterInfo, @@ -411,7 +418,7 @@ def _setup_ray_worker(runner_and_id: Tuple[command_runner.CommandRunner, @_log_start_end -@_auto_retry +@_auto_retry() def start_skylet_on_head_node(cluster_name: str, cluster_info: common.ClusterInfo, ssh_credentials: Dict[str, Any]) -> None: @@ -437,7 +444,7 @@ def start_skylet_on_head_node(cluster_name: str, f'===== stderr ====={stderr}') -@_auto_retry +@_auto_retry() def _internal_file_mounts(file_mounts: Dict, runner: command_runner.CommandRunner, log_path: str) -> None: diff --git a/sky/templates/aws-ray.yml.j2 b/sky/templates/aws-ray.yml.j2 index 778c64f6926..ac84f8a4fd3 100644 --- a/sky/templates/aws-ray.yml.j2 +++ b/sky/templates/aws-ray.yml.j2 @@ -14,6 +14,9 @@ docker: {%- if custom_resources is not none %} --gpus all {%- endif %} + {%- for run_option in docker_run_options %} + - {{run_option}} + {%- endfor %} {%- if docker_login_config is not none %} docker_login_config: username: |- diff --git a/sky/templates/azure-ray.yml.j2 b/sky/templates/azure-ray.yml.j2 index 803327f1032..66eac439453 100644 --- a/sky/templates/azure-ray.yml.j2 +++ b/sky/templates/azure-ray.yml.j2 @@ -14,6 +14,9 @@ docker: {%- if custom_resources is not none %} --gpus all {%- endif %} + {%- for run_option in docker_run_options %} + - {{run_option}} + {%- endfor %} {%- endif %} provider: diff --git a/sky/templates/gcp-ray.yml.j2 b/sky/templates/gcp-ray.yml.j2 index 42f1d179498..e01ed351bfa 100644 --- a/sky/templates/gcp-ray.yml.j2 +++ b/sky/templates/gcp-ray.yml.j2 @@ -15,6 +15,9 @@ docker: {%- if gpu is not none %} --gpus all {%- endif %} + {%- for run_option in docker_run_options %} + - {{run_option}} + {%- endfor %} {%- if docker_login_config is not none %} docker_login_config: username: |- diff --git a/sky/templates/paperspace-ray.yml.j2 b/sky/templates/paperspace-ray.yml.j2 index 005f30b5233..400714978b9 100644 --- a/sky/templates/paperspace-ray.yml.j2 +++ b/sky/templates/paperspace-ray.yml.j2 @@ -14,6 +14,9 @@ docker: {%- if custom_resources is not none %} --gpus all {%- endif %} + {%- for run_option in docker_run_options %} + - {{run_option}} + {%- endfor %} {%- if docker_login_config is not none %} docker_login_config: username: |- diff --git a/sky/utils/schemas.py b/sky/utils/schemas.py index 97b46113da4..2f1dd649ade 100644 --- a/sky/utils/schemas.py +++ b/sky/utils/schemas.py @@ -757,6 +757,23 @@ def get_config_schema(): } } + docker_configs = { + 'type': 'object', + 'required': [], + 'additionalProperties': False, + 'properties': { + 'run_options': { + 'anyOf': [{ + 'type': 'string', + }, { + 'type': 'array', + 'items': { + 'type': 'string', + } + }] + } + } + } gpu_configs = { 'type': 'object', 'required': [], @@ -785,6 +802,7 @@ def get_config_schema(): 'spot': controller_resources_schema, 'serve': controller_resources_schema, 'allowed_clouds': allowed_clouds, + 'docker': docker_configs, 'nvidia_gpus': gpu_configs, **cloud_configs, }, From a51b50793b40f07686fc66b12eb781b898396566 Mon Sep 17 00:00:00 2001 From: Romil Bhardwaj Date: Thu, 27 Jun 2024 10:34:37 -0700 Subject: [PATCH 02/23] [Examples] Add vLLM container example (#3694) * add docker example * fix link --- llm/vllm/README.md | 2 ++ llm/vllm/serve-openai-api-docker.yaml | 20 ++++++++++++++++++++ 2 files changed, 22 insertions(+) create mode 100644 llm/vllm/serve-openai-api-docker.yaml diff --git a/llm/vllm/README.md b/llm/vllm/README.md index 61932cd8571..e3a2befbecc 100644 --- a/llm/vllm/README.md +++ b/llm/vllm/README.md @@ -33,6 +33,8 @@ sky launch -c vllm-llama2 serve-openai-api.yaml --env HF_TOKEN=YOUR_HUGGING_FACE ```bash sky launch -c vllm-llama2 serve-openai-api.yaml --gpus V100:1 --env HF_TOKEN=YOUR_HUGGING_FACE_API_TOKEN ``` +**Tip**: You can also use the vLLM docker container for faster setup. Refer to [serve-openai-api-docker.yaml](https://github.com/skypilot-org/skypilot/tree/master/llm/vllm/serve-openai-api-docker.yaml) for more. + 2. Check the IP for the cluster with: ``` IP=$(sky status --ip vllm-llama2) diff --git a/llm/vllm/serve-openai-api-docker.yaml b/llm/vllm/serve-openai-api-docker.yaml new file mode 100644 index 00000000000..0a980092e99 --- /dev/null +++ b/llm/vllm/serve-openai-api-docker.yaml @@ -0,0 +1,20 @@ +envs: + MODEL_NAME: meta-llama/Llama-2-7b-chat-hf + HF_TOKEN: # TODO: Fill with your own huggingface token, or use --env to pass. + +resources: + image_id: docker:vllm/vllm-openai:latest + accelerators: {L4:1, A10G:1, A10:1, A100:1, A100-80GB:1} + ports: + - 8000 + +setup: | + conda deactivate + python3 -c "import huggingface_hub; huggingface_hub.login('${HF_TOKEN}')" + +run: | + conda deactivate + echo 'Starting vllm openai api server...' + python -m vllm.entrypoints.openai.api_server \ + --model $MODEL_NAME --tokenizer hf-internal-testing/llama-tokenizer \ + --host 0.0.0.0 From 4821f70b3f4998821dd68c2afcdc7ff61b54ec46 Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Fri, 28 Jun 2024 18:48:22 -0700 Subject: [PATCH 03/23] [Azure] Avoid azure reconfig everytime, speed up launch by up to 5.8x (#3697) * Avoid azure reconfig everytime * Add debug message * format * Fix error handling * format * skip deployment recreation when deployment exist * Add retry for subscription ID * fix logging * format * comment --- sky/adaptors/azure.py | 23 ++++++++++-- sky/skylet/providers/azure/config.py | 40 +++++++++++++++------ sky/skylet/providers/azure/node_provider.py | 37 +++++++++---------- sky/utils/common_utils.py | 2 +- 4 files changed, 68 insertions(+), 34 deletions(-) diff --git a/sky/adaptors/azure.py b/sky/adaptors/azure.py index 44618a8f64f..6bd57bc6bec 100644 --- a/sky/adaptors/azure.py +++ b/sky/adaptors/azure.py @@ -3,8 +3,10 @@ # pylint: disable=import-outside-toplevel import functools import threading +import time from sky.adaptors import common +from sky.utils import common_utils azure = common.LazyImport( 'azure', @@ -13,13 +15,30 @@ _LAZY_MODULES = (azure,) _session_creation_lock = threading.RLock() +_MAX_RETRY_FOR_GET_SUBSCRIPTION_ID = 5 @common.load_lazy_modules(modules=_LAZY_MODULES) +@functools.lru_cache() def get_subscription_id() -> str: """Get the default subscription id.""" from azure.common import credentials - return credentials.get_cli_profile().get_subscription_id() + retry = 0 + backoff = common_utils.Backoff(initial_backoff=0.5, max_backoff_factor=4) + while True: + try: + return credentials.get_cli_profile().get_subscription_id() + except Exception as e: + if ('Please run \'az login\' to setup account.' in str(e) and + retry < _MAX_RETRY_FOR_GET_SUBSCRIPTION_ID): + # When there are multiple processes trying to get the + # subscription id, it may fail with the above error message. + # Retry will fix the issue. + retry += 1 + + time.sleep(backoff.current_backoff()) + continue + raise @common.load_lazy_modules(modules=_LAZY_MODULES) @@ -36,8 +55,8 @@ def exceptions(): return azure_exceptions -@functools.lru_cache() @common.load_lazy_modules(modules=_LAZY_MODULES) +@functools.lru_cache() def get_client(name: str, subscription_id: str): # Sky only supports Azure CLI credential for now. # Increase the timeout to fix the Azure get-access-token timeout issue. diff --git a/sky/skylet/providers/azure/config.py b/sky/skylet/providers/azure/config.py index a19273761ba..35008ef13d7 100644 --- a/sky/skylet/providers/azure/config.py +++ b/sky/skylet/providers/azure/config.py @@ -12,6 +12,7 @@ from azure.mgmt.resource import ResourceManagementClient from azure.mgmt.resource.resources.models import DeploymentMode +from sky.adaptors import azure from sky.utils import common_utils UNIQUE_ID_LEN = 4 @@ -120,17 +121,36 @@ def _configure_resource_group(config): create_or_update = get_azure_sdk_function( client=resource_client.deployments, function_name="create_or_update" ) - # TODO (skypilot): this takes a long time (> 40 seconds) for stopping an - # azure VM, and this can be called twice during ray down. - outputs = ( - create_or_update( - resource_group_name=resource_group, - deployment_name="ray-config", - parameters=parameters, - ) - .result() - .properties.outputs + # Skip creating or updating the deployment if the deployment already exists + # and the cluster name is the same. + get_deployment = get_azure_sdk_function( + client=resource_client.deployments, function_name="get" ) + deployment_exists = False + try: + deployment = get_deployment( + resource_group_name=resource_group, deployment_name="ray-config" + ) + logger.info("Deployment already exists. Skipping deployment creation.") + + outputs = deployment.properties.outputs + if outputs is not None: + deployment_exists = True + except azure.exceptions().ResourceNotFoundError: + deployment_exists = False + + if not deployment_exists: + # This takes a long time (> 40 seconds), we should be careful calling + # this function. + outputs = ( + create_or_update( + resource_group_name=resource_group, + deployment_name="ray-config", + parameters=parameters, + ) + .result() + .properties.outputs + ) # We should wait for the NSG to be created before opening any ports # to avoid overriding the newly-added NSG rules. diff --git a/sky/skylet/providers/azure/node_provider.py b/sky/skylet/providers/azure/node_provider.py index 068930eb390..b4a1c656688 100644 --- a/sky/skylet/providers/azure/node_provider.py +++ b/sky/skylet/providers/azure/node_provider.py @@ -11,11 +11,11 @@ from azure.mgmt.resource import ResourceManagementClient from azure.mgmt.resource.resources.models import DeploymentMode +from sky.adaptors import azure from sky.skylet.providers.azure.config import ( bootstrap_azure, get_azure_sdk_function, ) -from sky.skylet import autostop_lib from sky.skylet.providers.command_runner import SkyDockerCommandRunner from sky.provision import docker_utils @@ -62,23 +62,7 @@ class AzureNodeProvider(NodeProvider): def __init__(self, provider_config, cluster_name): NodeProvider.__init__(self, provider_config, cluster_name) - if not autostop_lib.get_is_autostopping(): - # TODO(suquark): This is a temporary patch for resource group. - # By default, Ray autoscaler assumes the resource group is still - # here even after the whole cluster is destroyed. However, now we - # deletes the resource group after tearing down the cluster. To - # comfort the autoscaler, we need to create/update it here, so the - # resource group always exists. - # - # We should not re-configure the resource group again, when it is - # running on the remote VM and the autostopping is in progress, - # because the VM is running which guarantees the resource group - # exists. - from sky.skylet.providers.azure.config import _configure_resource_group - - _configure_resource_group( - {"cluster_name": cluster_name, "provider": provider_config} - ) + subscription_id = provider_config["subscription_id"] self.cache_stopped_nodes = provider_config.get("cache_stopped_nodes", True) # Sky only supports Azure CLI credential for now. @@ -106,9 +90,20 @@ def match_tags(vm): return False return True - vms = self.compute_client.virtual_machines.list( - resource_group_name=self.provider_config["resource_group"] - ) + try: + vms = list( + self.compute_client.virtual_machines.list( + resource_group_name=self.provider_config["resource_group"] + ) + ) + except azure.exceptions().ResourceNotFoundError as e: + if "Code: ResourceGroupNotFound" in e.exc_msg: + logger.debug( + "Resource group not found. VMs should have been terminated." + ) + vms = [] + else: + raise nodes = [self._extract_metadata(vm) for vm in filter(match_tags, vms)] self.cached_nodes = {node["name"]: node for node in nodes} diff --git a/sky/utils/common_utils.py b/sky/utils/common_utils.py index 103c834000c..a9227fb4c20 100644 --- a/sky/utils/common_utils.py +++ b/sky/utils/common_utils.py @@ -233,7 +233,7 @@ class Backoff: MULTIPLIER = 1.6 JITTER = 0.4 - def __init__(self, initial_backoff: int = 5, max_backoff_factor: int = 5): + def __init__(self, initial_backoff: float = 5, max_backoff_factor: int = 5): self._initial = True self._backoff = 0.0 self._initial_backoff = initial_backoff From 7633d2e829d351833e876974910f4eb82d283cfd Mon Sep 17 00:00:00 2001 From: Romil Bhardwaj Date: Sun, 30 Jun 2024 10:50:15 -0700 Subject: [PATCH 04/23] [k8s] Remove SSH jump pod for port-forward mode (#3657) * working prototype of direct-to-pod port-forwarding * lint * switch to using head as jump * removed ssh jump pod * remove sleep * update note * comments * remove vestiges * updates * remove slash * add ssh_user placeholder * fix private key * lint --- sky/authentication.py | 41 ++++--- sky/backends/backend_utils.py | 6 + sky/backends/cloud_vm_ray_backend.py | 5 +- sky/clouds/kubernetes.py | 4 +- sky/provision/kubernetes/config.py | 7 +- sky/provision/kubernetes/instance.py | 17 ++- sky/provision/kubernetes/network_utils.py | 17 +++ sky/provision/kubernetes/utils.py | 108 ++++++++++++------ sky/skylet/constants.py | 4 + ... kubernetes-port-forward-proxy-command.sh} | 15 ++- sky/templates/kubernetes-ray.yml.j2 | 3 + 11 files changed, 157 insertions(+), 70 deletions(-) rename sky/templates/{kubernetes-port-forward-proxy-command.sh.j2 => kubernetes-port-forward-proxy-command.sh} (83%) diff --git a/sky/authentication.py b/sky/authentication.py index 966dad670c5..c61e0ce36c8 100644 --- a/sky/authentication.py +++ b/sky/authentication.py @@ -439,29 +439,38 @@ def setup_kubernetes_authentication(config: Dict[str, Any]) -> Dict[str, Any]: f'Key {secret_name} does not exist in the cluster, creating it...') kubernetes.core_api().create_namespaced_secret(namespace, secret) - ssh_jump_name = clouds.Kubernetes.SKY_SSH_JUMP_NAME + private_key_path, _ = get_or_generate_keys() if network_mode == nodeport_mode: + ssh_jump_name = clouds.Kubernetes.SKY_SSH_JUMP_NAME service_type = kubernetes_enums.KubernetesServiceType.NODEPORT + # Setup service for SSH jump pod. We create the SSH jump service here + # because we need to know the service IP address and port to set the + # ssh_proxy_command in the autoscaler config. + kubernetes_utils.setup_ssh_jump_svc(ssh_jump_name, namespace, + service_type) + ssh_proxy_cmd = kubernetes_utils.get_ssh_proxy_command( + ssh_jump_name, + nodeport_mode, + private_key_path=private_key_path, + namespace=namespace) elif network_mode == port_forward_mode: + # Using `kubectl port-forward` creates a direct tunnel to the pod and + # does not require a ssh jump pod. kubernetes_utils.check_port_forward_mode_dependencies() - # Using `kubectl port-forward` creates a direct tunnel to jump pod and - # does not require opening any ports on Kubernetes nodes. As a result, - # the service can be a simple ClusterIP service which we access with - # `kubectl port-forward`. - service_type = kubernetes_enums.KubernetesServiceType.CLUSTERIP + # TODO(romilb): This can be further optimized. Instead of using the + # head node as a jump pod for worker nodes, we can also directly + # set the ssh_target to the worker node. However, that requires + # changes in the downstream code to return a mapping of node IPs to + # pod names (to be used as ssh_target) and updating the upstream + # SSHConfigHelper to use a different ProxyCommand for each pod. + # This optimization can reduce SSH time from ~0.35s to ~0.25s, tested + # on GKE. + ssh_target = config['cluster_name'] + '-head' + ssh_proxy_cmd = kubernetes_utils.get_ssh_proxy_command( + ssh_target, port_forward_mode, private_key_path=private_key_path) else: # This should never happen because we check for this in from_str above. raise ValueError(f'Unsupported networking mode: {network_mode_str}') - # Setup service for SSH jump pod. We create the SSH jump service here - # because we need to know the service IP address and port to set the - # ssh_proxy_command in the autoscaler config. - kubernetes_utils.setup_ssh_jump_svc(ssh_jump_name, namespace, service_type) - - ssh_proxy_cmd = kubernetes_utils.get_ssh_proxy_command( - PRIVATE_SSH_KEY_PATH, ssh_jump_name, network_mode, namespace, - clouds.Kubernetes.PORT_FORWARD_PROXY_CMD_PATH, - clouds.Kubernetes.PORT_FORWARD_PROXY_CMD_TEMPLATE) - config['auth']['ssh_proxy_command'] = ssh_proxy_cmd return config diff --git a/sky/backends/backend_utils.py b/sky/backends/backend_utils.py index e760132068b..a1c86fdb624 100644 --- a/sky/backends/backend_utils.py +++ b/sky/backends/backend_utils.py @@ -1251,6 +1251,12 @@ def ssh_credential_from_yaml( ssh_private_key = auth_section.get('ssh_private_key') ssh_control_name = config.get('cluster_name', '__default__') ssh_proxy_command = auth_section.get('ssh_proxy_command') + + # Update the ssh_user placeholder in proxy command, if required + if (ssh_proxy_command is not None and + constants.SKY_SSH_USER_PLACEHOLDER in ssh_proxy_command): + ssh_proxy_command = ssh_proxy_command.replace( + constants.SKY_SSH_USER_PLACEHOLDER, ssh_user) credentials = { 'ssh_user': ssh_user, 'ssh_private_key': ssh_private_key, diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index 7f490743f8b..a92d13fd214 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -3065,7 +3065,10 @@ def _update_after_cluster_provisioned( ) usage_lib.messages.usage.update_final_cluster_status( status_lib.ClusterStatus.UP) - auth_config = common_utils.read_yaml(handle.cluster_yaml)['auth'] + auth_config = backend_utils.ssh_credential_from_yaml( + handle.cluster_yaml, + ssh_user=handle.ssh_user, + docker_user=handle.docker_user) backend_utils.SSHConfigHelper.add_cluster(handle.cluster_name, ip_list, auth_config, ssh_port_list, diff --git a/sky/clouds/kubernetes.py b/sky/clouds/kubernetes.py index 5d9e57568b9..1e307f475c8 100644 --- a/sky/clouds/kubernetes.py +++ b/sky/clouds/kubernetes.py @@ -38,9 +38,6 @@ class Kubernetes(clouds.Cloud): SKY_SSH_KEY_SECRET_NAME = 'sky-ssh-keys' SKY_SSH_JUMP_NAME = 'sky-ssh-jump-pod' - PORT_FORWARD_PROXY_CMD_TEMPLATE = \ - 'kubernetes-port-forward-proxy-command.sh.j2' - PORT_FORWARD_PROXY_CMD_PATH = '~/.sky/port-forward-proxy-cmd.sh' # Timeout for resource provisioning. This timeout determines how long to # wait for pod to be in pending status before giving up. # Larger timeout may be required for autoscaling clusters, since autoscaler @@ -323,6 +320,7 @@ def make_deploy_resources_variables( 'k8s_namespace': kubernetes_utils.get_current_kube_config_context_namespace(), 'k8s_port_mode': port_mode.value, + 'k8s_networking_mode': network_utils.get_networking_mode().value, 'k8s_ssh_key_secret_name': self.SKY_SSH_KEY_SECRET_NAME, 'k8s_acc_label_key': k8s_acc_label_key, 'k8s_acc_label_value': k8s_acc_label_value, diff --git a/sky/provision/kubernetes/config.py b/sky/provision/kubernetes/config.py index c4c834d85fe..05fe1df19ec 100644 --- a/sky/provision/kubernetes/config.py +++ b/sky/provision/kubernetes/config.py @@ -9,7 +9,9 @@ from sky.adaptors import kubernetes from sky.provision import common +from sky.provision.kubernetes import network_utils from sky.provision.kubernetes import utils as kubernetes_utils +from sky.utils import kubernetes_enums logger = logging.getLogger(__name__) @@ -25,7 +27,10 @@ def bootstrap_instances( _configure_services(namespace, config.provider_config) - config = _configure_ssh_jump(namespace, config) + networking_mode = network_utils.get_networking_mode( + config.provider_config.get('networking_mode')) + if networking_mode == kubernetes_enums.KubernetesNetworkingMode.NODEPORT: + config = _configure_ssh_jump(namespace, config) requested_service_account = config.node_config['spec']['serviceAccountName'] if (requested_service_account == diff --git a/sky/provision/kubernetes/instance.py b/sky/provision/kubernetes/instance.py index 91102efdff0..052cbe1640f 100644 --- a/sky/provision/kubernetes/instance.py +++ b/sky/provision/kubernetes/instance.py @@ -12,6 +12,7 @@ from sky.provision import common from sky.provision import docker_utils from sky.provision.kubernetes import config as config_lib +from sky.provision.kubernetes import network_utils from sky.provision.kubernetes import utils as kubernetes_utils from sky.utils import command_runner from sky.utils import common_utils @@ -495,14 +496,18 @@ def _create_pods(region: str, cluster_name_on_cloud: str, if head_pod_name is None: head_pod_name = pod.metadata.name - # Adding the jump pod to the new_nodes list as well so it can be - # checked if it's scheduled and running along with other pods. - ssh_jump_pod_name = pod_spec['metadata']['labels']['skypilot-ssh-jump'] - jump_pod = kubernetes.core_api().read_namespaced_pod( - ssh_jump_pod_name, namespace) wait_pods_dict = _filter_pods(namespace, tags, ['Pending']) wait_pods = list(wait_pods_dict.values()) - wait_pods.append(jump_pod) + + networking_mode = network_utils.get_networking_mode( + config.provider_config.get('networking_mode')) + if networking_mode == kubernetes_enums.KubernetesNetworkingMode.NODEPORT: + # Adding the jump pod to the new_nodes list as well so it can be + # checked if it's scheduled and running along with other pods. + ssh_jump_pod_name = pod_spec['metadata']['labels']['skypilot-ssh-jump'] + jump_pod = kubernetes.core_api().read_namespaced_pod( + ssh_jump_pod_name, namespace) + wait_pods.append(jump_pod) provision_timeout = provider_config['timeout'] wait_str = ('indefinitely' diff --git a/sky/provision/kubernetes/network_utils.py b/sky/provision/kubernetes/network_utils.py index 836d75af41f..c42ffee2f1c 100644 --- a/sky/provision/kubernetes/network_utils.py +++ b/sky/provision/kubernetes/network_utils.py @@ -43,6 +43,23 @@ def get_port_mode( return port_mode +def get_networking_mode( + mode_str: Optional[str] = None +) -> kubernetes_enums.KubernetesNetworkingMode: + """Get the networking mode from the provider config.""" + mode_str = mode_str or skypilot_config.get_nested( + ('kubernetes', 'networking_mode'), + kubernetes_enums.KubernetesNetworkingMode.PORTFORWARD.value) + try: + networking_mode = kubernetes_enums.KubernetesNetworkingMode.from_str( + mode_str) + except ValueError as e: + with ux_utils.print_exception_no_traceback(): + raise ValueError(str(e) + + ' Please check: ~/.sky/config.yaml.') from None + return networking_mode + + def fill_loadbalancer_template(namespace: str, service_name: str, ports: List[int], selector_key: str, selector_value: str) -> Dict: diff --git a/sky/provision/kubernetes/utils.py b/sky/provision/kubernetes/utils.py index c599a5738d0..fbf79130424 100644 --- a/sky/provision/kubernetes/utils.py +++ b/sky/provision/kubernetes/utils.py @@ -3,6 +3,7 @@ import math import os import re +import shutil import subprocess from typing import Any, Dict, List, Optional, Set, Tuple, Union from urllib.parse import urlparse @@ -16,6 +17,7 @@ from sky import skypilot_config from sky.adaptors import kubernetes from sky.provision.kubernetes import network_utils +from sky.skylet import constants from sky.utils import common_utils from sky.utils import env_options from sky.utils import kubernetes_enums @@ -53,6 +55,10 @@ KIND_CONTEXT_NAME = 'kind-skypilot' # Context name used by sky local up +# Port-forward proxy command constants +PORT_FORWARD_PROXY_CMD_TEMPLATE = 'kubernetes-port-forward-proxy-command.sh' +PORT_FORWARD_PROXY_CMD_PATH = '~/.sky/kubernetes-port-forward-proxy-command.sh' + logger = sky_logging.init_logger(__name__) @@ -911,30 +917,38 @@ def __str__(self): return self.name -def construct_ssh_jump_command(private_key_path: str, - ssh_jump_ip: str, - ssh_jump_port: Optional[int] = None, - proxy_cmd_path: Optional[str] = None) -> str: +def construct_ssh_jump_command( + private_key_path: str, + ssh_jump_ip: str, + ssh_jump_port: Optional[int] = None, + ssh_jump_user: str = 'sky', + proxy_cmd_path: Optional[str] = None, + proxy_cmd_target_pod: Optional[str] = None) -> str: ssh_jump_proxy_command = (f'ssh -tt -i {private_key_path} ' '-o StrictHostKeyChecking=no ' '-o UserKnownHostsFile=/dev/null ' f'-o IdentitiesOnly=yes ' - f'-W %h:%p sky@{ssh_jump_ip}') + f'-W %h:%p {ssh_jump_user}@{ssh_jump_ip}') if ssh_jump_port is not None: ssh_jump_proxy_command += f' -p {ssh_jump_port} ' if proxy_cmd_path is not None: proxy_cmd_path = os.path.expanduser(proxy_cmd_path) # adding execution permission to the proxy command script os.chmod(proxy_cmd_path, os.stat(proxy_cmd_path).st_mode | 0o111) - ssh_jump_proxy_command += f' -o ProxyCommand=\'{proxy_cmd_path}\' ' + ssh_jump_proxy_command += (f' -o ProxyCommand=\'{proxy_cmd_path} ' + f'{proxy_cmd_target_pod}\' ') return ssh_jump_proxy_command def get_ssh_proxy_command( - private_key_path: str, ssh_jump_name: str, - network_mode: kubernetes_enums.KubernetesNetworkingMode, namespace: str, - port_fwd_proxy_cmd_path: str, port_fwd_proxy_cmd_template: str) -> str: - """Generates the SSH proxy command to connect through the SSH jump pod. + k8s_ssh_target: str, + network_mode: kubernetes_enums.KubernetesNetworkingMode, + private_key_path: Optional[str] = None, + namespace: Optional[str] = None) -> str: + """Generates the SSH proxy command to connect to the pod. + + Uses a jump pod if the network mode is NODEPORT, and direct port-forwarding + if the network mode is PORTFORWARD. By default, establishing an SSH connection creates a communication channel to a remote node by setting up a TCP connection. When a @@ -950,57 +964,77 @@ def get_ssh_proxy_command( With the NodePort networking mode, a NodePort service is launched. This service opens an external port on the node which redirects to the desired - port within the pod. When establishing an SSH session in this mode, the + port to a SSH jump pod. When establishing an SSH session in this mode, the ProxyCommand makes use of this external port to create a communication channel directly to port 22, which is the default port ssh server listens on, of the jump pod. With Port-forward mode, instead of directly exposing an external port, 'kubectl port-forward' sets up a tunnel between a local port - (127.0.0.1:23100) and port 22 of the jump pod. Then we establish a TCP + (127.0.0.1:23100) and port 22 of the provisioned pod. Then we establish TCP connection to the local end of this tunnel, 127.0.0.1:23100, using 'socat'. - This is setup in the inner ProxyCommand of the nested ProxyCommand, and the - rest is the same as NodePort approach, which the outer ProxyCommand - establishes a communication channel between 127.0.0.1:23100 and port 22 on - the jump pod. Consequently, any stdin provided on the local machine is - forwarded through this tunnel to the application (SSH server) listening in - the pod. Similarly, any output from the application in the pod is tunneled - back and displayed in the terminal on the local machine. + All of this is done in a ProxyCommand script. Any stdin provided on the + local machine is forwarded through this tunnel to the application + (SSH server) listening in the pod. Similarly, any output from the + application in the pod is tunneled back and displayed in the terminal on + the local machine. Args: - private_key_path: str; Path to the private key to use for SSH. - This key must be authorized to access the SSH jump pod. - ssh_jump_name: str; Name of the SSH jump service to use + k8s_ssh_target: str; The Kubernetes object that will be used as the + target for SSH. If network_mode is NODEPORT, this is the name of the + service. If network_mode is PORTFORWARD, this is the pod name. network_mode: KubernetesNetworkingMode; networking mode for ssh session. It is either 'NODEPORT' or 'PORTFORWARD' - namespace: Kubernetes namespace to use - port_fwd_proxy_cmd_path: str; path to the script used as Proxycommand - with 'kubectl port-forward' - port_fwd_proxy_cmd_template: str; template used to create - 'kubectl port-forward' Proxycommand + private_key_path: str; Path to the private key to use for SSH. + This key must be authorized to access the SSH jump pod. + Required for NODEPORT networking mode. + namespace: Kubernetes namespace to use. + Required for NODEPORT networking mode. """ # Fetch IP to connect to for the jump svc ssh_jump_ip = get_external_ip(network_mode) + assert private_key_path is not None, 'Private key path must be provided' if network_mode == kubernetes_enums.KubernetesNetworkingMode.NODEPORT: - ssh_jump_port = get_port(ssh_jump_name, namespace) + assert namespace is not None, 'Namespace must be provided for NodePort' + ssh_jump_port = get_port(k8s_ssh_target, namespace) ssh_jump_proxy_command = construct_ssh_jump_command( private_key_path, ssh_jump_ip, ssh_jump_port=ssh_jump_port) - # Setting kubectl port-forward/socat to establish ssh session using - # ClusterIP service to disallow any ports opened else: - vars_to_fill = { - 'ssh_jump_name': ssh_jump_name, - } - common_utils.fill_template(port_fwd_proxy_cmd_template, - vars_to_fill, - output_path=port_fwd_proxy_cmd_path) + ssh_jump_proxy_command_path = create_proxy_command_script() ssh_jump_proxy_command = construct_ssh_jump_command( private_key_path, ssh_jump_ip, - proxy_cmd_path=port_fwd_proxy_cmd_path) + ssh_jump_user=constants.SKY_SSH_USER_PLACEHOLDER, + proxy_cmd_path=ssh_jump_proxy_command_path, + proxy_cmd_target_pod=k8s_ssh_target) return ssh_jump_proxy_command +def create_proxy_command_script() -> str: + """Creates a ProxyCommand script that uses kubectl port-forward to setup + a tunnel between a local port and the SSH server in the pod. + + Returns: + str: Path to the ProxyCommand script. + """ + port_fwd_proxy_cmd_path = os.path.expanduser(PORT_FORWARD_PROXY_CMD_PATH) + os.makedirs(os.path.dirname(port_fwd_proxy_cmd_path), + exist_ok=True, + mode=0o700) + + root_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) + template_path = os.path.join(root_dir, 'templates', + PORT_FORWARD_PROXY_CMD_TEMPLATE) + # Copy the template to the proxy command path. We create a copy to allow + # different users sharing the same SkyPilot installation to have their own + # proxy command scripts. + shutil.copy(template_path, port_fwd_proxy_cmd_path) + # Set the permissions to 700 to ensure only the owner can read, write, + # and execute the file. + os.chmod(port_fwd_proxy_cmd_path, 0o700) + return port_fwd_proxy_cmd_path + + def setup_ssh_jump_svc(ssh_jump_name: str, namespace: str, service_type: kubernetes_enums.KubernetesServiceType): """Sets up Kubernetes service resource to access for SSH jump pod. diff --git a/sky/skylet/constants.py b/sky/skylet/constants.py index bfec3ad8cac..c456b48b306 100644 --- a/sky/skylet/constants.py +++ b/sky/skylet/constants.py @@ -257,3 +257,7 @@ SKYPILOT_NODE_IPS = 'SKYPILOT_NODE_IPS' SKYPILOT_NUM_GPUS_PER_NODE = 'SKYPILOT_NUM_GPUS_PER_NODE' SKYPILOT_NODE_RANK = 'SKYPILOT_NODE_RANK' + +# Placeholder for the SSH user in proxy command, replaced when the ssh_user is +# known after provisioning. +SKY_SSH_USER_PLACEHOLDER = 'skypilot:ssh_user' diff --git a/sky/templates/kubernetes-port-forward-proxy-command.sh.j2 b/sky/templates/kubernetes-port-forward-proxy-command.sh similarity index 83% rename from sky/templates/kubernetes-port-forward-proxy-command.sh.j2 rename to sky/templates/kubernetes-port-forward-proxy-command.sh index 39159eb15b9..d9e409b5545 100644 --- a/sky/templates/kubernetes-port-forward-proxy-command.sh.j2 +++ b/sky/templates/kubernetes-port-forward-proxy-command.sh @@ -1,6 +1,14 @@ #!/usr/bin/env bash set -uo pipefail +# Check if pod name is passed as an argument +if [ $# -eq 0 ]; then + echo "Usage: $0 " >&2 + exit 1 +fi + +POD_NAME="$1" # The first argument is the name of the pod + # Checks if socat is installed if ! command -v socat > /dev/null; then echo "Using 'port-forward' mode to run ssh session on Kubernetes instances requires 'socat' to be installed. Please install 'socat'" >&2 @@ -18,7 +26,7 @@ fi # This is preferred because of socket re-use issues in kubectl port-forward, # see - https://github.com/kubernetes/kubernetes/issues/74551#issuecomment-769185879 KUBECTL_OUTPUT=$(mktemp) -kubectl port-forward svc/{{ ssh_jump_name }} :22 > "${KUBECTL_OUTPUT}" 2>&1 & +kubectl port-forward pod/"${POD_NAME}" :22 > "${KUBECTL_OUTPUT}" 2>&1 & # Capture the PID for the backgrounded kubectl command K8S_PORT_FWD_PID=$! @@ -49,11 +57,6 @@ while ! nc -z 127.0.0.1 "${local_port}"; do sleep 0.1 done -# To avoid errors when many concurrent requests are sent (see https://github.com/skypilot-org/skypilot/issues/2628), -# we add a random delay before establishing the socat connection. -# Empirically, this needs to be at least 1 second. We set this to be random between 1 and 2 seconds. -sleep $(shuf -i 10-20 -n 1 | awk '{printf "%f", $1/10}') - # Establishes two directional byte streams to handle stdin/stdout between # terminal and the jump pod. # socat process terminates when port-forward terminates. diff --git a/sky/templates/kubernetes-ray.yml.j2 b/sky/templates/kubernetes-ray.yml.j2 index 20c35b15641..bd4bafd43d5 100644 --- a/sky/templates/kubernetes-ray.yml.j2 +++ b/sky/templates/kubernetes-ray.yml.j2 @@ -24,6 +24,9 @@ provider: # This should be one of KubernetesPortMode port_mode: {{k8s_port_mode}} + # The networking mode used to ssh to pods. One of KubernetesNetworkingMode. + networking_mode: {{k8s_networking_mode}} + # We use internal IPs since we set up a port-forward between the kubernetes # cluster and the local machine, or directly use NodePort to reach the # head node. From d3c1f8c8b77e5d3b4eee856c6e92b5da31a70b08 Mon Sep 17 00:00:00 2001 From: Andrew Aikawa Date: Sun, 30 Jun 2024 10:51:00 -0700 Subject: [PATCH 05/23] [k8s] suppress connection error warnings when disconnected from k8s (#3674) * suppress connection error warnings when disconnected from k8s format set urllib3 log level set to ERROR level * Update sky/provision/kubernetes/utils.py Co-authored-by: Zhanghao Wu * format * decorate k8s apis to suppress logs * Add docstr * lint --------- Co-authored-by: Zhanghao Wu Co-authored-by: Romil Bhardwaj --- sky/adaptors/kubernetes.py | 37 ++++++++++++++++++++++++++++++++++++- sky/sky_logging.py | 11 +++++++++++ 2 files changed, 47 insertions(+), 1 deletion(-) diff --git a/sky/adaptors/kubernetes.py b/sky/adaptors/kubernetes.py index 7cdb3ff3059..7f52a099f56 100644 --- a/sky/adaptors/kubernetes.py +++ b/sky/adaptors/kubernetes.py @@ -2,9 +2,11 @@ # pylint: disable=import-outside-toplevel +import logging import os from sky.adaptors import common +from sky.sky_logging import set_logging_level from sky.utils import env_options from sky.utils import ux_utils @@ -28,6 +30,33 @@ API_TIMEOUT = 5 +def _decorate_methods(obj, decorator): + for attr_name in dir(obj): + attr = getattr(obj, attr_name) + if callable(attr) and not attr_name.startswith('__'): + setattr(obj, attr_name, decorator(attr)) + return obj + + +def _api_logging_decorator(logger: str, level: int): + """Decorator to set logging level for API calls. + + This is used to suppress the verbose logging from urllib3 when calls to the + Kubernetes API timeout. + """ + + def decorated_api(api): + + def wrapped(*args, **kwargs): + obj = api(*args, **kwargs) + _decorate_methods(obj, set_logging_level(logger, level)) + return obj + + return wrapped + + return decorated_api + + def _load_config(): global _configured if _configured: @@ -65,15 +94,16 @@ def _load_config(): _configured = True +@_api_logging_decorator('urllib3', logging.ERROR) def core_api(): global _core_api if _core_api is None: _load_config() _core_api = kubernetes.client.CoreV1Api() - return _core_api +@_api_logging_decorator('urllib3', logging.ERROR) def auth_api(): global _auth_api if _auth_api is None: @@ -83,6 +113,7 @@ def auth_api(): return _auth_api +@_api_logging_decorator('urllib3', logging.ERROR) def networking_api(): global _networking_api if _networking_api is None: @@ -92,6 +123,7 @@ def networking_api(): return _networking_api +@_api_logging_decorator('urllib3', logging.ERROR) def custom_objects_api(): global _custom_objects_api if _custom_objects_api is None: @@ -101,6 +133,7 @@ def custom_objects_api(): return _custom_objects_api +@_api_logging_decorator('urllib3', logging.ERROR) def node_api(): global _node_api if _node_api is None: @@ -110,6 +143,7 @@ def node_api(): return _node_api +@_api_logging_decorator('urllib3', logging.ERROR) def apps_api(): global _apps_api if _apps_api is None: @@ -119,6 +153,7 @@ def apps_api(): return _apps_api +@_api_logging_decorator('urllib3', logging.ERROR) def api_client(): global _api_client if _api_client is None: diff --git a/sky/sky_logging.py b/sky/sky_logging.py index dbaf1dd0479..c8a243c72cf 100644 --- a/sky/sky_logging.py +++ b/sky/sky_logging.py @@ -95,6 +95,17 @@ def init_logger(name: str): return logging.getLogger(name) +@contextlib.contextmanager +def set_logging_level(logger: str, level: int): + logger = logging.getLogger(logger) + original_level = logger.level + logger.setLevel(level) + try: + yield + finally: + logger.setLevel(original_level) + + @contextlib.contextmanager def silent(): """Make all sky_logging.print() and logger.{info, warning...} silent. From 3d9c6cabbb37fba452cdfd4cf6595a3fa8f72e20 Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Sun, 30 Jun 2024 22:10:10 -0700 Subject: [PATCH 06/23] [Azure] Use SkyPilot provisioner for status query (#3696) * Use SkyPilot for status query * format * Avoid reconfig * Add todo * Fix filtering for autodown clusters * remove comment * Address comments * typing --- sky/clouds/azure.py | 90 +-------------------- sky/provision/azure/__init__.py | 1 + sky/provision/azure/instance.py | 113 +++++++++++++++++++++++++++ sky/provision/common.py | 19 +++++ sky/provision/instance_setup.py | 27 ++----- sky/skylet/providers/azure/config.py | 2 + 6 files changed, 142 insertions(+), 110 deletions(-) diff --git a/sky/clouds/azure.py b/sky/clouds/azure.py index 4df1cd4a4bf..852af5c0c77 100644 --- a/sky/clouds/azure.py +++ b/sky/clouds/azure.py @@ -14,10 +14,8 @@ from sky import clouds from sky import exceptions from sky import sky_logging -from sky import status_lib from sky.adaptors import azure from sky.clouds import service_catalog -from sky.skylet import log_lib from sky.utils import common_utils from sky.utils import resources_utils from sky.utils import ux_utils @@ -70,6 +68,7 @@ class Azure(clouds.Cloud): _INDENT_PREFIX = ' ' * 4 PROVISIONER_VERSION = clouds.ProvisionerVersion.RAY_AUTOSCALER + STATUS_VERSION = clouds.StatusVersion.SKYPILOT @classmethod def _unsupported_features_for_resources( @@ -613,90 +612,3 @@ def _get_disk_type(cls, resources_utils.DiskTier.LOW: 'Standard_LRS', } return tier2name[tier] - - @classmethod - def query_status(cls, name: str, tag_filters: Dict[str, str], - region: Optional[str], zone: Optional[str], - **kwargs) -> List[status_lib.ClusterStatus]: - del zone # unused - status_map = { - 'VM starting': status_lib.ClusterStatus.INIT, - 'VM running': status_lib.ClusterStatus.UP, - # 'VM stopped' in Azure means Stopped (Allocated), which still bills - # for the VM. - 'VM stopping': status_lib.ClusterStatus.INIT, - 'VM stopped': status_lib.ClusterStatus.INIT, - # 'VM deallocated' in Azure means Stopped (Deallocated), which does not - # bill for the VM. - 'VM deallocating': status_lib.ClusterStatus.STOPPED, - 'VM deallocated': status_lib.ClusterStatus.STOPPED, - } - tag_filter_str = ' '.join( - f'tags.\\"{k}\\"==\'{v}\'' for k, v in tag_filters.items()) - - query_node_id = (f'az vm list --query "[?{tag_filter_str}].id" -o json') - returncode, stdout, stderr = log_lib.run_with_log(query_node_id, - '/dev/null', - require_outputs=True, - shell=True) - logger.debug(f'{query_node_id} returned {returncode}.\n' - '**** STDOUT ****\n' - f'{stdout}\n' - '**** STDERR ****\n' - f'{stderr}') - if returncode == 0: - if not stdout.strip(): - return [] - node_ids = json.loads(stdout.strip()) - if not node_ids: - return [] - state_str = '[].powerState' - if len(node_ids) == 1: - state_str = 'powerState' - node_ids_str = '\t'.join(node_ids) - query_cmd = ( - f'az vm show -d --ids {node_ids_str} --query "{state_str}" -o json' - ) - returncode, stdout, stderr = log_lib.run_with_log( - query_cmd, '/dev/null', require_outputs=True, shell=True) - logger.debug(f'{query_cmd} returned {returncode}.\n' - '**** STDOUT ****\n' - f'{stdout}\n' - '**** STDERR ****\n' - f'{stderr}') - - # NOTE: Azure cli should be handled carefully. The query command above - # takes about 1 second to run. - # An alternative is the following command, but it will take more than - # 20 seconds to run. - # query_cmd = ( - # f'az vm list --show-details --query "[' - # f'?tags.\\"ray-cluster-name\\" == \'{handle.cluster_name}\' ' - # '&& tags.\\"ray-node-type\\" == \'head\'].powerState" -o tsv' - # ) - - if returncode != 0: - with ux_utils.print_exception_no_traceback(): - raise exceptions.ClusterStatusFetchingError( - f'Failed to query Azure cluster {name!r} status: ' - f'{stdout + stderr}') - - assert stdout.strip(), f'No status returned for {name!r}' - - original_statuses_list = json.loads(stdout.strip()) - if not original_statuses_list: - # No nodes found. The original_statuses_list will be empty string. - # Return empty list. - return [] - if not isinstance(original_statuses_list, list): - original_statuses_list = [original_statuses_list] - statuses = [] - for s in original_statuses_list: - if s not in status_map: - with ux_utils.print_exception_no_traceback(): - raise exceptions.ClusterStatusFetchingError( - f'Failed to parse status from Azure response: {stdout}') - node_status = status_map[s] - if node_status is not None: - statuses.append(node_status) - return statuses diff --git a/sky/provision/azure/__init__.py b/sky/provision/azure/__init__.py index b83dbb462d9..b28c161a866 100644 --- a/sky/provision/azure/__init__.py +++ b/sky/provision/azure/__init__.py @@ -2,3 +2,4 @@ from sky.provision.azure.instance import cleanup_ports from sky.provision.azure.instance import open_ports +from sky.provision.azure.instance import query_instances diff --git a/sky/provision/azure/instance.py b/sky/provision/azure/instance.py index de5c7cbf0e9..6693427d8ff 100644 --- a/sky/provision/azure/instance.py +++ b/sky/provision/azure/instance.py @@ -1,11 +1,19 @@ """Azure instance provisioning.""" import logging +from multiprocessing import pool +import typing from typing import Any, Callable, Dict, List, Optional +from sky import exceptions from sky import sky_logging +from sky import status_lib from sky.adaptors import azure +from sky.utils import common_utils from sky.utils import ux_utils +if typing.TYPE_CHECKING: + from azure.mgmt import compute as azure_compute + logger = sky_logging.init_logger(__name__) # Suppress noisy logs from Azure SDK. Reference: @@ -17,6 +25,8 @@ TAG_RAY_CLUSTER_NAME = 'ray-cluster-name' TAG_RAY_NODE_KIND = 'ray-node-type' +_RESOURCE_GROUP_NOT_FOUND_ERROR_MESSAGE = 'ResourceGroupNotFound' + def get_azure_sdk_function(client: Any, function_name: str) -> Callable: """Retrieve a callable function from Azure SDK client object. @@ -93,3 +103,106 @@ def cleanup_ports( # Azure will automatically cleanup network security groups when cleanup # resource group. So we don't need to do anything here. del cluster_name_on_cloud, ports, provider_config # Unused. + + +def _get_vm_status(compute_client: 'azure_compute.ComputeManagementClient', + vm_name: str, resource_group: str) -> str: + instance = compute_client.virtual_machines.instance_view( + resource_group_name=resource_group, vm_name=vm_name).as_dict() + for status in instance['statuses']: + code_state = status['code'].split('/') + # It is possible that sometimes the 'code' is empty string, and we + # should skip them. + if len(code_state) != 2: + continue + code, state = code_state + # skip provisioning status + if code == 'PowerState': + return state + raise ValueError(f'Failed to get status for VM {vm_name}') + + +def _filter_instances( + compute_client: 'azure_compute.ComputeManagementClient', + filters: Dict[str, str], + resource_group: str) -> List['azure_compute.models.VirtualMachine']: + + def match_tags(vm): + for k, v in filters.items(): + if vm.tags.get(k) != v: + return False + return True + + try: + list_virtual_machines = get_azure_sdk_function( + client=compute_client.virtual_machines, function_name='list') + vms = list_virtual_machines(resource_group_name=resource_group) + nodes = list(filter(match_tags, vms)) + except azure.exceptions().ResourceNotFoundError as e: + if _RESOURCE_GROUP_NOT_FOUND_ERROR_MESSAGE in str(e): + return [] + raise + return nodes + + +@common_utils.retry +def query_instances( + cluster_name_on_cloud: str, + provider_config: Optional[Dict[str, Any]] = None, + non_terminated_only: bool = True, +) -> Dict[str, Optional[status_lib.ClusterStatus]]: + """See sky/provision/__init__.py""" + assert provider_config is not None, cluster_name_on_cloud + status_map = { + 'starting': status_lib.ClusterStatus.INIT, + 'running': status_lib.ClusterStatus.UP, + # 'stopped' in Azure means Stopped (Allocated), which still bills + # for the VM. + 'stopping': status_lib.ClusterStatus.INIT, + 'stopped': status_lib.ClusterStatus.INIT, + # 'VM deallocated' in Azure means Stopped (Deallocated), which does not + # bill for the VM. + 'deallocating': status_lib.ClusterStatus.STOPPED, + 'deallocated': status_lib.ClusterStatus.STOPPED, + } + provisioning_state_map = { + 'Creating': status_lib.ClusterStatus.INIT, + 'Updating': status_lib.ClusterStatus.INIT, + 'Failed': status_lib.ClusterStatus.INIT, + 'Migrating': status_lib.ClusterStatus.INIT, + 'Deleting': None, + # Succeeded in provisioning state means the VM is provisioned but not + # necessarily running. We exclude Succeeded state here, and the caller + # should determine the status of the VM based on the power state. + # 'Succeeded': status_lib.ClusterStatus.UP, + } + + subscription_id = provider_config['subscription_id'] + resource_group = provider_config['resource_group'] + compute_client = azure.get_client('compute', subscription_id) + filters = {TAG_RAY_CLUSTER_NAME: cluster_name_on_cloud} + nodes = _filter_instances(compute_client, filters, resource_group) + statuses = {} + + def _fetch_and_map_status( + compute_client: 'azure_compute.ComputeManagementClient', node, + resource_group: str): + if node.provisioning_state in provisioning_state_map: + status = provisioning_state_map[node.provisioning_state] + else: + original_status = _get_vm_status(compute_client, node.name, + resource_group) + if original_status not in status_map: + with ux_utils.print_exception_no_traceback(): + raise exceptions.ClusterStatusFetchingError( + f'Failed to parse status from Azure response: {status}') + status = status_map[original_status] + if status is None and non_terminated_only: + return + statuses[node.name] = status + + with pool.ThreadPool() as p: + p.starmap(_fetch_and_map_status, + [(compute_client, node, resource_group) for node in nodes]) + + return statuses diff --git a/sky/provision/common.py b/sky/provision/common.py index 7c1bcb32652..e5df26a4c09 100644 --- a/sky/provision/common.py +++ b/sky/provision/common.py @@ -1,9 +1,11 @@ """Common data structures for provisioning""" import abc import dataclasses +import functools import os from typing import Any, Dict, List, Optional, Tuple +from sky import sky_logging from sky.utils import resources_utils # NOTE: we can use pydantic instead of dataclasses or namedtuples, because @@ -14,6 +16,10 @@ # -------------------- input data model -------------------- # InstanceId = str +_START_TITLE = '\n' + '-' * 20 + 'Start: {} ' + '-' * 20 +_END_TITLE = '-' * 20 + 'End: {} ' + '-' * 20 + '\n' + +logger = sky_logging.init_logger(__name__) class ProvisionerError(RuntimeError): @@ -268,3 +274,16 @@ def query_ports_passthrough( for port in ports: result[port] = [SocketEndpoint(port=port, host=head_ip)] return result + + +def log_function_start_end(func): + + @functools.wraps(func) + def wrapper(*args, **kwargs): + logger.info(_START_TITLE.format(func.__name__)) + try: + return func(*args, **kwargs) + finally: + logger.info(_END_TITLE.format(func.__name__)) + + return wrapper diff --git a/sky/provision/instance_setup.py b/sky/provision/instance_setup.py index 1fb80ba542a..2d9ead3dc01 100644 --- a/sky/provision/instance_setup.py +++ b/sky/provision/instance_setup.py @@ -23,8 +23,6 @@ from sky.utils import ux_utils logger = sky_logging.init_logger(__name__) -_START_TITLE = '\n' + '-' * 20 + 'Start: {} ' + '-' * 20 -_END_TITLE = '-' * 20 + 'End: {} ' + '-' * 20 + '\n' _MAX_RETRY = 6 @@ -99,19 +97,6 @@ def retry(*args, **kwargs): return decorator -def _log_start_end(func): - - @functools.wraps(func) - def wrapper(*args, **kwargs): - logger.info(_START_TITLE.format(func.__name__)) - try: - return func(*args, **kwargs) - finally: - logger.info(_END_TITLE.format(func.__name__)) - - return wrapper - - def _hint_worker_log_path(cluster_name: str, cluster_info: common.ClusterInfo, stage_name: str): if cluster_info.num_instances > 1: @@ -153,7 +138,7 @@ def _parallel_ssh_with_cache(func, return [future.result() for future in results] -@_log_start_end +@common.log_function_start_end def initialize_docker(cluster_name: str, docker_config: Dict[str, Any], cluster_info: common.ClusterInfo, ssh_credentials: Dict[str, Any]) -> Optional[str]: @@ -184,7 +169,7 @@ def _initialize_docker(runner: command_runner.CommandRunner, log_path: str): return docker_users[0] -@_log_start_end +@common.log_function_start_end def setup_runtime_on_cluster(cluster_name: str, setup_commands: List[str], cluster_info: common.ClusterInfo, ssh_credentials: Dict[str, Any]) -> None: @@ -260,7 +245,7 @@ def _ray_gpu_options(custom_resource: str) -> str: return f' --num-gpus={acc_count}' -@_log_start_end +@common.log_function_start_end @_auto_retry() def start_ray_on_head_node(cluster_name: str, custom_resource: Optional[str], cluster_info: common.ClusterInfo, @@ -320,7 +305,7 @@ def start_ray_on_head_node(cluster_name: str, custom_resource: Optional[str], f'===== stderr ====={stderr}') -@_log_start_end +@common.log_function_start_end @_auto_retry() def start_ray_on_worker_nodes(cluster_name: str, no_restart: bool, custom_resource: Optional[str], ray_port: int, @@ -417,7 +402,7 @@ def _setup_ray_worker(runner_and_id: Tuple[command_runner.CommandRunner, f'===== stderr ====={stderr}') -@_log_start_end +@common.log_function_start_end @_auto_retry() def start_skylet_on_head_node(cluster_name: str, cluster_info: common.ClusterInfo, @@ -501,7 +486,7 @@ def _max_workers_for_file_mounts(common_file_mounts: Dict[str, str]) -> int: return max_workers -@_log_start_end +@common.log_function_start_end def internal_file_mounts(cluster_name: str, common_file_mounts: Dict[str, str], cluster_info: common.ClusterInfo, ssh_credentials: Dict[str, str]) -> None: diff --git a/sky/skylet/providers/azure/config.py b/sky/skylet/providers/azure/config.py index 35008ef13d7..13ecd64a987 100644 --- a/sky/skylet/providers/azure/config.py +++ b/sky/skylet/providers/azure/config.py @@ -14,6 +14,7 @@ from sky.adaptors import azure from sky.utils import common_utils +from sky.provision import common UNIQUE_ID_LEN = 4 _WAIT_NSG_CREATION_NUM_TIMEOUT_SECONDS = 600 @@ -47,6 +48,7 @@ def bootstrap_azure(config): return config +@common.log_function_start_end def _configure_resource_group(config): # TODO: look at availability sets # https://docs.microsoft.com/en-us/azure/virtual-machines/windows/tutorial-availability-sets From 24faf70a67b038cccdf5703e829d761e6d57c9a8 Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Mon, 1 Jul 2024 01:43:42 -0700 Subject: [PATCH 07/23] [Azure] Use SkyPilot provisioner to handle stop and termination for Azure (#3700) * Use SkyPilot for status query * format * Avoid reconfig * Add todo * Add termination and stopping * add stop and termination into __init__ * get rid of azure special handling in backend * format * Fix filtering for autodown clusters * More detailed error message * typing --- sky/backends/cloud_vm_ray_backend.py | 26 ++--------- sky/clouds/azure.py | 2 +- sky/provision/azure/__init__.py | 2 + sky/provision/azure/instance.py | 64 ++++++++++++++++++++++++++-- 4 files changed, 67 insertions(+), 27 deletions(-) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index a92d13fd214..89f9dcdc695 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -3888,22 +3888,8 @@ def teardown_no_lock(self, self.post_teardown_cleanup(handle, terminate, purge) return - if terminate and isinstance(cloud, clouds.Azure): - # Here we handle termination of Azure by ourselves instead of Ray - # autoscaler. - resource_group = config['provider']['resource_group'] - terminate_cmd = f'az group delete -y --name {resource_group}' - with rich_utils.safe_status(f'[bold cyan]Terminating ' - f'[green]{cluster_name}'): - returncode, stdout, stderr = log_lib.run_with_log( - terminate_cmd, - log_abs_path, - shell=True, - stream_logs=False, - require_outputs=True) - - elif (isinstance(cloud, clouds.IBM) and terminate and - prev_cluster_status == status_lib.ClusterStatus.STOPPED): + if (isinstance(cloud, clouds.IBM) and terminate and + prev_cluster_status == status_lib.ClusterStatus.STOPPED): # pylint: disable= W0622 W0703 C0415 from sky.adaptors import ibm from sky.skylet.providers.ibm.vpc_provider import IBMVPCProvider @@ -4021,14 +4007,8 @@ def teardown_no_lock(self, # never launched and the errors are related to pre-launch # configurations (such as VPC not found). So it's safe & good UX # to not print a failure message. - # - # '(ResourceGroupNotFound)': this indicates the resource group on - # Azure is not found. That means the cluster is already deleted - # on the cloud. So it's safe & good UX to not print a failure - # message. elif ('TPU must be specified.' not in stderr and - 'SKYPILOT_ERROR_NO_NODES_LAUNCHED: ' not in stderr and - '(ResourceGroupNotFound)' not in stderr): + 'SKYPILOT_ERROR_NO_NODES_LAUNCHED: ' not in stderr): raise RuntimeError( _TEARDOWN_FAILURE_MESSAGE.format( extra_reason='', diff --git a/sky/clouds/azure.py b/sky/clouds/azure.py index 852af5c0c77..b75f9207856 100644 --- a/sky/clouds/azure.py +++ b/sky/clouds/azure.py @@ -67,7 +67,7 @@ class Azure(clouds.Cloud): _INDENT_PREFIX = ' ' * 4 - PROVISIONER_VERSION = clouds.ProvisionerVersion.RAY_AUTOSCALER + PROVISIONER_VERSION = clouds.ProvisionerVersion.RAY_PROVISIONER_SKYPILOT_TERMINATOR STATUS_VERSION = clouds.StatusVersion.SKYPILOT @classmethod diff --git a/sky/provision/azure/__init__.py b/sky/provision/azure/__init__.py index b28c161a866..2152728ba6e 100644 --- a/sky/provision/azure/__init__.py +++ b/sky/provision/azure/__init__.py @@ -3,3 +3,5 @@ from sky.provision.azure.instance import cleanup_ports from sky.provision.azure.instance import open_ports from sky.provision.azure.instance import query_instances +from sky.provision.azure.instance import stop_instances +from sky.provision.azure.instance import terminate_instances diff --git a/sky/provision/azure/instance.py b/sky/provision/azure/instance.py index 6693427d8ff..19c1ba3f3da 100644 --- a/sky/provision/azure/instance.py +++ b/sky/provision/azure/instance.py @@ -105,6 +105,63 @@ def cleanup_ports( del cluster_name_on_cloud, ports, provider_config # Unused. +def stop_instances( + cluster_name_on_cloud: str, + provider_config: Optional[Dict[str, Any]] = None, + worker_only: bool = False, +) -> None: + """See sky/provision/__init__.py""" + assert provider_config is not None, (cluster_name_on_cloud, provider_config) + + subscription_id = provider_config['subscription_id'] + resource_group = provider_config['resource_group'] + compute_client = azure.get_client('compute', subscription_id) + tag_filters = {TAG_RAY_CLUSTER_NAME: cluster_name_on_cloud} + if worker_only: + tag_filters[TAG_RAY_NODE_KIND] = 'worker' + + nodes = _filter_instances(compute_client, tag_filters, resource_group) + stop_virtual_machine = get_azure_sdk_function( + client=compute_client.virtual_machines, function_name='deallocate') + with pool.ThreadPool() as p: + p.starmap(stop_virtual_machine, + [(resource_group, node.name) for node in nodes]) + + +def terminate_instances( + cluster_name_on_cloud: str, + provider_config: Optional[Dict[str, Any]] = None, + worker_only: bool = False, +) -> None: + """See sky/provision/__init__.py""" + assert provider_config is not None, (cluster_name_on_cloud, provider_config) + # TODO(zhwu): check the following. Also, seems we can directly force + # delete a resource group. + subscription_id = provider_config['subscription_id'] + resource_group = provider_config['resource_group'] + if worker_only: + compute_client = azure.get_client('compute', subscription_id) + delete_virtual_machine = get_azure_sdk_function( + client=compute_client.virtual_machines, function_name='delete') + filters = { + TAG_RAY_CLUSTER_NAME: cluster_name_on_cloud, + TAG_RAY_NODE_KIND: 'worker' + } + nodes = _filter_instances(compute_client, filters, resource_group) + with pool.ThreadPool() as p: + p.starmap(delete_virtual_machine, + [(resource_group, node.name) for node in nodes]) + return + + assert provider_config is not None, cluster_name_on_cloud + + resource_group_client = azure.get_client('resource', subscription_id) + delete_resource_group = get_azure_sdk_function( + client=resource_group_client.resource_groups, function_name='delete') + + delete_resource_group(resource_group, force_deletion_types=None) + + def _get_vm_status(compute_client: 'azure_compute.ComputeManagementClient', vm_name: str, resource_group: str) -> str: instance = compute_client.virtual_machines.instance_view( @@ -119,7 +176,7 @@ def _get_vm_status(compute_client: 'azure_compute.ComputeManagementClient', # skip provisioning status if code == 'PowerState': return state - raise ValueError(f'Failed to get status for VM {vm_name}') + raise ValueError(f'Failed to get power state for VM {vm_name}: {instance}') def _filter_instances( @@ -185,8 +242,9 @@ def query_instances( statuses = {} def _fetch_and_map_status( - compute_client: 'azure_compute.ComputeManagementClient', node, - resource_group: str): + compute_client: 'azure_compute.ComputeManagementClient', + node: 'azure_compute.models.VirtualMachine', + resource_group: str) -> None: if node.provisioning_state in provisioning_state_map: status = provisioning_state_map[node.provisioning_state] else: From 0a4b0efb827eadbb7959fa3c1c3b81af5f094c92 Mon Sep 17 00:00:00 2001 From: Sean Date: Mon, 1 Jul 2024 21:24:33 +0100 Subject: [PATCH 08/23] [Cudo] Update and bugfixes (#3256) * bug fixes and improvements * moved shared function to helper, added error message * moved catalog helper to utils * small fixes * fetch cudo fix * id fix for vms.csv file * format fix --- .../data_fetchers/fetch_cudo.py | 125 ++---------------- sky/provision/cudo/cudo_utils.py | 112 ++++++++++++++++ sky/provision/cudo/cudo_wrapper.py | 53 +++++--- sky/provision/cudo/instance.py | 23 ++-- 4 files changed, 172 insertions(+), 141 deletions(-) create mode 100644 sky/provision/cudo/cudo_utils.py diff --git a/sky/clouds/service_catalog/data_fetchers/fetch_cudo.py b/sky/clouds/service_catalog/data_fetchers/fetch_cudo.py index b15570ddcbc..617751d865a 100644 --- a/sky/clouds/service_catalog/data_fetchers/fetch_cudo.py +++ b/sky/clouds/service_catalog/data_fetchers/fetch_cudo.py @@ -9,98 +9,9 @@ import cudo_compute -VMS_CSV = 'cudo/vms.csv' +import sky.provision.cudo.cudo_utils as utils -cudo_gpu_model = { - 'NVIDIA V100': 'V100', - 'NVIDIA A40': 'A40', - 'RTX 3080': 'RTX3080', - 'RTX A4000': 'RTXA4000', - 'RTX A4500': 'RTXA4500', - 'RTX A5000': 'RTXA5000', - 'RTX A6000': 'RTXA6000', -} - -cudo_gpu_mem = { - 'RTX3080': 12, - 'A40': 48, - 'RTXA4000': 16, - 'RTXA4500': 20, - 'RTXA5000': 24, - 'RTXA6000': 48, - 'V100': 16, -} - -machine_specs = [ - # Low - { - 'vcpu': 2, - 'mem': 4, - 'gpu': 1, - }, - { - 'vcpu': 4, - 'mem': 8, - 'gpu': 1, - }, - { - 'vcpu': 8, - 'mem': 16, - 'gpu': 2, - }, - { - 'vcpu': 16, - 'mem': 32, - 'gpu': 2, - }, - { - 'vcpu': 32, - 'mem': 64, - 'gpu': 4, - }, - { - 'vcpu': 64, - 'mem': 128, - 'gpu': 8, - }, - # Mid - { - 'vcpu': 96, - 'mem': 192, - 'gpu': 8 - }, - { - 'vcpu': 48, - 'mem': 96, - 'gpu': 4 - }, - { - 'vcpu': 24, - 'mem': 48, - 'gpu': 2 - }, - { - 'vcpu': 12, - 'mem': 24, - 'gpu': 1 - }, - # Hi - { - 'vcpu': 96, - 'mem': 192, - 'gpu': 4 - }, - { - 'vcpu': 48, - 'mem': 96, - 'gpu': 2 - }, - { - 'vcpu': 24, - 'mem': 48, - 'gpu': 1 - }, -] +VMS_CSV = 'cudo/vms.csv' def cudo_api(): @@ -110,28 +21,8 @@ def cudo_api(): return cudo_compute.VirtualMachinesApi(client) -def cudo_gpu_to_skypilot_gpu(model): - if model in cudo_gpu_model: - return cudo_gpu_model[model] - else: - return model - - -def skypilot_gpu_to_cudo_gpu(model): - for key, value in cudo_gpu_model.items(): - if value == model: - return key - return model - - -def gpu_exists(model): - if model in cudo_gpu_model: - return True - return False - - def get_gpu_info(count, model): - mem = cudo_gpu_mem[model] + mem = utils.cudo_gpu_mem[model] # pylint: disable=line-too-long # {'Name': 'A4000', 'Manufacturer': 'NVIDIA', 'Count': 1.0, 'MemoryInfo': {'SizeInMiB': 16384}}], 'TotalGpuMemoryInMiB': 16384}" info = { @@ -168,16 +59,16 @@ def machine_types(gpu_model, mem_gib, vcpu_count, gpu_count): def update_prices(): rows = [] - for spec in machine_specs: + for spec in utils.machine_specs: mts = machine_types('', spec['mem'], spec['vcpu'], spec['gpu']) for hc in mts['host_configs']: - if not gpu_exists(hc['gpu_model']): + if not utils.gpu_exists(hc['gpu_model']): continue - accelerator_name = cudo_gpu_to_skypilot_gpu(hc['gpu_model']) + accelerator_name = utils.cudo_gpu_to_skypilot_gpu(hc['gpu_model']) row = { 'instance_type': get_instance_type(hc['machine_type'], - spec['gpu'], spec['vcpu'], - spec['mem']), + spec['vcpu'], spec['mem'], + spec['gpu']), 'accelerator_name': accelerator_name, 'accelerator_count': str(spec['gpu']) + '.0', 'vcpus': str(spec['vcpu']), diff --git a/sky/provision/cudo/cudo_utils.py b/sky/provision/cudo/cudo_utils.py new file mode 100644 index 00000000000..d4ef7f9e415 --- /dev/null +++ b/sky/provision/cudo/cudo_utils.py @@ -0,0 +1,112 @@ +"""Cudo catalog helper.""" + +cudo_gpu_model = { + 'NVIDIA V100': 'V100', + 'NVIDIA A40': 'A40', + 'RTX 3080': 'RTX3080', + 'RTX A4000': 'RTXA4000', + 'RTX A4500': 'RTXA4500', + 'RTX A5000': 'RTXA5000', + 'RTX A6000': 'RTXA6000', +} + +cudo_gpu_mem = { + 'RTX3080': 12, + 'A40': 48, + 'RTXA4000': 16, + 'RTXA4500': 20, + 'RTXA5000': 24, + 'RTXA6000': 48, + 'V100': 16, +} + +machine_specs = [ + # Low + { + 'vcpu': 2, + 'mem': 4, + 'gpu': 1, + }, + { + 'vcpu': 4, + 'mem': 8, + 'gpu': 1, + }, + { + 'vcpu': 8, + 'mem': 16, + 'gpu': 2, + }, + { + 'vcpu': 16, + 'mem': 32, + 'gpu': 2, + }, + { + 'vcpu': 32, + 'mem': 64, + 'gpu': 4, + }, + { + 'vcpu': 64, + 'mem': 128, + 'gpu': 8, + }, + # Mid + { + 'vcpu': 96, + 'mem': 192, + 'gpu': 8 + }, + { + 'vcpu': 48, + 'mem': 96, + 'gpu': 4 + }, + { + 'vcpu': 24, + 'mem': 48, + 'gpu': 2 + }, + { + 'vcpu': 12, + 'mem': 24, + 'gpu': 1 + }, + # Hi + { + 'vcpu': 96, + 'mem': 192, + 'gpu': 4 + }, + { + 'vcpu': 48, + 'mem': 96, + 'gpu': 2 + }, + { + 'vcpu': 24, + 'mem': 48, + 'gpu': 1 + }, +] + + +def cudo_gpu_to_skypilot_gpu(model): + if model in cudo_gpu_model: + return cudo_gpu_model[model] + else: + return model + + +def skypilot_gpu_to_cudo_gpu(model): + for key, value in cudo_gpu_model.items(): + if value == model: + return key + return model + + +def gpu_exists(model): + if model in cudo_gpu_model: + return True + return False diff --git a/sky/provision/cudo/cudo_wrapper.py b/sky/provision/cudo/cudo_wrapper.py index 691c69bda8c..eac39d9faed 100644 --- a/sky/provision/cudo/cudo_wrapper.py +++ b/sky/provision/cudo/cudo_wrapper.py @@ -4,29 +4,29 @@ from sky import sky_logging from sky.adaptors import cudo +import sky.provision.cudo.cudo_utils as utils logger = sky_logging.init_logger(__name__) def launch(name: str, data_center_id: str, ssh_key: str, machine_type: str, - memory_gib: int, vcpu_count: int, gpu_count: int, gpu_model: str, + memory_gib: int, vcpu_count: int, gpu_count: int, tags: Dict[str, str], disk_size: int): """Launches an instance with the given parameters.""" - disk = cudo.cudo.Disk(storage_class='STORAGE_CLASS_NETWORK', - size_gib=disk_size) - - request = cudo.cudo.CreateVMBody(ssh_key_source='SSH_KEY_SOURCE_NONE', - custom_ssh_keys=[ssh_key], - vm_id=name, - machine_type=machine_type, - data_center_id=data_center_id, - boot_disk_image_id='ubuntu-nvidia-docker', - memory_gib=memory_gib, - vcpus=vcpu_count, - gpus=gpu_count, - gpu_model=gpu_model, - boot_disk=disk, - metadata=tags) + + request = cudo.cudo.CreateVMBody( + ssh_key_source='SSH_KEY_SOURCE_NONE', + custom_ssh_keys=[ssh_key], + vm_id=name, + machine_type=machine_type, + data_center_id=data_center_id, + boot_disk_image_id='ubuntu-2204-nvidia-535-docker-v20240214', + memory_gib=memory_gib, + vcpus=vcpu_count, + gpus=gpu_count, + boot_disk=cudo.cudo.Disk(storage_class='STORAGE_CLASS_NETWORK', + size_gib=disk_size), + metadata=tags) try: api = cudo.cudo.cudo_api.virtual_machines() @@ -121,3 +121,24 @@ def list_instances(): return instances except cudo.cudo.rest.ApiException as e: raise e + + +def vm_available(to_start_count, gpu_count, gpu_model, data_center_id, mem, + cpus): + try: + gpu_model = utils.skypilot_gpu_to_cudo_gpu(gpu_model) + api = cudo.cudo.cudo_api.virtual_machines() + types = api.list_vm_machine_types(mem, + cpus, + gpu=gpu_count, + gpu_model=gpu_model, + data_center_id=data_center_id) + types_dict = types.to_dict() + hc = types_dict['host_configs'] + total_count = sum(item['count_vm_available'] for item in hc) + if total_count < to_start_count: + raise Exception( + 'Too many VMs requested, try another gpu type or region') + return total_count + except cudo.cudo.rest.ApiException as e: + raise e diff --git a/sky/provision/cudo/instance.py b/sky/provision/cudo/instance.py index 39d4bc6b3d1..71ada577e53 100644 --- a/sky/provision/cudo/instance.py +++ b/sky/provision/cudo/instance.py @@ -16,7 +16,6 @@ def _filter_instances(cluster_name_on_cloud: str, status_filters: Optional[List[str]]) -> Dict[str, Any]: - instances = cudo_wrapper.list_instances() possible_names = [ f'{cluster_name_on_cloud}-head', f'{cluster_name_on_cloud}-worker' @@ -77,10 +76,19 @@ def run_instances(region: str, cluster_name_on_cloud: str, created_instance_ids = [] public_key = config.node_config['AuthorizedKey'] - + instance_type = config.node_config['InstanceType'] + spec = cudo_machine_type.get_spec_from_instance(instance_type, region) + gpu_count = int(float(spec['gpu_count'])) + vcpu_count = int(spec['vcpu_count']) + memory_gib = int(spec['mem_gb']) + gpu_model = spec['gpu_model'] + try: + cudo_wrapper.vm_available(to_start_count, gpu_count, gpu_model, region, + memory_gib, vcpu_count) + except Exception as e: + logger.warning(f'run_instances: {e}') + raise for _ in range(to_start_count): - instance_type = config.node_config['InstanceType'] - spec = cudo_machine_type.get_spec_from_instance(instance_type, region) node_type = 'head' if head_instance_id is None else 'worker' try: @@ -89,10 +97,9 @@ def run_instances(region: str, cluster_name_on_cloud: str, ssh_key=public_key, data_center_id=region, machine_type=spec['machine_type'], - memory_gib=int(spec['mem_gb']), - vcpu_count=int(spec['vcpu_count']), - gpu_count=int(float(spec['gpu_count'])), - gpu_model=spec['gpu_model'], + memory_gib=memory_gib, + vcpu_count=vcpu_count, + gpu_count=gpu_count, tags={}, disk_size=config.node_config['DiskSize']) except Exception as e: # pylint: disable=broad-except From b03c617eea731b265ddaf5da1ab526a3fe970876 Mon Sep 17 00:00:00 2001 From: Romil Bhardwaj Date: Mon, 1 Jul 2024 14:16:07 -0700 Subject: [PATCH 09/23] [Storage] Add storage translation for newly created buckets (#3671) * Fix storage external validation to skip if running on controller * fix smoke tests * fix smoke tests * Add storage translation for mount mode storages * Add storage translation for mount mode storages * Add storage translation for mount mode storages * revert some changes * revert dashboard changes * Add force_delete * typo * Update sky/utils/controller_utils.py Co-authored-by: Zhanghao Wu * lint --------- Co-authored-by: Zhanghao Wu --- examples/managed_job_with_storage.yaml | 11 ++++++++++- sky/utils/controller_utils.py | 22 +++++++++++++++++++++ tests/test_smoke.py | 27 +++++++++++++++++++++++--- 3 files changed, 56 insertions(+), 4 deletions(-) diff --git a/examples/managed_job_with_storage.yaml b/examples/managed_job_with_storage.yaml index ecefccd8b3d..61244c16ba0 100644 --- a/examples/managed_job_with_storage.yaml +++ b/examples/managed_job_with_storage.yaml @@ -15,11 +15,17 @@ workdir: ./examples file_mounts: ~/bucket_workdir: - # Change this to the your own globally unique bucket name. + # Change this to your own globally unique bucket name. name: sky-workdir-zhwu source: ./examples persistent: false mode: COPY + + /output_path: + # Change this to your own globally unique bucket name. + name: sky-output-bucket + mode: MOUNT + /imagenet-image: source: s3://sky-imagenet-data @@ -55,3 +61,6 @@ run: | cat ~/tmpfile cat ~/a/b/c/tmpfile + + # Write to a file in the mounted bucket + echo "hello world!" > /output_path/output.txt diff --git a/sky/utils/controller_utils.py b/sky/utils/controller_utils.py index c1859d52663..ba65d4b664a 100644 --- a/sky/utils/controller_utils.py +++ b/sky/utils/controller_utils.py @@ -742,3 +742,25 @@ def maybe_translate_local_file_mounts_and_sync_up(task: 'task_lib.Task', store_prefix = store_type.store_prefix() storage_obj.source = f'{store_prefix}{storage_obj.name}' storage_obj.force_delete = True + + # Step 7: Convert all `MOUNT` mode storages which don't specify a source + # to specifying a source. If the source is specified with a local path, + # it was handled in step 6. + updated_mount_storages = {} + for storage_path, storage_obj in task.storage_mounts.items(): + if (storage_obj.mode == storage_lib.StorageMode.MOUNT and + not storage_obj.source): + # Construct source URL with first store type and storage name + # E.g., s3://my-storage-name + source = list( + storage_obj.stores.keys())[0].store_prefix() + storage_obj.name + new_storage = storage_lib.Storage.from_yaml_config({ + 'source': source, + 'persistent': storage_obj.persistent, + 'mode': storage_lib.StorageMode.MOUNT.value, + # We enable force delete to allow the controller to delete + # the object store in case persistent is set to False. + '_force_delete': True + }) + updated_mount_storages[storage_path] = new_storage + task.update_storage_mounts(updated_mount_storages) diff --git a/tests/test_smoke.py b/tests/test_smoke.py index e0c71add85d..d692169730e 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -2859,7 +2859,9 @@ def test_managed_jobs_storage(generic_cloud: str): name = _get_cluster_name() yaml_str = pathlib.Path( 'examples/managed_job_with_storage.yaml').read_text() - storage_name = f'sky-test-{int(time.time())}' + timestamp = int(time.time()) + storage_name = f'sky-test-{timestamp}' + output_storage_name = f'sky-test-output-{timestamp}' # Also perform region testing for bucket creation to validate if buckets are # created in the correct region and correctly mounted in managed jobs. @@ -2874,16 +2876,32 @@ def test_managed_jobs_storage(generic_cloud: str): region_cmd = TestStorageWithCredentials.cli_region_cmd( storage_lib.StoreType.S3, storage_name) region_validation_cmd = f'{region_cmd} | grep {region}' + s3_check_file_count = TestStorageWithCredentials.cli_count_name_in_bucket( + storage_lib.StoreType.S3, output_storage_name, 'output.txt') + output_check_cmd = f'{s3_check_file_count} | grep 1' elif generic_cloud == 'gcp': region = 'us-west2' region_flag = f' --region {region}' region_cmd = TestStorageWithCredentials.cli_region_cmd( storage_lib.StoreType.GCS, storage_name) region_validation_cmd = f'{region_cmd} | grep {region}' + gcs_check_file_count = TestStorageWithCredentials.cli_count_name_in_bucket( + storage_lib.StoreType.GCS, output_storage_name, 'output.txt') + output_check_cmd = f'{gcs_check_file_count} | grep 1' elif generic_cloud == 'kubernetes': + # With Kubernetes, we don't know which object storage provider is used. + # Check both S3 and GCS if bucket exists in either. + s3_check_file_count = TestStorageWithCredentials.cli_count_name_in_bucket( + storage_lib.StoreType.S3, output_storage_name, 'output.txt') + s3_output_check_cmd = f'{s3_check_file_count} | grep 1' + gcs_check_file_count = TestStorageWithCredentials.cli_count_name_in_bucket( + storage_lib.StoreType.GCS, output_storage_name, 'output.txt') + gcs_output_check_cmd = f'{gcs_check_file_count} | grep 1' + output_check_cmd = f'{s3_output_check_cmd} || {gcs_output_check_cmd}' use_spot = ' --no-use-spot' yaml_str = yaml_str.replace('sky-workdir-zhwu', storage_name) + yaml_str = yaml_str.replace('sky-output-bucket', output_storage_name) with tempfile.NamedTemporaryFile(suffix='.yaml', mode='w') as f: f.write(yaml_str) f.flush() @@ -2896,9 +2914,12 @@ def test_managed_jobs_storage(generic_cloud: str): region_validation_cmd, # Check if the bucket is created in the correct region 'sleep 60', # Wait the spot queue to be updated f'{_JOB_QUEUE_WAIT}| grep {name} | grep SUCCEEDED', - f'[ $(aws s3api list-buckets --query "Buckets[?contains(Name, \'{storage_name}\')].Name" --output text | wc -l) -eq 0 ]' + f'[ $(aws s3api list-buckets --query "Buckets[?contains(Name, \'{storage_name}\')].Name" --output text | wc -l) -eq 0 ]', + # Check if file was written to the mounted output bucket + output_check_cmd ], - _JOB_CANCEL_WAIT.format(job_name=name), + (_JOB_CANCEL_WAIT.format(job_name=name), + f'; sky storage delete {output_storage_name} || true'), # Increase timeout since sky jobs queue -r can be blocked by other spot tests. timeout=20 * 60, ) From d40081aa7d53441d17ddc595c7aec04a93e1c46b Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Mon, 1 Jul 2024 21:32:06 -0700 Subject: [PATCH 10/23] [Azure] Wait Azure resource group to be deleted instead of error out (#3712) * Use SkyPilot for status query * format * Avoid reconfig * Add todo * Add termination and stopping * add stop and termination into __init__ * get rid of azure special handling in backend * format * Fix filtering for autodown clusters * More detailed error message * typing * Add wait for resource group deleting * Fix logging * Add comment and better logging * format * change timeout method * address comments --- sky/skylet/providers/azure/config.py | 28 +++++++++++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/sky/skylet/providers/azure/config.py b/sky/skylet/providers/azure/config.py index 13ecd64a987..4c6322f00e5 100644 --- a/sky/skylet/providers/azure/config.py +++ b/sky/skylet/providers/azure/config.py @@ -18,6 +18,8 @@ UNIQUE_ID_LEN = 4 _WAIT_NSG_CREATION_NUM_TIMEOUT_SECONDS = 600 +_WAIT_FOR_RESOURCE_GROUP_DELETION_TIMEOUT_SECONDS = 480 # 8 minutes + logger = logging.getLogger(__name__) @@ -80,7 +82,31 @@ def _configure_resource_group(config): rg_create_or_update = get_azure_sdk_function( client=resource_client.resource_groups, function_name="create_or_update" ) - rg_create_or_update(resource_group_name=resource_group, parameters=params) + rg_creation_start = time.time() + retry = 0 + while ( + time.time() - rg_creation_start + < _WAIT_FOR_RESOURCE_GROUP_DELETION_TIMEOUT_SECONDS + ): + try: + rg_create_or_update(resource_group_name=resource_group, parameters=params) + break + except azure.exceptions().ResourceExistsError as e: + if "ResourceGroupBeingDeleted" in str(e): + if retry % 5 == 0: + # TODO(zhwu): This should be shown in terminal for better + # UX, which will be achieved after we move Azure to use + # SkyPilot provisioner. + logger.warning( + f"Azure resource group {resource_group} of a recent " + "terminated cluster {config['cluster_name']} is being " + "deleted. It can only be provisioned after it is fully" + "deleted. Waiting..." + ) + time.sleep(1) + retry += 1 + continue + raise # load the template file current_path = Path(__file__).parent From 47d3dc0067e92b3676268353dae67cbda247dfc5 Mon Sep 17 00:00:00 2001 From: Andrew Aikawa Date: Tue, 2 Jul 2024 11:09:32 -0700 Subject: [PATCH 11/23] map gke h100 megas to 'H100' (#3691) * map gke h100 megas to 'H100' * patch comment about H100 vs H100-mega * format --- sky/provision/kubernetes/utils.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/sky/provision/kubernetes/utils.py b/sky/provision/kubernetes/utils.py index fbf79130424..cfa3581fb02 100644 --- a/sky/provision/kubernetes/utils.py +++ b/sky/provision/kubernetes/utils.py @@ -193,8 +193,13 @@ def get_accelerator_from_label_value(cls, value: str) -> str: return value.replace('nvidia-tesla-', '').upper() elif value.startswith('nvidia-'): acc = value.replace('nvidia-', '').upper() - if acc == 'H100-80GB': - # H100 is named as H100-80GB in GKE. + if acc in ['H100-80GB', 'H100-MEGA-80GB']: + # H100 is named H100-80GB or H100-MEGA-80GB in GKE, + # where the latter has improved bandwidth. + # See a3-mega instances on GCP. + # TODO: we do not distinguish the two GPUs for simplicity, + # but we can evaluate whether we should distinguish + # them based on users' requests. return 'H100' return acc else: From ad5966dbe2cc39cfddb070e52aae43770c6907e2 Mon Sep 17 00:00:00 2001 From: Colin Campbell Date: Tue, 2 Jul 2024 16:19:20 -0400 Subject: [PATCH 12/23] Don't require create namespace permission in cluster launch flow (#3714) --- sky/provision/kubernetes/utils.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sky/provision/kubernetes/utils.py b/sky/provision/kubernetes/utils.py index cfa3581fb02..41b43b82c2c 100644 --- a/sky/provision/kubernetes/utils.py +++ b/sky/provision/kubernetes/utils.py @@ -1529,6 +1529,14 @@ def create_namespace(namespace: str) -> None: namespace: Name of the namespace to create """ kubernetes_client = kubernetes.kubernetes.client + try: + kubernetes.core_api().read_namespace(namespace) + except kubernetes.api_exception() as e: + if e.status != 404: + raise + else: + return + ns_metadata = dict(name=namespace, labels={'parent': 'skypilot'}) merge_custom_metadata(ns_metadata) namespace_obj = kubernetes_client.V1Namespace(metadata=ns_metadata) From f0f4de8d310032e467627c999918281a100bbbb3 Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Wed, 3 Jul 2024 13:31:47 -0700 Subject: [PATCH 13/23] [AWS] Fix opening ports (#3719) * Fix opening ports for AWS * fix comment * format --- sky/provision/aws/instance.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/sky/provision/aws/instance.py b/sky/provision/aws/instance.py index 25a9a770732..f3b727d7c21 100644 --- a/sky/provision/aws/instance.py +++ b/sky/provision/aws/instance.py @@ -726,7 +726,15 @@ def open_ports( range(existing_rule['FromPort'], existing_rule['ToPort'] + 1)) elif existing_rule['IpProtocol'] == '-1': # For AWS, IpProtocol = -1 means all traffic - existing_ports.add(-1) + for group_pairs in existing_rule['UserIdGroupPairs']: + if group_pairs['GroupId'] != sg.id: + # We skip the port opening when the rule allows access from + # other security groups, as that is likely added by a user + # manually and satisfy their requirement. + # The security group created by SkyPilot allows all traffic + # from the same security group, which should not be skipped. + existing_ports.add(-1) + break break ports_to_open = [] From f7cd5ad7e63c5077519212a96d80c58795935cba Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Thu, 4 Jul 2024 00:04:22 -0700 Subject: [PATCH 14/23] [Cudo] Allow opening ports for cudo (#3717) * Allow opening ports for cudo * fix logging * format * Avoid host controller for cudo * install cudoctl on controller * fix cudoctl installation * update cudo controller message --- sky/clouds/cudo.py | 4 ++++ sky/provision/cudo/__init__.py | 3 ++- sky/provision/cudo/instance.py | 15 ++++++++++++--- sky/utils/controller_utils.py | 13 +++++++------ 4 files changed, 25 insertions(+), 10 deletions(-) diff --git a/sky/clouds/cudo.py b/sky/clouds/cudo.py index 1a32bb0bd2c..3ad66306517 100644 --- a/sky/clouds/cudo.py +++ b/sky/clouds/cudo.py @@ -66,6 +66,10 @@ class Cudo(clouds.Cloud): clouds.CloudImplementationFeatures.DOCKER_IMAGE: ('Docker image is currently not supported on Cudo. You can try ' 'running docker command inside the `run` section in task.yaml.'), + clouds.CloudImplementationFeatures.HOST_CONTROLLERS: ( + 'Cudo Compute cannot host a controller as it does not ' + 'autostopping, which will leave the controller to run indefinitely.' + ), } _MAX_CLUSTER_NAME_LEN_LIMIT = 60 diff --git a/sky/provision/cudo/__init__.py b/sky/provision/cudo/__init__.py index bbdc96413a8..c4587bfdfa7 100644 --- a/sky/provision/cudo/__init__.py +++ b/sky/provision/cudo/__init__.py @@ -3,6 +3,7 @@ from sky.provision.cudo.config import bootstrap_instances from sky.provision.cudo.instance import cleanup_ports from sky.provision.cudo.instance import get_cluster_info +from sky.provision.cudo.instance import open_ports from sky.provision.cudo.instance import query_instances from sky.provision.cudo.instance import run_instances from sky.provision.cudo.instance import stop_instances @@ -11,4 +12,4 @@ __all__ = ('bootstrap_instances', 'run_instances', 'stop_instances', 'terminate_instances', 'wait_instances', 'get_cluster_info', - 'cleanup_ports', 'query_instances') + 'cleanup_ports', 'query_instances', 'open_ports') diff --git a/sky/provision/cudo/instance.py b/sky/provision/cudo/instance.py index 71ada577e53..5f7473a4d93 100644 --- a/sky/provision/cudo/instance.py +++ b/sky/provision/cudo/instance.py @@ -157,11 +157,10 @@ def terminate_instances( del provider_config instances = _filter_instances(cluster_name_on_cloud, None) for inst_id, inst in instances.items(): - logger.info(f'Terminating instance {inst_id}.' - f'{inst}') if worker_only and inst['name'].endswith('-head'): continue - logger.info(f'Removing {inst_id}: {inst}') + logger.debug(f'Terminating Cudo instance {inst_id}.' + f'{inst}') cudo_wrapper.remove(inst_id) @@ -220,6 +219,16 @@ def query_instances( return statuses +def open_ports( + cluster_name_on_cloud: str, + ports: List[str], + provider_config: Optional[Dict[str, Any]] = None, +) -> None: + del cluster_name_on_cloud, ports, provider_config + # Cudo has all ports open by default. Nothing to do here. + return + + def cleanup_ports( cluster_name_on_cloud: str, ports: List[str], diff --git a/sky/utils/controller_utils.py b/sky/utils/controller_utils.py index ba65d4b664a..5a44e318985 100644 --- a/sky/utils/controller_utils.py +++ b/sky/utils/controller_utils.py @@ -247,6 +247,13 @@ def _get_cloud_dependencies_installation_commands( '/bin/linux/amd64/kubectl" && ' 'sudo install -o root -g root -m 0755 ' 'kubectl /usr/local/bin/kubectl))') + elif isinstance(cloud, clouds.Cudo): + commands.append( + f'echo -en "\\r{prefix_str}Cudo{empty_str}" && ' + 'pip list | grep cudo-compute > /dev/null 2>&1 || ' + 'pip install "cudo-compute>=0.1.10" > /dev/null 2>&1 && ' + 'wget https://download.cudo.org/compute/cudoctl-0.3.2-amd64.deb -O ~/cudoctl.deb > /dev/null 2>&1 && ' # pylint: disable=line-too-long + 'sudo dpkg -i ~/cudoctl.deb > /dev/null 2>&1') if controller == Controllers.JOBS_CONTROLLER: if isinstance(cloud, clouds.IBM): commands.append( @@ -263,12 +270,6 @@ def _get_cloud_dependencies_installation_commands( f'echo -en "\\r{prefix_str}RunPod{empty_str}" && ' 'pip list | grep runpod > /dev/null 2>&1 || ' 'pip install "runpod>=1.5.1" > /dev/null 2>&1') - elif isinstance(cloud, clouds.Cudo): - # cudo doesn't support open port - commands.append( - f'echo -en "\\r{prefix_str}Cudo{empty_str}" && ' - 'pip list | grep cudo-compute > /dev/null 2>&1 || ' - 'pip install "cudo-compute>=0.1.8" > /dev/null 2>&1') if (cloudflare.NAME in storage_lib.get_cached_enabled_storage_clouds_or_refresh()): commands.append(f'echo -en "\\r{prefix_str}Cloudflare{empty_str}" && ' + From 92f55a4458634dd3ef20f6c71d3a55e269186fc0 Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Thu, 4 Jul 2024 00:49:59 -0700 Subject: [PATCH 15/23] [K8s] Wait until endpoint to be ready for `--endpoint` call (#3634) * Wait until endpoint to be ready for k8s * fix * Less debug output * ux * fix * address comments * Add rich status for endpoint fetching * Add rich status for waiting for the endpoint --- sky/core.py | 5 +++- sky/provision/__init__.py | 4 +++ sky/provision/kubernetes/network.py | 10 +++++++- sky/provision/kubernetes/network_utils.py | 31 +++++++++++++++++------ sky/utils/controller_utils.py | 2 +- 5 files changed, 41 insertions(+), 11 deletions(-) diff --git a/sky/core.py b/sky/core.py index b1006fe19ab..6b18fd2c190 100644 --- a/sky/core.py +++ b/sky/core.py @@ -19,6 +19,7 @@ from sky.skylet import job_lib from sky.usage import usage_lib from sky.utils import controller_utils +from sky.utils import rich_utils from sky.utils import subprocess_utils if typing.TYPE_CHECKING: @@ -126,7 +127,9 @@ def endpoints(cluster: str, RuntimeError: if the cluster has no ports to be exposed or no endpoints are exposed yet. """ - return backend_utils.get_endpoints(cluster=cluster, port=port) + with rich_utils.safe_status('[bold cyan]Fetching endpoints for cluster ' + f'{cluster}...[/]'): + return backend_utils.get_endpoints(cluster=cluster, port=port) @usage_lib.entrypoint diff --git a/sky/provision/__init__.py b/sky/provision/__init__.py index 8371fb8ad83..0fe4ab614ce 100644 --- a/sky/provision/__init__.py +++ b/sky/provision/__init__.py @@ -155,6 +155,10 @@ def query_ports( return the endpoint without querying the cloud provider. If head_ip is not provided, the cloud provider will be queried to get the endpoint info. + The underlying implementation is responsible for retries and timeout, e.g. + kubernetes will wait for the service that expose the ports to be ready + before returning the endpoint info. + Returns a dict with port as the key and a list of common.Endpoint. """ del provider_name, provider_config, cluster_name_on_cloud # unused diff --git a/sky/provision/kubernetes/network.py b/sky/provision/kubernetes/network.py index 875547e7677..e4b267e8ab3 100644 --- a/sky/provision/kubernetes/network.py +++ b/sky/provision/kubernetes/network.py @@ -1,6 +1,7 @@ """Kubernetes network provisioning.""" from typing import Any, Dict, List, Optional +from sky import sky_logging from sky.adaptors import kubernetes from sky.provision import common from sky.provision.kubernetes import network_utils @@ -8,6 +9,8 @@ from sky.utils import kubernetes_enums from sky.utils.resources_utils import port_ranges_to_set +logger = sky_logging.init_logger(__name__) + _PATH_PREFIX = '/skypilot/{namespace}/{cluster_name_on_cloud}/{port}' _LOADBALANCER_SERVICE_NAME = '{cluster_name_on_cloud}--skypilot-lb' @@ -218,12 +221,17 @@ def _query_ports_for_loadbalancer( ports: List[int], provider_config: Dict[str, Any], ) -> Dict[int, List[common.Endpoint]]: + logger.debug(f'Getting loadbalancer IP for cluster {cluster_name_on_cloud}') result: Dict[int, List[common.Endpoint]] = {} service_name = _LOADBALANCER_SERVICE_NAME.format( cluster_name_on_cloud=cluster_name_on_cloud) external_ip = network_utils.get_loadbalancer_ip( namespace=provider_config.get('namespace', 'default'), - service_name=service_name) + service_name=service_name, + # Timeout is set so that we can retry the query when the + # cluster is firstly created and the load balancer is not ready yet. + timeout=60, + ) if external_ip is None: return {} diff --git a/sky/provision/kubernetes/network_utils.py b/sky/provision/kubernetes/network_utils.py index c42ffee2f1c..844f84a04f5 100644 --- a/sky/provision/kubernetes/network_utils.py +++ b/sky/provision/kubernetes/network_utils.py @@ -1,5 +1,6 @@ """Kubernetes network provisioning utils.""" import os +import time from typing import Dict, List, Optional, Tuple, Union import jinja2 @@ -7,12 +8,15 @@ import sky from sky import exceptions +from sky import sky_logging from sky import skypilot_config from sky.adaptors import kubernetes from sky.provision.kubernetes import utils as kubernetes_utils from sky.utils import kubernetes_enums from sky.utils import ux_utils +logger = sky_logging.init_logger(__name__) + _INGRESS_TEMPLATE_NAME = 'kubernetes-ingress.yml.j2' _LOADBALANCER_TEMPLATE_NAME = 'kubernetes-loadbalancer.yml.j2' @@ -239,18 +243,29 @@ def get_ingress_external_ip_and_ports( return external_ip, None -def get_loadbalancer_ip(namespace: str, service_name: str) -> Optional[str]: +def get_loadbalancer_ip(namespace: str, + service_name: str, + timeout: int = 0) -> Optional[str]: """Returns the IP address of the load balancer.""" core_api = kubernetes.core_api() - service = core_api.read_namespaced_service( - service_name, namespace, _request_timeout=kubernetes.API_TIMEOUT) - if service.status.load_balancer.ingress is None: - return None + ip = None - ip = service.status.load_balancer.ingress[ - 0].ip or service.status.load_balancer.ingress[0].hostname - return ip if ip is not None else None + start_time = time.time() + retry_cnt = 0 + while ip is None and (retry_cnt == 0 or time.time() - start_time < timeout): + service = core_api.read_namespaced_service( + service_name, namespace, _request_timeout=kubernetes.API_TIMEOUT) + if service.status.load_balancer.ingress is not None: + ip = (service.status.load_balancer.ingress[0].ip or + service.status.load_balancer.ingress[0].hostname) + if ip is None: + retry_cnt += 1 + if retry_cnt % 5 == 0: + logger.debug('Waiting for load balancer IP to be assigned' + '...') + time.sleep(1) + return ip def get_pod_ip(namespace: str, pod_name: str) -> Optional[str]: diff --git a/sky/utils/controller_utils.py b/sky/utils/controller_utils.py index 5a44e318985..477ebe8d1ba 100644 --- a/sky/utils/controller_utils.py +++ b/sky/utils/controller_utils.py @@ -191,7 +191,7 @@ def _get_cloud_dependencies_installation_commands( prefix_str = 'Check & install cloud dependencies on controller: ' # This is to make sure the shorter checking message does not have junk # characters from the previous message. - empty_str = ' ' * 5 + empty_str = ' ' * 10 aws_dependencies_installation = ( 'pip list | grep boto3 > /dev/null 2>&1 || pip install ' 'botocore>=1.29.10 boto3>=1.26.1; ' From 4c6abac8a7d38f24ce43d2bff7273b656fc24468 Mon Sep 17 00:00:00 2001 From: Romil Bhardwaj Date: Thu, 4 Jul 2024 06:34:34 -0700 Subject: [PATCH 16/23] [Docs] Clean up cudo installation docs (#3724) * lint * lint * update docs --- docs/source/getting-started/installation.rst | 31 ++++++++++--------- .../kubernetes/kubernetes-troubleshooting.rst | 14 ++++++++- 2 files changed, 29 insertions(+), 16 deletions(-) diff --git a/docs/source/getting-started/installation.rst b/docs/source/getting-started/installation.rst index e5b318d4f87..d7770f079ec 100644 --- a/docs/source/getting-started/installation.rst +++ b/docs/source/getting-started/installation.rst @@ -311,25 +311,26 @@ Fluidstack Cudo Compute ~~~~~~~~~~~~~~~~~~ -`Cudo Compute `__ GPU cloud provides low cost GPUs powered with green energy. -1. Create a billing account by following `this guide `__. -2. Create a project ``__. -3. Create an API Key by following `this guide `__. -3. Download and install the `cudoctl `__ command line tool -3. Run :code:`cudoctl init`: +`Cudo Compute `__ provides low cost GPUs powered by green energy. -.. code-block:: shell +1. Create a `billing account `__. +2. Create a `project `__. +3. Create an `API Key `__. +4. Download and install the `cudoctl `__ command line tool +5. Run :code:`cudoctl init`: + + .. code-block:: shell - cudoctl init - ✔ api key: my-api-key - ✔ project: my-project - ✔ billing account: my-billing-account - ✔ context: default - config file saved ~/.config/cudo/cudo.yml + cudoctl init + ✔ api key: my-api-key + ✔ project: my-project + ✔ billing account: my-billing-account + ✔ context: default + config file saved ~/.config/cudo/cudo.yml - pip install "cudo-compute>=0.1.10" + pip install "cudo-compute>=0.1.10" -If you want to want to use skypilot with a different Cudo Compute account or project, just run :code:`cudoctl init`: again. +If you want to want to use SkyPilot with a different Cudo Compute account or project, run :code:`cudoctl init` again. diff --git a/docs/source/reference/kubernetes/kubernetes-troubleshooting.rst b/docs/source/reference/kubernetes/kubernetes-troubleshooting.rst index bb0befc602a..ee940422314 100644 --- a/docs/source/reference/kubernetes/kubernetes-troubleshooting.rst +++ b/docs/source/reference/kubernetes/kubernetes-troubleshooting.rst @@ -68,7 +68,19 @@ Run :code:`sky check` to verify that SkyPilot can access your cluster. If you see an error, ensure that your kubeconfig file at :code:`~/.kube/config` is correctly set up. -Step A3 - Can you launch a SkyPilot task? +Step A3 - Do your nodes have enough disk space? +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +If your nodes are out of disk space, pulling the SkyPilot images may fail with :code:`rpc error: code = Canceled desc = failed to pull and unpack image: context canceled`. +Make sure your nodes are not under disk pressure by checking :code:`Conditions` in :code:`kubectl describe nodes`, or by running: + +.. code-block:: bash + + $ kubectl get nodes -o jsonpath='{range .items[*]}{.metadata.name}{"\n"}{range .status.conditions[?(@.type=="DiskPressure")]}{.type}={.status}{"\n"}{end}{"\n"}{end}' + # Should not show DiskPressure=True for any node + + +Step A4 - Can you launch a SkyPilot task? ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Next, try running a simple hello world task to verify that SkyPilot can launch tasks on your cluster. From d6ce1bac7a53744b427e25ee2f40e463e5b81408 Mon Sep 17 00:00:00 2001 From: Tian Xia Date: Fri, 5 Jul 2024 00:33:21 +0800 Subject: [PATCH 17/23] [Catalog] Remove fractional A10 instance types in catalog (#3722) * fix * Update sky/clouds/service_catalog/data_fetchers/fetch_azure.py Co-authored-by: Zhanghao Wu * change todo name --------- Co-authored-by: Zhanghao Wu --- .../service_catalog/data_fetchers/fetch_azure.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/sky/clouds/service_catalog/data_fetchers/fetch_azure.py b/sky/clouds/service_catalog/data_fetchers/fetch_azure.py index cc5e4597748..9a7b2a90bee 100644 --- a/sky/clouds/service_catalog/data_fetchers/fetch_azure.py +++ b/sky/clouds/service_catalog/data_fetchers/fetch_azure.py @@ -93,6 +93,15 @@ def get_regions() -> List[str]: # We have to manually remove it. DEPRECATED_FAMILIES = ['standardNVSv2Family'] +# Some A10 instance types only contains a fractional of GPU. We temporarily +# filter them out here to avoid using it as a whole A10 GPU. +# TODO(zhwu,tian): support fractional GPUs, which can be done on +# kubernetes as well. +# Ref: https://learn.microsoft.com/en-us/azure/virtual-machines/nva10v5-series +FILTERED_A10_INSTANCE_TYPES = [ + f'Standard_NV{vcpu}ads_A10_v5' for vcpu in [6, 12, 18] +] + USEFUL_COLUMNS = [ 'InstanceType', 'AcceleratorName', 'AcceleratorCount', 'vCPUs', 'MemoryGiB', 'GpuInfo', 'Price', 'SpotPrice', 'Region', 'Generation' @@ -286,6 +295,10 @@ def get_additional_columns(row): after_drop_len = len(df_ret) print(f'Dropped {before_drop_len - after_drop_len} duplicated rows') + # Filter out instance types that only contain a fractional of GPU. + df_ret = df_ret.loc[~df_ret['InstanceType'].isin(FILTERED_A10_INSTANCE_TYPES + )] + # Filter out deprecated families df_ret = df_ret.loc[~df_ret['family'].isin(DEPRECATED_FAMILIES)] df_ret = df_ret[USEFUL_COLUMNS] From 8674520859c856d83d5796010b28cef2a3ede46e Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Thu, 4 Jul 2024 10:58:06 -0700 Subject: [PATCH 18/23] [Docs] Revamp docs for secrets and distributed jobs (#3715) * Add docs for secrets and env var * add * revamp docs * partially * Update docs/source/running-jobs/distributed-jobs.rst Co-authored-by: Zongheng Yang * Update docs/source/running-jobs/environment-variables.rst Co-authored-by: Zongheng Yang * Update docs/source/running-jobs/distributed-jobs.rst Co-authored-by: Zongheng Yang * revert multi-node instruction * Update docs/source/running-jobs/environment-variables.rst Co-authored-by: Zongheng Yang * Update docs/source/running-jobs/environment-variables.rst Co-authored-by: Zongheng Yang * Update docs/source/running-jobs/environment-variables.rst Co-authored-by: Zongheng Yang * Update docs/source/running-jobs/environment-variables.rst Co-authored-by: Zongheng Yang * Update docs/source/running-jobs/environment-variables.rst Co-authored-by: Zongheng Yang * Update docs/source/running-jobs/environment-variables.rst Co-authored-by: Zongheng Yang * Update docs/source/getting-started/quickstart.rst Co-authored-by: Zongheng Yang * address * Rephrase * Update docs/source/running-jobs/environment-variables.rst Co-authored-by: Zongheng Yang * fix * move * Update docs/source/running-jobs/environment-variables.rst Co-authored-by: Zongheng Yang --------- Co-authored-by: Zongheng Yang --- docs/source/docs/index.rst | 4 +- docs/source/getting-started/quickstart.rst | 4 +- docs/source/running-jobs/distributed-jobs.rst | 44 +++--- .../running-jobs/environment-variables.rst | 129 ++++++++++++------ docs/source/running-jobs/index.rst | 7 - 5 files changed, 109 insertions(+), 79 deletions(-) delete mode 100644 docs/source/running-jobs/index.rst diff --git a/docs/source/docs/index.rst b/docs/source/docs/index.rst index 47c98d7bef7..5a648dbcda4 100644 --- a/docs/source/docs/index.rst +++ b/docs/source/docs/index.rst @@ -126,7 +126,7 @@ Contents ../reference/job-queue ../examples/auto-failover ../reference/kubernetes/index - ../running-jobs/index + ../running-jobs/distributed-jobs .. toctree:: :maxdepth: 1 @@ -155,12 +155,14 @@ Contents :maxdepth: 1 :caption: User Guides + ../running-jobs/environment-variables ../examples/docker-containers ../examples/ports ../reference/tpu ../reference/logging ../reference/faq + .. toctree:: :maxdepth: 1 :caption: Developer Guides diff --git a/docs/source/getting-started/quickstart.rst b/docs/source/getting-started/quickstart.rst index bb281087736..bfc6fd17e05 100644 --- a/docs/source/getting-started/quickstart.rst +++ b/docs/source/getting-started/quickstart.rst @@ -72,6 +72,8 @@ To launch a cluster and run a task, use :code:`sky launch`: You can use the ``-c`` flag to give the cluster an easy-to-remember name. If not specified, a name is autogenerated. + If the cluster name is an existing cluster shown in ``sky status``, the cluster will be reused. + The ``sky launch`` command performs much heavy-lifting: - selects an appropriate cloud and VM based on the specified resource constraints; @@ -208,7 +210,7 @@ Managed spot jobs run on much cheaper spot instances, with automatic preemption .. code-block:: console - $ sky spot launch hello_sky.yaml + $ sky jobs launch --use-spot hello_sky.yaml Next steps ----------- diff --git a/docs/source/running-jobs/distributed-jobs.rst b/docs/source/running-jobs/distributed-jobs.rst index fb20b7ca988..9eb590c10bc 100644 --- a/docs/source/running-jobs/distributed-jobs.rst +++ b/docs/source/running-jobs/distributed-jobs.rst @@ -1,15 +1,15 @@ .. _dist-jobs: -Distributed Jobs on Many VMs +Distributed Jobs on Many Nodes ================================================ SkyPilot supports multi-node cluster -provisioning and distributed execution on many VMs. +provisioning and distributed execution on many nodes. For example, here is a simple PyTorch Distributed training example: .. code-block:: yaml - :emphasize-lines: 6-6,21-22,24-25 + :emphasize-lines: 6-6,21-21,23-26 name: resnet-distributed-app @@ -31,14 +31,13 @@ For example, here is a simple PyTorch Distributed training example: run: | cd pytorch-distributed-resnet - num_nodes=`echo "$SKYPILOT_NODE_IPS" | wc -l` - master_addr=`echo "$SKYPILOT_NODE_IPS" | head -n1` - python3 -m torch.distributed.launch \ - --nproc_per_node=${SKYPILOT_NUM_GPUS_PER_NODE} \ - --node_rank=${SKYPILOT_NODE_RANK} \ - --nnodes=$num_nodes \ - --master_addr=$master_addr \ - --master_port=8008 \ + MASTER_ADDR=`echo "$SKYPILOT_NODE_IPS" | head -n1` + torchrun \ + --nnodes=$SKPILOT_NUM_NODES \ + --master_addr=$MASTER_ADDR \ + --nproc_per_node=$SKYPILOT_NUM_GPUS_PER_NODE \ + --node_rank=$SKYPILOT_NODE_RANK \ + --master_port=12375 \ resnet_ddp.py --num_epochs 20 In the above, @@ -66,16 +65,11 @@ SkyPilot exposes these environment variables that can be accessed in a task's `` the node executing the task. - :code:`SKYPILOT_NODE_IPS`: a string of IP addresses of the nodes reserved to execute the task, where each line contains one IP address. - - - You can retrieve the number of nodes by :code:`echo "$SKYPILOT_NODE_IPS" | wc -l` - and the IP address of the third node by :code:`echo "$SKYPILOT_NODE_IPS" | sed -n - 3p`. - - - To manipulate these IP addresses, you can also store them to a file in the - :code:`run` command with :code:`echo $SKYPILOT_NODE_IPS >> ~/sky_node_ips`. +- :code:`SKYPILOT_NUM_NODES`: number of nodes reserved for the task, which can be specified by ``num_nodes: ``. Same value as :code:`echo "$SKYPILOT_NODE_IPS" | wc -l`. - :code:`SKYPILOT_NUM_GPUS_PER_NODE`: number of GPUs reserved on each node to execute the task; the same as the count in ``accelerators: :`` (rounded up if a fraction). +See :ref:`sky-env-vars` for more details. Launching a multi-node task (new cluster) ------------------------------------------------- @@ -106,7 +100,7 @@ The following happens in sequence: and step 4). Executing a task on the head node only ------------------------------------------ +-------------------------------------- To execute a task on the head node only (a common scenario for tools like ``mpirun``), use the ``SKYPILOT_NODE_RANK`` environment variable as follows: @@ -141,7 +135,7 @@ This allows you directly to SSH into the worker nodes, if required. Executing a Distributed Ray Program ------------------------------------ -To execute a distributed Ray program on many VMs, you can download the `training script `_ and launch the `task yaml `_: +To execute a distributed Ray program on many nodes, you can download the `training script `_ and launch the `task yaml `_: .. code-block:: console @@ -171,19 +165,17 @@ To execute a distributed Ray program on many VMs, you can download the `training run: | sudo chmod 777 -R /var/tmp - head_ip=`echo "$SKYPILOT_NODE_IPS" | head -n1` - num_nodes=`echo "$SKYPILOT_NODE_IPS" | wc -l` + HEAD_IP=`echo "$SKYPILOT_NODE_IPS" | head -n1` if [ "$SKYPILOT_NODE_RANK" == "0" ]; then ps aux | grep ray | grep 6379 &> /dev/null || ray start --head --disable-usage-stats --port 6379 sleep 5 - python train.py --num-workers $num_nodes + python train.py --num-workers $SKYPILOT_NUM_NODES else sleep 5 - ps aux | grep ray | grep 6379 &> /dev/null || ray start --address $head_ip:6379 --disable-usage-stats + ps aux | grep ray | grep 6379 &> /dev/null || ray start --address $HEAD_IP:6379 --disable-usage-stats fi .. warning:: - **Avoid Installing Ray in Base Environment**: Before proceeding with the execution of a distributed Ray program, it is crucial to ensure that Ray is **not** installed in the *base* environment. Installing a different version of Ray in the base environment can lead to abnormal cluster status. - It is highly recommended to **create a dedicated virtual environment** (as above) for Ray and its dependencies, and avoid calling `ray stop` as that will also cause issue with the cluster. + When using Ray, avoid calling ``ray stop`` as that will also cause the SkyPilot runtime to be stopped. diff --git a/docs/source/running-jobs/environment-variables.rst b/docs/source/running-jobs/environment-variables.rst index 7f91720f9b5..f7138af95fa 100644 --- a/docs/source/running-jobs/environment-variables.rst +++ b/docs/source/running-jobs/environment-variables.rst @@ -1,23 +1,38 @@ .. _env-vars: -Using Environment Variables +Secrets and Environment Variables ================================================ +Environment variables are a powerful way to pass configuration and secrets to your tasks. There are two types of environment variables in SkyPilot: + +- :ref:`User-specified environment variables `: Passed by users to tasks, useful for secrets and configurations. +- :ref:`SkyPilot environment variables `: Predefined by SkyPilot with information about the current cluster and task. + +.. _user-specified-env-vars: + User-specified environment variables ------------------------------------------------------------------ +User-specified environment variables are useful for passing secrets and any arguments or configurations needed for your tasks. They are made available in ``file_mounts``, ``setup``, and ``run``. + You can specify environment variables to be made available to a task in two ways: -- The ``envs`` field (dict) in a :ref:`task YAML ` -- The ``--env`` flag in the ``sky launch/exec`` :ref:`CLI ` (takes precedence over the above) +- ``envs`` field (dict) in a :ref:`task YAML `: + + .. code-block:: yaml + + envs: + MYVAR: val + +- ``--env`` flag in ``sky launch/exec`` :ref:`CLI ` (takes precedence over the above) .. tip:: - If an environment variable is required to be specified with `--env` during - ``sky launch/exec``, you can set it to ``null`` in task YAML to raise an - error when it is forgotten to be specified. For example, the ``WANDB_API_KEY`` - and ``HF_TOKEN`` in the following task YAML: + To mark an environment variable as required and make SkyPilot forcefully check + its existence (errors out if not specified), set it to an empty string or + ``null`` in the task YAML. For example, ``WANDB_API_KEY`` and ``HF_TOKEN`` in + the following task YAML are marked as required: .. code-block:: yaml @@ -28,6 +43,26 @@ You can specify environment variables to be made available to a task in two ways The ``file_mounts``, ``setup``, and ``run`` sections of a task YAML can access the variables via the ``${MYVAR}`` syntax. +.. _passing-secrets: + +Passing secrets +~~~~~~~~~~~~~~~ + +We recommend passing secrets to any node(s) executing your task by first making +it available in your current shell, then using ``--env SECRET`` to pass it to SkyPilot: + +.. code-block:: console + + $ sky launch -c mycluster --env HF_TOKEN --env WANDB_API_KEY task.yaml + $ sky exec mycluster --env WANDB_API_KEY task.yaml + +.. tip:: + + You do not need to pass the value directly such as ``--env + WANDB_API_KEY=1234``. When the value is not specified (e.g., ``--env WANDB_API_KEY``), + SkyPilot reads it from local environment variables. + + Using in ``file_mounts`` ~~~~~~~~~~~~~~~~~~~~~~~~ @@ -77,40 +112,29 @@ For example, this is useful for passing secrets (see below) or passing configura See complete examples at `llm/vllm/serve.yaml `_ and `llm/vicuna/train.yaml `_. -.. _passing-secrets: - -Passing secrets -~~~~~~~~~~~~~~~~~~~~~~~~ - -We recommend passing secrets to any node(s) executing your task by first making -it available in your current shell, then using ``--env`` to pass it to SkyPilot: - -.. code-block:: console - - $ sky launch -c mycluster --env WANDB_API_KEY task.yaml - $ sky exec mycluster --env WANDB_API_KEY task.yaml - -.. tip:: - - In other words, you do not need to pass the value directly such as ``--env - WANDB_API_KEY=1234``. +.. _sky-env-vars: +SkyPilot environment variables +------------------------------------------------------------------ +SkyPilot exports several predefined environment variables made available during a task's execution. These variables contain information about the current cluster or task, which can be useful for distributed frameworks such as +torch.distributed, OpenMPI, etc. See examples in :ref:`dist-jobs` and :ref:`managed-jobs`. +The values of these variables are filled in by SkyPilot at task execution time. +You can access these variables in the following ways: -SkyPilot environment variables ------------------------------------------------------------------- +* In the task YAML's ``setup``/``run`` commands (a Bash script), access them using the ``${MYVAR}`` syntax; +* In the program(s) launched in ``setup``/``run``, access them using the language's standard method (e.g., ``os.environ`` for Python). -SkyPilot exports these environment variables for a task's execution. ``setup`` -and ``run`` stages have different environment variables available. +The ``setup`` and ``run`` stages can access different sets of SkyPilot environment variables: Environment variables for ``setup`` ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. list-table:: - :widths: 20 60 10 + :widths: 20 40 10 :header-rows: 1 * - Name @@ -120,9 +144,15 @@ Environment variables for ``setup`` - Rank (an integer ID from 0 to :code:`num_nodes-1`) of the node being set up. - 0 * - ``SKYPILOT_SETUP_NODE_IPS`` - - A string of IP addresses of the nodes in the cluster with the same order as the node ranks, where each line contains one IP address. Note that this is not necessarily the same as the nodes in ``run`` stage, as the ``setup`` stage runs on all nodes of the cluster, while the ``run`` stage can run on a subset of nodes. - - 1.2.3.4 - 3.4.5.6 + - A string of IP addresses of the nodes in the cluster with the same order as the node ranks, where each line contains one IP address. + + Note that this is not necessarily the same as the nodes in ``run`` stage: the ``setup`` stage runs on all nodes of the cluster, while the ``run`` stage can run on a subset of nodes. + - + .. code-block:: text + + 1.2.3.4 + 3.4.5.6 + * - ``SKYPILOT_NUM_NODES`` - Number of nodes in the cluster. Same value as ``$(echo "$SKYPILOT_NODE_IPS" | wc -l)``. - 2 @@ -137,7 +167,15 @@ Environment variables for ``setup`` For managed spot jobs: sky-managed-2023-07-06-21-18-31-563597_my-job-name_1-0 * - ``SKYPILOT_CLUSTER_INFO`` - - A JSON string containing information about the cluster. To access the information, you could parse the JSON string in bash ``echo $SKYPILOT_CLUSTER_INFO | jq .cloud`` or in Python ``json.loads(os.environ['SKYPILOT_CLUSTER_INFO'])['cloud']``. + - A JSON string containing information about the cluster. To access the information, you could parse the JSON string in bash ``echo $SKYPILOT_CLUSTER_INFO | jq .cloud`` or in Python : + + .. code-block:: python + + import json + json.loads( + os.environ['SKYPILOT_CLUSTER_INFO'] + )['cloud'] + - {"cluster_name": "my-cluster-name", "cloud": "GCP", "region": "us-central1", "zone": "us-central1-a"} * - ``SKYPILOT_SERVE_REPLICA_ID`` - The ID of a replica within the service (starting from 1). Available only for a :ref:`service `'s replica task. @@ -151,7 +189,7 @@ Environment variables for ``run`` ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. list-table:: - :widths: 20 60 10 + :widths: 20 40 10 :header-rows: 1 * - Name @@ -162,7 +200,11 @@ Environment variables for ``run`` - 0 * - ``SKYPILOT_NODE_IPS`` - A string of IP addresses of the nodes reserved to execute the task, where each line contains one IP address. Read more :ref:`here `. - - 1.2.3.4 + - + .. code-block:: text + + 1.2.3.4 + * - ``SKYPILOT_NUM_NODES`` - Number of nodes assigned to execute the current task. Same value as ``$(echo "$SKYPILOT_NODE_IPS" | wc -l)``. Read more :ref:`here `. - 1 @@ -182,16 +224,15 @@ Environment variables for ``run`` For managed spot jobs: sky-managed-2023-07-06-21-18-31-563597_my-job-name_1-0 * - ``SKYPILOT_CLUSTER_INFO`` - - A JSON string containing information about the cluster. To access the information, you could parse the JSON string in bash ``echo $SKYPILOT_CLUSTER_INFO | jq .cloud`` or in Python ``json.loads(os.environ['SKYPILOT_CLUSTER_INFO'])['cloud']``. + - A JSON string containing information about the cluster. To access the information, you could parse the JSON string in bash ``echo $SKYPILOT_CLUSTER_INFO | jq .cloud`` or in Python : + + .. code-block:: python + + import json + json.loads( + os.environ['SKYPILOT_CLUSTER_INFO'] + )['cloud'] - {"cluster_name": "my-cluster-name", "cloud": "GCP", "region": "us-central1", "zone": "us-central1-a"} * - ``SKYPILOT_SERVE_REPLICA_ID`` - The ID of a replica within the service (starting from 1). Available only for a :ref:`service `'s replica task. - 1 - -The values of these variables are filled in by SkyPilot at task execution time. - -You can access these variables in the following ways: - -* In the task YAML's ``setup``/``run`` commands (a Bash script), access them using the ``${MYVAR}`` syntax; -* In the program(s) launched in ``setup``/``run``, access them using the - language's standard method (e.g., ``os.environ`` for Python). diff --git a/docs/source/running-jobs/index.rst b/docs/source/running-jobs/index.rst deleted file mode 100644 index 04c921d1022..00000000000 --- a/docs/source/running-jobs/index.rst +++ /dev/null @@ -1,7 +0,0 @@ -More User Guides -================================================ - -.. toctree:: - - distributed-jobs - environment-variables From 49af5c9b5915faa86923b5c9aedf7aaaf71ffa18 Mon Sep 17 00:00:00 2001 From: Romil Bhardwaj Date: Thu, 4 Jul 2024 11:03:40 -0700 Subject: [PATCH 19/23] [Docs] Add out of disk to k8s troubleshooting docs (#3721) * lint * lint * comments --- docs/source/reference/kubernetes/kubernetes-troubleshooting.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/reference/kubernetes/kubernetes-troubleshooting.rst b/docs/source/reference/kubernetes/kubernetes-troubleshooting.rst index ee940422314..258c3e9eb55 100644 --- a/docs/source/reference/kubernetes/kubernetes-troubleshooting.rst +++ b/docs/source/reference/kubernetes/kubernetes-troubleshooting.rst @@ -71,7 +71,7 @@ If you see an error, ensure that your kubeconfig file at :code:`~/.kube/config` Step A3 - Do your nodes have enough disk space? ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -If your nodes are out of disk space, pulling the SkyPilot images may fail with :code:`rpc error: code = Canceled desc = failed to pull and unpack image: context canceled`. +If your nodes are out of disk space, pulling the SkyPilot images may fail with :code:`rpc error: code = Canceled desc = failed to pull and unpack image: context canceled` error in the terminal during provisioning. Make sure your nodes are not under disk pressure by checking :code:`Conditions` in :code:`kubectl describe nodes`, or by running: .. code-block:: bash From c4457557c7d8de007bf459fbf6db2c6d6884056b Mon Sep 17 00:00:00 2001 From: Andrew Aikawa Date: Thu, 4 Jul 2024 15:44:20 -0700 Subject: [PATCH 20/23] add nccl test example (#3217) * add nccl test example * use pytorch nccl test instead * fix docstring * nit newline --- examples/nccl_test.yaml | 42 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100644 examples/nccl_test.yaml diff --git a/examples/nccl_test.yaml b/examples/nccl_test.yaml new file mode 100644 index 00000000000..046e72cc00f --- /dev/null +++ b/examples/nccl_test.yaml @@ -0,0 +1,42 @@ +# This measures NCCL all reduce performance with Torch. + +# Usage: +# $ sky launch -c nccl --use-spot nccl_test.yaml + +# Example output +# (head, rank=0, pid=17654) [nccl-ebd1-head-8x3wqw6d-compute:0]:1 +# (head, rank=0, pid=17654) [nccl-ebd1-head-8x3wqw6d-compute:0]:2 +# (head, rank=0, pid=17654) [nccl-ebd1-head-8x3wqw6d-compute:0]:3 +# (head, rank=0, pid=17654) [nccl-ebd1-head-8x3wqw6d-compute:0]:4 +# (head, rank=0, pid=17654) [nccl-ebd1-head-8x3wqw6d-compute:0]:5 +# (head, rank=0, pid=17654) [nccl-ebd1-head-8x3wqw6d-compute:0]:The average bandwidth of all_reduce with a 4.0GB payload (5 trials, 16 ranks): +# (head, rank=0, pid=17654) [nccl-ebd1-head-8x3wqw6d-compute:0]: algbw: 2.053 GBps (16.4 Gbps) +# (head, rank=0, pid=17654) [nccl-ebd1-head-8x3wqw6d-compute:0]: busbw: 3.850 GBps (30.8 Gbps) +# (head, rank=0, pid=17654) [nccl-ebd1-head-8x3wqw6d-compute:0]: + +name: torch-nccl-allreduce + +num_nodes: 2 + +resources: + accelerators: A100:8 + use_spot: True + +setup: | + pip install torch + git clone https://github.com/stas00/ml-engineering.git + +run: | + cd ml-engineering/network/benchmarks + NNODES=`echo "$SKYPILOT_NODE_IPS" | wc -l` + MASTER_ADDR=`echo "$SKYPILOT_NODE_IPS" | head -n1` + python -u -m torch.distributed.run \ + --nproc_per_node $SKYPILOT_NUM_GPUS_PER_NODE \ + --nnodes $NNODES \ + --rdzv_endpoint $MASTER_ADDR:8888 \ + --rdzv_backend c10d \ + --max_restarts 0 \ + --role `hostname -s`: \ + --tee 3 \ + all_reduce_bench.py + \ No newline at end of file From 05ce5e999a5c4218d267481ebddac7967dce1897 Mon Sep 17 00:00:00 2001 From: Ziming Mao Date: Thu, 4 Jul 2024 19:51:14 -0400 Subject: [PATCH 21/23] [Docs] Clarify spot policy docs (#3725) update spot doc --- docs/source/serving/spot-policy.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/serving/spot-policy.rst b/docs/source/serving/spot-policy.rst index 1c03dbe7ba4..ff23b328705 100644 --- a/docs/source/serving/spot-policy.rst +++ b/docs/source/serving/spot-policy.rst @@ -3,7 +3,7 @@ Using Spot Instances for Serving ================================ -SkyServe supports serving models on a mixture of spot and on-demand replicas with two options: :code:`base_ondemand_fallback_replicas` and :code:`dynamic_ondemand_fallback`. +SkyServe supports serving models on a mixture of spot and on-demand replicas with two options: :code:`base_ondemand_fallback_replicas` and :code:`dynamic_ondemand_fallback`. Currently, SkyServe relies on the user side to retry in the event of spot instance preemptions. Base on-demand Fallback From 994d35ae3bf0962e1d21e4db006c2ba1dddcaa0f Mon Sep 17 00:00:00 2001 From: Tian Xia Date: Sat, 6 Jul 2024 01:10:32 +0800 Subject: [PATCH 22/23] [Core] Fix A10 GPU on Azure (#3707) * init * works. todo: only do this for A10 VMs * only install for A10 instances * merge into one template * Update sky/skylet/providers/azure/node_provider.py Co-authored-by: Zhanghao Wu * add warning * apply suggestions from code review * Update sky/clouds/azure.py Co-authored-by: Zhanghao Wu --------- Co-authored-by: Zhanghao Wu --- sky/backends/cloud_vm_ray_backend.py | 8 ++++++ sky/clouds/azure.py | 20 +++++++++------ sky/skylet/providers/azure/node_provider.py | 27 +++++++++++++++++++++ sky/templates/azure-ray.yml.j2 | 2 ++ 4 files changed, 49 insertions(+), 8 deletions(-) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index 89f9dcdc695..6fe4211f102 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -2020,8 +2020,16 @@ def provision_with_retries( failover_history: List[Exception] = list() style = colorama.Style + fore = colorama.Fore # Retrying launchable resources. while True: + if (isinstance(to_provision.cloud, clouds.Azure) and + to_provision.accelerators is not None and + 'A10' in to_provision.accelerators): + logger.warning(f'{style.BRIGHT}{fore.YELLOW}Trying to launch ' + 'an A10 cluster on Azure. This may take ~20 ' + 'minutes due to driver installation.' + f'{style.RESET_ALL}') try: # Recheck cluster name as the 'except:' block below may # change the cloud assignment. diff --git a/sky/clouds/azure.py b/sky/clouds/azure.py index b75f9207856..916a1c01c7d 100644 --- a/sky/clouds/azure.py +++ b/sky/clouds/azure.py @@ -7,7 +7,7 @@ import subprocess import textwrap import typing -from typing import Dict, Iterator, List, Optional, Tuple +from typing import Any, Dict, Iterator, List, Optional, Tuple import colorama @@ -269,13 +269,12 @@ def get_vcpus_mem_from_instance_type( def get_zone_shell_cmd(cls) -> Optional[str]: return None - def make_deploy_resources_variables( - self, - resources: 'resources.Resources', - cluster_name_on_cloud: str, - region: 'clouds.Region', - zones: Optional[List['clouds.Zone']], - dryrun: bool = False) -> Dict[str, Optional[str]]: + def make_deploy_resources_variables(self, + resources: 'resources.Resources', + cluster_name_on_cloud: str, + region: 'clouds.Region', + zones: Optional[List['clouds.Zone']], + dryrun: bool = False) -> Dict[str, Any]: assert zones is None, ('Azure does not support zones', zones) region_name = region.name @@ -315,6 +314,10 @@ def make_deploy_resources_variables( 'image_version': version, } + # Setup the A10 nvidia driver. + need_nvidia_driver_extension = (acc_dict is not None and + 'A10' in acc_dict) + # Setup commands to eliminate the banner and restart sshd. # This script will modify /etc/ssh/sshd_config and add a bash script # into .bashrc. The bash script will restart sshd if it has not been @@ -367,6 +370,7 @@ def _failover_disk_tier() -> Optional[resources_utils.DiskTier]: # Azure does not support specific zones. 'zones': None, **image_config, + 'need_nvidia_driver_extension': need_nvidia_driver_extension, 'disk_tier': Azure._get_disk_type(_failover_disk_tier()), 'cloud_init_setup_commands': cloud_init_setup_commands, 'azure_subscription_id': self.get_project_id(dryrun), diff --git a/sky/skylet/providers/azure/node_provider.py b/sky/skylet/providers/azure/node_provider.py index b4a1c656688..5f87e57245e 100644 --- a/sky/skylet/providers/azure/node_provider.py +++ b/sky/skylet/providers/azure/node_provider.py @@ -303,6 +303,33 @@ def _create_node(self, node_config, tags, count): template_params["nsg"] = self.provider_config["nsg"] template_params["subnet"] = self.provider_config["subnet"] + if node_config.get("need_nvidia_driver_extension", False): + # Configure driver extension for A10 GPUs. A10 GPUs requires a + # special type of drivers which is available at Microsoft HPC + # extension. Reference: https://forums.developer.nvidia.com/t/ubuntu-22-04-installation-driver-error-nvidia-a10/285195/2 + for r in template["resources"]: + if r["type"] == "Microsoft.Compute/virtualMachines": + # Add a nested extension resource for A10 GPUs + r["resources"] = [ + { + "type": "extensions", + "apiVersion": "2015-06-15", + "location": "[variables('location')]", + "dependsOn": [ + "[concat('Microsoft.Compute/virtualMachines/', parameters('vmName'), copyIndex())]" + ], + "name": "NvidiaGpuDriverLinux", + "properties": { + "publisher": "Microsoft.HpcCompute", + "type": "NvidiaGpuDriverLinux", + "typeHandlerVersion": "1.9", + "autoUpgradeMinorVersion": True, + "settings": {}, + }, + }, + ] + break + parameters = { "properties": { "mode": DeploymentMode.incremental, diff --git a/sky/templates/azure-ray.yml.j2 b/sky/templates/azure-ray.yml.j2 index 66eac439453..e8c388e1879 100644 --- a/sky/templates/azure-ray.yml.j2 +++ b/sky/templates/azure-ray.yml.j2 @@ -80,6 +80,7 @@ available_node_types: # billingProfile: # maxPrice: -1 {%- endif %} + need_nvidia_driver_extension: {{need_nvidia_driver_extension}} # TODO: attach disk {% if num_nodes > 1 %} ray.worker.default: @@ -108,6 +109,7 @@ available_node_types: # billingProfile: # maxPrice: -1 {%- endif %} + need_nvidia_driver_extension: {{need_nvidia_driver_extension}} {%- endif %} head_node_type: ray.head.default From 6acaa75f39c433bb643b40cc5469dc17b39c076d Mon Sep 17 00:00:00 2001 From: Doyoung Kim <34902420+landscapepainter@users.noreply.github.com> Date: Sat, 6 Jul 2024 23:43:41 -0700 Subject: [PATCH 23/23] [Storage] Make s3 bucket creation log visible to the users (#3730) * refactor and make log visible * nit * Update sky/data/storage.py Co-authored-by: Tian Xia --------- Co-authored-by: Tian Xia --- sky/data/storage.py | 28 +++++++++++++--------------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/sky/data/storage.py b/sky/data/storage.py index e43406c3951..f909df45dd5 100644 --- a/sky/data/storage.py +++ b/sky/data/storage.py @@ -1368,24 +1368,22 @@ def _create_s3_bucket(self, """ s3_client = self.client try: - if region is None: - s3_client.create_bucket(Bucket=bucket_name) - else: - if region == 'us-east-1': - # If default us-east-1 region is used, the - # LocationConstraint must not be specified. - # https://stackoverflow.com/a/51912090 - s3_client.create_bucket(Bucket=bucket_name) - else: - location = {'LocationConstraint': region} - s3_client.create_bucket(Bucket=bucket_name, - CreateBucketConfiguration=location) - logger.info(f'Created S3 bucket {bucket_name} in {region}') + create_bucket_config: Dict[str, Any] = {'Bucket': bucket_name} + # If default us-east-1 region of create_bucket API is used, + # the LocationConstraint must not be specified. + # Reference: https://stackoverflow.com/a/51912090 + if region is not None and region != 'us-east-1': + create_bucket_config['CreateBucketConfiguration'] = { + 'LocationConstraint': region + } + s3_client.create_bucket(**create_bucket_config) + logger.info( + f'Created S3 bucket {bucket_name!r} in {region or "us-east-1"}') except aws.botocore_exceptions().ClientError as e: with ux_utils.print_exception_no_traceback(): raise exceptions.StorageBucketCreateError( - f'Attempted to create a bucket ' - f'{self.name} but failed.') from e + f'Attempted to create a bucket {self.name} but failed.' + ) from e return aws.resource('s3').Bucket(bucket_name) def _delete_s3_bucket(self, bucket_name: str) -> bool: