Skip to content

Commit

Permalink
[core] GcsClient: No GIL for C++ pb serialization (ray-project#46926)
Browse files Browse the repository at this point in the history
Currently we hold the GIL when we do conversion `C++ pb message` ->
`serialized string` -> `Python pb message`. However only the latter is
needed; the first conversion is a pure C++ call and we should not hold
the GIL.

Adds a `c_vector[c_string]` to hold the strings and do the serialization
in a `nogil` block. Then with gil, we convert it to Python objects.

Not doing per-object locking because GIL is expensive to hold.

Also makes the conversion functions and the future assignment function
`with gil`, so the C++ wrapper code `PyCallback` needs not to do it,
except for error checking.

Signed-off-by: Ruiyang Wang <[email protected]>
  • Loading branch information
rynewang committed Aug 6, 2024
1 parent 779ee56 commit 492cc1b
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 37 deletions.
15 changes: 9 additions & 6 deletions python/ray/includes/common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -370,18 +370,21 @@ cdef extern from "ray/core_worker/common.h" nogil:
cdef extern from "ray/gcs/gcs_client/python_callbacks.h" namespace "ray::gcs":
cdef cppclass MultiItemPyCallback[T]:
MultiItemPyCallback(
object (*)(CRayStatus, c_vector[T] &&),
void (object, void*), void*) nogil
object (*)(CRayStatus, c_vector[T] &&) nogil,
void (object, void*) nogil,
void*) nogil

cdef cppclass OptionalItemPyCallback[T]:
OptionalItemPyCallback(
object (*)(CRayStatus, const optional[T]&),
void (object, void*), void*) nogil
object (*)(CRayStatus, const optional[T]&) nogil,
void (object, void*) nogil,
void*) nogil

cdef cppclass StatusPyCallback:
StatusPyCallback(
object (*)(CRayStatus),
void (object, void*), void*) nogil
object (*)(CRayStatus) nogil,
void (object, void*) nogil,
void*) nogil

cdef extern from "ray/gcs/gcs_client/accessor.h" nogil:
cdef cppclass CActorInfoAccessor "ray::gcs::ActorInfoAccessor":
Expand Down
75 changes: 45 additions & 30 deletions python/ray/includes/gcs_client.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ cdef class NewGcsClient:
self.inner.get().InternalKV().AsyncInternalKVGet(
ns, key, timeout_ms,
OptionalItemPyCallback[c_string](
convert_optional_str_none_for_not_found,
&convert_optional_str_none_for_not_found,
assign_and_decrement_fut,
fut_ptr)))
return asyncio.wrap_future(fut)
Expand All @@ -203,7 +203,7 @@ cdef class NewGcsClient:
self.inner.get().InternalKV().AsyncInternalKVMultiGet(
ns, c_keys, timeout_ms,
OptionalItemPyCallback[unordered_map[c_string, c_string]](
convert_optional_multi_get,
&convert_optional_multi_get,
assign_and_decrement_fut,
fut_ptr)))
return asyncio.wrap_future(fut)
Expand All @@ -224,7 +224,7 @@ cdef class NewGcsClient:
self.inner.get().InternalKV().AsyncInternalKVPut(
ns, key, value, overwrite, timeout_ms,
OptionalItemPyCallback[int](
convert_optional_int,
&convert_optional_int,
assign_and_decrement_fut,
fut_ptr)))
return asyncio.wrap_future(fut)
Expand All @@ -241,7 +241,7 @@ cdef class NewGcsClient:
self.inner.get().InternalKV().AsyncInternalKVDel(
ns, key, del_by_prefix, timeout_ms,
OptionalItemPyCallback[int](
convert_optional_int,
&convert_optional_int,
assign_and_decrement_fut,
fut_ptr)))
return asyncio.wrap_future(fut)
Expand All @@ -258,7 +258,7 @@ cdef class NewGcsClient:
self.inner.get().InternalKV().AsyncInternalKVKeys(
ns, prefix, timeout_ms,
OptionalItemPyCallback[c_vector[c_string]](
convert_optional_vector_str,
&convert_optional_vector_str,
assign_and_decrement_fut,
fut_ptr)))
return asyncio.wrap_future(fut)
Expand All @@ -275,7 +275,7 @@ cdef class NewGcsClient:
self.inner.get().InternalKV().AsyncInternalKVExists(
ns, key, timeout_ms,
OptionalItemPyCallback[c_bool](
convert_optional_bool,
&convert_optional_bool,
assign_and_decrement_fut,
fut_ptr)))
return asyncio.wrap_future(fut)
Expand Down Expand Up @@ -408,7 +408,7 @@ cdef class NewGcsClient:
self.inner.get().Actors().AsyncGetAllByFilter(
c_actor_id, c_job_id, c_actor_state_name,
MultiItemPyCallback[CActorTableData](
convert_get_all_actor_info,
&convert_get_all_actor_info,
assign_and_decrement_fut,
fut_ptr),
timeout_ms))
Expand Down Expand Up @@ -449,7 +449,6 @@ cdef class NewGcsClient:
cdef int64_t timeout_ms = round(1000 * timeout) if timeout else -1
cdef CRayStatus status
cdef c_vector[CJobTableData] reply
cdef c_vector[c_string] serialized_reply
with nogil:
status = self.inner.get().Jobs().GetAll(reply, timeout_ms)
return raise_or_return((convert_get_all_job_info(status, move(reply))))
Expand All @@ -466,7 +465,7 @@ cdef class NewGcsClient:
check_status_timeout_as_rpc_error(
self.inner.get().Jobs().AsyncGetAll(
MultiItemPyCallback[CJobTableData](
convert_get_all_job_info,
&convert_get_all_job_info,
assign_and_decrement_fut,
fut_ptr),
timeout_ms))
Expand Down Expand Up @@ -578,7 +577,7 @@ cdef incremented_fut():
cpython.Py_INCREF(fut)
return fut

