Skip to content

Commit

Permalink
[hotfix][kafka] Make FlinkKafkaConsumerBase#setStartFromTimestamp to …
Browse files Browse the repository at this point in the history
…be public

All the Kafka versions are 0.10+ now, all the implementations have a public
setStartFromTimestamp(). We don't need to keep this method protected in the base class.
So that Kafka table source can call this method in kafka-base.
  • Loading branch information
wuchong committed May 17, 2020
1 parent 83ae463 commit 10a65ed
Show file tree
Hide file tree
Showing 3 changed files with 1 addition and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -294,17 +294,6 @@ protected boolean getIsAutoCommitEnabled() {
PropertiesUtil.getLong(properties, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000) > 0;
}

// ------------------------------------------------------------------------
// Timestamp-based startup
// ------------------------------------------------------------------------

@Override
public FlinkKafkaConsumerBase<T> setStartFromTimestamp(long startupOffsetsTimestamp) {
// the purpose of this override is just to publicly expose the method for Kafka 0.10+;
// the base class doesn't publicly expose it since not all Kafka versions support the functionality
return super.setStartFromTimestamp(startupOffsetsTimestamp);
}

@Override
protected Map<KafkaTopicPartition, Long> fetchOffsetsWithTimestamp(
Collection<KafkaTopicPartition> partitions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,11 +408,7 @@ public FlinkKafkaConsumerBase<T> setStartFromLatest() {
*
* @return The consumer object, to allow function chaining.
*/
// NOTE -
// This method is implemented in the base class because this is where the startup logging and verifications live.
// However, it is not publicly exposed since only newer Kafka versions support the functionality.
// Version-specific subclasses which can expose the functionality should override and allow public access.
protected FlinkKafkaConsumerBase<T> setStartFromTimestamp(long startupOffsetsTimestamp) {
public FlinkKafkaConsumerBase<T> setStartFromTimestamp(long startupOffsetsTimestamp) {
checkArgument(startupOffsetsTimestamp >= 0, "The provided value for the startup offsets timestamp is invalid.");

long currentTimestamp = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,17 +248,6 @@ protected AbstractPartitionDiscoverer createPartitionDiscoverer(
return new KafkaPartitionDiscoverer(topicsDescriptor, indexOfThisSubtask, numParallelSubtasks, properties);
}

// ------------------------------------------------------------------------
// Timestamp-based startup
// ------------------------------------------------------------------------

@Override
public FlinkKafkaConsumerBase<T> setStartFromTimestamp(long startupOffsetsTimestamp) {
// the purpose of this override is just to publicly expose the method for Kafka 0.10+;
// the base class doesn't publicly expose it since not all Kafka versions support the functionality
return super.setStartFromTimestamp(startupOffsetsTimestamp);
}

@Override
protected Map<KafkaTopicPartition, Long> fetchOffsetsWithTimestamp(
Collection<KafkaTopicPartition> partitions,
Expand Down

0 comments on commit 10a65ed

Please sign in to comment.