Skip to content

Commit

Permalink
[FLINK-10843] [connectors] Change Kafka table factory version '2.0' t…
Browse files Browse the repository at this point in the history
…o 'universal'

This closes apache#7087.
  • Loading branch information
twalthr committed Nov 16, 2018
1 parent 1680132 commit ad7e81a
Show file tree
Hide file tree
Showing 9 changed files with 39 additions and 34 deletions.
27 changes: 16 additions & 11 deletions docs/dev/table/connect.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,15 @@ The following table list all available connectors and formats. Their mutual comp

### Connectors

| Name | Version | Maven dependency | SQL Client JAR |
| :---------------- | :------------ | :--------------------------- | :----------------------|
| Filesystem | | Built-in | Built-in |
| Elasticsearch | 6 | `flink-connector-elasticsearch6` | [Download](http:https://central.maven.org/maven2/org/apache/flink/flink-connector-elasticsearch6{{site.scala_version_suffix}}/{{site.version}}/flink-connector-elasticsearch6{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) |
| Apache Kafka | 0.8 | `flink-connector-kafka-0.8` | Not available |
| Apache Kafka | 0.9 | `flink-connector-kafka-0.9` | [Download](http:https://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.9{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.9{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) |
| Apache Kafka | 0.10 | `flink-connector-kafka-0.10` | [Download](http:https://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.10{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.10{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) |
| Apache Kafka | 0.11 | `flink-connector-kafka-0.11` | [Download](http:https://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.11{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.11{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) |
| Name | Version | Maven dependency | SQL Client JAR |
| :---------------- | :------------------ | :--------------------------- | :----------------------|
| Filesystem | | Built-in | Built-in |
| Elasticsearch | 6 | `flink-connector-elasticsearch6` | [Download](http:https://central.maven.org/maven2/org/apache/flink/flink-connector-elasticsearch6{{site.scala_version_suffix}}/{{site.version}}/flink-connector-elasticsearch6{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) |
| Apache Kafka | 0.8 | `flink-connector-kafka-0.8` | Not available |
| Apache Kafka | 0.9 | `flink-connector-kafka-0.9` | [Download](http:https://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.9{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.9{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) |
| Apache Kafka | 0.10 | `flink-connector-kafka-0.10` | [Download](http:https://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.10{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.10{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) |
| Apache Kafka | 0.11 | `flink-connector-kafka-0.11` | [Download](http:https://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.11{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.11{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) |
| Apache Kafka | 0.11+ (`universal`) | `flink-connector-kafka` | [Download](http:https://central.maven.org/maven2/org/apache/flink/flink-connector-kafka{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) |

### Formats

Expand Down Expand Up @@ -524,7 +525,8 @@ The Kafka connector allows for reading and writing from and to an Apache Kafka t
{% highlight java %}
.connect(
new Kafka()
.version("0.11") // required: valid connector versions are "0.8", "0.9", "0.10", and "0.11"
.version("0.11") // required: valid connector versions are
// "0.8", "0.9", "0.10", "0.11", and "universal"
.topic("...") // required: topic name from which the table is read

// optional: connector specific properties
Expand All @@ -549,7 +551,8 @@ The Kafka connector allows for reading and writing from and to an Apache Kafka t
{% highlight yaml %}
connector:
type: kafka
version: "0.11" # required: valid connector versions are "0.8", "0.9", "0.10", and "0.11"
version: "0.11" # required: valid connector versions are
# "0.8", "0.9", "0.10", "0.11", and "universal"
topic: ... # required: topic name from which the table is read

properties: # optional: connector specific properties
Expand Down Expand Up @@ -583,7 +586,9 @@ connector:

**Consistency guarantees:** By default, a Kafka sink ingests data with at-least-once guarantees into a Kafka topic if the query is executed with [checkpointing enabled]({{ site.baseurl }}/dev/stream/state/checkpointing.html#enabling-and-configuring-checkpointing).

**Kafka 0.10+ Timestamps:** Since Kafka 0.10, Kafka messages have a timestamp as metadata that specifies when the record was written into the Kafka topic. These timestamps can be used for a [rowtime attribute](connect.html#defining-the-schema) by selecting `timestamps: from-source` in YAML and `timestampsFromSource()` in Java/Scala respectively.
**Kafka 0.10+ Timestamps:** Since Kafka 0.10, Kafka messages have a timestamp as metadata that specifies when the record was written into the Kafka topic. These timestamps can be used for a [rowtime attribute](connect.html#defining-the-schema) by selecting `timestamps: from-source` in YAML and `timestampsFromSource()` in Java/Scala respectively.

**Kafka 0.11+ Versioning:** Since Flink 1.7, the Kafka connector definition should be independent of a hard-coded Kafka version. Use the connector version `universal` as a wildcard for Flink's Kafka connector that is compatible with all Kafka versions starting from 0.11.

Make sure to add the version-specific Kafka dependency. In addition, a corresponding format needs to be specified for reading and writing rows from and to Kafka.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

Expand All @@ -40,7 +38,7 @@ public class KafkaValidator extends ConnectorDescriptorValidator {
public static final String CONNECTOR_VERSION_VALUE_09 = "0.9";
public static final String CONNECTOR_VERSION_VALUE_010 = "0.10";
public static final String CONNECTOR_VERSION_VALUE_011 = "0.11";
public static final String CONNECTOR_VERSION_VALUE_20 = "2.0";
public static final String CONNECTOR_VERSION_VALUE_UNIVERSAL = "universal";
public static final String CONNECTOR_TOPIC = "connector.topic";
public static final String CONNECTOR_STARTUP_MODE = "connector.startup-mode";
public static final String CONNECTOR_STARTUP_MODE_VALUE_EARLIEST = "earliest-offset";
Expand All @@ -64,7 +62,7 @@ public void validate(DescriptorProperties properties) {
super.validate(properties);
properties.validateValue(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_KAFKA, false);

validateVersion(properties);
properties.validateString(CONNECTOR_TOPIC, false, 1, Integer.MAX_VALUE);

validateStartupMode(properties);

Expand All @@ -73,17 +71,6 @@ public void validate(DescriptorProperties properties) {
validateSinkPartitioner(properties);
}

private void validateVersion(DescriptorProperties properties) {
final List<String> versions = Arrays.asList(
CONNECTOR_VERSION_VALUE_08,
CONNECTOR_VERSION_VALUE_09,
CONNECTOR_VERSION_VALUE_010,
CONNECTOR_VERSION_VALUE_011,
CONNECTOR_VERSION_VALUE_20);
properties.validateEnumValues(CONNECTOR_VERSION, false, versions);
properties.validateString(CONNECTOR_TOPIC, false, 1, Integer.MAX_VALUE);
}

private void validateStartupMode(DescriptorProperties properties) {
final Map<String, Consumer<String>> specificOffsetValidators = new HashMap<>();
specificOffsetValidators.put(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class KafkaTableSourceSinkFactory extends KafkaTableSourceSinkFactoryBase

@Override
protected String kafkaVersion() {
return KafkaValidator.CONNECTOR_VERSION_VALUE_20;
return KafkaValidator.CONNECTOR_VERSION_VALUE_UNIVERSAL;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class KafkaTableSourceSinkFactoryTest extends KafkaTableSourceSinkFactory

@Override
protected String getKafkaVersion() {
return KafkaValidator.CONNECTOR_VERSION_VALUE_20;
return KafkaValidator.CONNECTOR_VERSION_VALUE_UNIVERSAL;
}

@Override
Expand Down
3 changes: 2 additions & 1 deletion flink-end-to-end-tests/test-scripts/kafka_sql_common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ KAFKA_CONNECTOR_VERSION="$1"
KAFKA_VERSION="$2"
CONFLUENT_VERSION="$3"
CONFLUENT_MAJOR_VERSION="$4"
KAFKA_SQL_VERSION="$5"

source "$(dirname "$0")"/kafka-common.sh $2 $3 $4

Expand Down Expand Up @@ -64,7 +65,7 @@ function get_kafka_json_source_schema {
type: ROW<type VARCHAR, message VARCHAR>
connector:
type: kafka
version: "$KAFKA_CONNECTOR_VERSION"
version: "$KAFKA_SQL_VERSION"
topic: $topicName
startup-mode: earliest-offset
properties:
Expand Down
8 changes: 7 additions & 1 deletion flink-end-to-end-tests/test-scripts/test_sql_client.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,15 @@ KAFKA_CONNECTOR_VERSION="2.0"
KAFKA_VERSION="2.0.0"
CONFLUENT_VERSION="5.0.0"
CONFLUENT_MAJOR_VERSION="5.0"
KAFKA_SQL_VERSION="universal"

source "$(dirname "$0")"/common.sh
source "$(dirname "$0")"/kafka_sql_common.sh $KAFKA_CONNECTOR_VERSION $KAFKA_VERSION $CONFLUENT_VERSION $CONFLUENT_MAJOR_VERSION
source "$(dirname "$0")"/kafka_sql_common.sh \
$KAFKA_CONNECTOR_VERSION \
$KAFKA_VERSION \
$CONFLUENT_VERSION \
$CONFLUENT_MAJOR_VERSION \
$KAFKA_SQL_VERSION
source "$(dirname "$0")"/elasticsearch-common.sh

SQL_TOOLBOX_JAR=$END_TO_END_DIR/flink-sql-client-test/target/SqlToolbox.jar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@

set -Eeuo pipefail

source "$(dirname "$0")"/test_sql_client_kafka_common.sh 2.0 2.0.0 5.0.0 5.0 "kafka"
source "$(dirname "$0")"/test_sql_client_kafka_common.sh 2.0 2.0.0 5.0.0 5.0 "kafka" "universal"
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@

set -Eeuo pipefail

source "$(dirname "$0")"/test_sql_client_kafka_common.sh 0.10 0.10.2.0 3.2.0 3.2 "kafka-0.10"
source "$(dirname "$0")"/test_sql_client_kafka_common.sh 0.10 0.10.2.0 3.2.0 3.2 "kafka-0.10" "0.10"
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,15 @@ KAFKA_VERSION="$2"
CONFLUENT_VERSION="$3"
CONFLUENT_MAJOR_VERSION="$4"
KAFKA_SQL_JAR="$5"
KAFKA_SQL_VERSION="$6"

source "$(dirname "$0")"/common.sh
source "$(dirname "$0")"/kafka_sql_common.sh $KAFKA_CONNECTOR_VERSION $KAFKA_VERSION $CONFLUENT_VERSION $CONFLUENT_MAJOR_VERSION
source "$(dirname "$0")"/kafka_sql_common.sh \
$KAFKA_CONNECTOR_VERSION \
$KAFKA_VERSION \
$CONFLUENT_VERSION \
$CONFLUENT_MAJOR_VERSION \
$KAFKA_SQL_VERSION

################################################################################
# Prepare connectors
Expand Down Expand Up @@ -98,7 +104,7 @@ cat >> $SQL_CONF << EOF
type: BIGINT
connector:
type: kafka
version: "$KAFKA_CONNECTOR_VERSION"
version: "$KAFKA_SQL_VERSION"
topic: test-avro
startup-mode: earliest-offset
properties:
Expand Down

0 comments on commit ad7e81a

Please sign in to comment.