cdef void assign_and_decrement_fut(result, void* fut_ptr):
cdef void assign_and_decrement_fut(result, void* fut_ptr) with gil:
cdef fut = <object>fut_ptr
assert isinstance(fut, concurrent.futures.Future)

Expand Down Expand Up @@ -609,14 +608,20 @@ cdef raise_or_return(tup):
#############################################################

cdef convert_get_all_node_info(
CRayStatus status, c_vector[CGcsNodeInfo]&& c_data):
CRayStatus status, c_vector[CGcsNodeInfo]&& c_data) with gil:
# -> Dict[NodeID, gcs_pb2.GcsNodeInfo]
cdef c_string b
# No GIL block for C++ looping && serialization.
# GIL block for Pyhton deserialization and dict building.
# Not doing per-object GIL lock because it's expensive.
cdef c_vector[c_string] serialized_reply
try:
check_status_timeout_as_rpc_error(status)
with nogil:
serialized_reply.reserve(c_data.size())
for c_proto in c_data:
serialized_reply.push_back(c_proto.SerializeAsString())
node_table_data = {}
for c_proto in c_data:
b = c_proto.SerializeAsString()
for b in serialized_reply:
proto = gcs_pb2.GcsNodeInfo()
proto.ParseFromString(b)
node_table_data[NodeID.from_binary(proto.node_id)] = proto
Expand All @@ -625,14 +630,20 @@ cdef convert_get_all_node_info(
return None, e

cdef convert_get_all_job_info(
CRayStatus status, c_vector[CJobTableData]&& c_data):
CRayStatus status, c_vector[CJobTableData]&& c_data) with gil:
# -> Dict[JobID, gcs_pb2.JobTableData]
cdef c_string b
# No GIL block for C++ looping && serialization.
# GIL block for Pyhton deserialization and dict building.
# Not doing per-object GIL lock because it's expensive.
cdef c_vector[c_string] serialized_reply
try:
check_status_timeout_as_rpc_error(status)
with nogil:
serialized_reply.reserve(c_data.size())
for c_proto in c_data:
serialized_reply.push_back(c_proto.SerializeAsString())
job_table_data = {}
for c_proto in c_data:
b = c_proto.SerializeAsString()
for b in serialized_reply:
proto = gcs_pb2.JobTableData()
proto.ParseFromString(b)
job_table_data[JobID.from_binary(proto.job_id)] = proto
Expand All @@ -641,30 +652,33 @@ cdef convert_get_all_job_info(
return None, e

cdef convert_get_all_actor_info(
CRayStatus status, c_vector[CActorTableData]&& c_data):
CRayStatus status, c_vector[CActorTableData]&& c_data) with gil:
# -> Dict[ActorID, gcs_pb2.ActorTableData]
cdef c_string b
cdef c_vector[c_string] serialized_reply
try:
check_status_timeout_as_rpc_error(status)
with nogil:
serialized_reply.reserve(c_data.size())
for c_proto in c_data:
serialized_reply.push_back(c_proto.SerializeAsString())
actor_table_data = {}
for c_proto in c_data:
b = c_proto.SerializeAsString()
for b in serialized_reply:
proto = gcs_pb2.ActorTableData()
proto.ParseFromString(b)
actor_table_data[ActorID.from_binary(proto.actor_id)] = proto
return actor_table_data, None
except Exception as e:
return None, e

