Skip to content

Commit

Permalink
[serve] Refactor RayServeWrappedReplica to not be dynamically gener…
Browse files Browse the repository at this point in the history
…ated (ray-project#42014)

* fix

Signed-off-by: Edward Oakes <[email protected]>

* oops

Signed-off-by: Edward Oakes <[email protected]>

---------

Signed-off-by: Edward Oakes <[email protected]>
  • Loading branch information
edoakes authored and vickytsang committed Jan 12, 2024
1 parent aad14a3 commit 53575b3
Show file tree
Hide file tree
Showing 3 changed files with 379 additions and 399 deletions.
4 changes: 4 additions & 0 deletions python/ray/serve/_private/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@
#: Max concurrency
ASYNC_CONCURRENCY = int(1e6)

# Concurrency group used for replica operations that cannot be blocked by user code
# (e.g., health checks and fetching queue length).
REPLICA_CONTROL_PLANE_CONCURRENCY_GROUP = "control_plane"

# How often to call the control loop on the controller.
CONTROL_LOOP_PERIOD_S = 0.1

Expand Down
18 changes: 12 additions & 6 deletions python/ray/serve/_private/deployment_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@
from ray.serve._private.autoscaling_policy import BasicAutoscalingPolicy
from ray.serve._private.common import TargetCapacityDirection
from ray.serve._private.config import DeploymentConfig, ReplicaConfig
from ray.serve._private.constants import REPLICA_CONTROL_PLANE_CONCURRENCY_GROUP
from ray.serve.generated.serve_pb2 import DeploymentInfo as DeploymentInfoProto
from ray.serve.generated.serve_pb2 import (
TargetCapacityDirection as TargetCapacityDirectionProto,
)

# Concurrency group used for operations that cannot be blocked by user code
# (e.g., health checks and fetching queue length).
CONTROL_PLANE_CONCURRENCY_GROUP = "control_plane"
REPLICA_DEFAULT_ACTOR_OPTIONS = {
"concurrency_groups": {CONTROL_PLANE_CONCURRENCY_GROUP: 1}
"concurrency_groups": {REPLICA_CONTROL_PLANE_CONCURRENCY_GROUP: 1}
}


Expand Down Expand Up @@ -101,14 +101,20 @@ def set_target_capacity(

@property
def actor_def(self):
# Delayed import as replica depends on this file.
from ray.serve._private.replica import create_replica_wrapper

if self._cached_actor_def is None:
assert self.actor_name is not None

# Break circular import :(.
from ray.serve._private.replica import RayServeWrappedReplica

# Dynamically create a new class with custom name here so Ray picks it up
# correctly in actor metadata table and observability stack.
self._cached_actor_def = ray.remote(**REPLICA_DEFAULT_ACTOR_OPTIONS)(
create_replica_wrapper(self.actor_name)
type(
self.actor_name,
(RayServeWrappedReplica,),
dict(RayServeWrappedReplica.__dict__),
)
)

return self._cached_actor_def
Expand Down
Loading

0 comments on commit 53575b3

Please sign in to comment.