-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-15080; Fetcher's lag never set when partition is idle #13843
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changes looks good to me but I am afraid that the absence of lag field in InitialFetchState is causing problems at other places as well such as
val initialFetchState = InitialFetchState(currentFetchState.topicId, thread.leader.brokerEndPoint(), |
This makes me wonder if we should either include lag in InitialState or follow the suggestion you provided to not rely on lag.
This is exactly why I check |
I am playing around with my suggestion but this is way more invasive than the current proposed change so I wonder if we should start by merging a simple fix while I work on a long term solution... |
LGTM! Let's merge this PR and continue discussion on the other fix.
|
Yeah, I agree but |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
The PartitionFetchState's lag field is set to None when the state is created and it is updated when bytes are received for a partition. For idle partitions (newly created or not), the lag is never updated because `validBytes > 0` is never true. As a side effect, the partition is considered out-of-sync and could be incorrectly throttled. Reviewers: Divij Vaidya <[email protected]>, Jason Gustafson <[email protected]>
…no record A regression is introduced by apache#13843. When a fetch response has no record for a partition, validBytes is 0. We shouldn't set the last fetched epoch to logAppendInfo.lastLeaderEpoch.asScala since there is no record and it is Optional.empty. We should use currentFetchState.lastFetchedEpoch instead.
The PartitionFetchState's lag field is set to None when the state is created and it is updated when bytes are received for a partition. For idle partitions (newly created or not), the lag is never updated because
validBytes > 0
is never true. As a side effect, the partition is considered out-of-sync and could be incorrectly throttled.This patch really makes me wonder if keeping the lag in the
PartitionFetchState
is really necessary. At the end of the day, we only use is to computeisReplicaInSync
because the metrics rely on the theFetcherLagMetrics
. We could just maintain a boolean for this purpose. I chose the easy path in this patch.Committer Checklist (excluded from commit message)