Skip to content

Commit

Permalink
[FLINK-24277][connector/kafka] Remove auto-generated group id in Kafk…
Browse files Browse the repository at this point in the history
…a table source
  • Loading branch information
PatrickRen authored and becketqin committed Sep 15, 2021
1 parent ca8bff2 commit f3ef860
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,9 @@
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.flink.util.Preconditions;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

Expand All @@ -77,8 +74,6 @@
public class KafkaDynamicSource
implements ScanTableSource, SupportsReadingMetadata, SupportsWatermarkPushDown {

private static final Logger LOG = LoggerFactory.getLogger(KafkaDynamicSource.class);

// --------------------------------------------------------------------------------------------
// Mutable attributes
// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -389,17 +384,6 @@ protected KafkaSource<RowData> createKafkaSource(
kafkaSourceBuilder.setTopicPattern(topicPattern);
}

// For compatibility with legacy source that is not validating group id
if (!properties.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
String generatedGroupId = "KafkaSource-" + tableIdentifier;
LOG.warn(
"Property \"{}\" is required for offset commit but not set in table options. "
+ "Assigning \"{}\" as consumer group id",
ConsumerConfig.GROUP_ID_CONFIG,
generatedGroupId);
kafkaSourceBuilder.setGroupId(generatedGroupId);
}

switch (startupMode) {
case EARLIEST:
kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.earliest());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.connector.kafka.source;

import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.reader.KafkaSourceReader;

import java.util.Collection;
Expand All @@ -44,4 +45,9 @@ public static <T> KafkaSourceReader<T> createReaderWithFinishedSplitHook(
return ((KafkaSourceReader<T>)
kafkaSource.createReader(sourceReaderContext, splitFinishedHook));
}

/** Get configuration of KafkaSource. */
public static Configuration getKafkaSourceConfiguration(KafkaSource<?> kafkaSource) {
return kafkaSource.getConfiguration();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,13 @@
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
import org.apache.flink.connector.kafka.source.KafkaSourceTestUtils;
import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumState;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
import org.apache.flink.formats.avro.AvroRowDataSerializationSchema;
Expand Down Expand Up @@ -68,6 +72,7 @@

import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
Expand All @@ -93,6 +98,7 @@
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

Expand Down Expand Up @@ -349,6 +355,31 @@ public void testTableSourceWithKeyValueAndMetadata() {
assertEquals(actualSource, expectedKafkaSource);
}

@Test
public void testTableSourceCommitOnCheckpointDisabled() {
final Map<String, String> modifiedOptions =
getModifiedOptions(
getBasicSourceOptions(), options -> options.remove("properties.group.id"));
final DynamicTableSource tableSource = createTableSource(SCHEMA, modifiedOptions);

assertThat(tableSource, instanceOf(KafkaDynamicSource.class));
ScanTableSource.ScanRuntimeProvider providerWithoutGroupId =
((KafkaDynamicSource) tableSource)
.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
assertThat(providerWithoutGroupId, instanceOf(DataStreamScanProvider.class));
final KafkaSource<?> kafkaSource = assertKafkaSource(providerWithoutGroupId);
final Configuration configuration =
KafkaSourceTestUtils.getKafkaSourceConfiguration(kafkaSource);

// Test offset commit on checkpoint should be disabled when do not set consumer group.
assertFalse(configuration.get(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT));
assertFalse(
configuration.get(
ConfigOptions.key(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)
.booleanType()
.noDefaultValue()));
}

@Test
public void testTableSink() {
final Map<String, String> modifiedOptions =
Expand Down Expand Up @@ -997,7 +1028,7 @@ private static Map<String, String> getKeyValueOptions() {
return tableOptions;
}

private void assertKafkaSource(ScanTableSource.ScanRuntimeProvider provider) {
private KafkaSource<?> assertKafkaSource(ScanTableSource.ScanRuntimeProvider provider) {
assertThat(provider, instanceOf(DataStreamScanProvider.class));
final DataStreamScanProvider dataStreamScanProvider = (DataStreamScanProvider) provider;
final Transformation<RowData> transformation =
Expand All @@ -1010,5 +1041,6 @@ private void assertKafkaSource(ScanTableSource.ScanRuntimeProvider provider) {
(SourceTransformation<RowData, KafkaPartitionSplit, KafkaSourceEnumState>)
transformation;
assertThat(sourceTransformation.getSource(), instanceOf(KafkaSource.class));
return (KafkaSource<?>) sourceTransformation.getSource();
}
}

0 comments on commit f3ef860

Please sign in to comment.