cdef convert_status(CRayStatus status):
cdef convert_status(CRayStatus status) with gil:
# -> None
try:
check_status_timeout_as_rpc_error(status)
return None, None
except Exception as e:
return None, e
cdef convert_optional_str_none_for_not_found(
CRayStatus status, const optional[c_string]& c_str):
CRayStatus status, const optional[c_string]& c_str) with gil:
# If status is NotFound, return None.
# If status is OK, return the value.
# Else, raise exception.
Expand All @@ -679,7 +693,8 @@ cdef convert_optional_str_none_for_not_found(
return None, e

cdef convert_optional_multi_get(
CRayStatus status, const optional[unordered_map[c_string, c_string]]& c_map):
CRayStatus status,
const optional[unordered_map[c_string, c_string]]& c_map) with gil:
# -> Dict[str, str]
cdef unordered_map[c_string, c_string].const_iterator it
try:
Expand All @@ -697,7 +712,7 @@ cdef convert_optional_multi_get(
except Exception as e:
return None, e

cdef convert_optional_int(CRayStatus status, const optional[int]& c_int):
cdef convert_optional_int(CRayStatus status, const optional[int]& c_int) with gil:
# -> int
try:
check_status_timeout_as_rpc_error(status)
Expand All @@ -707,7 +722,7 @@ cdef convert_optional_int(CRayStatus status, const optional[int]& c_int):
return None, e

cdef convert_optional_vector_str(
CRayStatus status, const optional[c_vector[c_string]]& c_vec):
CRayStatus status, const optional[c_vector[c_string]]& c_vec) with gil:
# -> Dict[str, str]
cdef const c_vector[c_string]* vec
cdef c_vector[c_string].const_iterator it
Expand All @@ -726,7 +741,7 @@ cdef convert_optional_vector_str(
return None, e


cdef convert_optional_bool(CRayStatus status, const optional[c_bool]& b):
cdef convert_optional_bool(CRayStatus status, const optional[c_bool]& b) with gil:
# -> bool
try:
check_status_timeout_as_rpc_error(status)
Expand All @@ -735,15 +750,15 @@ cdef convert_optional_bool(CRayStatus status, const optional[c_bool]& b):
except Exception as e:
return None, e

cdef convert_multi_bool(CRayStatus status, c_vector[c_bool]&& c_data):
cdef convert_multi_bool(CRayStatus status, c_vector[c_bool]&& c_data) with gil:
# -> List[bool]
try:
check_status_timeout_as_rpc_error(status)
return [b for b in c_data], None
except Exception as e:
return None, e

cdef convert_multi_str(CRayStatus status, c_vector[c_string]&& c_data):
cdef convert_multi_str(CRayStatus status, c_vector[c_string]&& c_data) with gil:
# -> List[bytes]
try:
check_status_timeout_as_rpc_error(status)
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_client/python_callbacks.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ class PyCallback {
: converter(converter), assigner(assigner), context(context) {}

void operator()(Args &&...args) {
PythonGilHolder gil;
PyObject *result = converter(std::forward<Args>(args)...);
CheckNoException();

Expand All @@ -85,6 +84,7 @@ class PyCallback {
}

void CheckNoException() {
PythonGilHolder gil;
if (PyErr_Occurred() != nullptr) {
PyErr_Print();
PyErr_Clear();
Expand Down

0 comments on commit 492cc1b

Please sign in to comment.