Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][dashboard] Add repr_name as part of actor state #33555

Merged
merged 7 commits into from
Mar 23, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
init
Signed-off-by: rickyyx <[email protected]>
  • Loading branch information
rickyyx committed Mar 21, 2023
commit 4e5a51f76f64b4a5c394e5e435e3137f6e2a455c
7 changes: 7 additions & 0 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,10 @@ cdef void execute_task(
print(actor_magic_token, end="")
print(actor_magic_token, file=sys.stderr, end="")

# Sets the actor repr name for the actor so other components
# like GCS has such info.
core_worker.set_actor_repr_name(repr(actor))
rickyyx marked this conversation as resolved.
Show resolved Hide resolved

if (returns[0].size() > 0 and
not inspect.isgenerator(outputs) and
len(outputs) != int(returns[0].size())):
Expand Down Expand Up @@ -1620,6 +1624,9 @@ cdef class CoreWorker:
def set_actor_title(self, title):
CCoreWorkerProcess.GetCoreWorker().SetActorTitle(title)

def set_actor_repr_name(self, repr_name):
CCoreWorkerProcess.GetCoreWorker().SetActorReprName(repr_name)

def get_plasma_event_handler(self):
return self.plasma_event_handler

Expand Down
2 changes: 2 additions & 0 deletions python/ray/experimental/state/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,8 @@ class ActorState(StateSchema):
is_detached: bool = state_column(filterable=False, detail=True)
#: The placement group id that's associated with this actor.
placement_group_id: str = state_column(detail=True, filterable=True)
#: Actor's repr name if a customized __repr__ method exists, else empty string.
repr_name: str = state_column(detail=True, filterable=True)


@dataclass(init=True)
Expand Down
1 change: 1 addition & 0 deletions python/ray/includes/libcoreworker.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
c_bool ShouldCaptureChildTasksInPlacementGroup()
const CActorID &GetActorId()
void SetActorTitle(const c_string &title)
void SetActorReprName(const c_string &repr_name)
void SetWebuiDisplay(const c_string &key, const c_string &message)
CTaskID GetCallerId()
const ResourceMappingType &GetResourceIDs() const
Expand Down
85 changes: 84 additions & 1 deletion python/ray/tests/test_state_api_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,13 @@
import pytest

from ray._private.profiling import chrome_tracing_dump
from ray.experimental.state.api import list_tasks, list_actors, list_workers, list_nodes
from ray.experimental.state.api import (
get_actor,
list_tasks,
list_actors,
list_workers,
list_nodes,
)
from ray._private.test_utils import wait_for_condition


Expand Down Expand Up @@ -169,6 +175,83 @@ def verify():
wait_for_condition(verify, timeout=10)


def test_actor_repr_name(shutdown_only):
def _verify_repr_name(id, name):
actor = get_actor(id=id)
assert actor is not None
assert actor["repr_name"] == name
return True

# Assert simple actor repr name
@ray.remote
class ReprActor:
def __init__(self, x) -> None:
self.x = x

def __repr__(self) -> str:
return self.x

def ready(self):
pass

a = ReprActor.remote(x="repr-name-a")
b = ReprActor.remote(x="repr-name-b")

wait_for_condition(_verify_repr_name, id=a._actor_id.hex(), name="repr-name-a")
wait_for_condition(_verify_repr_name, id=b._actor_id.hex(), name="repr-name-b")

# Assert when no __repr__ defined. repr_name should be empty
@ray.remote
class Actor:
pass

a = Actor.remote()
wait_for_condition(_verify_repr_name, id=a._actor_id.hex(), name="")

# Assert special actors (async actor, threaded actor, detached actor, named actor)
@ray.remote
class AsyncActor:
def __init__(self, x) -> None:
self.x = x

def __repr__(self) -> str:
return self.x

async def ready(self):
pass

a = AsyncActor.remote(x="async-x")
wait_for_condition(_verify_repr_name, id=a._actor_id.hex(), name="async-x")

a = ReprActor.options(max_concurrency=3).remote(x="x")
wait_for_condition(_verify_repr_name, id=a._actor_id.hex(), name="x")

a = ReprActor.options(name="named-actor").remote(x="repr-name")
wait_for_condition(_verify_repr_name, id=a._actor_id.hex(), name="repr-name")

a = ReprActor.options(name="detached-actor", lifetime="detached").remote(
x="repr-name"
)
wait_for_condition(_verify_repr_name, id=a._actor_id.hex(), name="repr-name")
ray.kill(a)

# Assert nested actor class.
class OutClass:
@ray.remote
class InnerActor:
def __init__(self, name) -> None:
self.name = name

def __repr__(self) -> str:
return self.name

def get_actor(self, name):
return OutClass.InnerActor.remote(name=name)

a = OutClass().get_actor(name="inner")
wait_for_condition(_verify_repr_name, id=a._actor_id.hex(), name="inner")


if __name__ == "__main__":
import sys

Expand Down
5 changes: 5 additions & 0 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3745,6 +3745,11 @@ void CoreWorker::SetActorTitle(const std::string &title) {
actor_title_ = title;
}

void CoreWorker::SetActorReprName(const std::string &repr_name) {
RAY_CHECK(direct_task_receiver_ != nullptr);
direct_task_receiver_->SetActorReprName(repr_name);
}

rpc::JobConfig CoreWorker::GetJobConfig() const {
return worker_context_.GetCurrentJobConfig();
}
Expand Down
10 changes: 10 additions & 0 deletions src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,16 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {

void SetActorTitle(const std::string &title);

/// Sets the actor's repr name.
///
/// This is set explicitly rather than included as part of actor creation task spec
/// because it's only available after running the creation task as it might depend on
/// fields to be be initialized during actor creation task. The repr name will be
/// included as part of actor creation task reply (PushTaskReply) to GCS.
///
/// \param repr_name Actor repr name.
void SetActorReprName(const std::string &repr_name);

void SetCallerCreationTimestamp();

/// Increase the reference count for this object ID.
Expand Down
11 changes: 10 additions & 1 deletion src/ray/core_worker/transport/direct_actor_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,13 @@ void CoreWorkerDirectTaskReceiver::HandleTask(
<< ", actor_id: " << task_spec.ActorCreationId()
<< ", status: " << status;
} else {
// Set the actor repr name if it's customized by the actor.
if (!actor_repr_name_.empty()) {
reply->set_actor_repr_name(actor_repr_name_);
}
RAY_LOG(INFO) << "Actor creation task finished, task_id: " << task_spec.TaskId()
<< ", actor_id: " << task_spec.ActorCreationId();
<< ", actor_id: " << task_spec.ActorCreationId()
<< ", actor_repr_name: " << actor_repr_name_;
}
}
}
Expand Down Expand Up @@ -311,5 +316,9 @@ void CoreWorkerDirectTaskReceiver::Stop() {
}
}

