specificStartupOffsets) {
+
+ return new Kafka09TableSource(
+ schema,
+ proctimeAttribute,
+ rowtimeAttributeDescriptors,
+ Optional.of(fieldMapping),
+ topic,
+ properties,
+ deserializationSchema,
+ startupMode,
+ specificStartupOffsets
+ );
+ }
+}
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
index 8c8ce324ba997..3e9f2b03e344f 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
@@ -21,16 +21,14 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.formats.avro.AvroRowDeserializationSchema;
-import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.sources.DefinedFieldMapping;
+import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.specific.SpecificRecordBase;
import java.util.Map;
-import java.util.Objects;
import java.util.Properties;
/**
@@ -38,13 +36,15 @@
*
* The version-specific Kafka consumers need to extend this class and
* override {@link #createKafkaConsumer(String, Properties, DeserializationSchema)}}.
+ *
+ * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
+ * with descriptors for schema and format instead. Descriptors allow for
+ * implementation-agnostic definition of tables. See also
+ * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
*/
+@Deprecated
@Internal
-public abstract class KafkaAvroTableSource extends KafkaTableSource implements DefinedFieldMapping {
-
- private final Class extends SpecificRecordBase> avroRecordClass;
-
- private Map fieldMapping;
+public abstract class KafkaAvroTableSource extends KafkaTableSource {
/**
* Creates a generic Kafka Avro {@link StreamTableSource} using a given {@link SpecificRecord}.
@@ -53,7 +53,12 @@ public abstract class KafkaAvroTableSource extends KafkaTableSource implements D
* @param properties Properties for the Kafka consumer.
* @param schema Schema of the produced table.
* @param avroRecordClass Class of the Avro record that is read from the Kafka topic.
+ * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
+ * with descriptors for schema and format instead. Descriptors allow for
+ * implementation-agnostic definition of tables. See also
+ * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
*/
+ @Deprecated
protected KafkaAvroTableSource(
String topic,
Properties properties,
@@ -61,59 +66,15 @@ protected KafkaAvroTableSource(
Class extends SpecificRecordBase> avroRecordClass) {
super(
+ schema,
topic,
properties,
- schema,
- AvroSchemaConverter.convertToTypeInfo(avroRecordClass));
-
- this.avroRecordClass = avroRecordClass;
- }
-
- @Override
- public Map getFieldMapping() {
- return fieldMapping;
+ new AvroRowDeserializationSchema(avroRecordClass));
}
@Override
public String explainSource() {
- return "KafkaAvroTableSource(" + this.avroRecordClass.getSimpleName() + ")";
- }
-
- @Override
- protected AvroRowDeserializationSchema getDeserializationSchema() {
- return new AvroRowDeserializationSchema(avroRecordClass);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof KafkaAvroTableSource)) {
- return false;
- }
- if (!super.equals(o)) {
- return false;
- }
- final KafkaAvroTableSource that = (KafkaAvroTableSource) o;
- return Objects.equals(avroRecordClass, that.avroRecordClass) &&
- Objects.equals(fieldMapping, that.fieldMapping);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(super.hashCode(), avroRecordClass, fieldMapping);
- }
-
- //////// SETTERS FOR OPTIONAL PARAMETERS
-
- /**
- * Configures a field mapping for this TableSource.
- *
- * @param fieldMapping The field mapping.
- */
- protected void setFieldMapping(Map fieldMapping) {
- this.fieldMapping = fieldMapping;
+ return "KafkaAvroTableSource";
}
//////// HELPER METHODS
@@ -124,7 +85,13 @@ protected void setFieldMapping(Map fieldMapping) {
*
* @param Type of the KafkaAvroTableSource produced by the builder.
* @param Type of the KafkaAvroTableSource.Builder subclass.
+ *
+ * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
+ * with descriptors for schema and format instead. Descriptors allow for
+ * implementation-agnostic definition of tables. See also
+ * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
*/
+ @Deprecated
protected abstract static class Builder
extends KafkaTableSource.Builder {
@@ -137,7 +104,9 @@ protected abstract static class Builder avroClass) {
this.avroClass = avroClass;
return builder();
@@ -153,7 +122,9 @@ public B forAvroRecordClass(Class extends SpecificRecordBase> avroClass) {
*
* @param schemaToAvroMapping A mapping from schema fields to Avro fields.
* @return The builder.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
public B withTableToAvroMapping(Map schemaToAvroMapping) {
this.fieldMapping = schemaToAvroMapping;
return builder();
@@ -163,7 +134,9 @@ public B withTableToAvroMapping(Map schemaToAvroMapping) {
* Returns the configured Avro class.
*
* @return The configured Avro class.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
protected Class extends SpecificRecordBase> getAvroRecordClass() {
return this.avroClass;
}
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceFactory.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceFactory.java
deleted file mode 100644
index 8ef7270ccae5a..0000000000000
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceFactory.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.descriptors.AvroValidator;
-import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.descriptors.FormatDescriptorValidator;
-import org.apache.flink.table.descriptors.SchemaValidator;
-
-import org.apache.avro.specific.SpecificRecordBase;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
-/**
- * Factory for creating configured instances of {@link KafkaAvroTableSource}.
- */
-public abstract class KafkaAvroTableSourceFactory extends KafkaTableSourceFactory {
-
- @Override
- protected String formatType() {
- return AvroValidator.FORMAT_TYPE_VALUE;
- }
-
- @Override
- protected int formatPropertyVersion() {
- return 1;
- }
-
- @Override
- protected List formatProperties() {
- return Collections.singletonList(AvroValidator.FORMAT_RECORD_CLASS);
- }
-
- @Override
- protected FormatDescriptorValidator formatValidator() {
- return new AvroValidator();
- }
-
- @Override
- protected KafkaTableSource.Builder createBuilderWithFormat(DescriptorProperties params) {
- KafkaAvroTableSource.Builder builder = createKafkaAvroBuilder();
-
- // Avro format schema
- final Class extends SpecificRecordBase> avroRecordClass =
- params.getClass(AvroValidator.FORMAT_RECORD_CLASS, SpecificRecordBase.class);
- builder.forAvroRecordClass(avroRecordClass);
- final TableSchema formatSchema = TableSchema.fromTypeInfo(AvroSchemaConverter.convertToTypeInfo(avroRecordClass));
-
- // field mapping
- final Map mapping = SchemaValidator.deriveFieldMapping(params, Optional.of(formatSchema));
- builder.withTableToAvroMapping(mapping);
-
- return builder;
- }
-
- /**
- * Creates a version specific {@link KafkaAvroTableSource.Builder}.
- */
- protected abstract KafkaAvroTableSource.Builder createKafkaAvroBuilder();
-}
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
index b2bb8ff773bf4..bd0d0dedf22e7 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
@@ -20,14 +20,12 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.formats.json.JsonRowDeserializationSchema;
import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.sources.DefinedFieldMapping;
+import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import java.util.Map;
-import java.util.Objects;
import java.util.Properties;
/**
@@ -37,15 +35,15 @@
* override {@link #createKafkaConsumer(String, Properties, DeserializationSchema)}}.
*
* The field names are used to parse the JSON file and so are the types.
+ *
+ * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
+ * with descriptors for schema and format instead. Descriptors allow for
+ * implementation-agnostic definition of tables. See also
+ * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
*/
+@Deprecated
@Internal
-public abstract class KafkaJsonTableSource extends KafkaTableSource implements DefinedFieldMapping {
-
- private TableSchema jsonSchema;
-
- private Map fieldMapping;
-
- private boolean failOnMissingField;
+public abstract class KafkaJsonTableSource extends KafkaTableSource {
/**
* Creates a generic Kafka JSON {@link StreamTableSource}.
@@ -54,7 +52,9 @@ public abstract class KafkaJsonTableSource extends KafkaTableSource implements D
* @param properties Properties for the Kafka consumer.
* @param tableSchema The schema of the table.
* @param jsonSchema The schema of the JSON messages to decode from Kafka.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
protected KafkaJsonTableSource(
String topic,
Properties properties,
@@ -62,24 +62,10 @@ protected KafkaJsonTableSource(
TableSchema jsonSchema) {
super(
+ tableSchema,
topic,
properties,
- tableSchema,
- jsonSchemaToReturnType(jsonSchema));
-
- this.jsonSchema = jsonSchema;
- }
-
- @Override
- public Map getFieldMapping() {
- return fieldMapping;
- }
-
- @Override
- protected JsonRowDeserializationSchema getDeserializationSchema() {
- JsonRowDeserializationSchema deserSchema = new JsonRowDeserializationSchema(jsonSchemaToReturnType(jsonSchema));
- deserSchema.setFailOnMissingField(failOnMissingField);
- return deserSchema;
+ new JsonRowDeserializationSchema(jsonSchema.toRowType()));
}
@Override
@@ -87,28 +73,6 @@ public String explainSource() {
return "KafkaJsonTableSource";
}
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof KafkaJsonTableSource)) {
- return false;
- }
- if (!super.equals(o)) {
- return false;
- }
- KafkaJsonTableSource that = (KafkaJsonTableSource) o;
- return failOnMissingField == that.failOnMissingField &&
- Objects.equals(jsonSchema, that.jsonSchema) &&
- Objects.equals(fieldMapping, that.fieldMapping);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(super.hashCode(), jsonSchema, fieldMapping, failOnMissingField);
- }
-
//////// SETTERS FOR OPTIONAL PARAMETERS
/**
@@ -116,34 +80,27 @@ public int hashCode() {
* TableSource will fail for missing fields if set to true. If set to false, the missing field is set to null.
*
* @param failOnMissingField Flag that specifies the TableSource behavior in case of missing fields.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
protected void setFailOnMissingField(boolean failOnMissingField) {
- this.failOnMissingField = failOnMissingField;
- }
-
- /**
- * Sets the mapping from table schema fields to JSON schema fields.
- *
- * @param fieldMapping The mapping from table schema fields to JSON schema fields.
- */
- protected void setFieldMapping(Map fieldMapping) {
- this.fieldMapping = fieldMapping;
+ ((JsonRowDeserializationSchema) getDeserializationSchema()).setFailOnMissingField(failOnMissingField);
}
//////// HELPER METHODS
- /** Converts the JSON schema into into the return type. */
- private static RowTypeInfo jsonSchemaToReturnType(TableSchema jsonSchema) {
- return new RowTypeInfo(jsonSchema.getTypes(), jsonSchema.getColumnNames());
- }
-
/**
* Abstract builder for a {@link KafkaJsonTableSource} to be extended by builders of subclasses of
* KafkaJsonTableSource.
*
* @param Type of the KafkaJsonTableSource produced by the builder.
* @param Type of the KafkaJsonTableSource.Builder subclass.
+ * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
+ * with descriptors for schema and format instead. Descriptors allow for
+ * implementation-agnostic definition of tables. See also
+ * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
*/
+ @Deprecated
protected abstract static class Builder
extends KafkaTableSource.Builder {
@@ -159,7 +116,9 @@ protected abstract static class Builder tableToJsonMapping) {
this.fieldMapping = tableToJsonMapping;
return builder();
@@ -188,7 +149,9 @@ public B withTableToJsonMapping(Map tableToJsonMapping) {
* field.
* If set to false, a missing field is set to null.
* @return The builder.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
public B failOnMissingField(boolean failOnMissingField) {
this.failOnMissingField = failOnMissingField;
return builder();
@@ -199,7 +162,9 @@ public B failOnMissingField(boolean failOnMissingField) {
* is returned.
*
* @return The JSON schema for the TableSource.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
*/
+ @Deprecated
protected TableSchema getJsonSchema() {
if (jsonSchema != null) {
return this.jsonSchema;
@@ -208,7 +173,13 @@ protected TableSchema getJsonSchema() {
}
}
- @Override
+ /**
+ * Configures a TableSource with optional parameters.
+ *
+ * @param source The TableSource to configure.
+ * @deprecated Use table descriptors instead of implementation-specific builders.
+ */
+ @Deprecated
protected void configureTableSource(T source) {
super.configureTableSource(source);
// configure field mapping
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactory.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactory.java
deleted file mode 100644
index 98985694ab9a2..0000000000000
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactory.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.formats.json.JsonSchemaConverter;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.descriptors.FormatDescriptorValidator;
-import org.apache.flink.table.descriptors.JsonValidator;
-import org.apache.flink.table.descriptors.SchemaValidator;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
-/**
- * Factory for creating configured instances of {@link KafkaJsonTableSource}.
- */
-public abstract class KafkaJsonTableSourceFactory extends KafkaTableSourceFactory {
-
- @Override
- protected String formatType() {
- return JsonValidator.FORMAT_TYPE_VALUE;
- }
-
- @Override
- protected int formatPropertyVersion() {
- return 1;
- }
-
- @Override
- protected List formatProperties() {
- return Arrays.asList(
- JsonValidator.FORMAT_JSON_SCHEMA,
- JsonValidator.FORMAT_SCHEMA,
- JsonValidator.FORMAT_FAIL_ON_MISSING_FIELD,
- FormatDescriptorValidator.FORMAT_DERIVE_SCHEMA());
- }
-
- @Override
- protected FormatDescriptorValidator formatValidator() {
- return new JsonValidator();
- }
-
- @Override
- protected KafkaTableSource.Builder createBuilderWithFormat(DescriptorProperties params) {
- final KafkaJsonTableSource.Builder builder = createKafkaJsonBuilder();
-
- // missing field
- params.getOptionalBoolean(JsonValidator.FORMAT_FAIL_ON_MISSING_FIELD)
- .ifPresent(builder::failOnMissingField);
-
- // json schema
- final TableSchema formatSchema;
- if (params.containsKey(JsonValidator.FORMAT_SCHEMA)) {
- final TypeInformation> info = params.getType(JsonValidator.FORMAT_SCHEMA);
- formatSchema = TableSchema.fromTypeInfo(info);
- } else if (params.containsKey(JsonValidator.FORMAT_JSON_SCHEMA)) {
- final TypeInformation> info = JsonSchemaConverter.convert(params.getString(JsonValidator.FORMAT_JSON_SCHEMA));
- formatSchema = TableSchema.fromTypeInfo(info);
- } else {
- formatSchema = SchemaValidator.deriveFormatFields(params);
- }
- builder.forJsonSchema(formatSchema);
-
- // field mapping
- final Map mapping = SchemaValidator.deriveFieldMapping(params, Optional.of(formatSchema));
- builder.withTableToJsonMapping(mapping);
-
- return builder;
- }
-
- /**
- * Creates a version specific {@link KafkaJsonTableSource.Builder}.
- */
- protected abstract KafkaJsonTableSource.Builder createKafkaJsonBuilder();
-
-}
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
index 29654b061c790..78b373bc2c486 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
@@ -28,6 +28,8 @@
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.descriptors.ConnectorDescriptor;
+import org.apache.flink.table.sources.DefinedFieldMapping;
import org.apache.flink.table.sources.DefinedProctimeAttribute;
import org.apache.flink.table.sources.DefinedRowtimeAttributes;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
@@ -43,6 +45,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.Properties;
import scala.Option;
@@ -54,29 +57,40 @@
* override {@link #createKafkaConsumer(String, Properties, DeserializationSchema)}}.
*/
@Internal
-public abstract class KafkaTableSource
- implements StreamTableSource, DefinedProctimeAttribute, DefinedRowtimeAttributes {
+public abstract class KafkaTableSource implements
+ StreamTableSource,
+ DefinedProctimeAttribute,
+ DefinedRowtimeAttributes,
+ DefinedFieldMapping {
+
+ // common table source attributes
+ // TODO make all attributes final once we drop support for format-specific table sources
/** The schema of the table. */
private final TableSchema schema;
+ /** Field name of the processing time attribute, null if no processing time field is defined. */
+ private Optional proctimeAttribute;
+
+ /** Descriptor for a rowtime attribute. */
+ private List rowtimeAttributeDescriptors;
+
+ /** Mapping for the fields of the table schema to fields of the physical returned type. */
+ private Optional