Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][dashboard] Add repr_name as part of actor state #33555

Merged
merged 7 commits into from
Mar 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions dashboard/client/src/components/ActorTable.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,25 @@ const ActorTable = ({
</Typography>
),
},
{
label: "Repr",
helpInfo: (
<Typography>
The repr name of the actor instance defined by __repr__. For example,
this actor will have repr "Actor1"
<br />
<br />
@ray.remote
<br />
class Actor:
<br />
&emsp;def __repr__(self):
<br />
&emsp;&emsp;return "Actor1"
<br />
</Typography>
),
},
{
label: "State",
helpInfo: (
Expand Down Expand Up @@ -306,6 +325,7 @@ const ActorTable = ({
({
actorId,
actorClass,
reprName,
jobId,
placementGroupId,
pid,
Expand Down Expand Up @@ -359,6 +379,9 @@ const ActorTable = ({
</TableCell>
<TableCell align="center">{actorClass}</TableCell>
<TableCell align="center">{name ? name : "-"}</TableCell>
<TableCell align="center">
{reprName ? reprName : "-"}
</TableCell>
<TableCell align="center">
<StatusChip type="actor" status={state} />
</TableCell>
Expand Down
8 changes: 8 additions & 0 deletions dashboard/client/src/pages/actor/ActorDetail.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,14 @@ const ActorDetailPage = () => {
}
: { value: "-" },
},
{
label: "Repr",
content: actorDetail.reprName
? {
value: actorDetail.reprName,
}
: { value: "-" },
},
{
label: "Job ID",
content: actorDetail.jobId
Expand Down
1 change: 1 addition & 0 deletions dashboard/client/src/type/actor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export type Actor = {
[key: string]: number;
};
exitDetail: string;
reprName: string;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this supposed to be optional? Or does it always exist?

Copy link
Contributor Author

@rickyyx rickyyx Mar 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will always exist, as an empty string if no repr is defined on the actor.

};

export type ActorDetail = {
Expand Down
2 changes: 2 additions & 0 deletions dashboard/modules/actor/actor_head.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def actor_table_data_to_dict(message):
"className",
"startTime",
"endTime",
"reprName",
}
light_message = {k: v for (k, v) in orig_message.items() if k in fields}
light_message["actorClass"] = orig_message["className"]
Expand Down Expand Up @@ -141,6 +142,7 @@ async def _update_actors(self):
"exitDetail",
"startTime",
"endTime",
"reprName",
)

def process_actor_data_from_pubsub(actor_id, actor_table_data):
Expand Down
4 changes: 4 additions & 0 deletions dashboard/modules/actor/tests/test_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ def get_pid(self):

return os.getpid()

def __repr__(self) -> str:
return "Foo1"

@ray.remote(num_cpus=0, resources={"infeasible_actor": 1})
class InfeasibleActor:
pass
Expand Down Expand Up @@ -76,6 +79,7 @@ class InfeasibleActor:
assert actor_response["requiredResources"] == {}
assert actor_response["endTime"] == 0
assert actor_response["exitDetail"] == "-"
assert actor_response["reprName"] == "Foo1"
for a in actors.values():
# "exitDetail always exits from the response"
assert "exitDetail" in a
Expand Down
13 changes: 12 additions & 1 deletion dashboard/state_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -709,10 +709,21 @@ async def summarize_tasks(self, option: SummaryApiOptions) -> SummaryApiResponse
detail=summary_by == "lineage",
)
)

if summary_by == "func_name":
summary_results = TaskSummaries.to_summary_by_func_name(tasks=result.result)
else:
summary_results = TaskSummaries.to_summary_by_lineage(tasks=result.result)
# We will need the actors info for actor tasks.
actors = await self.list_actors(
option=ListApiOptions(
timeout=option.timeout,
limit=RAY_MAX_LIMIT_FROM_API_SERVER,
detail=True,
)
)
summary_results = TaskSummaries.to_summary_by_lineage(
tasks=result.result, actors=actors.result
)
summary = StateSummary(node_id_to_summary={"cluster": summary_results})
warnings = result.warnings
if (
Expand Down
10 changes: 9 additions & 1 deletion python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -922,12 +922,17 @@ cdef void execute_task(
if (hasattr(actor_class, "__ray_actor_class__") and
(actor_class.__ray_actor_class__.__repr__
!= object.__repr__)):
actor_repr = repr(actor)
actor_magic_token = "{}{}\n".format(
ray_constants.LOG_PREFIX_ACTOR_NAME, repr(actor))
ray_constants.LOG_PREFIX_ACTOR_NAME, actor_repr)
# Flush on both stdout and stderr.
print(actor_magic_token, end="")
print(actor_magic_token, file=sys.stderr, end="")

# Sets the actor repr name for the actor so other components
# like GCS has such info.
core_worker.set_actor_repr_name(actor_repr)

if (returns[0].size() > 0 and
not inspect.isgenerator(outputs) and
len(outputs) != int(returns[0].size())):
Expand Down Expand Up @@ -1620,6 +1625,9 @@ cdef class CoreWorker:
def set_actor_title(self, title):
CCoreWorkerProcess.GetCoreWorker().SetActorTitle(title)

def set_actor_repr_name(self, repr_name):
CCoreWorkerProcess.GetCoreWorker().SetActorReprName(repr_name)

def get_plasma_event_handler(self):
return self.plasma_event_handler

Expand Down
26 changes: 23 additions & 3 deletions python/ray/experimental/state/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,8 @@ class ActorState(StateSchema):
is_detached: bool = state_column(filterable=False, detail=True)
#: The placement group id that's associated with this actor.
placement_group_id: str = state_column(detail=True, filterable=True)
#: Actor's repr name if a customized __repr__ method exists, else empty string.
repr_name: str = state_column(detail=True, filterable=True)


@dataclass(init=True)
Expand Down Expand Up @@ -835,7 +837,9 @@ def to_summary_by_func_name(cls, *, tasks: List[Dict]) -> "TaskSummaries":
)

@classmethod
def to_summary_by_lineage(cls, *, tasks: List[Dict]) -> "TaskSummaries":
def to_summary_by_lineage(
cls, *, tasks: List[Dict], actors: List[Dict]
) -> "TaskSummaries":
"""
This summarizes tasks by lineage.
i.e. A task will be grouped with another task if they have the
Expand Down Expand Up @@ -875,6 +879,8 @@ def to_summary_by_lineage(cls, *, tasks: List[Dict]) -> "TaskSummaries":
if type_enum == TaskType.ACTOR_CREATION_TASK:
actor_creation_task_id_for_actor_id[task["actor_id"]] = task["task_id"]

actor_dict = {actor["actor_id"]: actor for actor in actors}

def get_or_create_task_group(task_id: str) -> Optional[NestedTaskSummary]:
"""
Gets an already created task_group
Expand Down Expand Up @@ -946,6 +952,7 @@ def get_or_create_actor_task_group(
Returns None if there is missing data about the actor or one of its parents.
"""
key = f"actor:{actor_id}"
actor = actor_dict.get(actor_id)
if key not in task_group_by_id:
creation_task_id = actor_creation_task_id_for_actor_id.get(actor_id)
creation_task = tasks_by_id.get(creation_task_id)
Expand All @@ -956,8 +963,21 @@ def get_or_create_actor_task_group(
# tree at that node.
return None

# TODO(aguo): Get actor name from actors state-api.
[actor_name, *rest] = creation_task["func_or_class_name"].split(".")
# TODO(rickyx)
# We are using repr name for grouping actors if exists,
# else use class name. We should be using some group_name in the future.
if actor is None:
logger.debug(
f"We are missing actor info for actor {actor_id}, "
f"even though creation task exists: {creation_task}"
)
[actor_name, *rest] = creation_task["func_or_class_name"].split(".")
else:
actor_name = (
actor["repr_name"]
rickyyx marked this conversation as resolved.
Show resolved Hide resolved
if actor["repr_name"]
else actor["class_name"]
)

task_group_by_id[key] = NestedTaskSummary(
name=actor_name,
Expand Down
1 change: 1 addition & 0 deletions python/ray/includes/libcoreworker.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
c_bool ShouldCaptureChildTasksInPlacementGroup()
const CActorID &GetActorId()
void SetActorTitle(const c_string &title)
void SetActorReprName(const c_string &repr_name)
void SetWebuiDisplay(const c_string &key, const c_string &message)
CTaskID GetCallerId()
const ResourceMappingType &GetResourceIDs() const
Expand Down
85 changes: 84 additions & 1 deletion python/ray/tests/test_state_api_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,13 @@
import pytest

from ray._private.profiling import chrome_tracing_dump
from ray.experimental.state.api import list_tasks, list_actors, list_workers, list_nodes
from ray.experimental.state.api import (
get_actor,
list_tasks,
list_actors,
list_workers,
list_nodes,
)
from ray._private.test_utils import wait_for_condition


Expand Down Expand Up @@ -169,6 +175,83 @@ def verify():
wait_for_condition(verify, timeout=10)


def test_actor_repr_name(shutdown_only):
def _verify_repr_name(id, name):
actor = get_actor(id=id)
assert actor is not None
assert actor["repr_name"] == name
return True

# Assert simple actor repr name
@ray.remote
class ReprActor:
def __init__(self, x) -> None:
self.x = x

def __repr__(self) -> str:
return self.x

def ready(self):
pass

a = ReprActor.remote(x="repr-name-a")
b = ReprActor.remote(x="repr-name-b")

wait_for_condition(_verify_repr_name, id=a._actor_id.hex(), name="repr-name-a")
wait_for_condition(_verify_repr_name, id=b._actor_id.hex(), name="repr-name-b")

# Assert when no __repr__ defined. repr_name should be empty
@ray.remote
class Actor:
pass

a = Actor.remote()
wait_for_condition(_verify_repr_name, id=a._actor_id.hex(), name="")

# Assert special actors (async actor, threaded actor, detached actor, named actor)
@ray.remote
class AsyncActor:
def __init__(self, x) -> None:
self.x = x

def __repr__(self) -> str:
return self.x

async def ready(self):
pass

a = AsyncActor.remote(x="async-x")
wait_for_condition(_verify_repr_name, id=a._actor_id.hex(), name="async-x")

a = ReprActor.options(max_concurrency=3).remote(x="x")
wait_for_condition(_verify_repr_name, id=a._actor_id.hex(), name="x")

a = ReprActor.options(name="named-actor").remote(x="repr-name")
wait_for_condition(_verify_repr_name, id=a._actor_id.hex(), name="repr-name")

a = ReprActor.options(name="detached-actor", lifetime="detached").remote(
x="repr-name"
)
wait_for_condition(_verify_repr_name, id=a._actor_id.hex(), name="repr-name")
ray.kill(a)

# Assert nested actor class.
class OutClass:
@ray.remote
class InnerActor:
def __init__(self, name) -> None:
self.name = name

def __repr__(self) -> str:
return self.name

def get_actor(self, name):
return OutClass.InnerActor.remote(name=name)

a = OutClass().get_actor(name="inner")
wait_for_condition(_verify_repr_name, id=a._actor_id.hex(), name="inner")


if __name__ == "__main__":
import sys

Expand Down
2 changes: 1 addition & 1 deletion python/ray/tests/test_state_api_summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,7 @@ def grab_tasks_from_task_group(

random.shuffle(tasks)

summary = TaskSummaries.to_summary_by_lineage(tasks=tasks)
summary = TaskSummaries.to_summary_by_lineage(tasks=tasks, actors=[])

assert summary.total_tasks == 20
assert summary.total_actor_tasks == 110
Expand Down
5 changes: 5 additions & 0 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3745,6 +3745,11 @@ void CoreWorker::SetActorTitle(const std::string &title) {
actor_title_ = title;
}

void CoreWorker::SetActorReprName(const std::string &repr_name) {
RAY_CHECK(direct_task_receiver_ != nullptr);
direct_task_receiver_->SetActorReprName(repr_name);
}

rpc::JobConfig CoreWorker::GetJobConfig() const {
return worker_context_.GetCurrentJobConfig();
}
Expand Down
10 changes: 10 additions & 0 deletions src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,16 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {

void SetActorTitle(const std::string &title);

/// Sets the actor's repr name.
///
/// This is set explicitly rather than included as part of actor creation task spec
/// because it's only available after running the creation task as it might depend on
/// fields to be be initialized during actor creation task. The repr name will be
/// included as part of actor creation task reply (PushTaskReply) to GCS.
///
/// \param repr_name Actor repr name.
void SetActorReprName(const std::string &repr_name);

void SetCallerCreationTimestamp();

/// Increase the reference count for this object ID.
Expand Down
11 changes: 10 additions & 1 deletion src/ray/core_worker/transport/direct_actor_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,13 @@ void CoreWorkerDirectTaskReceiver::HandleTask(
<< ", actor_id: " << task_spec.ActorCreationId()
<< ", status: " << status;
} else {
// Set the actor repr name if it's customized by the actor.
if (!actor_repr_name_.empty()) {
reply->set_actor_repr_name(actor_repr_name_);
}
RAY_LOG(INFO) << "Actor creation task finished, task_id: " << task_spec.TaskId()
<< ", actor_id: " << task_spec.ActorCreationId();
<< ", actor_id: " << task_spec.ActorCreationId()
<< ", actor_repr_name: " << actor_repr_name_;
}
}
}
Expand Down Expand Up @@ -311,5 +316,9 @@ void CoreWorkerDirectTaskReceiver::Stop() {
}
}

void CoreWorkerDirectTaskReceiver::SetActorReprName(const std::string &repr_name) {
actor_repr_name_ = repr_name;
}

} // namespace core
} // namespace ray
Loading