Skip to content

Commit

Permalink
[FLINK-8866] [table] Move table type out of descriptors
Browse files Browse the repository at this point in the history
The declaration of a table type is SQL Client/context specific and should not be part of a descriptor.
  • Loading branch information
twalthr committed Jul 15, 2018
1 parent abbb890 commit 09fbfdf
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.table.client.SqlClientException;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.TableDescriptor;
import org.apache.flink.table.descriptors.TableDescriptorValidator;

import java.io.IOException;
import java.net.URL;
Expand Down Expand Up @@ -50,6 +49,9 @@ public class Environment {

private static final String TABLE_NAME = "name";
private static final String TABLE_TYPE = "type";
private static final String TABLE_TYPE_VALUE_SOURCE = "source";
private static final String TABLE_TYPE_VALUE_SINK = "sink";
private static final String TABLE_TYPE_VALUE_BOTH = "both";

public Environment() {
this.tables = Collections.emptyMap();
Expand Down Expand Up @@ -214,17 +216,21 @@ private static TableDescriptor createTableDescriptor(String name, Map<String, Ob
if (typeObject == null || !(typeObject instanceof String)) {
throw new SqlClientException("Invalid 'type' attribute for table '" + name + "'.");
}
final String type = (String) config.get(TABLE_TYPE);
config.remove(TABLE_TYPE);
final Map<String, String> normalizedConfig = ConfigUtil.normalizeYaml(config);
if (type.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE())) {
return new Source(name, normalizedConfig);
} else if (type.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SINK())) {
return new Sink(name, normalizedConfig);
} else if (type.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE_SINK())) {
return new SourceSink(name, normalizedConfig);
final String type = (String) typeObject;
final Map<String, Object> configCopy = new HashMap<>(config);
configCopy.remove(TABLE_TYPE);

final Map<String, String> normalizedConfig = ConfigUtil.normalizeYaml(configCopy);
switch (type) {
case TABLE_TYPE_VALUE_SOURCE:
return new Source(name, normalizedConfig);
case TABLE_TYPE_VALUE_SINK:
return new Sink(name, normalizedConfig);
case TABLE_TYPE_VALUE_BOTH:
return new SourceSink(name, normalizedConfig);
default:
throw new SqlClientException(String.format("Invalid 'type' attribute for table '%s'. " +
"Only 'source', 'sink', and 'both' are supported. But was '%s'.", name, type));
}
throw new SqlClientException("Invalid 'type' attribute for table '" + name + "'. " +
"Only 'source', 'sink', and 'both' are supported.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,10 @@ public void addProperties(DescriptorProperties properties) {
}

public Source toSource() {
final Map<String, String> newProperties = new HashMap<>(properties);
newProperties.replace(TableDescriptorValidator.TABLE_TYPE(),
TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE());
return new Source(name, newProperties);
return new Source(name, properties);
}

public Sink toSink() {
final Map<String, String> newProperties = new HashMap<>(properties);
newProperties.replace(TableDescriptorValidator.TABLE_TYPE(),
TableDescriptorValidator.TABLE_TYPE_VALUE_SINK());
return new Sink(name, newProperties);
return new Sink(name, properties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,3 @@ class TableDescriptorValidator extends DescriptorValidator {
// nothing to do
}
}

object TableDescriptorValidator {

/**
* Key for describing the type of this table, valid values are ('source', 'sink', 'both').
*/
val TABLE_TYPE = "type"

/**
* Valid TABLE_TYPE value.
*/
val TABLE_TYPE_VALUE_SOURCE = "source"
val TABLE_TYPE_VALUE_SINK = "sink"
val TABLE_TYPE_VALUE_SOURCE_SINK = "both"
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ package org.apache.flink.table.descriptors
* Common class for all descriptors describing a table sink.
*/
abstract class TableSinkDescriptor extends TableDescriptor {

/**
* Internal method for properties conversion.
*/
override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
super.addProperties(properties)
properties.putString(TableDescriptorValidator.TABLE_TYPE,
TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ abstract class TableSourceDescriptor extends TableDescriptor {
*/
override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
super.addProperties(properties)
properties.putString(TableDescriptorValidator.TABLE_TYPE,
TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE)
statisticsDescriptor.foreach(_.addProperties(properties))
}

Expand Down

0 comments on commit 09fbfdf

Please sign in to comment.