diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java index 2b59e06591956..0853afbb45efc 100644 --- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java @@ -21,7 +21,6 @@ import org.apache.flink.table.client.SqlClientException; import org.apache.flink.table.descriptors.DescriptorProperties; import org.apache.flink.table.descriptors.TableDescriptor; -import org.apache.flink.table.descriptors.TableDescriptorValidator; import java.io.IOException; import java.net.URL; @@ -50,6 +49,9 @@ public class Environment { private static final String TABLE_NAME = "name"; private static final String TABLE_TYPE = "type"; + private static final String TABLE_TYPE_VALUE_SOURCE = "source"; + private static final String TABLE_TYPE_VALUE_SINK = "sink"; + private static final String TABLE_TYPE_VALUE_BOTH = "both"; public Environment() { this.tables = Collections.emptyMap(); @@ -214,17 +216,21 @@ private static TableDescriptor createTableDescriptor(String name, Map normalizedConfig = ConfigUtil.normalizeYaml(config); - if (type.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE())) { - return new Source(name, normalizedConfig); - } else if (type.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SINK())) { - return new Sink(name, normalizedConfig); - } else if (type.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE_SINK())) { - return new SourceSink(name, normalizedConfig); + final String type = (String) typeObject; + final Map configCopy = new HashMap<>(config); + configCopy.remove(TABLE_TYPE); + + final Map normalizedConfig = ConfigUtil.normalizeYaml(configCopy); + switch (type) { + case TABLE_TYPE_VALUE_SOURCE: + return new Source(name, normalizedConfig); + case TABLE_TYPE_VALUE_SINK: + return new Sink(name, normalizedConfig); + case TABLE_TYPE_VALUE_BOTH: + return new SourceSink(name, normalizedConfig); + default: + throw new SqlClientException(String.format("Invalid 'type' attribute for table '%s'. " + + "Only 'source', 'sink', and 'both' are supported. But was '%s'.", name, type)); } - throw new SqlClientException("Invalid 'type' attribute for table '" + name + "'. " + - "Only 'source', 'sink', and 'both' are supported."); } } diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java index 33fb0f11f40fc..bfa3c4444629f 100644 --- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java @@ -50,16 +50,10 @@ public void addProperties(DescriptorProperties properties) { } public Source toSource() { - final Map newProperties = new HashMap<>(properties); - newProperties.replace(TableDescriptorValidator.TABLE_TYPE(), - TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE()); - return new Source(name, newProperties); + return new Source(name, properties); } public Sink toSink() { - final Map newProperties = new HashMap<>(properties); - newProperties.replace(TableDescriptorValidator.TABLE_TYPE(), - TableDescriptorValidator.TABLE_TYPE_VALUE_SINK()); - return new Sink(name, newProperties); + return new Sink(name, properties); } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptorValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptorValidator.scala index c712e72602799..e0fa6025811a3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptorValidator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptorValidator.scala @@ -27,18 +27,3 @@ class TableDescriptorValidator extends DescriptorValidator { // nothing to do } } - -object TableDescriptorValidator { - - /** - * Key for describing the type of this table, valid values are ('source', 'sink', 'both'). - */ - val TABLE_TYPE = "type" - - /** - * Valid TABLE_TYPE value. - */ - val TABLE_TYPE_VALUE_SOURCE = "source" - val TABLE_TYPE_VALUE_SINK = "sink" - val TABLE_TYPE_VALUE_SOURCE_SINK = "both" -} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSinkDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSinkDescriptor.scala index 97ed47d65ea54..0a4d5044ff8a5 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSinkDescriptor.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSinkDescriptor.scala @@ -22,9 +22,11 @@ package org.apache.flink.table.descriptors * Common class for all descriptors describing a table sink. */ abstract class TableSinkDescriptor extends TableDescriptor { + + /** + * Internal method for properties conversion. + */ override private[flink] def addProperties(properties: DescriptorProperties): Unit = { super.addProperties(properties) - properties.putString(TableDescriptorValidator.TABLE_TYPE, - TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE) } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala index 2a0b67ce4f78e..3ca39c2bbfca2 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala @@ -36,8 +36,6 @@ abstract class TableSourceDescriptor extends TableDescriptor { */ override private[flink] def addProperties(properties: DescriptorProperties): Unit = { super.addProperties(properties) - properties.putString(TableDescriptorValidator.TABLE_TYPE, - TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE) statisticsDescriptor.foreach(_.addProperties(properties)) }