Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core][pubsub] handle failures when publish failed. #33115

Merged
merged 41 commits into from
Apr 19, 2023

Conversation

scv119
Copy link
Contributor

@scv119 scv119 commented Mar 7, 2023

Why are these changes needed?

#32046 indicating that the pubsub might lose data, especially when the subscriber is under load. After examine the protocol it seems one bug is that the publisher fails to handle publish failures. i.e. when we push message in mailbox, we will delete the message being sent regardless of RPC failures.

This PR tries to address the problem by adding monotonically increasing sequence_id to each message, and only delete messages when the subscriber acknowledged a message has been received.

The sequence_id sequences is also generated per publisher, regardless of channels. This means if there exists multiple channels for the same publisher, each channel might not see contiguous sequences. This also assumes the invariant that a subscriber object will only subscribe to one publisher.

We also relies on the pubsub protocol that at most one going push request will be inflight.

This also handles the case gcs failover. We do so by track the publisher_id between both publisher and subscriber. When gcs failover, the publisher_id will be different, thus both the publisher and subscriber will forget the information about previous state.

  • Unit tests
  • Integration tests

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@scv119 scv119 changed the title [Core][pubsub] handle failures when publish failed. [Core][pubsub][wip] handle failures when publish failed. Mar 7, 2023
@scv119 scv119 force-pushed the fix-try branch 3 times, most recently from 44a7e27 to 883e2a3 Compare March 8, 2023 09:37
@scv119 scv119 marked this pull request as ready for review March 8, 2023 15:33
@scv119 scv119 changed the title [Core][pubsub][wip] handle failures when publish failed. [Core][pubsub] handle failures when publish failed. Mar 8, 2023
python/ray/_private/gcs_pubsub.py Show resolved Hide resolved
src/ray/pubsub/publisher.cc Outdated Show resolved Hide resolved
src/ray/pubsub/subscriber.cc Outdated Show resolved Hide resolved
Copy link
Contributor

@clarng clarng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

/// it has processed beyond the message's sequence_id.
///
/// Note:
/// - a valide sequence_id starts from 1.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/valide/valid/

Copy link
Contributor

@rkooo567 rkooo567 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally, lgtm

Did I understand the high level behavior change correctly?

  1. We now guarantee at least once semantics
  2. The subscribe side processing is idempotent (same as status quo)

Also some of tests seem fail (maybe we have a bug somewhere).

Lastly, this should work only when we do "resubscribe". When I wrote code last time, I didn't add resubscribe mechanism to the module (but @mwtian probably added this mechanism to the high lever layer?). At that time, if the publish failed (meaning long polling failed), the publisher is considered as dead. Do you happen to know if we do resubscription in this scenario? I am 100% sure we don't do resubscription for non-GCS channels, but I am not sure about GCS

src/ray/pubsub/publisher.cc Show resolved Hide resolved
src/ray/pubsub/publisher.cc Show resolved Hide resolved
@rkooo567 rkooo567 added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Mar 13, 2023
@scv119
Copy link
Contributor Author

scv119 commented Mar 13, 2023

Did I understand the high level behavior change correctly?

We now guarantee at least once semantics
The subscribe side processing is idempotent (same as status quo)

It's almost correct. We guarantee the subscribe will receive all the published message at least once after subscribe succeeded, on those non-lossy channels (i.e. there a channels capped by mailbox queue size). we also guarantee that exact once semantics if only network error happens (no application error).

Lastly, this should work only when we do "resubscribe". When I wrote code last time, I didn't add resubscribe mechanism to the module (but @mwtian probably added this mechanism to the high lever layer?). At that time, if the publish failed (meaning long polling failed), the publisher is considered as dead. Do you happen to know if we do resubscription in this scenario? I am 100% sure we don't do resubscription for non-GCS channels, but I am not sure about GCS

I spoke with @iycheng confirmed that we do have resubscribe logic. I'll add integration test to verify that.

@scv119 scv119 added the do-not-merge Do not merge this PR! label Mar 13, 2023
@scv119
Copy link
Contributor Author

scv119 commented Mar 13, 2023

synced with Sang, i'll add integration tests to verify it works e2e.

@scv119 scv119 removed the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Apr 17, 2023
Copy link
Contributor

@rkooo567 rkooo567 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will review ti today

