Skip to content

Commit

Permalink
[hotfix] [kafka] Remove stale comment on publishing procedures of Abs…
Browse files Browse the repository at this point in the history
…tractFetcher

The previous comment mentioned "only now will the fetcher return at
least the restored offsets when calling snapshotCurrentState()". This is
a remnant of the previous fetcher initialization behaviour, where in the
past the fetcher wasn't directly seeded with restored offsets on
instantiation.

Since this is no longer true, this commit fixes the stale comment to
avoid confusion.
  • Loading branch information
tzulitai committed Jan 12, 2018
1 parent 69fff74 commit ac0facc
Showing 1 changed file with 9 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -547,21 +547,19 @@ public void onException(Throwable cause) {
sourceContext.markAsTemporarilyIdle();
}

// create the fetcher that will communicate with the Kafka brokers
final AbstractFetcher<T, ?> fetcher = createFetcher(
// from this point forward:
// - 'snapshotState' will draw offsets from the fetcher,
// instead of being built from `subscribedPartitionsToStartOffsets`
// - 'notifyCheckpointComplete' will start to do work (i.e. commit offsets to
// Kafka through the fetcher, if configured to do so)
this.kafkaFetcher = createFetcher(
sourceContext,
subscribedPartitionsToStartOffsets,
periodicWatermarkAssigner,
punctuatedWatermarkAssigner,
(StreamingRuntimeContext) getRuntimeContext(),
offsetCommitMode);

// publish the reference, for snapshot-, commit-, and cancel calls
// IMPORTANT: We can only do that now, because only now will calls to
// the fetchers 'snapshotCurrentState()' method return at least
// the restored offsets
this.kafkaFetcher = fetcher;

if (!running) {
return;
}
Expand Down Expand Up @@ -600,7 +598,7 @@ public void run() {

// no need to add the discovered partitions if we were closed during the meantime
if (running && !discoveredPartitions.isEmpty()) {
fetcher.addDiscoveredPartitions(discoveredPartitions);
kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);
}

// do not waste any time sleeping if we're not running anymore
Expand All @@ -623,7 +621,7 @@ public void run() {
});

discoveryLoopThread.start();
fetcher.runFetchLoop();
kafkaFetcher.runFetchLoop();

// --------------------------------------------------------------------

Expand All @@ -640,7 +638,7 @@ public void run() {
// won't be using the discoverer
partitionDiscoverer.close();

fetcher.runFetchLoop();
kafkaFetcher.runFetchLoop();
}
}

Expand Down

0 comments on commit ac0facc

Please sign in to comment.