Skip to content

Commit

Permalink
[FLINK-8558] [table] Add unified format interfaces and separate forma…
Browse files Browse the repository at this point in the history
…ts from connectors

This PR introduces a format discovery mechanism based on Java Service Providers. The general `TableFormatFactory` is similar to the existing table source discovery mechanism. However, it allows for arbirary format interfaces that might be introduced in the future. At the moment, a connector can request configured instances of `DeserializationSchema` and `SerializationSchema`. In the future we can add interfaces such as a `Writer` or `KeyedSerializationSchema` without breaking backwards compatibility.

This PR deprecates the existing strong coupling of connector and format for the Kafa table sources and table source factories. It introduces descriptor-based alternatives.
  • Loading branch information
twalthr committed Jul 15, 2018
1 parent 1632681 commit ee40335
Show file tree
Hide file tree
Showing 107 changed files with 3,641 additions and 1,182 deletions.
2 changes: 2 additions & 0 deletions docs/dev/table/sqlClient.md
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ The SQL Client does not require to setup a Java project using Maven or SBT. Inst
| Name | Version | Download |
| :---------------- | :------------ | :--------------------- |
| Filesystem | | Built-in |
| Apache Kafka | 0.9 | [Download](https://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.9{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.9{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) |
| Apache Kafka | 0.10 | [Download](https://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.10{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.10{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) |
| Apache Kafka | 0.11 | [Download](https://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.11{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.11{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) |

#### Formats
Expand Down
53 changes: 53 additions & 0 deletions flink-connectors/flink-connector-kafka-0.10/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,59 @@ under the License.

</dependencies>

<profiles>
<profile>
<!-- Create SQL Client uber jars for releases -->
<id>release</id>
<activation>
<property>
<name>release</name>
</property>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadedArtifactAttached>true</shadedArtifactAttached>
<shadedClassifierName>sql-jar</shadedClassifierName>
<artifactSet>
<includes combine.children="append">
<include>org.apache.kafka:*</include>
<include>org.apache.flink:flink-connector-kafka-base_${scala.binary.version}</include>
<include>org.apache.flink:flink-connector-kafka-0.9_${scala.binary.version}</include>
</includes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>kafka/kafka-version.properties</exclude>
</excludes>
</filter>
</filters>
<relocations>
<relocation>
<pattern>org.apache.kafka</pattern>
<shadedPattern>org.apache.flink.kafka010.shaded.org.apache.kafka</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>

<build>
<plugins>
<plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@

package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
Expand All @@ -35,8 +35,13 @@

/**
* Kafka {@link StreamTableSource} for Kafka 0.10.
*
* @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
* with descriptors for schema and format instead. Descriptors allow for
* implementation-agnostic definition of tables. See also
* {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
*/
@PublicEvolving
@Deprecated
public class Kafka010AvroTableSource extends KafkaAvroTableSource {

/**
Expand All @@ -46,7 +51,9 @@ public class Kafka010AvroTableSource extends KafkaAvroTableSource {
* @param properties Properties for the Kafka consumer.
* @param schema Schema of the produced table.
* @param record Avro specific record.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
public Kafka010AvroTableSource(
String topic,
Properties properties,
Expand All @@ -69,7 +76,9 @@ public Kafka010AvroTableSource(
* the value to the field of the Avro record.</p>
*
* @param fieldMapping A mapping from schema fields to Avro fields.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
@Override
public void setFieldMapping(Map<String, String> fieldMapping) {
super.setFieldMapping(fieldMapping);
Expand All @@ -79,7 +88,9 @@ public void setFieldMapping(Map<String, String> fieldMapping) {
* Declares a field of the schema to be a processing time attribute.
*
* @param proctimeAttribute The name of the field that becomes the processing time field.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
@Override
public void setProctimeAttribute(String proctimeAttribute) {
super.setProctimeAttribute(proctimeAttribute);
Expand All @@ -89,7 +100,9 @@ public void setProctimeAttribute(String proctimeAttribute) {
* Declares a field of the schema to be a rowtime attribute.
*
* @param rowtimeAttributeDescriptor The descriptor of the rowtime attribute.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
public void setRowtimeAttributeDescriptor(RowtimeAttributeDescriptor rowtimeAttributeDescriptor) {
Preconditions.checkNotNull(rowtimeAttributeDescriptor, "Rowtime attribute descriptor must not be null.");
super.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
Expand All @@ -102,15 +115,27 @@ protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, Properti

/**
* Returns a builder to configure and create a {@link Kafka010AvroTableSource}.
*
* @return A builder to configure and create a {@link Kafka010AvroTableSource}.
* @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
* with descriptors for schema and format instead. Descriptors allow for
* implementation-agnostic definition of tables. See also
* {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
*/
@Deprecated
public static Builder builder() {
return new Builder();
}

/**
* A builder to configure and create a {@link Kafka010AvroTableSource}.
*
* @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
* with descriptors for schema and format instead. Descriptors allow for
* implementation-agnostic definition of tables. See also
* {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
*/
@Deprecated
public static class Builder extends KafkaAvroTableSource.Builder<Kafka010AvroTableSource, Kafka010AvroTableSource.Builder> {

@Override
Expand All @@ -127,7 +152,9 @@ protected Kafka010AvroTableSource.Builder builder() {
* Builds and configures a {@link Kafka010AvroTableSource}.
*
* @return A configured {@link Kafka010AvroTableSource}.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
@Override
public Kafka010AvroTableSource build() {
Kafka010AvroTableSource tableSource = new Kafka010AvroTableSource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@

package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
Expand All @@ -32,8 +32,13 @@

/**
* Kafka {@link StreamTableSource} for Kafka 0.10.
*
* @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
* with descriptors for schema and format instead. Descriptors allow for
* implementation-agnostic definition of tables. See also
* {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
*/
@PublicEvolving
@Deprecated
public class Kafka010JsonTableSource extends KafkaJsonTableSource {

/**
Expand All @@ -43,7 +48,9 @@ public class Kafka010JsonTableSource extends KafkaJsonTableSource {
* @param properties Properties for the Kafka consumer.
* @param tableSchema The schema of the table.
* @param jsonSchema The schema of the JSON messages to decode from Kafka.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
public Kafka010JsonTableSource(
String topic,
Properties properties,
Expand All @@ -58,7 +65,9 @@ public Kafka010JsonTableSource(
* TableSource will fail for missing fields if set to true. If set to false, the missing field is set to null.
*
* @param failOnMissingField Flag that specifies the TableSource behavior in case of missing fields.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
@Override
public void setFailOnMissingField(boolean failOnMissingField) {
super.setFailOnMissingField(failOnMissingField);
Expand All @@ -68,7 +77,9 @@ public void setFailOnMissingField(boolean failOnMissingField) {
* Sets the mapping from table schema fields to JSON schema fields.
*
* @param fieldMapping The mapping from table schema fields to JSON schema fields.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
@Override
public void setFieldMapping(Map<String, String> fieldMapping) {
super.setFieldMapping(fieldMapping);
Expand All @@ -78,7 +89,9 @@ public void setFieldMapping(Map<String, String> fieldMapping) {
* Declares a field of the schema to be a processing time attribute.
*
* @param proctimeAttribute The name of the field that becomes the processing time field.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
@Override
public void setProctimeAttribute(String proctimeAttribute) {
super.setProctimeAttribute(proctimeAttribute);
Expand All @@ -88,7 +101,9 @@ public void setProctimeAttribute(String proctimeAttribute) {
* Declares a field of the schema to be a rowtime attribute.
*
* @param rowtimeAttributeDescriptor The descriptor of the rowtime attribute.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
public void setRowtimeAttributeDescriptor(RowtimeAttributeDescriptor rowtimeAttributeDescriptor) {
Preconditions.checkNotNull(rowtimeAttributeDescriptor, "Rowtime attribute descriptor must not be null.");
super.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
Expand All @@ -101,15 +116,27 @@ protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, Properti

/**
* Returns a builder to configure and create a {@link Kafka010JsonTableSource}.
*
* @return A builder to configure and create a {@link Kafka010JsonTableSource}.
* @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
* with descriptors for schema and format instead. Descriptors allow for
* implementation-agnostic definition of tables. See also
* {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
*/
@Deprecated
public static Kafka010JsonTableSource.Builder builder() {
return new Kafka010JsonTableSource.Builder();
}

/**
* A builder to configure and create a {@link Kafka010JsonTableSource}.
*
* @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
* with descriptors for schema and format instead. Descriptors allow for
* implementation-agnostic definition of tables. See also
* {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
*/
@Deprecated
public static class Builder extends KafkaJsonTableSource.Builder<Kafka010JsonTableSource, Kafka010JsonTableSource.Builder> {

@Override
Expand All @@ -126,6 +153,7 @@ protected Kafka010JsonTableSource.Builder builder() {
* Builds and configures a {@link Kafka010JsonTableSource}.
*
* @return A configured {@link Kafka010JsonTableSource}.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Override
public Kafka010JsonTableSource build() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,48 +18,79 @@

package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;

/**
* Kafka {@link StreamTableSource} for Kafka 0.10.
*/
@PublicEvolving
public abstract class Kafka010TableSource extends KafkaTableSource {

// The deserialization schema for the Kafka records
private final DeserializationSchema<Row> deserializationSchema;
@Internal
public class Kafka010TableSource extends KafkaTableSource {

/**
* Creates a Kafka 0.10 {@link StreamTableSource}.
*
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param deserializationSchema Deserialization schema to use for Kafka records.
* @param typeInfo Type information describing the result type. The field names are used
* to parse the JSON file and so are the types.
* @param schema Schema of the produced table.
* @param proctimeAttribute Field name of the processing time attribute.
* @param rowtimeAttributeDescriptors Descriptor for a rowtime attribute
* @param fieldMapping Mapping for the fields of the table schema to
* fields of the physical returned type.
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param deserializationSchema Deserialization schema for decoding records from Kafka.
* @param startupMode Startup mode for the contained consumer.
* @param specificStartupOffsets Specific startup offsets; only relevant when startup
* mode is {@link StartupMode#SPECIFIC_OFFSETS}.
*/
public Kafka010TableSource(
TableSchema schema,
Optional<String> proctimeAttribute,
List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
Optional<Map<String, String>> fieldMapping,
String topic,
Properties properties,
DeserializationSchema<Row> deserializationSchema,
TableSchema schema,
TypeInformation<Row> typeInfo) {
StartupMode startupMode,
Map<KafkaTopicPartition, Long> specificStartupOffsets) {

super(topic, properties, schema, typeInfo);

this.deserializationSchema = deserializationSchema;
super(
schema,
proctimeAttribute,
rowtimeAttributeDescriptors,
fieldMapping,
topic,
properties,
deserializationSchema,
startupMode,
specificStartupOffsets);
}

@Override
public DeserializationSchema<Row> getDeserializationSchema() {
return this.deserializationSchema;
/**
* Creates a Kafka 0.10 {@link StreamTableSource}.
*
* @param schema Schema of the produced table.
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param deserializationSchema Deserialization schema for decoding records from Kafka.
*/
public Kafka010TableSource(
TableSchema schema,
String topic,
Properties properties,
DeserializationSchema<Row> deserializationSchema) {

super(schema, topic, properties, deserializationSchema);
}

@Override
Expand Down
Loading

0 comments on commit ee40335

Please sign in to comment.