Skip to content

Commit

Permalink
[autoscaler v2][1/n] introducing storage interface for instance provi…
Browse files Browse the repository at this point in the history
…der (#34976)

Why are these changes needed?
this is the stack of PRs to introduce new node_provider for autoscaler v2. For overall design please refer to #34009

Stack of PRs
#34976 <- this PR
#34977
#34979
#34983
#34985

introduce a versioned storage interface for node_provider in autoscaler v2.

Interface for a storage backend that stores the state of nodes in the cluster.
The storage is versioned, which means that each successful stage change to the
storage will bump the version number. The version number can be used to
implement optimistic concurrency control.
Each entry in the storage table is also versioned. The version number of an entry
is the last version number when the entry is updated.

The storage will be used to store autoscaler's scaling decisions (instance to be started), as well the instance state from the cloud provider.
  • Loading branch information
scv119 committed May 3, 2023
1 parent 9a36f01 commit 8f80516
Show file tree
Hide file tree
Showing 4 changed files with 283 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .buildkite/pipeline.build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,8 @@
--test_env=DOCKER_CERT_PATH=/certs/client
--test_env=DOCKER_TLS_CERTDIR=/certs
-- python/ray/tests/...
- bazel test --config=ci $(./ci/run/bazel_export_options)
-- python/ray/autoscaler/v2/...

- label: ":python: (Large)"
conditions: ["RAY_CI_PYTHON_AFFECTED"]
Expand Down
14 changes: 14 additions & 0 deletions python/ray/autoscaler/v2/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# --------------------------------------------------------------------
# Tests from the python/ray/autoscaler/v2/tests directory.
# Covers all tests starting with `test_`.
# Please keep these sorted alphabetically.
# --------------------------------------------------------------------
load("//bazel:python.bzl", "py_test_module_list")

py_test(
name = "test_storage",
size = "small",
srcs = ["tests/test_storage.py"],
tags = ["team:core"],
deps = ["//:ray_lib",],
)
180 changes: 180 additions & 0 deletions python/ray/autoscaler/v2/instance_manager/storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
import copy
from abc import ABCMeta, abstractmethod
from collections import defaultdict, namedtuple
from threading import Lock
from typing import Dict, List, Optional, Tuple

StoreStatus = namedtuple("StoreStatus", ["success", "version"])
VersionedValue = namedtuple("VersionedValue", ["value", "version"])


class Storage(metaclass=ABCMeta):
"""Interface for a storage backend that stores the state of nodes in the cluster.
The storage is thread-safe.
The storage is versioned, which means that each successful stage change to the
storage will bump the version number. The version number can be used to
implement optimistic concurrency control.
Each entry in the storage table is also versioned. The version number of an entry
is the last version number when the entry is updated.
"""

@abstractmethod
def batch_update(
self,
table: str,
mutation: Optional[Dict[str, str]] = None,
deletion: Optional[List[str]] = None,
expected_storage_version: Optional[int] = None,
) -> StoreStatus:
"""Batch update the storage table. This method is atomic.
Args:
table: The name of the table.
mutation: A dictionary of key-value pairs to be updated.
deletion: A list of keys to be deleted.
expected_storage_version: The expected storage version. The
update will fail if the version does not match the
current storage version.
Returns:
StoreStatus: A tuple of (success, version). If the update is
successful, returns (True, new_version).
Otherwise, returns (False, current_version).
"""
raise NotImplementedError("batch_update() has to be implemented")

@abstractmethod
def update(
self,
table: str,
key: str,
value: str,
expected_entry_version: Optional[int] = None,
insert_only: bool = False,
) -> StoreStatus:
"""Update a single entry in the storage table.
Args:
table: The name of the table.
key: The key of the entry.
value: The value of the entry.
expected_entry_version: The expected version of the entry.
The update will fail if the version does not match the current
version of the entry.
insert_only: If True, the update will
fail if the entry already exists.
Returns:
StoreStatus: A tuple of (success, version). If the update is
successful, returns (True, new_version). Otherwise,
returns (False, current_version).
"""
raise NotImplementedError("update() has to be implemented")

@abstractmethod
def get_all(self, table: str) -> Tuple[Dict[str, Tuple[str, int]], int]:
raise NotImplementedError("get_all() has to be implemented")

@abstractmethod
def get(
self, table: str, keys: List[str]
) -> Tuple[Dict[str, Tuple[str, int]], int]:
"""Get a list of entries from the storage table.
Args:
table: The name of the table.
keys: A list of keys to be retrieved. If the list is empty,
all entries in the table will be returned.
Returns:
Tuple[Dict[str, VersionedValue], int]: A tuple of
(entries, storage_version). The entries is a dictionary of
(key, (value, entry_version)) pairs. The entry_version is the
version of the entry when it was last updated. The
storage_version is the current storage version.
"""
raise NotImplementedError("get() has to be implemented")

@abstractmethod
def get_version(self) -> int:
"""Get the current storage version.
Returns:
int: The current storage version.
"""
raise NotImplementedError("get_version() has to be implemented")


class InMemoryStorage(Storage):
"""An in-memory implementation of the Storage interface. This implementation
is not durable"""

def __init__(self):
self._version = 0
self._tables = defaultdict(dict)
self._lock = Lock()

def batch_update(
self,
table: str,
mutation: Dict[str, str] = None,
deletion: List[str] = None,
expected_version: Optional[int] = None,
) -> StoreStatus:
mutation = mutation if mutation else {}
deletion = deletion if deletion else []
with self._lock:
if expected_version is not None and expected_version != self._version:
return StoreStatus(False, self._version)
self._version += 1
key_value_pairs_with_version = {
key: VersionedValue(value, self._version)
for key, value in mutation.items()
}
self._tables[table].update(key_value_pairs_with_version)
for deleted_key in deletion:
self._tables[table].pop(deleted_key, None)
return StoreStatus(True, self._version)

def update(
self,
table: str,
key: str,
value: str,
expected_entry_version: Optional[int] = None,
expected_storage_version: Optional[int] = None,
insert_only: bool = False,
) -> StoreStatus:
with self._lock:
if (
expected_storage_version is not None
and expected_storage_version != self._version
):
return StoreStatus(False, self._version)
if insert_only and key in self._tables[table]:
return StoreStatus(False, self._version)
_, version = self._tables[table].get(key, (None, -1))
if expected_entry_version is not None and expected_entry_version != version:
return StoreStatus(False, self._version)
self._version += 1
self._tables[table][key] = VersionedValue(value, self._version)
return StoreStatus(True, self._version)

def get_all(self, table: str) -> Tuple[Dict[str, VersionedValue], int]:
with self._lock:
return (copy.deepcopy(self._tables[table]), self._version)

def get(self, table: str, keys: List[str]) -> Tuple[Dict[str, VersionedValue], int]:
if not keys:
return self.get_all(table)
with self._lock:
result = {}
for key in keys:
if key in self._tables.get(table, {}):
result[key] = self._tables[table][key]
return StoreStatus(result, self._version)

def get_version(self) -> int:
return self._version
87 changes: 87 additions & 0 deletions python/ray/autoscaler/v2/tests/test_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# coding: utf-8
import os
import sys

import pytest # noqa

from ray.autoscaler.v2.instance_manager.storage import (
InMemoryStorage,
StoreStatus,
VersionedValue,
)


@pytest.mark.parametrize("storage", [InMemoryStorage()])
def test_storage(storage):
assert storage.get_version() == 0
assert storage.get_all(table="test_table") == ({}, 0)
assert storage.get(table="test_table", keys=[]) == ({}, 0)
assert storage.get(table="test_table", keys=["key1"]) == ({}, 0)

assert storage.batch_update(
table="test_table", mutation={"key1": "value1"}
) == StoreStatus(
True,
1,
)

assert storage.get_version() == 1

assert storage.get_all(table="test_table") == (
{"key1": VersionedValue("value1", 1)},
1,
)
assert storage.get(table="test_table", keys=[]) == (
{"key1": VersionedValue("value1", 1)},
1,
)

assert storage.batch_update(
table="test_table", mutation={"key1": "value2"}, expected_version=0
) == StoreStatus(False, 1)

assert storage.batch_update(
table="test_table", mutation={"key1": "value2"}, expected_version=1
) == StoreStatus(True, 2)

assert storage.get_all(table="test_table") == (
{"key1": VersionedValue("value2", 2)},
2,
)

assert storage.batch_update(
table="test_table",
mutation={"key2": "value3", "key3": "value4"},
deletion=["key1"],
expected_version=2,
) == StoreStatus(True, 3)

assert storage.get_all(table="test_table") == (
{"key2": VersionedValue("value3", 3), "key3": VersionedValue("value4", 3)},
3,
)

assert storage.get(table="test_table", keys=["key2", "key1"]) == (
{"key2": VersionedValue("value3", 3)},
3,
)

assert storage.update(
table="test_table", key="key2", value="value5"
) == StoreStatus(True, 4)
assert storage.update(
table="test_table", key="key2", value="value5", insert_only=True
) == StoreStatus(False, 4)
assert storage.update(
table="test_table", key="key2", value="value5", expected_entry_version=3
) == StoreStatus(False, 4)
assert storage.update(
table="test_table", key="key2", value="value6", expected_entry_version=4
) == StoreStatus(True, 5)


if __name__ == "__main__":
if os.environ.get("PARALLEL_CI"):
sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__]))
else:
sys.exit(pytest.main(["-sv", __file__]))

0 comments on commit 8f80516

Please sign in to comment.