Copy link
Contributor

@rkooo567 rkooo567 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, but I have one last question... I might be missing some invariants you assumed here though.


// clean up messages that have already been processed.
while (!mailbox_.empty() &&
mailbox_.front()->sequence_id() <= max_processed_sequence_id) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be missing something, but isn't it going to cause issues if this happens?

  1. New GCS starts. It sent messages to other nodes (so the global seq_no > 0)
  2. publisher finds the subscriber doesn't match. Set max_processed_sequence_id for this subscriber == 0
  3. Publish is called to this subscriber. Since seq_no > 0, it will never be sent (because max_processed_sequence_id == 0)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm

Publish is called to this subscriber. Since seq_no > 0, it will never be sent (because max_processed_sequence_id == 0)?

on sender: max_processed_sequence_id is only used for garbage collection, not preventing messaging being sent
on receiver: max_processed_sequence_id will also be reset to 0 so if any message sequence_id > 0, it will be received.

@rkooo567
Copy link
Contributor

Hmm not sure lint failure is related. can you merge the latest master?

@scv119 scv119 merged commit 897a282 into ray-project:master Apr 19, 2023
elliottower pushed a commit to elliottower/ray that referenced this pull request Apr 22, 2023
Why are these changes needed?
ray-project#32046 indicating that the pubsub might lose data, especially when the subscriber is under load. After examine the protocol it seems one bug is that the publisher fails to handle publish failures. i.e. when we push message in mailbox, we will delete the message being sent regardless of RPC failures.

This PR tries to address the problem by adding monotonically increasing sequence_id to each message, and only delete messages when the subscriber acknowledged a message has been received.

The sequence_id sequences is also generated per publisher, regardless of channels. This means if there exists multiple channels for the same publisher, each channel might not see contiguous sequences. This also assumes the invariant that a subscriber object will only subscribe to one publisher.

We also relies on the pubsub protocol that at most one going push request will be inflight.

This also handles the case gcs failover. We do so by track the publisher_id between both publisher and subscriber. When gcs failover, the publisher_id will be different, thus both the publisher and subscriber will forget the information about previous state.

Signed-off-by: elliottower <[email protected]>
ProjectsByJackHe pushed a commit to ProjectsByJackHe/ray that referenced this pull request May 4, 2023
Why are these changes needed?
ray-project#32046 indicating that the pubsub might lose data, especially when the subscriber is under load. After examine the protocol it seems one bug is that the publisher fails to handle publish failures. i.e. when we push message in mailbox, we will delete the message being sent regardless of RPC failures.

This PR tries to address the problem by adding monotonically increasing sequence_id to each message, and only delete messages when the subscriber acknowledged a message has been received.

The sequence_id sequences is also generated per publisher, regardless of channels. This means if there exists multiple channels for the same publisher, each channel might not see contiguous sequences. This also assumes the invariant that a subscriber object will only subscribe to one publisher.

We also relies on the pubsub protocol that at most one going push request will be inflight.

This also handles the case gcs failover. We do so by track the publisher_id between both publisher and subscriber. When gcs failover, the publisher_id will be different, thus both the publisher and subscriber will forget the information about previous state.

Signed-off-by: Jack He <[email protected]>
architkulkarni pushed a commit to architkulkarni/ray that referenced this pull request May 16, 2023
Why are these changes needed?
ray-project#32046 indicating that the pubsub might lose data, especially when the subscriber is under load. After examine the protocol it seems one bug is that the publisher fails to handle publish failures. i.e. when we push message in mailbox, we will delete the message being sent regardless of RPC failures.

This PR tries to address the problem by adding monotonically increasing sequence_id to each message, and only delete messages when the subscriber acknowledged a message has been received.

The sequence_id sequences is also generated per publisher, regardless of channels. This means if there exists multiple channels for the same publisher, each channel might not see contiguous sequences. This also assumes the invariant that a subscriber object will only subscribe to one publisher.

We also relies on the pubsub protocol that at most one going push request will be inflight.

This also handles the case gcs failover. We do so by track the publisher_id between both publisher and subscriber. When gcs failover, the publisher_id will be different, thus both the publisher and subscriber will forget the information about previous state.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Core] ObjectStore fail to pull object, possibly because node info is missing
6 participants