Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] The New GcsClient binding #46186

Merged
merged 42 commits into from
Jul 22, 2024
Merged

Conversation

rynewang
Copy link
Contributor

@rynewang rynewang commented Jun 21, 2024

Creates a direct Cython binding for ray::gcs::GcsClient and replaces the existing PythonGcsClient binding. The new binding is enabled by default; one can switch back with RAY_USE_OLD_GCS_CLIENT=1.

The new binding is in its own file gcs_client.pxi included by _raylet.pyx.

This PR is a proof of concept that, we can do bindings for Async C++ APIs. Specifically, we can wrap a C++ ray::gcs::GcsClient Async APIs (by callbacks) into Python Async APIs (by async/await).

Why?

On Python gRPC, we now have a very complicated and inconsistent way. We have:

  • Python grpcio based Client.
  • Python grpcio.aio based Client and Servers,
  • Python GcsClient -> C++ PythonGcsClient -> C++ grpc::Channel,
  • Python GcsAioClient -> thread pool executor -> Python GcsClient -> C++ PythonGcsClient -> C++ grpc::Channel,
  • Python Gcs.*Subscriber (sync)
  • Python GcsAio.*Subscriber (async)

All of them talking to the GCS with more or less similar but subtly different APIs. This introduces maintenance overhead, makes debugging harder, and makes it harder to add new features.

Beyond Python, all these APIs are also having slightly different semantics than the C++ GcsClient itself as used by core_worker C++ code. For example,

  1. in _raylet.pyx we liberally added many @_auto_reconnect to APIs. This applies to Python GcsClient and GcsAioClient, but not to C++ GcsClient or the Python subscribers. If we tweaked retry counts to "happen to work", it only works for the python code but not core worker code.
  2. in PythonGcsClient::Connect we retry several times, each time recreating a GcsChannel. This is supposed to "make reconnection" faster by not waiting in the grpc-internal backoff. But this is not applied in C++ GcsClient or the Python subscribers. In fact, in C++ GcsClient, we don't manage the channel at all. We use the Ray-wide GcsRpcClient to manage it. Indeed, if we wanna "fast recreate" channels, we may want it to be consistenly applied to all clients.
  3. in Python GcsClient, we have a method get_all_node_info that forwards the RPC. However we also have a self._gcs_node_info_stub.GetAllNodeInfo call in node_head.py, because they want the full reply whereas the Python GcsClient method only returns partial data the original caller wanted.
  4. ...and more.

What's blocking us?

Async. Cython is not known to be good at binding async APIs. We have a few challenges:

  1. We need to invoke Python functions in C++ callbacks. This involves a C++ callback class to hold a Python object with all its implications. Today's Ray C++ code base is largely CPython-free.
    1. Cython reference counting. In experimenting I accidentally decreased a PyObject's refcount to -9.
    2. GIL. We need to properly hold and release the locks, requires careful coding and testing.
  2. Event loops. In C++ (asio) loop we received the callback, but the python awaiter is waiting in the Python event loop. We need to play with asyncio futures and event loops.
  3. (Still risky) Types. C++ callbacks receive C++ Protobuf messages, and Python callbacks receive Python Protobuf messages or Python dicts. We can:
    1. Serialize the C++ Protobuf to string, and deserialize it in Python. This is the approach I chose in this PR.
    2. Bind all C++ protobuf types' fields as .pxd methods (readonly is fine.)

What's in this PR?

This PR introduces a NewGcsClient for all sync methods for the GcsClient, and a flag to switch back to the old PythonGcsClient implementation. The NewGcsClient is used by default; it assumes GCS is HA and always try to reconnect if the connection breaks, up until gcs_rpc_server_reconnect_timeout_s (default = 60s) and then exits. Python side @auto_reconnect retries are removed, though the retry in node.py initialization (_init_gcs_client) is left for next PRs.

