Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaIO reader should set consumedOffset even before reading the first record #1071

Closed
wants to merge 8 commits into from

Conversation

rangadi
Copy link
Contributor

@rangadi rangadi commented Oct 8, 2016

KafkaIO reader should set consumedOffset even before reading the first record. This matters in cases where one of the partition might not have any records since the start. Without this fix, when the job is 'updated' (in Spark runner this happens frequently for each micro batch), the reader for such a partition could lose messages that are sent during the update.

Another option discussed is fetching these offsets insider generateInitialSplits() for better consistency. I don't think we need to do that yet :

  • I don't it provides better consistency. AFIK, there is no concept of 'consistent set of offsets' across different partitions.
  • Increases start up latency.
  • If we are starting a streaming job from scratch (and using default 'latest offset'), expectation on 'starting point' is fuzzy anyway.
  • This policy does not matter for updates as the offset comes from checkpoint stored.
  • readers will take even more responsibility of managing splits when we add support for handing change in Kafka partitions. This does not help there.

R: @amitsela, @dhalperi

TODO:

  • Add a unit test.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

  • Make sure the PR title is formatted like:
    [BEAM-<Jira issue #>] Description of pull request
  • Make sure tests pass via mvn clean verify. (Even better, enable
    Travis-CI on your fork and ensure the whole test matrix passes).
  • Replace <Jira issue #> in the title with the actual Jira issue
    number, if there is one.
  • If this contribution is large, please file an Apache
    Individual Contributor License Agreement.

Copy link
Member

@amitsela amitsela left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested locally and it makes sense, but there is still an edge case where this fails, see comments.

for (PartitionState p : partitionStates) {
if (p.consumedOffset >= 0) {
LOG.info("{}: resuming {} at {}", name, p.topicPartition, p.consumedOffset + 1);
if (p.consumedOffset >= 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You got two white spaces after >=

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

LOG.info("{}: resuming {} at default offset", name, p.topicPartition);
// set consumed offset to (next offset - 1), otherwise checkpoint would contain invalid
// offset until we read first record from this partition.
p.consumedOffset = consumer.position(p.topicPartition) - 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Edge case, but still: If this is a new/empty partition, first micro-batch read will be skipped and the latest (was actually 0) will advance.
Maybe use null or -2 for a "never tried to read yet" flag. Or add one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I updated the patch. Now we have two negative offset ('UNINITIALIZED' and 'EMPTY_PARTITION').

We wouldn't need two of these if we tracked next offset rather than consumedOffsets. May be we should do that.

… for

an empty partition. Would have been simpler to store next offset rather than
'consumed offset'.
}

LOG.info("{}: reading from {} at offset {}", name, p.topicPartition, p.consumedOffset + 1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will log: ${readerName}: reading from ${topic} at offset -99 for a read attempt of an empty partition.
Maybe use nextOffset instead of p.consumedOffset + 1 or log different messages for resuming and initialising.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doh! Yeah, it is getting cumbersome. I changed the implementation to track nextOffset instead (see comment below).

@amitsela
Copy link
Member

+1 on the TODO for unit test. This is becoming tricky.
BTW I see that PartitionState has a latestOffset field, why not add that to the CheckpointMark ? we won't need different flags if we have another state, and we seem to track it anyway.

@rangadi
Copy link
Contributor Author

rangadi commented Oct 10, 2016

I will look into adding a unit test.

It is getting cumbersome to track 'consumedOffset'. Changed the implementation to store next offset instead.

I changed it in CheckpointMark also. That implies an update could be off by one. I don't think there any Dataflow apps in prod that also depend heavily on update(). I think it worth the small incompatibility at this stage.
We are going to change checkpoint serialization to Avro to handle future updates better.

cc: @dhalperi , @amitsela

@rangadi
Copy link
Contributor Author

rangadi commented Oct 10, 2016

What is the advantage of storing latest_offset in Checkpoint? We are tracking it to be able to report backlog to the runner.

@amitsela
Copy link
Member

I don't think there is an advantage to storing latest_offset now that you're tracking nextOffset, I'll have a look.

Fix how we were using MockConsumer. checkpoint test was actually doing what it was supposed to.
@rangadi
Copy link
Contributor Author

rangadi commented Oct 12, 2016

@amitsela , added a new test (testUnboundedSourceCheckpointMarkWithEmptyPartitions). The new test fails without this fix.

Overall there are quite a few changes to KafkaIOTest.java, much larger than I expected.
While working on this I found an important problem with how I was using MockConsumer. When we seek() on a topic, it just changes topic metadata, but not what it serves. If we have 10 records, and we seek to the end, next poll() still returns those 10 records. It is our responsibility to feed the records correct based on partition position. Now fixed.

Because of this problem, existing checkpoint test was wrong (it passed by a fluke, the reader ignores records before the expected offset with a warning). This is also fixed now.

@amitsela
Copy link
Member

amitsela commented Oct 12, 2016

The Jenkins error doesn't seem to be related, though I can't figure out exactly what it is - looks like some Google service issue ?

// Similar to testUnboundedSourceCheckpointMark(), but verifies that source resumes
// properly from empty partitions, without missing messages added since checkpoint.

// Initialize consumer with than number of partitions so that some of them are empty.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you meant the instead of than ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed. I wanted to say 'Initialize consumer with fewer elements than number of partitions...'

@amitsela
Copy link
Member

Tests seem to cover the simple case of reading all form earliest and a read from latest with some partitions empty.
This seems more robust now, and I can't come up with anything else so LGTM from me.

@dhalperi wanna take a look, or should I merge ?

@amitsela
Copy link
Member

@rangadi another quick question, which might require a separate PR, but as long as this thread is alive:
I understand that KAFKA_POLL_TIMEOUT and START_NEW_RECORDS_POLL_TIMEOUT are set to 1 and 5 seconds (respectively) for throughput reasons (right ?) and I was wondering if they could be made configurable so that Spark could replace them if the micro-batch (or the portion it dedicates to read) is smaller than those values.
I'm not sure what's correct here - propagating a configuration through the Source or letting the runner's worker configure the Reader.

@rangadi
Copy link
Contributor Author

rangadi commented Oct 12, 2016

These timeouts don't affect KafkaIO itself in any way. This was mainly for the local runner (and to some extent Dataflow runner). There is quite a big ambiguity around 'How long should UnboundedSourceReader.advance()' wait. Ideally runner should work fine even if 'advance()' is always non-blocking. There is minimal guidance from SDK on this. E.g. originally I had both timeouts set to just 10 millis. It caused cpu thrashing on local runner (it kept churning through too many empty bundles, as start() often returned false). 5 seconds timeout for start() was mainly to address this problem. May be this is less of an issue now with DoFn reuse etc (cc @tgroh).

I think 5 seconds is too long too. I am open making these configurable as work around.
From spark runner perspective, how long should 'advance()' wait ideally when there is no input?

@amitsela
Copy link
Member

For both advance and start timeouts, it has to be less than the time dedicate for a read, which is less than the batch-interval time.
AFAIK Spark doesn't recommend less than a 0.5 second interval, but the rest is really up to configuration.
The Spark runner UnboundedSource plans to have a minimum of 200 msec for a read, and when testing locally I've set both to 100 msec.

@rangadi
Copy link
Contributor Author

rangadi commented Oct 12, 2016

I see. This seems pretty runner specific. It would be quite simple for KafkaIO to obey 'max_wait' hint. Please open and issue or jira to decide how we want to pass the hint from the runner to source. May be in PipelineOptions passed to createReader(). cc @dhalperi.

@rangadi
Copy link
Contributor Author

rangadi commented Oct 12, 2016

Notes to @dhalperi (if you are committing) :

  • crux of the fix is line 932 , where we set nextOffset.
  • there is a small incompatibility in checkpoint mark. Offset here 'nextOffset', it used to be 'consumed offset', i.e. off by one. We don't think it will affect many and worth the simplification. The worst case when user runs new code with old snapshot is that it will miss one record from each partition.

@amitsela
Copy link
Member

Filed BEAM-744

Copy link
Contributor

@dhalperi dhalperi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basic feedback on style with a few questions.

if (p.consumedOffset >= 0) {
LOG.info("{}: resuming {} at {}", name, p.topicPartition, p.consumedOffset + 1);
consumer.seek(p.topicPartition, p.consumedOffset + 1);
if (p.nextOffset >= 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make the swap from -1 to a constant more clear? E.g., this should be if (p.nextOffset != UNITIALIZED_OFFSET)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. This is better..

} else {
LOG.info("{}: resuming {} at default offset", name, p.topicPartition);
// Set nextOffset to current position, otherwise checkpoint would contain invalid
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possible wording change:

// nextOffset is currently uninitialized, meaning "start from latest" record. Ask the consumer for that offset
// and start there, and this information will be persisted across checkpoints if this reader is closed.

Sanity check: what does consumer.position return if there has never been a record published? Hopefully, it returns 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, consumer.position() should either return a valid offset or throw an error. These exceptions will reach the runner.

Rephrased it:

          // nextOffset is unininitialized here, meaning start reading from latest record as of now
          // ('latest' is the default, and is configurable). Remember the current position without
          // waiting until the first record read. This ensures checkpoint is accurate even if the
          // reader is closed before reading any records.

@@ -989,10 +989,10 @@ public boolean advance() throws IOException {
}

ConsumerRecord<byte[], byte[]> rawRecord = pState.recordIter.next();
long consumed = pState.consumedOffset;
long next = pState.nextOffset;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

claim: none of these offsets are allowed to be -1 now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I renamed 'next' to 'expected'.

records.get(tp).add(
new ConsumerRecord<byte[], byte[]>(
new ConsumerRecord<>(
tp.topic(),
tp.partition(),
offsets[pIdx]++,
ByteBuffer.wrap(new byte[8]).putInt(i).array(), // key is 4 byte record id
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

8 vs 4 here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this intentional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops! fixed. It does not affect the test results.

@dhalperi
Copy link
Contributor

Jenkins was a flake, but expected: the DirectRunner does not retry failed bundles.

@dhalperi
Copy link
Contributor

LGTM, thanks.

@asfgit asfgit closed this in 93d2e37 Oct 14, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants