-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KafkaIO reader should set consumedOffset even before reading the first record #1071
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tested locally and it makes sense, but there is still an edge case where this fails, see comments.
for (PartitionState p : partitionStates) { | ||
if (p.consumedOffset >= 0) { | ||
LOG.info("{}: resuming {} at {}", name, p.topicPartition, p.consumedOffset + 1); | ||
if (p.consumedOffset >= 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You got two white spaces after >=
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
LOG.info("{}: resuming {} at default offset", name, p.topicPartition); | ||
// set consumed offset to (next offset - 1), otherwise checkpoint would contain invalid | ||
// offset until we read first record from this partition. | ||
p.consumedOffset = consumer.position(p.topicPartition) - 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Edge case, but still: If this is a new/empty partition, first micro-batch read will be skipped and the latest (was actually 0
) will advance.
Maybe use null
or -2
for a "never tried to read yet" flag. Or add one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I updated the patch. Now we have two negative offset ('UNINITIALIZED' and 'EMPTY_PARTITION').
We wouldn't need two of these if we tracked next offset rather than consumedOffsets. May be we should do that.
… for an empty partition. Would have been simpler to store next offset rather than 'consumed offset'.
} | ||
|
||
LOG.info("{}: reading from {} at offset {}", name, p.topicPartition, p.consumedOffset + 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will log: ${readerName}: reading from ${topic} at offset -99
for a read attempt of an empty partition.
Maybe use nextOffset
instead of p.consumedOffset + 1
or log different messages for resuming and initialising.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doh! Yeah, it is getting cumbersome. I changed the implementation to track nextOffset instead (see comment below).
+1 on the |
I will look into adding a unit test. It is getting cumbersome to track 'consumedOffset'. Changed the implementation to store I changed it in CheckpointMark also. That implies an update could be off by one. I don't think there any Dataflow apps in prod that also depend heavily on update(). I think it worth the small incompatibility at this stage. |
What is the advantage of storing latest_offset in Checkpoint? We are tracking it to be able to report backlog to the runner. |
I don't think there is an advantage to storing latest_offset now that you're tracking nextOffset, I'll have a look. |
Fix how we were using MockConsumer. checkpoint test was actually doing what it was supposed to.
@amitsela , added a new test ( Overall there are quite a few changes to KafkaIOTest.java, much larger than I expected. Because of this problem, existing checkpoint test was wrong (it passed by a fluke, the reader ignores records before the expected offset with a warning). This is also fixed now. |
The Jenkins error doesn't seem to be related, though I can't figure out exactly what it is - looks like some Google service issue ? |
// Similar to testUnboundedSourceCheckpointMark(), but verifies that source resumes | ||
// properly from empty partitions, without missing messages added since checkpoint. | ||
|
||
// Initialize consumer with than number of partitions so that some of them are empty. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess you meant the
instead of than
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed. I wanted to say 'Initialize consumer with fewer elements than number of partitions...'
Tests seem to cover the simple case of reading all form @dhalperi wanna take a look, or should I merge ? |
@rangadi another quick question, which might require a separate PR, but as long as this thread is alive: |
These timeouts don't affect KafkaIO itself in any way. This was mainly for the local runner (and to some extent Dataflow runner). There is quite a big ambiguity around 'How long should UnboundedSourceReader.advance()' wait. Ideally runner should work fine even if 'advance()' is always non-blocking. There is minimal guidance from SDK on this. E.g. originally I had both timeouts set to just 10 millis. It caused cpu thrashing on local runner (it kept churning through too many empty bundles, as start() often returned false). 5 seconds timeout for start() was mainly to address this problem. May be this is less of an issue now with DoFn reuse etc (cc @tgroh). I think 5 seconds is too long too. I am open making these configurable as work around. |
For both advance and start timeouts, it has to be less than the time dedicate for a read, which is less than the |
I see. This seems pretty runner specific. It would be quite simple for KafkaIO to obey 'max_wait' hint. Please open and issue or jira to decide how we want to pass the hint from the runner to source. May be in PipelineOptions passed to |
Notes to @dhalperi (if you are committing) :
|
Filed BEAM-744 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basic feedback on style with a few questions.
if (p.consumedOffset >= 0) { | ||
LOG.info("{}: resuming {} at {}", name, p.topicPartition, p.consumedOffset + 1); | ||
consumer.seek(p.topicPartition, p.consumedOffset + 1); | ||
if (p.nextOffset >= 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you make the swap from -1 to a constant more clear? E.g., this should be if (p.nextOffset != UNITIALIZED_OFFSET)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. This is better..
} else { | ||
LOG.info("{}: resuming {} at default offset", name, p.topicPartition); | ||
// Set nextOffset to current position, otherwise checkpoint would contain invalid |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Possible wording change:
// nextOffset is currently uninitialized, meaning "start from latest" record. Ask the consumer for that offset
// and start there, and this information will be persisted across checkpoints if this reader is closed.
Sanity check: what does consumer.position
return if there has never been a record published? Hopefully, it returns 0
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, consumer.position()
should either return a valid offset or throw an error. These exceptions will reach the runner.
Rephrased it:
// nextOffset is unininitialized here, meaning start reading from latest record as of now
// ('latest' is the default, and is configurable). Remember the current position without
// waiting until the first record read. This ensures checkpoint is accurate even if the
// reader is closed before reading any records.
@@ -989,10 +989,10 @@ public boolean advance() throws IOException { | |||
} | |||
|
|||
ConsumerRecord<byte[], byte[]> rawRecord = pState.recordIter.next(); | |||
long consumed = pState.consumedOffset; | |||
long next = pState.nextOffset; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
claim: none of these offsets are allowed to be -1
now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I renamed 'next' to 'expected'.
records.get(tp).add( | ||
new ConsumerRecord<byte[], byte[]>( | ||
new ConsumerRecord<>( | ||
tp.topic(), | ||
tp.partition(), | ||
offsets[pIdx]++, | ||
ByteBuffer.wrap(new byte[8]).putInt(i).array(), // key is 4 byte record id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
8 vs 4 here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this intentional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops! fixed. It does not affect the test results.
Jenkins was a flake, but expected: the DirectRunner does not retry failed bundles. |
LGTM, thanks. |
KafkaIO reader should set consumedOffset even before reading the first record. This matters in cases where one of the partition might not have any records since the start. Without this fix, when the job is 'updated' (in Spark runner this happens frequently for each micro batch), the reader for such a partition could lose messages that are sent during the update.
Another option discussed is fetching these offsets insider
generateInitialSplits()
for better consistency. I don't think we need to do that yet :R: @amitsela, @dhalperi
TODO:
Be sure to do all of the following to help us incorporate your contribution
quickly and easily:
[BEAM-<Jira issue #>] Description of pull request
mvn clean verify
. (Even better, enableTravis-CI on your fork and ensure the whole test matrix passes).
<Jira issue #>
in the title with the actual Jira issuenumber, if there is one.
Individual Contributor License Agreement.