GcsAioClient is left intact, it now uses thread pool executor to delegate calls to the NewGcsClient. In later PR we will remove it for good and use real async calls (See #45289).

Detailed Changes:

  • Adds a Cython binding for GcsClient, and use it as default to replace PythonGcsClient.
  • TaskEventBufferImpl: change gcs_client_ connection to after thread start.
  • Moves check_status from _raylet.pyx to common.pxi for reuse
  • GcsClient now only retry grpc UNAVAILABLE and not other codes, since we only want to retry on GCS down and not other cases (e.g. RESOURCE_EXHAUSTED). See python/ray/tune/tests/test_tune_restore.py::ResourceExhaustedTest::test_resource_exhausted_info
  • Fixes test fixtures: env vars set after loaded modules. Added module reload
  • Better test message

Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
@rynewang rynewang self-assigned this Jun 25, 2024
@rynewang rynewang added the go add ONLY when ready to merge, run all tests label Jun 25, 2024
@rynewang rynewang marked this pull request as ready for review June 27, 2024 17:54
python/ray/_raylet.pyx Show resolved Hide resolved
python/ray/includes/common.pxi Outdated Show resolved Hide resolved
@@ -241,7 +241,7 @@ class GcsRpcClient {
callback(status, reply);
}
delete executor;
} else if (!status.IsRpcError()) {
} else if (!IsGrpcRetryableStatus(status)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GCS can return non-OK and non-retryable errors. Excerpt from the PR initial comment:

  • GcsClient now only retry grpc UNAVAILABLE and not other codes, since we only want to retry on GCS down and not other cases (e.g. RESOURCE_EXHAUSTED). See python/ray/tune/tests/test_tune_restore.py::ResourceExhaustedTest::test_resource_exhausted_info

In this example, Ray Train made an extra large message to GCS, which exceeds gRPC limit. It can't possibly be sent and we should not retry.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

except RpcError as e:
                if e.rpc_code in [
                    GRPC_STATUS_CODE_UNAVAILABLE,
                    GRPC_STATUS_CODE_UNKNOWN,
                ]:
                    if remaining_retry <= 0:

Python gcs client is retrying these two. Let's do the same for now.

src/ray/gcs/gcs_client/gcs_client.cc Outdated Show resolved Hide resolved
@jjyao
Copy link
Collaborator

jjyao commented Jul 15, 2024

Let's also double check the thread safety as we discussed.

@rynewang
Copy link
Contributor Author

A comment on thread safety:

GcsClient is thread safe when it's Connect()ed and not yet Disconnect()ed. When there are concurrent RPC calls from different threads, that's OK. The only issue happens when an RPC call races with a Connect() call. This is because Connect() call resets client_call_manager_ which is used in RPC calls.

For Python bindings however, this won't happen, because we don't expose Disconnect() at all - it's only called when the last reference to the Python binding expires, and hence when the C++ GcsClient is destructed. At that time it's not possible to have a concurrent RPC call happening.

It is possible, though, for Python to call Connect() a second time. To guard against that I added a return-if-already-connected check in Connect().

Note, in C++ it's still possible to have race conditions because one can call Disconnect(). An example being GlobalStateAccessor(), who does it own mutex.

Copy link
Collaborator

@jjyao jjyao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lg

python/ray/includes/common.pxi Outdated Show resolved Hide resolved
python/ray/includes/gcs_client.pxi Outdated Show resolved Hide resolved
python/ray/includes/gcs_client.pxi Outdated Show resolved Hide resolved
python/ray/tests/test_ray_shutdown.py Show resolved Hide resolved
@@ -241,7 +241,7 @@ class GcsRpcClient {
callback(status, reply);
}
delete executor;
} else if (!status.IsRpcError()) {
} else if (!IsGrpcRetryableStatus(status)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

except RpcError as e:
                if e.rpc_code in [
                    GRPC_STATUS_CODE_UNAVAILABLE,
                    GRPC_STATUS_CODE_UNKNOWN,
                ]:
                    if remaining_retry <= 0:

Python gcs client is retrying these two. Let's do the same for now.

@jjyao
Copy link
Collaborator

jjyao commented Jul 16, 2024

It is possible, though, for Python to call Connect() a second time.

How come? I thought Connect() is only called inside the constructor?

@rynewang
Copy link
Contributor Author

It is possible, though, for Python to call Connect() a second time.

How come? I thought Connect() is only called inside the constructor?

yeah, I meant we do have the binding of Connect() exposed to Python side. Nobody is really calling it second time.

rynewang and others added 4 commits July 16, 2024 09:47
Co-authored-by: Jiajun Yao <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
src/ray/common/grpc_util.h Outdated Show resolved Hide resolved
python/ray/_raylet.pyx Outdated Show resolved Hide resolved
@jjyao jjyao enabled auto-merge (squash) July 22, 2024 18:59
@jjyao jjyao merged commit 8205fb9 into ray-project:master Jul 22, 2024
6 checks passed
@rynewang rynewang deleted the new-gcs-client-sync branch July 23, 2024 18:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
go add ONLY when ready to merge, run all tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants