Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15080; Fetcher's lag never set when partition is idle #13843

Merged
merged 3 commits into from
Jun 13, 2023

Conversation

dajac
Copy link
Contributor

@dajac dajac commented Jun 12, 2023

The PartitionFetchState's lag field is set to None when the state is created and it is updated when bytes are received for a partition. For idle partitions (newly created or not), the lag is never updated because validBytes > 0 is never true. As a side effect, the partition is considered out-of-sync and could be incorrectly throttled.

This patch really makes me wonder if keeping the lag in the PartitionFetchState is really necessary. At the end of the day, we only use is to compute isReplicaInSync because the metrics rely on the the FetcherLagMetrics. We could just maintain a boolean for this purpose. I chose the easy path in this patch.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Copy link
Contributor

@divijvaidya divijvaidya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes looks good to me but I am afraid that the absence of lag field in InitialFetchState is causing problems at other places as well such as

val initialFetchState = InitialFetchState(currentFetchState.topicId, thread.leader.brokerEndPoint(),
where we ideally should be copying the lag of existing partition states to the new one when migrating partitions from one thread to another.

This makes me wonder if we should either include lag in InitialState or follow the suggestion you provided to not rely on lag.

@dajac
Copy link
Contributor Author

dajac commented Jun 12, 2023

This is exactly why I check currentFetchState.lag.isEmpty to ensure that it gets initialised in all cases on the first fetch. I would not include lag in InitialState because this is also what we use when a partition is added by the replica manager and the lag does not have any sense in this context. In the case you mentioned, the lag will be set on the first fetch.

@dajac
Copy link
Contributor Author

dajac commented Jun 12, 2023

I am playing around with my suggestion but this is way more invasive than the current proposed change so I wonder if we should start by merging a simple fix while I work on a long term solution...

@divijvaidya
Copy link
Contributor

LGTM! Let's merge this PR and continue discussion on the other fix.

In the case you mentioned, the lag will be set on the first fetch.
Yes, but we already have that information. We should be reusing it from older state. Why wait for next fetch?

@dajac
Copy link
Contributor Author

dajac commented Jun 12, 2023

Yeah, I agree but InitialFetchState is not the right way here. Instead, we should refactor resizeThreadPool to not use it but I am not sure that it is worth given than resizing is a rare event.

Copy link

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@dajac dajac merged commit dfe050c into apache:trunk Jun 13, 2023
@dajac dajac deleted the KAFKA-15080 branch June 13, 2023 13:19
dajac added a commit that referenced this pull request Jun 13, 2023
The PartitionFetchState's lag field is set to None when the state is created and it is updated when bytes are received for a partition. For idle partitions (newly created or not), the lag is never updated because `validBytes > 0` is never true. As a side effect, the partition is considered out-of-sync and could be incorrectly throttled.

Reviewers: Divij Vaidya <[email protected]>, Jason Gustafson <[email protected]>
chernyih added a commit to chernyih/kafka that referenced this pull request Sep 27, 2023
…no record

A regression is introduced by apache#13843.
When a fetch response has no record for a partition, validBytes is 0. We shouldn't
set the last fetched epoch to logAppendInfo.lastLeaderEpoch.asScala since there
is no record and it is Optional.empty. We should use currentFetchState.lastFetchedEpoch
instead.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants