forked from apache/flink
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[FLINK-15670][connector] Kafka Shuffle API Part
KafkaShuffle provides a transparent Kafka source and sink pair, through which the network traffic of a shuffle step is persisted and redirected.
- Loading branch information
Showing
2 changed files
with
421 additions
and
0 deletions.
There are no files selected for viewing
378 changes: 378 additions & 0 deletions
378
.../src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,378 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.streaming.connectors.kafka.shuffle; | ||
|
||
import org.apache.flink.annotation.Experimental; | ||
import org.apache.flink.api.common.operators.Keys; | ||
import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema; | ||
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; | ||
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; | ||
import org.apache.flink.api.common.typeinfo.TypeInformation; | ||
import org.apache.flink.api.common.typeutils.TypeSerializer; | ||
import org.apache.flink.api.java.functions.KeySelector; | ||
import org.apache.flink.api.java.tuple.Tuple; | ||
import org.apache.flink.runtime.state.KeyGroupRangeAssignment; | ||
import org.apache.flink.streaming.api.TimeCharacteristic; | ||
import org.apache.flink.streaming.api.datastream.DataStream; | ||
import org.apache.flink.streaming.api.datastream.DataStreamUtils; | ||
import org.apache.flink.streaming.api.datastream.KeyedStream; | ||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; | ||
import org.apache.flink.streaming.api.functions.source.SourceFunction; | ||
import org.apache.flink.streaming.api.transformations.SinkTransformation; | ||
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; | ||
import org.apache.flink.streaming.util.keys.KeySelectorUtil; | ||
import org.apache.flink.util.Preconditions; | ||
import org.apache.flink.util.PropertiesUtil; | ||
|
||
import java.util.Properties; | ||
|
||
/** | ||
* {@link FlinkKafkaShuffle} uses Kafka as a message bus to shuffle and persist data at the same time. | ||
* | ||
* <p>Persisting shuffle data is useful when | ||
* - you would like to reuse the shuffle data and/or, | ||
* - you would like to avoid a full restart of a pipeline during failure recovery | ||
* | ||
* <p>Persisting shuffle is achieved by wrapping a {@link FlinkKafkaShuffleProducer} and | ||
* a {@link FlinkKafkaShuffleConsumer} together into a {@link FlinkKafkaShuffle}. | ||
* Here is an example how to use a {@link FlinkKafkaShuffle}. | ||
* | ||
* <p><pre>{@code | ||
* StreamExecutionEnvironment env = ... // create execution environment | ||
* DataStream<X> source = env.addSource(...) // add data stream source | ||
* DataStream<Y> dataStream = ... // some transformation(s) based on source | ||
* | ||
* KeyedStream<Y, KEY> keyedStream = FlinkKafkaShuffle | ||
* .persistentKeyBy( // keyBy shuffle through kafka | ||
* dataStream, // data stream to be shuffled | ||
* topic, // Kafka topic written to | ||
* producerParallelism, // the number of tasks of a Kafka Producer | ||
* numberOfPartitions, // the number of partitions of the Kafka topic written to | ||
* kafkaProperties, // kafka properties for Kafka Producer and Consumer | ||
* keySelector<Y, KEY>); // key selector to retrieve key from `dataStream' | ||
* | ||
* keyedStream.transform... // some other transformation(s) | ||
* | ||
* KeyedStream<Y, KEY> keyedStreamReuse = FlinkKafkaShuffle | ||
* .readKeyBy( // Read the Kafka shuffle data again for other usages | ||
* topic, // the topic of Kafka where data is persisted | ||
* env, // execution environment, and it can be a new environment | ||
* typeInformation<Y>, // type information of the data persisted in Kafka | ||
* kafkaProperties, // kafka properties for Kafka Consumer | ||
* keySelector<Y, KEY>); // key selector to retrieve key | ||
* | ||
* keyedStreamReuse.transform... // some other transformation(s) | ||
* }</pre> | ||
* | ||
* <p>Usage of {@link FlinkKafkaShuffle#persistentKeyBy} is similar to {@link DataStream#keyBy(KeySelector)}. | ||
* The differences are: | ||
* | ||
* <p>1). Partitioning is done through {@link FlinkKafkaShuffleProducer}. {@link FlinkKafkaShuffleProducer} decides | ||
* which partition a key goes when writing to Kafka | ||
* | ||
* <p>2). Shuffle data can be reused through {@link FlinkKafkaShuffle#readKeyBy}, as shown in the example above. | ||
* | ||
* <p>3). Job execution is decoupled by the persistent Kafka message bus. In the example, the job execution graph is | ||
* decoupled to three regions: `KafkaShuffleProducer', `KafkaShuffleConsumer' and `KafkaShuffleConsumerReuse' | ||
* through `PERSISTENT DATA` as shown below. If any region fails the execution, the other two keep progressing. | ||
* | ||
* <p><pre> | ||
* source -> ... KafkaShuffleProducer -> PERSISTENT DATA -> KafkaShuffleConsumer -> ... | ||
* | | ||
* | ----------> KafkaShuffleConsumerReuse -> ... | ||
* </pre> | ||
*/ | ||
@Experimental | ||
public class FlinkKafkaShuffle { | ||
static final String PRODUCER_PARALLELISM = "producer parallelism"; | ||
static final String PARTITION_NUMBER = "partition number"; | ||
|
||
/** | ||
* Uses Kafka as a message bus to persist keyBy shuffle. | ||
* | ||
* <p>Persisting keyBy shuffle is achieved by wrapping a {@link FlinkKafkaShuffleProducer} and | ||
* {@link FlinkKafkaShuffleConsumer} together. | ||
* | ||
* <p>On the producer side, {@link FlinkKafkaShuffleProducer} | ||
* is similar to {@link DataStream#keyBy(KeySelector)}. They use the same key group assignment function | ||
* {@link KeyGroupRangeAssignment#assignKeyToParallelOperator} to decide which partition a key goes. | ||
* Hence, each producer task can potentially write to each Kafka partition based on where the key goes. | ||
* Here, `numberOfPartitions` equals to the key group size. | ||
* In the case of using {@link TimeCharacteristic#EventTime}, each producer task broadcasts its watermark | ||
* to ALL of the Kafka partitions to make sure watermark information is propagated correctly. | ||
* | ||
* <p>On the consumer side, each consumer task should read partitions equal to the key group indices | ||
* it is assigned. `numberOfPartitions` is the maximum parallelism of the consumer. This version only | ||
* supports numberOfPartitions = consumerParallelism. | ||
* In the case of using {@link TimeCharacteristic#EventTime}, a consumer task is responsible to emit | ||
* watermarks. Watermarks are read from the corresponding Kafka partitions. Notice that a consumer task only starts | ||
* to emit a watermark after reading at least one watermark from each producer task to make sure watermarks | ||
* are monotonically increasing. Hence a consumer task needs to know `producerParallelism` as well. | ||
* | ||
* @see FlinkKafkaShuffle#writeKeyBy | ||
* @see FlinkKafkaShuffle#readKeyBy | ||
* | ||
* @param dataStream Data stream to be shuffled | ||
* @param topic Kafka topic written to | ||
* @param producerParallelism Parallelism of producer | ||
* @param numberOfPartitions Number of partitions | ||
* @param properties Kafka properties | ||
* @param keySelector Key selector to retrieve key from `dataStream' | ||
* @param <T> Type of the input data stream | ||
* @param <K> Type of key | ||
*/ | ||
public static <T, K> KeyedStream<T, K> persistentKeyBy( | ||
DataStream<T> dataStream, | ||
String topic, | ||
int producerParallelism, | ||
int numberOfPartitions, | ||
Properties properties, | ||
KeySelector<T, K> keySelector) { | ||
// KafkaProducer#propsToMap uses Properties purely as a HashMap without considering the default properties | ||
// So we have to flatten the default property to first level elements. | ||
Properties kafkaProperties = PropertiesUtil.flatten(properties); | ||
kafkaProperties.setProperty(PRODUCER_PARALLELISM, String.valueOf(producerParallelism)); | ||
kafkaProperties.setProperty(PARTITION_NUMBER, String.valueOf(numberOfPartitions)); | ||
|
||
StreamExecutionEnvironment env = dataStream.getExecutionEnvironment(); | ||
|
||
writeKeyBy(dataStream, topic, kafkaProperties, keySelector); | ||
return readKeyBy(topic, env, dataStream.getType(), kafkaProperties, keySelector); | ||
} | ||
|
||
/** | ||
* Uses Kafka as a message bus to persist keyBy shuffle. | ||
* | ||
* <p>Persisting keyBy shuffle is achieved by wrapping a {@link FlinkKafkaShuffleProducer} and | ||
* {@link FlinkKafkaShuffleConsumer} together. | ||
* | ||
* <p>On the producer side, {@link FlinkKafkaShuffleProducer} | ||
* is similar to {@link DataStream#keyBy(KeySelector)}. They use the same key group assignment function | ||
* {@link KeyGroupRangeAssignment#assignKeyToParallelOperator} to decide which partition a key goes. | ||
* Hence, each producer task can potentially write to each Kafka partition based on where the key goes. | ||
* Here, `numberOfPartitions` equals to the key group size. | ||
* In the case of using {@link TimeCharacteristic#EventTime}, each producer task broadcasts its watermark | ||
* to ALL of the Kafka partitions to make sure watermark information is propagated correctly. | ||
* | ||
* <p>On the consumer side, each consumer task should read partitions equal to the key group indices | ||
* it is assigned. `numberOfPartitions` is the maximum parallelism of the consumer. This version only | ||
* supports numberOfPartitions = consumerParallelism. | ||
* In the case of using {@link TimeCharacteristic#EventTime}, a consumer task is responsible to emit | ||
* watermarks. Watermarks are read from the corresponding Kafka partitions. Notice that a consumer task only starts | ||
* to emit a watermark after reading at least one watermark from each producer task to make sure watermarks | ||
* are monotonically increasing. Hence a consumer task needs to know `producerParallelism` as well. | ||
* | ||
* @param dataStream Data stream to be shuffled | ||
* @param topic Kafka topic written to | ||
* @param producerParallelism Parallelism of producer | ||
* @param numberOfPartitions Number of partitions | ||
* @param properties Kafka properties | ||
* @param fields Key positions from the input data stream | ||
* @param <T> Type of the input data stream | ||
*/ | ||
public static <T> KeyedStream<T, Tuple> persistentKeyBy( | ||
DataStream<T> dataStream, | ||
String topic, | ||
int producerParallelism, | ||
int numberOfPartitions, | ||
Properties properties, | ||
int... fields) { | ||
return persistentKeyBy( | ||
dataStream, | ||
topic, | ||
producerParallelism, | ||
numberOfPartitions, | ||
properties, | ||
keySelector(dataStream, fields)); | ||
} | ||
|
||
/** | ||
* The write side of {@link FlinkKafkaShuffle#persistentKeyBy}. | ||
* | ||
* <p>This method contains a {@link FlinkKafkaShuffleProducer} to shuffle and persist data in Kafka. | ||
* {@link FlinkKafkaShuffleProducer} uses the same key group assignment function | ||
* {@link KeyGroupRangeAssignment#assignKeyToParallelOperator} to decide which partition a key goes. | ||
* Hence, each producer task can potentially write to each Kafka partition based on the key. | ||
* Here, the number of partitions equals to the key group size. | ||
* In the case of using {@link TimeCharacteristic#EventTime}, each producer task broadcasts each watermark | ||
* to all of the Kafka partitions to make sure watermark information is propagated properly. | ||
* | ||
* <p>Attention: make sure kafkaProperties include | ||
* {@link FlinkKafkaShuffle#PRODUCER_PARALLELISM} and {@link FlinkKafkaShuffle#PARTITION_NUMBER} explicitly. | ||
* {@link FlinkKafkaShuffle#PRODUCER_PARALLELISM} is the parallelism of the producer. | ||
* {@link FlinkKafkaShuffle#PARTITION_NUMBER} is the number of partitions. | ||
* They are not necessarily the same and allowed to be set independently. | ||
* | ||
* @see FlinkKafkaShuffle#persistentKeyBy | ||
* @see FlinkKafkaShuffle#readKeyBy | ||
* | ||
* @param dataStream Data stream to be shuffled | ||
* @param topic Kafka topic written to | ||
* @param kafkaProperties Kafka properties for Kafka Producer | ||
* @param keySelector Key selector to retrieve key from `dataStream' | ||
* @param <T> Type of the input data stream | ||
* @param <K> Type of key | ||
*/ | ||
public static <T, K> void writeKeyBy( | ||
DataStream<T> dataStream, | ||
String topic, | ||
Properties kafkaProperties, | ||
KeySelector<T, K> keySelector) { | ||
|
||
StreamExecutionEnvironment env = dataStream.getExecutionEnvironment(); | ||
TypeSerializer<T> typeSerializer = dataStream.getType().createSerializer(env.getConfig()); | ||
|
||
// write data to Kafka | ||
FlinkKafkaShuffleProducer<T, K> kafkaProducer = new FlinkKafkaShuffleProducer<>( | ||
topic, | ||
typeSerializer, | ||
kafkaProperties, | ||
env.clean(keySelector), | ||
FlinkKafkaProducer.Semantic.EXACTLY_ONCE, | ||
FlinkKafkaProducer.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE); | ||
|
||
// make sure the sink parallelism is set to producerParallelism | ||
Preconditions.checkArgument( | ||
kafkaProperties.getProperty(PRODUCER_PARALLELISM) != null, | ||
"Missing producer parallelism for Kafka Shuffle"); | ||
int producerParallelism = PropertiesUtil.getInt(kafkaProperties, PRODUCER_PARALLELISM, Integer.MIN_VALUE); | ||
|
||
addKafkaShuffle(dataStream, kafkaProducer, producerParallelism); | ||
} | ||
|
||
/** | ||
* The write side of {@link FlinkKafkaShuffle#persistentKeyBy}. | ||
* | ||
* <p>This method contains a {@link FlinkKafkaShuffleProducer} to shuffle and persist data in Kafka. | ||
* {@link FlinkKafkaShuffleProducer} uses the same key group assignment function | ||
* {@link KeyGroupRangeAssignment#assignKeyToParallelOperator} to decide which partition a key goes. | ||
* Hence, each producer task can potentially write to each Kafka partition based on the key. | ||
* Here, the number of partitions equals to the key group size. | ||
* In the case of using {@link TimeCharacteristic#EventTime}, each producer task broadcasts each watermark | ||
* to all of the Kafka partitions to make sure watermark information is propagated properly. | ||
* | ||
* @param dataStream Data stream to be shuffled | ||
* @param topic Kafka topic written to | ||
* @param kafkaProperties Kafka properties for Kafka Producer | ||
* @param fields Key positions from the input data stream | ||
* @param <T> Type of the input data stream | ||
*/ | ||
public static <T> void writeKeyBy( | ||
DataStream<T> dataStream, | ||
String topic, | ||
Properties kafkaProperties, | ||
int... fields) { | ||
writeKeyBy(dataStream, topic, kafkaProperties, keySelector(dataStream, fields)); | ||
} | ||
|
||
/** | ||
* The read side of {@link FlinkKafkaShuffle#persistentKeyBy}. | ||
* | ||
* <p>Each consumer task should read kafka partitions equal to the key group indices it is assigned. | ||
* The number of kafka partitions is the maximum parallelism of the consumer. | ||
* This version only supports numberOfPartitions = consumerParallelism. | ||
* In the case of using {@link TimeCharacteristic#EventTime}, a consumer task is responsible to emit | ||
* watermarks. Watermarks are read from the corresponding Kafka partitions. Notice that a consumer task only starts | ||
* to emit a watermark after receiving at least one watermark from each producer task to make sure watermarks | ||
* are monotonically increasing. Hence a consumer task needs to know `producerParallelism` as well. | ||
* | ||
* <p>Attention: make sure kafkaProperties include | ||
* {@link FlinkKafkaShuffle#PRODUCER_PARALLELISM} and {@link FlinkKafkaShuffle#PARTITION_NUMBER} explicitly. | ||
* {@link FlinkKafkaShuffle#PRODUCER_PARALLELISM} is the parallelism of the producer. | ||
* {@link FlinkKafkaShuffle#PARTITION_NUMBER} is the number of partitions. | ||
* They are not necessarily the same and allowed to be set independently. | ||
* | ||
* @see FlinkKafkaShuffle#persistentKeyBy | ||
* @see FlinkKafkaShuffle#writeKeyBy | ||
* | ||
* @param topic The topic of Kafka where data is persisted | ||
* @param env Execution environment. readKeyBy's environment can be different from writeKeyBy's | ||
* @param typeInformation Type information of the data persisted in Kafka | ||
* @param kafkaProperties kafka properties for Kafka Consumer | ||
* @param keySelector key selector to retrieve key | ||
* @param <T> Schema type | ||
* @param <K> Key type | ||
* @return Keyed data stream | ||
*/ | ||
public static <T, K> KeyedStream<T, K> readKeyBy( | ||
String topic, | ||
StreamExecutionEnvironment env, | ||
TypeInformation<T> typeInformation, | ||
Properties kafkaProperties, | ||
KeySelector<T, K> keySelector) { | ||
|
||
TypeSerializer<T> typeSerializer = typeInformation.createSerializer(env.getConfig()); | ||
TypeInformationSerializationSchema<T> schema = | ||
new TypeInformationSerializationSchema<>(typeInformation, typeSerializer); | ||
|
||
SourceFunction<T> kafkaConsumer = | ||
new FlinkKafkaShuffleConsumer<>(topic, schema, typeSerializer, kafkaProperties); | ||
|
||
// TODO: consider situations where numberOfPartitions != consumerParallelism | ||
Preconditions.checkArgument( | ||
kafkaProperties.getProperty(PARTITION_NUMBER) != null, | ||
"Missing partition number for Kafka Shuffle"); | ||
int numberOfPartitions = PropertiesUtil.getInt(kafkaProperties, PARTITION_NUMBER, Integer.MIN_VALUE); | ||
DataStream<T> outputDataStream = env.addSource(kafkaConsumer).setParallelism(numberOfPartitions); | ||
|
||
return DataStreamUtils.reinterpretAsKeyedStream(outputDataStream, keySelector); | ||
} | ||
|
||
/** | ||
* Adds a {@link StreamKafkaShuffleSink} to {@link DataStream}. | ||
* | ||
* <p>{@link StreamKafkaShuffleSink} is associated a {@link FlinkKafkaShuffleProducer}. | ||
* | ||
* @param inputStream Input data stream connected to the shuffle | ||
* @param kafkaShuffleProducer Kafka shuffle sink function that can handle both records and watermark | ||
* @param producerParallelism The number of tasks writing to the kafka shuffle | ||
*/ | ||
private static <T, K> void addKafkaShuffle( | ||
DataStream<T> inputStream, | ||
FlinkKafkaShuffleProducer<T, K> kafkaShuffleProducer, | ||
int producerParallelism) { | ||
|
||
// read the output type of the input Transform to coax out errors about MissingTypeInfo | ||
inputStream.getTransformation().getOutputType(); | ||
|
||
StreamKafkaShuffleSink<T> shuffleSinkOperator = new StreamKafkaShuffleSink<>(kafkaShuffleProducer); | ||
SinkTransformation<T> transformation = new SinkTransformation<>( | ||
inputStream.getTransformation(), | ||
"kafka_shuffle", | ||
shuffleSinkOperator, | ||
inputStream.getExecutionEnvironment().getParallelism()); | ||
inputStream.getExecutionEnvironment().addOperator(transformation); | ||
transformation.setParallelism(producerParallelism); | ||
} | ||
|
||
// A better place to put this function is DataStream; but put it here for now to avoid changing DataStream | ||
private static <T> KeySelector<T, Tuple> keySelector(DataStream<T> source, int... fields) { | ||
KeySelector<T, Tuple> keySelector; | ||
if (source.getType() instanceof BasicArrayTypeInfo || source.getType() instanceof PrimitiveArrayTypeInfo) { | ||
keySelector = KeySelectorUtil.getSelectorForArray(fields, source.getType()); | ||
} else { | ||
Keys<T> keys = new Keys.ExpressionKeys<>(fields, source.getType()); | ||
keySelector = KeySelectorUtil.getSelectorForKeys( | ||
keys, | ||
source.getType(), | ||
source.getExecutionEnvironment().getConfig()); | ||
} | ||
|
||
return keySelector; | ||
} | ||
} |
Oops, something went wrong.