Skip to content

Commit

Permalink
[FLINK-18323][connector/kafka] Add Kafka Source based on FLIP-27.
Browse files Browse the repository at this point in the history
  • Loading branch information
becketqin committed Nov 8, 2020
1 parent 84eb280 commit 80c040f
Show file tree
Hide file tree
Showing 34 changed files with 4,764 additions and 2 deletions.
17 changes: 15 additions & 2 deletions flink-connectors/flink-connector-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,13 @@ under the License.
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<!-- test dependencies -->

<dependency>
Expand Down Expand Up @@ -130,6 +137,13 @@ under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-test-utils</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
Expand Down Expand Up @@ -189,8 +203,7 @@ under the License.
<version>${project.version}</version>
<scope>test</scope>
</dependency>

</dependencies>
</dependencies>

<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.kafka.source;

import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumState;
import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumStateSerializer;
import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber;
import org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader;
import org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter;
import org.apache.flink.connector.kafka.source.reader.KafkaSourceReader;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializer;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplitSerializer;
import org.apache.flink.core.io.SimpleVersionedSerializer;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.Properties;
import java.util.function.Supplier;

/**
* The Source implementation of Kafka. Please use a {@link KafkaSourceBuilder} to construct a {@link KafkaSource}.
* The following example shows how to create a KafkaSource emitting records of <code>String</code> type.
*
* <pre>{@code
* KafkaSource<String> source = KafkaSource
* .<String>builder()
* .setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
* .setGroupId("MyGroup")
* .setTopics(Arrays.asList(TOPIC1, TOPIC2))
* .setDeserializer(new TestingKafkaRecordDeserializer())
* .setStartingOffsetInitializer(OffsetsInitializer.earliest())
* .build();
* }</pre>
*
* <p>See {@link KafkaSourceBuilder} for more details.
*
* @param <OUT> the output type of the source.
*/
public class KafkaSource<OUT> implements Source<OUT, KafkaPartitionSplit, KafkaSourceEnumState> {
private static final long serialVersionUID = -8755372893283732098L;
// Users can choose only one of the following ways to specify the topics to consume from.
private final KafkaSubscriber subscriber;
// Users can specify the starting / stopping offset initializer.
private final OffsetsInitializer startingOffsetsInitializer;
private final OffsetsInitializer stoppingOffsetsInitializer;
// Boundedness
private final Boundedness boundedness;
private final KafkaRecordDeserializer<OUT> deserializationSchema;
// The configurations.
private final Properties props;

KafkaSource(
KafkaSubscriber subscriber,
OffsetsInitializer startingOffsetsInitializer,
@Nullable OffsetsInitializer stoppingOffsetsInitializer,
Boundedness boundedness,
KafkaRecordDeserializer<OUT> deserializationSchema,
Properties props) {
this.subscriber = subscriber;
this.startingOffsetsInitializer = startingOffsetsInitializer;
this.stoppingOffsetsInitializer = stoppingOffsetsInitializer;
this.boundedness = boundedness;
this.deserializationSchema = deserializationSchema;
this.props = props;
}

/**
* Get a kafkaSourceBuilder to build a {@link KafkaSource}.
*
* @return a Kafka source builder.
*/
public static <OUT> KafkaSourceBuilder<OUT> builder() {
return new KafkaSourceBuilder<>();
}

@Override
public Boundedness getBoundedness() {
return this.boundedness;
}

@Override
public SourceReader<OUT, KafkaPartitionSplit> createReader(SourceReaderContext readerContext) {
FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple3<OUT, Long, Long>>> elementsQueue =
new FutureCompletingBlockingQueue<>();
Supplier<KafkaPartitionSplitReader<OUT>> splitReaderSupplier =
() -> new KafkaPartitionSplitReader<>(
props,
deserializationSchema,
readerContext.getIndexOfSubtask());
KafkaRecordEmitter<OUT> recordEmitter = new KafkaRecordEmitter<>();

return new KafkaSourceReader<>(
elementsQueue,
splitReaderSupplier,
recordEmitter,
toConfiguration(props),
readerContext);
}

@Override
public SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState> createEnumerator(
SplitEnumeratorContext<KafkaPartitionSplit> enumContext) {
return new KafkaSourceEnumerator(
subscriber,
startingOffsetsInitializer,
stoppingOffsetsInitializer,
props,
enumContext);
}

@Override
public SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState> restoreEnumerator(
SplitEnumeratorContext<KafkaPartitionSplit> enumContext,
KafkaSourceEnumState checkpoint) throws IOException {
return new KafkaSourceEnumerator(
subscriber,
startingOffsetsInitializer,
stoppingOffsetsInitializer,
props,
enumContext,
checkpoint.getCurrentAssignment());
}

@Override
public SimpleVersionedSerializer<KafkaPartitionSplit> getSplitSerializer() {
return new KafkaPartitionSplitSerializer();
}

@Override
public SimpleVersionedSerializer<KafkaSourceEnumState> getEnumeratorCheckpointSerializer() {
return new KafkaSourceEnumStateSerializer();
}

// ----------- private helper methods ---------------

private Configuration toConfiguration(Properties props) {
Configuration config = new Configuration();
props.stringPropertyNames().forEach(key -> config.setString(key, props.getProperty(key)));
return config;
}
}
Loading

0 comments on commit 80c040f

Please sign in to comment.