void CoreWorkerDirectTaskReceiver::SetActorReprName(const std::string &repr_name) {
actor_repr_name_ = repr_name;
}

} // namespace core
} // namespace ray
9 changes: 9 additions & 0 deletions src/ray/core_worker/transport/direct_actor_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ class CoreWorkerDirectTaskReceiver {

void Stop();

/// Set the actor repr name for an actor.
///
/// The actor repr name is only available after actor creation task has been run since
/// the repr name could include data only initialized during the creation task.
void SetActorReprName(const std::string &repr_name);

private:
/// Set up the configs for an actor.
/// This should be called once for the actor creation task.
Expand Down Expand Up @@ -135,6 +141,9 @@ class CoreWorkerDirectTaskReceiver {
/// Whether this actor executes tasks out of order with respect to client submission
/// order.
bool execute_out_of_order_ = false;
/// The repr name of the actor instance for an anonymous actor.
/// This is only available after the actor creation task.
std::string actor_repr_name_ = "";
};

} // namespace core
Expand Down
1 change: 1 addition & 0 deletions src/ray/gcs/gcs_server/gcs_actor_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1224,6 +1224,7 @@ void GcsActorManager::OnActorCreationSuccess(const std::shared_ptr<GcsActor> &ac
auto worker_id = actor->GetWorkerID();
auto node_id = actor->GetNodeID();
mutable_actor_table_data->set_node_id(node_id.Binary());
mutable_actor_table_data->set_repr_name(reply.actor_repr_name());
RAY_CHECK(!worker_id.IsNil());
RAY_CHECK(!node_id.IsNil());
RAY_CHECK(created_actors_[node_id].emplace(worker_id, actor_id).second);
Expand Down
4 changes: 4 additions & 0 deletions src/ray/protobuf/core_worker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ message PushTaskReply {
bool is_application_error = 6;
// Whether the task was cancelled before it started running (i.e. while queued).
bool was_cancelled_before_running = 7;
// If the task was an actor creation task, and the actor class has a customized
// repr defined for the anonymous actor (not a named actor), the repr name of the
// actor will be piggybacked to GCS to be included as part of ActorTableData.
optional string actor_repr_name = 8;
}

message DirectActorCallArgWaitCompleteRequest {
Expand Down
3 changes: 3 additions & 0 deletions src/ray/protobuf/gcs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ message ActorTableData {
optional bytes node_id = 29;
// Placement group ID if the actor requires a placement group.
optional bytes placement_group_id = 30;
// The repr name of the actor if specified with a customized repr method, e.g. __repr__
// Default to empty string if no customized repr is defined.
rickyyx marked this conversation as resolved.
Show resolved Hide resolved
string repr_name = 31;
}

message ErrorTableData {
Expand Down