Skip to content

Commit

Permalink
Merge branch 'master' into tf
Browse files Browse the repository at this point in the history
  • Loading branch information
c21 committed Feb 7, 2024
2 parents e96d8ba + d8b0fe9 commit d81563d
Show file tree
Hide file tree
Showing 58 changed files with 1,373 additions and 420 deletions.
6 changes: 4 additions & 2 deletions .buildkite/release-automation/config.yml
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
name: ray-release-automation
artifacts_bucket: ray-ci-artifact-branch-public
ci_temp: s3:https://ray-ci-artifact-branch-public/ci-temp/
artifacts_bucket: ray-ci-artifact-pr-public
ci_temp: s3:https://ray-ci-artifact-pr-public/ci-temp/
ci_work_repo: 029272617770.dkr.ecr.us-west-2.amazonaws.com/rayproject/citemp
forge_prefix: cr.ray.io/rayproject/
builder_queues:
builder: builder_queue_pr
builder-arm64: builder_queue_arm64_pr
runner_queues:
default: runner_queue_small_pr
medium-arm64: runner_queue_arm64_medium_pr
buildkite_dirs:
- .buildkite/release-automation
env:
Expand Down
23 changes: 14 additions & 9 deletions ci/ray_ci/automation/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,36 +22,41 @@ py_binary(
py_binary(
name = "filter_tests",
srcs = ["filter_tests.py"],
deps = ["//ci/ray_ci:ray_ci_lib"],
exec_compatible_with = ["//:hermetic_python"],
deps = [
ci_require("click"),
"//ci/ray_ci:ray_ci_lib",
"//release:ray_release",
],
)

py_library(
name = "upgrade_version_lib",
srcs = ["upgrade_version_lib.py"],
name = "update_version_lib",
srcs = ["update_version_lib.py"],
visibility = ["//ci/ray_ci/automation:__subpackages__"],
deps = [],
)

py_test(
name = "test_upgrade_version_lib",
name = "test_update_version_lib",
size = "small",
srcs = ["test_upgrade_version_lib.py"],
srcs = ["test_update_version_lib.py"],
tags = [
"ci_unit",
"team:ci",
],
deps = [
ci_require("pytest"),
":upgrade_version_lib",
":update_version_lib",
],
)

py_binary(
name = "upgrade_version",
srcs = ["upgrade_version.py"],
name = "update_version",
srcs = ["update_version.py"],
exec_compatible_with = ["//:hermetic_python"],
deps = [
ci_require("click"),
":upgrade_version_lib",
":update_version_lib",
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@

import pytest

from ci.ray_ci.automation.upgrade_version_lib import (
from ci.ray_ci.automation.update_version_lib import (
list_java_files,
get_current_version,
upgrade_file_version,
update_file_version,
)


Expand Down Expand Up @@ -40,7 +40,7 @@ def test_list_java_files():
)


@mock.patch("ci.ray_ci.automation.upgrade_version_lib.get_check_output")
@mock.patch("ci.ray_ci.automation.update_version_lib.get_check_output")
def test_get_current_version_from_master_branch_version(mock_check_output):
mock_check_output.return_value = (
"3.0.0.dev0 a123456dc1d2egd345a6789f1e23d45b678c90ed"
Expand All @@ -51,7 +51,7 @@ def test_get_current_version_from_master_branch_version(mock_check_output):
)


@mock.patch("ci.ray_ci.automation.upgrade_version_lib.get_check_output")
@mock.patch("ci.ray_ci.automation.update_version_lib.get_check_output")
def test_get_current_version_from_changed_version(mock_check_output):
mock_check_output.return_value = "2.2.0 a123456dc1d2egd345a6789f1e23d45b678c90ed"

Expand Down Expand Up @@ -98,7 +98,7 @@ def _make_tmp_directories(tmp_dir):
("2.3.2", "2.3.2", "2.3.3"),
],
)
def test_upgrade_file_version(main_version, java_version, new_version):
def test_update_file_version(main_version, java_version, new_version):
with tempfile.TemporaryDirectory() as tmp_dir:
_make_tmp_directories(tmp_dir)
non_java_file_paths = [
Expand Down Expand Up @@ -129,7 +129,7 @@ def test_upgrade_file_version(main_version, java_version, new_version):
os.path.join(tmp_dir, file_path), version=main_version, java=False
)

upgrade_file_version(
update_file_version(
main_version=main_version,
java_version=java_version,
new_version=new_version,
Expand All @@ -149,7 +149,7 @@ def test_upgrade_file_version(main_version, java_version, new_version):
assert f.read() == f"<version>{java_version}</version>"


def test_upgrade_file_version_fail_no_non_java_file():
def test_update_file_version_fail_no_non_java_file():
"""
Test for failure when there's no file to be found.
"""
Expand All @@ -172,15 +172,15 @@ def test_upgrade_file_version_fail_no_non_java_file():
os.path.join(tmp_dir, file_path), version=java_version, java=True
)
with pytest.raises(ValueError):
upgrade_file_version(
update_file_version(
main_version=main_version,
java_version=java_version,
new_version=new_version,
root_dir=tmp_dir,
)


def test_upgrade_file_version_fail_no_java_file():
def test_update_file_version_fail_no_java_file():
"""
Test for failure when there's no java file to be found.
"""
Expand All @@ -201,7 +201,7 @@ def test_upgrade_file_version_fail_no_java_file():
)

with pytest.raises(AssertionError):
upgrade_file_version(
update_file_version(
main_version=main_version,
java_version=java_version,
new_version=new_version,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import os

import click
from ci.ray_ci.automation.upgrade_version_lib import (

from ci.ray_ci.automation.update_version_lib import (
get_current_version,
upgrade_file_version,
update_file_version,
)
import os

bazel_workspace_dir = os.environ.get("BUILD_WORKSPACE_DIRECTORY", "")

Expand All @@ -16,7 +18,7 @@ def main(new_version: str):
"""
main_version, java_version = get_current_version(bazel_workspace_dir)

upgrade_file_version(
update_file_version(
main_version,
java_version,
new_version,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def get_current_version(root_dir: str):
return MASTER_BRANCH_VERSION, MASTER_BRANCH_JAVA_VERSION


def upgrade_file_version(
def update_file_version(
main_version: str,
java_version: str,
new_version: str,
Expand Down
4 changes: 2 additions & 2 deletions doc/source/tune/doc_code/pytorch_optuna.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
TEST_SIZE = 256


def train(model, optimizer, train_loader, device=None):
def train_epoch(model, optimizer, train_loader, device=None):
device = device or torch.device("cpu")
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
Expand Down Expand Up @@ -91,7 +91,7 @@ def objective(config): # <1>
)

while True:
train(model, optimizer, train_loader) # Train the model
train_epoch(model, optimizer, train_loader) # Train the model
acc = test(model, test_loader) # Compute test accuracy
train.report({"mean_accuracy": acc}) # Report to Tune

Expand Down
10 changes: 7 additions & 3 deletions python/ray/_private/state.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
import logging
from collections import defaultdict
from typing import Set
from typing import Dict

from google.protobuf.json_format import MessageToDict

Expand Down Expand Up @@ -807,8 +807,12 @@ def get_node_to_connect_for_driver(self, node_ip_address):
node_ip_address
)

def get_draining_nodes(self) -> Set[str]:
"""Get all the hex ids of nodes that are being drained."""
def get_draining_nodes(self) -> Dict[str, int]:
"""Get all the hex ids of nodes that are being drained
and the corresponding draining deadline timestamps in ms.
There is no deadline if the timestamp is 0.
"""
self._check_connected()
return self.global_state_accessor.get_draining_nodes()

Expand Down
8 changes: 5 additions & 3 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ cdef int check_status(const CRayStatus& status) nogil except -1:

if status.IsObjectStoreFull():
raise ObjectStoreFullError(message)
if status.IsInvalidArgument():
elif status.IsInvalidArgument():
raise ValueError(message)
elif status.IsOutOfDisk():
raise OutOfDiskError(message)
Expand Down Expand Up @@ -2869,7 +2869,8 @@ cdef class GcsClient:
self,
node_id: c_string,
reason: int32_t,
reason_message: c_string):
reason_message: c_string,
deadline_timestamp_ms: int64_t):
"""Send the DrainNode request to GCS.
This is only for testing.
Expand All @@ -2879,7 +2880,8 @@ cdef class GcsClient:
c_bool is_accepted = False
with nogil:
check_status(self.inner.get().DrainNode(
node_id, reason, reason_message, timeout_ms, is_accepted))
node_id, reason, reason_message,
deadline_timestamp_ms, timeout_ms, is_accepted))

return is_accepted

Expand Down
1 change: 1 addition & 0 deletions python/ray/air/tests/test_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ def launch_training():
node_id,
autoscaler_pb2.DrainNodeReason.Value("DRAIN_NODE_REASON_PREEMPTION"),
"preemption",
0,
)
assert is_accepted
print("Killing node...")
Expand Down
1 change: 1 addition & 0 deletions python/ray/includes/common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,7 @@ cdef extern from "ray/gcs/gcs_client/gcs_client.h" nogil:
const c_string &node_id,
int32_t reason,
const c_string &reason_message,
int64_t deadline_timestamp_ms,
int64_t timeout_ms,
c_bool &is_accepted)
CRayStatus DrainNodes(
Expand Down
9 changes: 7 additions & 2 deletions python/ray/includes/global_state_accessor.pxd
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
from libcpp.string cimport string as c_string
from libcpp cimport bool as c_bool
from libcpp.vector cimport vector as c_vector
from libcpp.unordered_map cimport unordered_map
from libcpp.memory cimport unique_ptr
from libc.stdint cimport int32_t as c_int32_t, uint32_t as c_uint32_t
from libc.stdint cimport (
int32_t as c_int32_t,
uint32_t as c_uint32_t,
int64_t as c_int64_t,
)
from ray.includes.unique_ids cimport (
CActorID,
CJobID,
Expand All @@ -28,7 +33,7 @@ cdef extern from "ray/gcs/gcs_client/global_state_accessor.h" nogil:
CJobID GetNextJobID()
c_vector[c_string] GetAllNodeInfo()
c_vector[c_string] GetAllAvailableResources()
c_vector[CNodeID] GetDrainingNodes()
unordered_map[CNodeID, c_int64_t] GetDrainingNodes()
c_vector[c_string] GetAllTaskEvents()
unique_ptr[c_string] GetObjectInfo(const CObjectID &object_id)
unique_ptr[c_string] GetAllResourceUsage()
Expand Down
16 changes: 12 additions & 4 deletions python/ray/includes/global_state_accessor.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,20 @@ cdef class GlobalStateAccessor:
return results

def get_draining_nodes(self):
cdef c_vector[CNodeID] draining_nodes
cdef:
unordered_map[CNodeID, int64_t] draining_nodes
unordered_map[CNodeID, int64_t].iterator draining_nodes_it

with nogil:
draining_nodes = self.inner.get().GetDrainingNodes()
results = set()
for draining_node in draining_nodes:
results.add(ray._private.utils.binary_to_hex(draining_node.Binary()))
draining_nodes_it = draining_nodes.begin()
results = {}
while draining_nodes_it != draining_nodes.end():
draining_node_id = dereference(draining_nodes_it).first
results[ray._private.utils.binary_to_hex(
draining_node_id.Binary())] = dereference(draining_nodes_it).second
postincrement(draining_nodes_it)

return results

def get_all_available_resources(self):
Expand Down
6 changes: 6 additions & 0 deletions python/ray/serve/_private/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -713,3 +713,9 @@ class TargetCapacityDirection(str, Enum):

UP = "UP"
DOWN = "DOWN"


@dataclass(frozen=True)
class ReplicaQueueLengthInfo:
accepted: bool
num_ongoing_requests: int
18 changes: 18 additions & 0 deletions python/ray/serve/_private/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,24 @@
os.environ.get("RAY_SERVE_MAX_QUEUE_LENGTH_RESPONSE_DEADLINE_S", 1.0)
)

# Feature flag for caching queue lengths for faster routing in each handle.
RAY_SERVE_ENABLE_QUEUE_LENGTH_CACHE = (
os.environ.get("RAY_SERVE_ENABLE_QUEUE_LENGTH_CACHE", "0") == "1"
)

# Feature flag for strictly enforcing max_concurrent_queries (replicas will reject
# requests).
RAY_SERVE_ENABLE_STRICT_MAX_CONCURRENT_QUERIES = (
os.environ.get("RAY_SERVE_ENABLE_STRICT_MAX_CONCURRENT_QUERIES", "0") == "1"
# Strict enforcement path must be enabled for the queue length cache.
or RAY_SERVE_ENABLE_QUEUE_LENGTH_CACHE
)

# Length of time to respect entries in the queue length cache when scheduling requests.
RAY_SERVE_QUEUE_LENGTH_CACHE_TIMEOUT_S = float(
os.environ.get("RAY_SERVE_QUEUE_LENGTH_CACHE_TIMEOUT_S", 10.0)
)

# The default autoscaling policy to use if none is specified.
DEFAULT_AUTOSCALING_POLICY = "ray.serve.autoscaling_policy:default_autoscaling_policy"

Expand Down
Loading

0 comments on commit d81563d

Please sign in to comment.