Skip to content

Commit

Permalink
[FLINK-8190] [kafka] Add constructors to expose topic pattern-based s…
Browse files Browse the repository at this point in the history
…ubscription

This closes apache#5117.
  • Loading branch information
tzulitai committed Dec 4, 2017
1 parent 7c5a694 commit c40b84d
Show file tree
Hide file tree
Showing 5 changed files with 265 additions and 6 deletions.
55 changes: 54 additions & 1 deletion docs/dev/connectors/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,9 @@ Flink on YARN supports automatic restart of lost YARN containers.

If checkpointing is not enabled, the Kafka consumer will periodically commit the offsets to Zookeeper.

### Kafka Consumers Partition Discovery
### Kafka Consumers Topic and Partition Discovery

#### Partition discovery

The Flink Kafka Consumer supports discovering dynamically created Kafka partitions, and consumes them with
exactly-once guarantees. All partitions discovered after the initial retrieval of partition metadata (i.e., when the
Expand All @@ -309,6 +311,57 @@ prior to Flink 1.3.x, partition discovery cannot be enabled on the restore run.
with an exception. In this case, in order to use partition discovery, please first take a savepoint in Flink 1.3.x and
then restore again from that.

#### Topic discovery

At a higher-level, the Flink Kafka Consumer is also capable of discovering topics, based on pattern matching on the
topic names using regular expressions. See the below for an example:

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");

FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<>(
java.util.regex.Pattern.compile("test-topic-[0-9]"),
new SimpleStringSchema(),
properties);

DataStream<String> stream = env.addSource(myConsumer);
...
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
val env = StreamExecutionEnvironment.getExecutionEnvironment()

val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "test")

val myConsumer = new FlinkKafkaConsumer08[String](
java.util.regex.Pattern.compile("test-topic-[0-9]"),
new SimpleStringSchema,
properties)

val stream = env.addSource(myConsumer)
...
{% endhighlight %}
</div>
</div>

In the above example, all topics with names that match the specified regular expression
(starting with `test-topic-` and ending with a single digit) will be subscribed by the consumer
when the job starts running.

To allow the consumer to discover dynamically created topics after the job started running,
set a non-negative value for `flink.partition-discovery.interval-millis`. This allows
the consumer to discover partitions of new topics with names that also match the specified
pattern.

### Kafka Consumers Offset Committing Behaviour Configuration

The Flink Kafka Consumer allows configuring the behaviour of how offsets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
Expand All @@ -39,6 +40,7 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Pattern;

/**
* The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from
Expand Down Expand Up @@ -126,6 +128,49 @@ public FlinkKafkaConsumer010(List<String> topics, KeyedDeserializationSchema<T>
super(topics, deserializer, props);
}

/**
* Creates a new Kafka streaming source consumer for Kafka 0.10.x. Use this constructor to
* subscribe to multiple topics based on a regular expression pattern.
*
* <p>If partition discovery is enabled (by setting a non-negative value for
* {@link FlinkKafkaConsumer010#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} in the properties), topics
* with names matching the pattern will also be subscribed to as they are created on the fly.
*
* @param subscriptionPattern
* The regular expression for a pattern of topic names to subscribe to.
* @param valueDeserializer
* The de-/serializer used to convert between Kafka's byte messages and Flink's objects.
* @param props
* The properties used to configure the Kafka consumer client, and the ZooKeeper client.
*/
@PublicEvolving
public FlinkKafkaConsumer010(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props) {
this(subscriptionPattern, new KeyedDeserializationSchemaWrapper<>(valueDeserializer), props);
}

