-
Notifications
You must be signed in to change notification settings - Fork 5.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Core][pubsub] handle failures when publish failed. #33115
Conversation
44a7e27
to
883e2a3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice
/// it has processed beyond the message's sequence_id. | ||
/// | ||
/// Note: | ||
/// - a valide sequence_id starts from 1. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/valide/valid/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally, lgtm
Did I understand the high level behavior change correctly?
- We now guarantee at least once semantics
- The subscribe side processing is idempotent (same as status quo)
Also some of tests seem fail (maybe we have a bug somewhere).
Lastly, this should work only when we do "resubscribe". When I wrote code last time, I didn't add resubscribe mechanism to the module (but @mwtian probably added this mechanism to the high lever layer?). At that time, if the publish failed (meaning long polling failed), the publisher is considered as dead. Do you happen to know if we do resubscription in this scenario? I am 100% sure we don't do resubscription for non-GCS channels, but I am not sure about GCS
It's almost correct. We guarantee the subscribe will receive all the published message at least once after subscribe succeeded, on those non-lossy channels (i.e. there a channels capped by mailbox queue size). we also guarantee that exact once semantics if only network error happens (no application error).
I spoke with @iycheng confirmed that we do have resubscribe logic. I'll add integration test to verify that. |
synced with Sang, i'll add integration tests to verify it works e2e. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will review ti today
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, but I have one last question... I might be missing some invariants you assumed here though.
|
||
// clean up messages that have already been processed. | ||
while (!mailbox_.empty() && | ||
mailbox_.front()->sequence_id() <= max_processed_sequence_id) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might be missing something, but isn't it going to cause issues if this happens?
- New GCS starts. It sent messages to other nodes (so the global seq_no > 0)
- publisher finds the subscriber doesn't match. Set max_processed_sequence_id for this subscriber == 0
Publish
is called to this subscriber. Since seq_no > 0, it will never be sent (because max_processed_sequence_id == 0)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm
Publish is called to this subscriber. Since seq_no > 0, it will never be sent (because max_processed_sequence_id == 0)?
on sender: max_processed_sequence_id is only used for garbage collection, not preventing messaging being sent
on receiver: max_processed_sequence_id will also be reset to 0 so if any message sequence_id > 0, it will be received.
Hmm not sure lint failure is related. can you merge the latest master? |
Why are these changes needed? ray-project#32046 indicating that the pubsub might lose data, especially when the subscriber is under load. After examine the protocol it seems one bug is that the publisher fails to handle publish failures. i.e. when we push message in mailbox, we will delete the message being sent regardless of RPC failures. This PR tries to address the problem by adding monotonically increasing sequence_id to each message, and only delete messages when the subscriber acknowledged a message has been received. The sequence_id sequences is also generated per publisher, regardless of channels. This means if there exists multiple channels for the same publisher, each channel might not see contiguous sequences. This also assumes the invariant that a subscriber object will only subscribe to one publisher. We also relies on the pubsub protocol that at most one going push request will be inflight. This also handles the case gcs failover. We do so by track the publisher_id between both publisher and subscriber. When gcs failover, the publisher_id will be different, thus both the publisher and subscriber will forget the information about previous state. Signed-off-by: elliottower <[email protected]>
Why are these changes needed? ray-project#32046 indicating that the pubsub might lose data, especially when the subscriber is under load. After examine the protocol it seems one bug is that the publisher fails to handle publish failures. i.e. when we push message in mailbox, we will delete the message being sent regardless of RPC failures. This PR tries to address the problem by adding monotonically increasing sequence_id to each message, and only delete messages when the subscriber acknowledged a message has been received. The sequence_id sequences is also generated per publisher, regardless of channels. This means if there exists multiple channels for the same publisher, each channel might not see contiguous sequences. This also assumes the invariant that a subscriber object will only subscribe to one publisher. We also relies on the pubsub protocol that at most one going push request will be inflight. This also handles the case gcs failover. We do so by track the publisher_id between both publisher and subscriber. When gcs failover, the publisher_id will be different, thus both the publisher and subscriber will forget the information about previous state. Signed-off-by: Jack He <[email protected]>
Why are these changes needed? ray-project#32046 indicating that the pubsub might lose data, especially when the subscriber is under load. After examine the protocol it seems one bug is that the publisher fails to handle publish failures. i.e. when we push message in mailbox, we will delete the message being sent regardless of RPC failures. This PR tries to address the problem by adding monotonically increasing sequence_id to each message, and only delete messages when the subscriber acknowledged a message has been received. The sequence_id sequences is also generated per publisher, regardless of channels. This means if there exists multiple channels for the same publisher, each channel might not see contiguous sequences. This also assumes the invariant that a subscriber object will only subscribe to one publisher. We also relies on the pubsub protocol that at most one going push request will be inflight. This also handles the case gcs failover. We do so by track the publisher_id between both publisher and subscriber. When gcs failover, the publisher_id will be different, thus both the publisher and subscriber will forget the information about previous state.
Why are these changes needed?
#32046 indicating that the pubsub might lose data, especially when the subscriber is under load. After examine the protocol it seems one bug is that the publisher fails to handle publish failures. i.e. when we push message in mailbox, we will delete the message being sent regardless of RPC failures.
This PR tries to address the problem by adding monotonically increasing sequence_id to each message, and only delete messages when the subscriber acknowledged a message has been received.
The sequence_id sequences is also generated per publisher, regardless of channels. This means if there exists multiple channels for the same publisher, each channel might not see contiguous sequences. This also assumes the invariant that a subscriber object will only subscribe to one publisher.
We also relies on the pubsub protocol that at most one going push request will be inflight.
This also handles the case gcs failover. We do so by track the publisher_id between both publisher and subscriber. When gcs failover, the publisher_id will be different, thus both the publisher and subscriber will forget the information about previous state.
Related issue number
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.