Skip to content

Commit

Permalink
[FLINK-17802][kafka] Set offset commit only if group id is configured…
Browse files Browse the repository at this point in the history
… for new Kafka Table source

This closes apache#12252
  • Loading branch information
leonardBang committed May 26, 2020
1 parent 66f2132 commit 6095131
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1119,7 +1119,7 @@ LinkedMap getPendingOffsetsToCommit() {
}

@VisibleForTesting
boolean getEnableCommitOnCheckpoints() {
public boolean getEnableCommitOnCheckpoints() {
return enableCommitOnCheckpoints;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ protected FlinkKafkaConsumerBase<RowData> getKafkaConsumer(
kafkaConsumer.setStartFromTimestamp(startupTimestampMillis);
break;
}
kafkaConsumer.setCommitOffsetsOnCheckpoints(properties.getProperty("group.id") != null);
return kafkaConsumer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,24 +170,23 @@ public void testTableSource() {
final StreamExecutionEnvironmentMock mock = new StreamExecutionEnvironmentMock();
actualKafkaSource.getDataStream(mock);
assertTrue(getExpectedFlinkKafkaConsumer().isAssignableFrom(mock.sourceFunction.getClass()));
// Test commitOnCheckpoints flag should be true when set consumer group.
assertTrue(((FlinkKafkaConsumerBase) mock.sourceFunction).getEnableCommitOnCheckpoints());
}

Properties propsWithoutGroupId = new Properties();
propsWithoutGroupId.setProperty("bootstrap.servers", "dummy");

final KafkaTableSourceBase sourceWithoutGroupId = getExpectedKafkaTableSource(
schema,
Optional.of(PROC_TIME),
rowtimeAttributeDescriptors,
fieldMapping,
TOPIC,
propsWithoutGroupId,
deserializationSchema,
StartupMode.LATEST,
new HashMap<>(),
0L);

sourceWithoutGroupId.getDataStream(mock);
@Test
public void testTableSourceCommitOnCheckpointsDisabled() {
Map<String, String> propertiesMap = new HashMap<>();
createKafkaSourceProperties().forEach((k, v) -> {
if (!k.equals("connector.properties.group.id")) {
propertiesMap.put(k, v);
}
});
final TableSource<?> tableSource = TableFactoryService.find(StreamTableSourceFactory.class, propertiesMap)
.createStreamTableSource(propertiesMap);
final StreamExecutionEnvironmentMock mock = new StreamExecutionEnvironmentMock();
// Test commitOnCheckpoints flag should be false when do not set consumer group.
((KafkaTableSourceBase) tableSource).getDataStream(mock);
assertTrue(mock.sourceFunction instanceof FlinkKafkaConsumerBase);
assertFalse(((FlinkKafkaConsumerBase) mock.sourceFunction).getEnableCommitOnCheckpoints());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
Expand Down Expand Up @@ -61,7 +62,9 @@
import static org.apache.flink.util.CoreMatchers.containsCause;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

/**
* Abstract test base for {@link KafkaDynamicTableFactoryBase}.
Expand Down Expand Up @@ -153,6 +156,34 @@ public void testTableSource() {
final SourceFunctionProvider sourceFunctionProvider = (SourceFunctionProvider) provider;
final SourceFunction<RowData> sourceFunction = sourceFunctionProvider.createSourceFunction();
assertThat(sourceFunction, instanceOf(getExpectedConsumerClass()));
// Test commitOnCheckpoints flag should be true when set consumer group
assertTrue(((FlinkKafkaConsumerBase) sourceFunction).getEnableCommitOnCheckpoints());
}

@Test
public void testTableSourceCommitOnCheckpointsDisabled() {
//Construct table source using options and table source factory
ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
"default",
"default",
"scanTable");
Map<String, String> tableOptions = getFullSourceOptions();
tableOptions.remove("properties.group.id");
CatalogTable catalogTable = createKafkaSourceCatalogTable(tableOptions);
final DynamicTableSource tableSource = FactoryUtil.createTableSource(null,
objectIdentifier,
catalogTable,
new Configuration(),
Thread.currentThread().getContextClassLoader());

// Test commitOnCheckpoints flag should be false when do not set consumer group.
assertThat(tableSource, instanceOf(KafkaDynamicSourceBase.class));
ScanTableSource.ScanRuntimeProvider providerWithoutGroupId = ((KafkaDynamicSourceBase) tableSource)
.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
assertThat(providerWithoutGroupId, instanceOf(SourceFunctionProvider.class));
final SourceFunctionProvider functionProviderWithoutGroupId = (SourceFunctionProvider) providerWithoutGroupId;
final SourceFunction<RowData> function = functionProviderWithoutGroupId.createSourceFunction();
assertFalse(((FlinkKafkaConsumerBase) function).getEnableCommitOnCheckpoints());
}

@Test
Expand Down

0 comments on commit 6095131

Please sign in to comment.