Skip to content

Commit

Permalink
[FLINK-23391][connector/kafka] Fix flaky Kafka source metric test by …
Browse files Browse the repository at this point in the history
…retrying notifyCheckpointComplete until success or timeout

When calling KafkaSourceReader.notifyCheckpointComplete(), KafkaConsumer.commitAsync() might fail because of coordinator movement or network instability during the test. Retrying offset commit until success will increase the stability of the case and has no side effect since this operation is idempotent.
  • Loading branch information
PatrickRen authored and AHeise committed Oct 28, 2021
1 parent 8106e27 commit c461338
Showing 1 changed file with 19 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -329,28 +329,40 @@ public void testKafkaSourceMetrics() throws Exception {
assertEquals(INITIAL_OFFSET, getCommittedOffsetMetric(tp1, metricListener));

// Trigger offset commit
reader.snapshotState(15213L);
reader.notifyCheckpointComplete(15213L);
final long checkpointId = 15213L;
reader.snapshotState(checkpointId);
waitUtil(
() -> reader.getOffsetsToCommit().isEmpty(),
() -> {
try {
reader.notifyCheckpointComplete(checkpointId);
} catch (Exception e) {
throw new RuntimeException(
"Failed to notify checkpoint complete to reader", e);
}
return reader.getOffsetsToCommit().isEmpty();
},
Duration.ofSeconds(60),
Duration.ofSeconds(1),
String.format(
"Offsets are not committed successfully. Dangling offsets: %s",
reader.getOffsetsToCommit()));

// Metric "commit-total" of KafkaConsumer should be 1
assertEquals(1, getKafkaConsumerMetric("commit-total", metricListener));
// Metric "commit-total" of KafkaConsumer should be greater than 0
// It's hard to know the exactly number of commit because of the retry
MatcherAssert.assertThat(
getKafkaConsumerMetric("commit-total", metricListener),
Matchers.greaterThan(0L));

// Committed offset should be NUM_RECORD_PER_SPLIT
assertEquals(NUM_RECORDS_PER_SPLIT, getCommittedOffsetMetric(tp0, metricListener));
assertEquals(NUM_RECORDS_PER_SPLIT, getCommittedOffsetMetric(tp1, metricListener));

// Number of successful commits should be 1
// Number of successful commits should be greater than 0
final Optional<Counter> commitsSucceeded =
metricListener.getCounter(
KAFKA_SOURCE_READER_METRIC_GROUP, COMMITS_SUCCEEDED_METRIC_COUNTER);
assertTrue(commitsSucceeded.isPresent());
assertEquals(1L, commitsSucceeded.get().getCount());
MatcherAssert.assertThat(commitsSucceeded.get().getCount(), Matchers.greaterThan(0L));
}
}

Expand Down

0 comments on commit c461338

Please sign in to comment.