/**
* Creates a new Kafka streaming source consumer for Kafka 0.10.x. Use this constructor to
* subscribe to multiple topics based on a regular expression pattern.
*
* <p>If partition discovery is enabled (by setting a non-negative value for
* {@link FlinkKafkaConsumer010#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} in the properties), topics
* with names matching the pattern will also be subscribed to as they are created on the fly.
*
* <p>This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value
* pairs, offsets, and topic names from Kafka.
*
* @param subscriptionPattern
* The regular expression for a pattern of topic names to subscribe to.
* @param deserializer
* The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects.
* @param props
* The properties used to configure the Kafka consumer client, and the ZooKeeper client.
*/
@PublicEvolving
public FlinkKafkaConsumer010(Pattern subscriptionPattern, KeyedDeserializationSchema<T> deserializer, Properties props) {
super(subscriptionPattern, deserializer, props);
}

@Override
protected AbstractFetcher<T, ?> createFetcher(
SourceContext<T> sourceContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@

package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;

import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.regex.Pattern;

/**
* The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from
Expand Down Expand Up @@ -110,4 +112,47 @@ public FlinkKafkaConsumer011(List<String> topics, DeserializationSchema<T> deser
public FlinkKafkaConsumer011(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) {
super(topics, deserializer, props);
}

/**
* Creates a new Kafka streaming source consumer for Kafka 0.11.x. Use this constructor to
* subscribe to multiple topics based on a regular expression pattern.
*
* <p>If partition discovery is enabled (by setting a non-negative value for
* {@link FlinkKafkaConsumer011#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} in the properties), topics
* with names matching the pattern will also be subscribed to as they are created on the fly.
*
* @param subscriptionPattern
* The regular expression for a pattern of topic names to subscribe to.
* @param valueDeserializer
* The de-/serializer used to convert between Kafka's byte messages and Flink's objects.
* @param props
* The properties used to configure the Kafka consumer client, and the ZooKeeper client.
*/
@PublicEvolving
public FlinkKafkaConsumer011(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props) {
this(subscriptionPattern, new KeyedDeserializationSchemaWrapper<>(valueDeserializer), props);
}

/**
* Creates a new Kafka streaming source consumer for Kafka 0.11.x. Use this constructor to
* subscribe to multiple topics based on a regular expression pattern.
*
* <p>If partition discovery is enabled (by setting a non-negative value for
* {@link FlinkKafkaConsumer011#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} in the properties), topics
* with names matching the pattern will also be subscribed to as they are created on the fly.
*
* <p>This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value
* pairs, offsets, and topic names from Kafka.
*
* @param subscriptionPattern
* The regular expression for a pattern of topic names to subscribe to.
* @param deserializer
* The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects.
* @param props
* The properties used to configure the Kafka consumer client, and the ZooKeeper client.
*/
@PublicEvolving
public FlinkKafkaConsumer011(Pattern subscriptionPattern, KeyedDeserializationSchema<T> deserializer, Properties props) {
super(subscriptionPattern, deserializer, props);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
Expand All @@ -39,6 +40,7 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Pattern;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.PropertiesUtil.getLong;
Expand Down Expand Up @@ -156,13 +158,67 @@ public FlinkKafkaConsumer08(List<String> topics, DeserializationSchema<T> deseri
* The properties that are used to configure both the fetcher and the offset handler.
*/
public FlinkKafkaConsumer08(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) {
this(topics, null, deserializer, props);
}

/**
* Creates a new Kafka streaming source consumer for Kafka 0.8.x. Use this constructor to
* subscribe to multiple topics based on a regular expression pattern.
*
* <p>If partition discovery is enabled (by setting a non-negative value for
* {@link FlinkKafkaConsumer08#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} in the properties), topics
* with names matching the pattern will also be subscribed to as they are created on the fly.
*
* @param subscriptionPattern
* The regular expression for a pattern of topic names to subscribe to.
* @param valueDeserializer
* The de-/serializer used to convert between Kafka's byte messages and Flink's objects.
* @param props
* The properties used to configure the Kafka consumer client, and the ZooKeeper client.
*/
@PublicEvolving
public FlinkKafkaConsumer08(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props) {
this(subscriptionPattern, new KeyedDeserializationSchemaWrapper<>(valueDeserializer), props);
}

/**
* Creates a new Kafka streaming source consumer for Kafka 0.8.x. Use this constructor to
* subscribe to multiple topics based on a regular expression pattern.
*
* <p>If partition discovery is enabled (by setting a non-negative value for
* {@link FlinkKafkaConsumer08#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} in the properties), topics
* with names matching the pattern will also be subscribed to as they are created on the fly.
*
* <p>This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value
* pairs, offsets, and topic names from Kafka.
*
* @param subscriptionPattern
* The regular expression for a pattern of topic names to subscribe to.
* @param deserializer
* The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects.
* @param props
* The properties used to configure the Kafka consumer client, and the ZooKeeper client.
*/
@PublicEvolving
public FlinkKafkaConsumer08(Pattern subscriptionPattern, KeyedDeserializationSchema<T> deserializer, Properties props) {
this(null, subscriptionPattern, deserializer, props);
}

private FlinkKafkaConsumer08(
List<String> topics,
Pattern subscriptionPattern,
KeyedDeserializationSchema<T> deserializer,
Properties props) {

super(
topics,
null,
subscriptionPattern,
deserializer,
getLong(props, KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED));
getLong(
checkNotNull(props, "props"),
KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED));

this.kafkaProperties = checkNotNull(props, "props");
this.kafkaProperties = props;

// validate the zookeeper properties
validateZooKeeperConfig(props);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
Expand All @@ -42,6 +43,7 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Pattern;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.PropertiesUtil.getLong;
Expand Down Expand Up @@ -147,9 +149,67 @@ public FlinkKafkaConsumer09(List<String> topics, DeserializationSchema<T> deseri
* The properties that are used to configure both the fetcher and the offset handler.
*/
public FlinkKafkaConsumer09(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) {
super(topics, null, deserializer, getLong(props, KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED));
this(topics, null, deserializer, props);
}

/**
* Creates a new Kafka streaming source consumer for Kafka 0.9.x. Use this constructor to
* subscribe to multiple topics based on a regular expression pattern.
*
* <p>If partition discovery is enabled (by setting a non-negative value for
* {@link FlinkKafkaConsumer09#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} in the properties), topics
* with names matching the pattern will also be subscribed to as they are created on the fly.
*
* @param subscriptionPattern
* The regular expression for a pattern of topic names to subscribe to.
* @param valueDeserializer
* The de-/serializer used to convert between Kafka's byte messages and Flink's objects.
* @param props
* The properties used to configure the Kafka consumer client, and the ZooKeeper client.
*/
@PublicEvolving
public FlinkKafkaConsumer09(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props) {
this(subscriptionPattern, new KeyedDeserializationSchemaWrapper<>(valueDeserializer), props);
}

/**
* Creates a new Kafka streaming source consumer for Kafka 0.9.x. Use this constructor to
* subscribe to multiple topics based on a regular expression pattern.
*
* <p>If partition discovery is enabled (by setting a non-negative value for
* {@link FlinkKafkaConsumer09#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} in the properties), topics
* with names matching the pattern will also be subscribed to as they are created on the fly.
*
* <p>This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value
* pairs, offsets, and topic names from Kafka.
*
* @param subscriptionPattern
* The regular expression for a pattern of topic names to subscribe to.
* @param deserializer
* The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects.
* @param props
* The properties used to configure the Kafka consumer client, and the ZooKeeper client.
*/
@PublicEvolving
public FlinkKafkaConsumer09(Pattern subscriptionPattern, KeyedDeserializationSchema<T> deserializer, Properties props) {
this(null, subscriptionPattern, deserializer, props);
}

private FlinkKafkaConsumer09(
List<String> topics,
Pattern subscriptionPattern,
KeyedDeserializationSchema<T> deserializer,
Properties props) {

super(
topics,
subscriptionPattern,
deserializer,
getLong(
checkNotNull(props, "props"),
KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED));

this.properties = checkNotNull(props, "props");
this.properties = props;
setDeserializer(this.properties);

// configure the polling timeout
Expand Down

0 comments on commit c40b84d

Please sign in to comment.