Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
scv119 committed May 2, 2023
1 parent 48972e5 commit 1fd70b0
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 5 deletions.
9 changes: 7 additions & 2 deletions python/ray/autoscaler/v2/instance_manager/instance_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@ def upsert_instances(
if expected_storage_version and expected_storage_version != version:
return False, version

for instance in updates.values():
for instance in updates:
# the instance version is set to 0, it will be
# populated by the storage entry's verion on read
instance.version = 0
mutations[instance.instance_id] = instance.SerializeToString()

result, version = self._storage.batch_update(
Expand Down Expand Up @@ -112,7 +115,9 @@ def update_instance(
Returns:
Tuple[bool, int]: A tuple of (success, storage_version).
"""

# the instance version is set to 0, it will be
# populated by the storage entry's verion on read
instance.version = 0
result, version = self._storage.update(
self._table_name,
key=instance.instance_id,
Expand Down
41 changes: 38 additions & 3 deletions python/ray/autoscaler/v2/tests/test_instance_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,54 @@ def create_instance(instance_id, status=Instance.INSTANCE_STATUS_UNSPECIFIED):

def test_upsert_storage():
subscriber = DummySubscriber()
import pdb

pdb.set_trace()
storage = InstanceStorage(
cluster_id="test_cluster",
storage=InMemoryStorage(),
status_change_subscriber=subscriber,
)
instance1 = create_instance("instance1")
instance2 = create_instance("instance2")
instance3 = create_instance("instance3")

assert (True, 1) == storage.upsert_instances(
[create_instance("instance1"), create_instance("instance2")],
[instance1, instance2],
expected_storage_version=None,
)

instance1.version = 1
instance2.version = 1
entries, storage_version = storage.get_instances()

assert storage_version == 1
assert entries == {
"instance1": instance1,
"instance2": instance2,
}

assert (False, 1) == storage.upsert_instances(
[create_instance("instance1"), create_instance("instance2")],
expected_storage_version=0,
)

instance2.status = Instance.IDLE
assert (True, 2) == storage.upsert_instances(
[instance3, instance2],
expected_storage_version=1,
)

instance1.version = 1
instance2.version = 2
instance3.version = 2
entries, storage_version = storage.get_instances()

assert storage_version == 2
assert entries == {
"instance1": instance1,
"instance2": instance2,
"instance3": instance3,
}


if __name__ == "__main__":
if os.environ.get("PARALLEL_CI"):
Expand Down

0 comments on commit 1fd70b0

Please sign in to comment.