From 492cc1bfe5283faf79dd93dd715bcdbd680d3466 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang <56065503+rynewang@users.noreply.github.com> Date: Mon, 5 Aug 2024 20:29:30 -0700 Subject: [PATCH] [core] GcsClient: No GIL for C++ pb serialization (#46926) Currently we hold the GIL when we do conversion `C++ pb message` -> `serialized string` -> `Python pb message`. However only the latter is needed; the first conversion is a pure C++ call and we should not hold the GIL. Adds a `c_vector[c_string]` to hold the strings and do the serialization in a `nogil` block. Then with gil, we convert it to Python objects. Not doing per-object locking because GIL is expensive to hold. Also makes the conversion functions and the future assignment function `with gil`, so the C++ wrapper code `PyCallback` needs not to do it, except for error checking. Signed-off-by: Ruiyang Wang --- python/ray/includes/common.pxd | 15 +++-- python/ray/includes/gcs_client.pxi | 75 ++++++++++++++--------- src/ray/gcs/gcs_client/python_callbacks.h | 2 +- 3 files changed, 55 insertions(+), 37 deletions(-) diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index 919ae71239b70..a7e873af4b8de 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -370,18 +370,21 @@ cdef extern from "ray/core_worker/common.h" nogil: cdef extern from "ray/gcs/gcs_client/python_callbacks.h" namespace "ray::gcs": cdef cppclass MultiItemPyCallback[T]: MultiItemPyCallback( - object (*)(CRayStatus, c_vector[T] &&), - void (object, void*), void*) nogil + object (*)(CRayStatus, c_vector[T] &&) nogil, + void (object, void*) nogil, + void*) nogil cdef cppclass OptionalItemPyCallback[T]: OptionalItemPyCallback( - object (*)(CRayStatus, const optional[T]&), - void (object, void*), void*) nogil + object (*)(CRayStatus, const optional[T]&) nogil, + void (object, void*) nogil, + void*) nogil cdef cppclass StatusPyCallback: StatusPyCallback( - object (*)(CRayStatus), - void (object, void*), void*) nogil + object (*)(CRayStatus) nogil, + void (object, void*) nogil, + void*) nogil cdef extern from "ray/gcs/gcs_client/accessor.h" nogil: cdef cppclass CActorInfoAccessor "ray::gcs::ActorInfoAccessor": diff --git a/python/ray/includes/gcs_client.pxi b/python/ray/includes/gcs_client.pxi index 71a41b94c5bf8..60061ab64d7f1 100644 --- a/python/ray/includes/gcs_client.pxi +++ b/python/ray/includes/gcs_client.pxi @@ -184,7 +184,7 @@ cdef class NewGcsClient: self.inner.get().InternalKV().AsyncInternalKVGet( ns, key, timeout_ms, OptionalItemPyCallback[c_string]( - convert_optional_str_none_for_not_found, + &convert_optional_str_none_for_not_found, assign_and_decrement_fut, fut_ptr))) return asyncio.wrap_future(fut) @@ -203,7 +203,7 @@ cdef class NewGcsClient: self.inner.get().InternalKV().AsyncInternalKVMultiGet( ns, c_keys, timeout_ms, OptionalItemPyCallback[unordered_map[c_string, c_string]]( - convert_optional_multi_get, + &convert_optional_multi_get, assign_and_decrement_fut, fut_ptr))) return asyncio.wrap_future(fut) @@ -224,7 +224,7 @@ cdef class NewGcsClient: self.inner.get().InternalKV().AsyncInternalKVPut( ns, key, value, overwrite, timeout_ms, OptionalItemPyCallback[int]( - convert_optional_int, + &convert_optional_int, assign_and_decrement_fut, fut_ptr))) return asyncio.wrap_future(fut) @@ -241,7 +241,7 @@ cdef class NewGcsClient: self.inner.get().InternalKV().AsyncInternalKVDel( ns, key, del_by_prefix, timeout_ms, OptionalItemPyCallback[int]( - convert_optional_int, + &convert_optional_int, assign_and_decrement_fut, fut_ptr))) return asyncio.wrap_future(fut) @@ -258,7 +258,7 @@ cdef class NewGcsClient: self.inner.get().InternalKV().AsyncInternalKVKeys( ns, prefix, timeout_ms, OptionalItemPyCallback[c_vector[c_string]]( - convert_optional_vector_str, + &convert_optional_vector_str, assign_and_decrement_fut, fut_ptr))) return asyncio.wrap_future(fut) @@ -275,7 +275,7 @@ cdef class NewGcsClient: self.inner.get().InternalKV().AsyncInternalKVExists( ns, key, timeout_ms, OptionalItemPyCallback[c_bool]( - convert_optional_bool, + &convert_optional_bool, assign_and_decrement_fut, fut_ptr))) return asyncio.wrap_future(fut) @@ -408,7 +408,7 @@ cdef class NewGcsClient: self.inner.get().Actors().AsyncGetAllByFilter( c_actor_id, c_job_id, c_actor_state_name, MultiItemPyCallback[CActorTableData]( - convert_get_all_actor_info, + &convert_get_all_actor_info, assign_and_decrement_fut, fut_ptr), timeout_ms)) @@ -449,7 +449,6 @@ cdef class NewGcsClient: cdef int64_t timeout_ms = round(1000 * timeout) if timeout else -1 cdef CRayStatus status cdef c_vector[CJobTableData] reply - cdef c_vector[c_string] serialized_reply with nogil: status = self.inner.get().Jobs().GetAll(reply, timeout_ms) return raise_or_return((convert_get_all_job_info(status, move(reply)))) @@ -466,7 +465,7 @@ cdef class NewGcsClient: check_status_timeout_as_rpc_error( self.inner.get().Jobs().AsyncGetAll( MultiItemPyCallback[CJobTableData]( - convert_get_all_job_info, + &convert_get_all_job_info, assign_and_decrement_fut, fut_ptr), timeout_ms)) @@ -578,7 +577,7 @@ cdef incremented_fut(): cpython.Py_INCREF(fut) return fut -cdef void assign_and_decrement_fut(result, void* fut_ptr): +cdef void assign_and_decrement_fut(result, void* fut_ptr) with gil: cdef fut = fut_ptr assert isinstance(fut, concurrent.futures.Future) @@ -609,14 +608,20 @@ cdef raise_or_return(tup): ############################################################# cdef convert_get_all_node_info( - CRayStatus status, c_vector[CGcsNodeInfo]&& c_data): + CRayStatus status, c_vector[CGcsNodeInfo]&& c_data) with gil: # -> Dict[NodeID, gcs_pb2.GcsNodeInfo] - cdef c_string b + # No GIL block for C++ looping && serialization. + # GIL block for Pyhton deserialization and dict building. + # Not doing per-object GIL lock because it's expensive. + cdef c_vector[c_string] serialized_reply try: check_status_timeout_as_rpc_error(status) + with nogil: + serialized_reply.reserve(c_data.size()) + for c_proto in c_data: + serialized_reply.push_back(c_proto.SerializeAsString()) node_table_data = {} - for c_proto in c_data: - b = c_proto.SerializeAsString() + for b in serialized_reply: proto = gcs_pb2.GcsNodeInfo() proto.ParseFromString(b) node_table_data[NodeID.from_binary(proto.node_id)] = proto @@ -625,14 +630,20 @@ cdef convert_get_all_node_info( return None, e cdef convert_get_all_job_info( - CRayStatus status, c_vector[CJobTableData]&& c_data): + CRayStatus status, c_vector[CJobTableData]&& c_data) with gil: # -> Dict[JobID, gcs_pb2.JobTableData] - cdef c_string b + # No GIL block for C++ looping && serialization. + # GIL block for Pyhton deserialization and dict building. + # Not doing per-object GIL lock because it's expensive. + cdef c_vector[c_string] serialized_reply try: check_status_timeout_as_rpc_error(status) + with nogil: + serialized_reply.reserve(c_data.size()) + for c_proto in c_data: + serialized_reply.push_back(c_proto.SerializeAsString()) job_table_data = {} - for c_proto in c_data: - b = c_proto.SerializeAsString() + for b in serialized_reply: proto = gcs_pb2.JobTableData() proto.ParseFromString(b) job_table_data[JobID.from_binary(proto.job_id)] = proto @@ -641,14 +652,17 @@ cdef convert_get_all_job_info( return None, e cdef convert_get_all_actor_info( - CRayStatus status, c_vector[CActorTableData]&& c_data): + CRayStatus status, c_vector[CActorTableData]&& c_data) with gil: # -> Dict[ActorID, gcs_pb2.ActorTableData] - cdef c_string b + cdef c_vector[c_string] serialized_reply try: check_status_timeout_as_rpc_error(status) + with nogil: + serialized_reply.reserve(c_data.size()) + for c_proto in c_data: + serialized_reply.push_back(c_proto.SerializeAsString()) actor_table_data = {} - for c_proto in c_data: - b = c_proto.SerializeAsString() + for b in serialized_reply: proto = gcs_pb2.ActorTableData() proto.ParseFromString(b) actor_table_data[ActorID.from_binary(proto.actor_id)] = proto @@ -656,7 +670,7 @@ cdef convert_get_all_actor_info( except Exception as e: return None, e -cdef convert_status(CRayStatus status): +cdef convert_status(CRayStatus status) with gil: # -> None try: check_status_timeout_as_rpc_error(status) @@ -664,7 +678,7 @@ cdef convert_status(CRayStatus status): except Exception as e: return None, e cdef convert_optional_str_none_for_not_found( - CRayStatus status, const optional[c_string]& c_str): + CRayStatus status, const optional[c_string]& c_str) with gil: # If status is NotFound, return None. # If status is OK, return the value. # Else, raise exception. @@ -679,7 +693,8 @@ cdef convert_optional_str_none_for_not_found( return None, e cdef convert_optional_multi_get( - CRayStatus status, const optional[unordered_map[c_string, c_string]]& c_map): + CRayStatus status, + const optional[unordered_map[c_string, c_string]]& c_map) with gil: # -> Dict[str, str] cdef unordered_map[c_string, c_string].const_iterator it try: @@ -697,7 +712,7 @@ cdef convert_optional_multi_get( except Exception as e: return None, e -cdef convert_optional_int(CRayStatus status, const optional[int]& c_int): +cdef convert_optional_int(CRayStatus status, const optional[int]& c_int) with gil: # -> int try: check_status_timeout_as_rpc_error(status) @@ -707,7 +722,7 @@ cdef convert_optional_int(CRayStatus status, const optional[int]& c_int): return None, e cdef convert_optional_vector_str( - CRayStatus status, const optional[c_vector[c_string]]& c_vec): + CRayStatus status, const optional[c_vector[c_string]]& c_vec) with gil: # -> Dict[str, str] cdef const c_vector[c_string]* vec cdef c_vector[c_string].const_iterator it @@ -726,7 +741,7 @@ cdef convert_optional_vector_str( return None, e -cdef convert_optional_bool(CRayStatus status, const optional[c_bool]& b): +cdef convert_optional_bool(CRayStatus status, const optional[c_bool]& b) with gil: # -> bool try: check_status_timeout_as_rpc_error(status) @@ -735,7 +750,7 @@ cdef convert_optional_bool(CRayStatus status, const optional[c_bool]& b): except Exception as e: return None, e -cdef convert_multi_bool(CRayStatus status, c_vector[c_bool]&& c_data): +cdef convert_multi_bool(CRayStatus status, c_vector[c_bool]&& c_data) with gil: # -> List[bool] try: check_status_timeout_as_rpc_error(status) @@ -743,7 +758,7 @@ cdef convert_multi_bool(CRayStatus status, c_vector[c_bool]&& c_data): except Exception as e: return None, e -cdef convert_multi_str(CRayStatus status, c_vector[c_string]&& c_data): +cdef convert_multi_str(CRayStatus status, c_vector[c_string]&& c_data) with gil: # -> List[bytes] try: check_status_timeout_as_rpc_error(status) diff --git a/src/ray/gcs/gcs_client/python_callbacks.h b/src/ray/gcs/gcs_client/python_callbacks.h index c01709b16fed6..30a5ee8b42bb6 100644 --- a/src/ray/gcs/gcs_client/python_callbacks.h +++ b/src/ray/gcs/gcs_client/python_callbacks.h @@ -76,7 +76,6 @@ class PyCallback { : converter(converter), assigner(assigner), context(context) {} void operator()(Args &&...args) { - PythonGilHolder gil; PyObject *result = converter(std::forward(args)...); CheckNoException(); @@ -85,6 +84,7 @@ class PyCallback { } void CheckNoException() { + PythonGilHolder gil; if (PyErr_Occurred() != nullptr) { PyErr_Print(); PyErr_Clear();