-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-8325: Remove batch from in-flight requests when handling MESSAG… #7176
Conversation
…E_TOO_LARGE error This patch fixes a bug in the handling of MESSAGE_TOO_LARGE errors. The large batch is split, the smaller batches are re-added to the accumulator, and the batch is deallocated, but it was not removed from the list of in-flight batches. When the batch was eventually expired from the in-flight batches, the producer would try to deallocate it a second time, causing an error. This patch changes the behavior to correctly remove the batch from the list of in-flight requests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. LGTM
@@ -625,6 +625,7 @@ private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionRespons | |||
if (transactionManager != null) | |||
transactionManager.removeInFlightBatch(batch); | |||
this.accumulator.splitAndReenqueue(batch); | |||
maybeRemoveFromInflightBatches(batch); | |||
this.accumulator.deallocate(batch); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to have a method that deallocates and removes from in flight batches?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think that's reasonable. Added.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, just a couple of nits.
client.prepareResponse(new AddPartitionsToTxnResponse(0, Collections.singletonMap(tp0, Errors.NONE))); | ||
sender.runOnce(); | ||
|
||
// create a producer batch with more than one record so it is eligible to split |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: eligible for splitting
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
accumulator.append(tp0, time.milliseconds(), "key2".getBytes(), "value2".getBytes(), null, null, | ||
MAX_BLOCK_TIMEOUT, false).future; | ||
|
||
sender.runOnce(); // send request |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this comment be before the method for consistency with other comments?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
… errors (#7176) This patch fixes a bug in the handling of MESSAGE_TOO_LARGE errors. The large batch is split, the smaller batches are re-added to the accumulator, and the batch is deallocated, but it was not removed from the list of in-flight batches. When the batch was eventually expired from the in-flight batches, the producer would try to deallocate it a second time, causing an error. This patch changes the behavior to correctly remove the batch from the list of in-flight requests. Reviewers: Luke Stephenson, Ismael Juma <[email protected]>, Jason Gustafson <[email protected]>
…in-flight requests on MESSAGE_TOO_LARGE errors (apache#7176) TICKET = KAFKA-8052, KAFKA-8202 LI_DESCRIPTION = This patch fixes memory leaks in producer when batch split happens. EXIT_CRITERIA = HASH [e4215c1] ORIGINAL_DESCRIPTION = This patch fixes a bug in the handling of MESSAGE_TOO_LARGE errors. The large batch is split, the smaller batches are re-added to the accumulator, and the batch is deallocated, but it was not removed from the list of in-flight batches. When the batch was eventually expired from the in-flight batches, the producer would try to deallocate it a second time, causing an error. This patch changes the behavior to correctly remove the batch from the list of in-flight requests. Reviewers: Luke Stephenson, Ismael Juma <[email protected]>, Jason Gustafson <[email protected]>
…quests on MESSAGE_TOO_LARGE errors (apache#7176) TICKET = KAFKA-8325 LI_DESCRIPTION = EXIT_CRITERIA = HASH [db8cb96] ORIGINAL_DESCRIPTION = This patch fixes a bug in the handling of MESSAGE_TOO_LARGE errors. The large batch is split, the smaller batches are re-added to the accumulator, and the batch is deallocated, but it was not removed from the list of in-flight batches. When the batch was eventually expired from the in-flight batches, the producer would try to deallocate it a second time, causing an error. This patch changes the behavior to correctly remove the batch from the list of in-flight requests. Reviewers: Luke Stephenson, Ismael Juma <[email protected]>, Jason Gustafson <[email protected]> (cherry picked from commit db8cb96)
…E_TOO_LARGE error
This patch fixes a bug in the handling of MESSAGE_TOO_LARGE errors. The large batch is split, the smaller batches are re-added to the accumulator, and the batch is deallocated, but it was not removed from the list of in-flight batches. When the batch was eventually expired from the in-flight batches, the producer would try to deallocate it a second time, causing an error. This patch changes the behavior to correctly remove the batch from the list of in-flight requests.
More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.
Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.
Committer Checklist (excluded from commit message)