Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kgo: add TopicID to the FetchTopic type #794

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open

kgo: add TopicID to the FetchTopic type #794

wants to merge 1 commit into from

Conversation

twmb
Copy link
Owner

@twmb twmb commented Jul 29, 2024

Closes #790.

@twmb twmb added the minor label Jul 29, 2024
@@ -1041,6 +1041,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe

fetchTopic := FetchTopic{
Topic: topic,
TopicID: rt.TopicID,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, so if version >= 13, I should be assured that this is set?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's set if the broker returns it -- it's a new field in fetch response v13, yes -- and this will be all 0s if it is unset.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha, I assume all 0s is an invalid topic id.

@@ -274,6 +274,9 @@ func (p *FetchPartition) EachRecord(fn func(*Record)) {
type FetchTopic struct {
// Topic is the topic this is for.
Topic string
// TopicID is the ID of the topic, if your cluster supports returning
// topic IDs in fetch responses (Kafka 3.1+).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe add fetch request version? Will make it easier for someone using the API, I think.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

End users generally don't actually know what version of Kafka introduced what version of a request. It's undocumented anywhere -- a person has to first know the version a field was introduced in, then figure out which KIP introduced that version, then figure out which version of Kafka the KIP was actually released in.

Mentioning the version of Kafka that introduced topic IDs in the fetch response circumvents that process and gets directly to the answer, IMO.

@twmb
Copy link
Owner Author

twmb commented Jul 31, 2024

Topics do not have all 0 topic IDs, if the cluster supports topic IDs. The user can be sure that a topic ID is present based on the fact it is not all 0s. I use this same comparison myself:

var noID [16]byte
if newTP.cursor.topicID == noID && oldTP.cursor.topicID != noID {
cl.cfg.logger.Log(LogLevelWarn, "metadata update is missing the topic ID when we previously had one, ignoring update",
"topic", topic,
"partition", part,
)
retryWhy.add(topic, int32(part), errMissingTopicID)
continue
}

franz-go/pkg/kgo/source.go

Lines 1895 to 1898 in 6b61d17

var noID [16]byte
if c.topicID == noID {
f.disableIDs = true
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

fetch using topic id
2 participants