Skip to content

Commit

Permalink
[FLINK-10687] [table] Move format descriptors and validators to flink…
Browse files Browse the repository at this point in the history
…-table-common

This commit makes the flink-formats module Scala free by introducing a
flink-table-common module that is implemented in Java. This module contains
all classes that are required across different Maven modules.

Additionally, all classes in this module have been annotated with @internal
and @PublicEvolving accordingly.

Since the methods in Descriptor were declared with private[flink] visibility,
they have been migrated to a new toProperties() method that is public.

This closes apache#6958.
  • Loading branch information
twalthr committed Oct 30, 2018
1 parent 95b4086 commit 9e5a427
Show file tree
Hide file tree
Showing 72 changed files with 1,064 additions and 691 deletions.
20 changes: 13 additions & 7 deletions docs/dev/table/sourceSinks.md
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,8 @@ A connector for `MySystem` in our example can extend `ConnectorDescriptor` as sh
<div data-lang="java" markdown="1">
{% highlight java %}
import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.table.descriptors.DescriptorProperties;
import java.util.HashMap;
import java.util.Map;

/**
* Connector to MySystem with debug mode.
Expand All @@ -687,8 +688,10 @@ public class MySystemConnector extends ConnectorDescriptor {
}

@Override
public void addConnectorProperties(DescriptorProperties properties) {
properties.putString("connector.debug", Boolean.toString(isDebug));
protected Map<String, String> toConnectorProperties() {
Map<String, String> properties = new HashMap<>();
properties.put("connector.debug", Boolean.toString(isDebug));
return properties;
}
}
{% endhighlight %}
Expand All @@ -697,15 +700,18 @@ public class MySystemConnector extends ConnectorDescriptor {
<div data-lang="scala" markdown="1">
{% highlight scala %}
import org.apache.flink.table.descriptors.ConnectorDescriptor
import org.apache.flink.table.descriptors.DescriptorProperties
import java.util.HashMap
import java.util.Map

/**
* Connector to MySystem with debug mode.
*/
class MySystemConnector(isDebug: Boolean) extends ConnectorDescriptor("my-system", 1, formatNeeded = false) {
class MySystemConnector(isDebug: Boolean) extends ConnectorDescriptor("my-system", 1, false) {

override protected def addConnectorProperties(properties: DescriptorProperties): Unit = {
properties.putString("connector.debug", isDebug.toString)
override protected def toConnectorProperties(): Map[String, String] = {
val properties = new HashMap[String, String]
properties.put("connector.debug", isDebug.toString)
properties
}
}
{% endhighlight %}
Expand Down
9 changes: 9 additions & 0 deletions flink-connectors/flink-connector-elasticsearch-base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,15 @@ under the License.
<scope>test</scope>
</dependency>

<!-- Elasticsearch table descriptor testing -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<!-- Elasticsearch table sink factory testing -->
<dependency>
<groupId>org.apache.flink</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,9 @@ public abstract class ElasticsearchUpsertTableSinkFactoryBase implements StreamT
@Override
public Map<String, String> requiredContext() {
final Map<String, String> context = new HashMap<>();
context.put(CONNECTOR_TYPE(), CONNECTOR_TYPE_VALUE_ELASTICSEARCH);
context.put(CONNECTOR_VERSION(), elasticsearchVersion());
context.put(CONNECTOR_PROPERTY_VERSION(), "1");
context.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_ELASTICSEARCH);
context.put(CONNECTOR_VERSION, elasticsearchVersion());
context.put(CONNECTOR_PROPERTY_VERSION, "1");
return context;
}

Expand Down Expand Up @@ -140,7 +140,7 @@ public List<String> supportedProperties() {
properties.add(SCHEMA() + ".#." + SCHEMA_NAME());

// format wildcard
properties.add(FORMAT() + ".*");
properties.add(FORMAT + ".*");

return properties;
}
Expand Down Expand Up @@ -210,7 +210,7 @@ private List<Host> getHosts(DescriptorProperties descriptorProperties) {
}

private SerializationSchema<Row> getSerializationSchema(Map<String, String> properties) {
final String formatType = properties.get(FORMAT_TYPE());
final String formatType = properties.get(FORMAT_TYPE);
// we could have added this check to the table factory context
// but this approach allows to throw more helpful error messages
// if the supported format has not been added
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION;
Expand Down Expand Up @@ -71,7 +72,7 @@ public Elasticsearch() {
* @param version Elasticsearch version. E.g., "6".
*/
public Elasticsearch version(String version) {
internalProperties.putString(CONNECTOR_VERSION(), version);
internalProperties.putString(CONNECTOR_VERSION, version);
return this;
}

Expand Down Expand Up @@ -301,8 +302,9 @@ public Elasticsearch connectionPathPrefix(String pathPrefix) {
}

@Override
public void addConnectorProperties(DescriptorProperties properties) {
properties.putProperties(internalProperties.asMap());
protected Map<String, String> toConnectorProperties() {
final DescriptorProperties properties = new DescriptorProperties();
properties.putProperties(internalProperties);

final List<List<String>> hostValues = hosts.stream()
.map(host -> Arrays.asList(host.hostname, String.valueOf(host.port), host.protocol))
Expand All @@ -311,5 +313,7 @@ public void addConnectorProperties(DescriptorProperties properties) {
CONNECTOR_HOSTS,
Arrays.asList(CONNECTOR_HOSTS_HOSTNAME, CONNECTOR_HOSTS_PORT, CONNECTOR_HOSTS_PROTOCOL),
hostValues);

return properties.asMap();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.flink.table.descriptors;

import org.apache.flink.annotation.Internal;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -29,6 +31,7 @@
/**
* The validator for {@link Elasticsearch}.
*/
@Internal
public class ElasticsearchValidator extends ConnectorDescriptorValidator {

public static final String CONNECTOR_TYPE_VALUE_ELASTICSEARCH = "elasticsearch";
Expand Down Expand Up @@ -63,7 +66,7 @@ public class ElasticsearchValidator extends ConnectorDescriptorValidator {
@Override
public void validate(DescriptorProperties properties) {
super.validate(properties);
properties.validateValue(CONNECTOR_TYPE(), CONNECTOR_TYPE_VALUE_ELASTICSEARCH, false);
properties.validateValue(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_ELASTICSEARCH, false);
validateVersion(properties);
validateHosts(properties);
validateGeneralProperties(properties);
Expand All @@ -74,7 +77,7 @@ public void validate(DescriptorProperties properties) {

private void validateVersion(DescriptorProperties properties) {
properties.validateEnumValues(
CONNECTOR_VERSION(),
CONNECTOR_VERSION,
false,
Collections.singletonList(CONNECTOR_VERSION_VALUE_6));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.flink.table.factories.StreamTableSinkFactory;
import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.util.JavaScalaConversionUtil;
import org.apache.flink.types.Row;
import org.apache.flink.util.TestLogger;

Expand Down Expand Up @@ -113,7 +112,7 @@ public void testTableSink() {
.field(FIELD_TS, Types.SQL_TIMESTAMP()))
.inUpsertMode();

final Map<String, String> propertiesMap = JavaScalaConversionUtil.toJavaMap(testDesc);
final Map<String, String> propertiesMap = testDesc.toProperties();
final TableSink<?> actualSink = TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
.createStreamTableSink(propertiesMap);

Expand Down
9 changes: 9 additions & 0 deletions flink-connectors/flink-connector-kafka-base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,15 @@ under the License.
<scope>test</scope>
</dependency>

<!-- Kafka table descriptor testing -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minikdc</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ public abstract class KafkaTableSourceSinkFactoryBase implements
public Map<String, String> requiredContext() {
Map<String, String> context = new HashMap<>();
context.put(UPDATE_MODE(), UPDATE_MODE_VALUE_APPEND()); // append mode
context.put(CONNECTOR_TYPE(), CONNECTOR_TYPE_VALUE_KAFKA); // kafka
context.put(CONNECTOR_VERSION(), kafkaVersion()); // version
context.put(CONNECTOR_PROPERTY_VERSION(), "1"); // backwards compatibility
context.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_KAFKA); // kafka
context.put(CONNECTOR_VERSION, kafkaVersion()); // version
context.put(CONNECTOR_PROPERTY_VERSION, "1"); // backwards compatibility
return context;
}

Expand Down Expand Up @@ -131,7 +131,7 @@ public List<String> supportedProperties() {
properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_DELAY());

// format wildcard
properties.add(FORMAT() + ".*");
properties.add(FORMAT + ".*");

return properties;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,13 +246,12 @@ public Kafka sinkPartitionerCustom(Class<? extends FlinkKafkaPartitioner> partit
return this;
}

/**
* Internal method for connector properties conversion.
*/
@Override
public void addConnectorProperties(DescriptorProperties properties) {
protected Map<String, String> toConnectorProperties() {
final DescriptorProperties properties = new DescriptorProperties();

if (version != null) {
properties.putString(CONNECTOR_VERSION(), version);
properties.putString(CONNECTOR_VERSION, version);
}

if (topic != null) {
Expand Down Expand Up @@ -290,5 +289,7 @@ public void addConnectorProperties(DescriptorProperties properties) {
properties.putClass(CONNECTOR_SINK_PARTITIONER_CLASS, sinkPartitionerClass);
}
}

return properties.asMap();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.table.descriptors;

import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;

import java.util.Arrays;
Expand All @@ -31,6 +32,7 @@
/**
* The validator for {@link Kafka}.
*/
@Internal
public class KafkaValidator extends ConnectorDescriptorValidator {

public static final String CONNECTOR_TYPE_VALUE_KAFKA = "kafka";
Expand Down Expand Up @@ -60,7 +62,7 @@ public class KafkaValidator extends ConnectorDescriptorValidator {
@Override
public void validate(DescriptorProperties properties) {
super.validate(properties);
properties.validateValue(CONNECTOR_TYPE(), CONNECTOR_TYPE_VALUE_KAFKA, false);
properties.validateValue(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_KAFKA, false);

validateVersion(properties);

Expand All @@ -78,7 +80,7 @@ private void validateVersion(DescriptorProperties properties) {
CONNECTOR_VERSION_VALUE_010,
CONNECTOR_VERSION_VALUE_011,
CONNECTOR_VERSION_VALUE_20);
properties.validateEnumValues(CONNECTOR_VERSION(), false, versions);
properties.validateEnumValues(CONNECTOR_VERSION, false, versions);
properties.validateString(CONNECTOR_TOPIC, false, 1, Integer.MAX_VALUE);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.formats.json.JsonRowSchemaConverter;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.FormatDescriptor;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Kafka;
Expand Down Expand Up @@ -156,11 +155,10 @@ private void testTableSource(FormatDescriptor format) {
.field("proc-time", Types.SQL_TIMESTAMP).proctime())
.inAppendMode();

DescriptorProperties properties = new DescriptorProperties(true);
testDesc.addProperties(properties);
final Map<String, String> properties = testDesc.toProperties();
final TableSource<?> factorySource =
TableFactoryService.find(StreamTableSourceFactory.class, testDesc)
.createStreamTableSource(properties.asMap());
TableFactoryService.find(StreamTableSourceFactory.class, properties)
.createStreamTableSource(properties);

assertEquals(builderSource, factorySource);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.apache.flink.table.sources.TableSourceUtil;
import org.apache.flink.table.sources.tsextractors.ExistingField;
import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;
import org.apache.flink.table.util.JavaScalaConversionUtil;
import org.apache.flink.types.Row;
import org.apache.flink.util.TestLogger;

Expand Down Expand Up @@ -165,7 +164,7 @@ public void testTableSource() {
.field(PROC_TIME, Types.SQL_TIMESTAMP()).proctime())
.inAppendMode();

final Map<String, String> propertiesMap = JavaScalaConversionUtil.toJavaMap(testDesc);
final Map<String, String> propertiesMap = testDesc.toProperties();
final TableSource<?> actualSource = TableFactoryService.find(StreamTableSourceFactory.class, propertiesMap)
.createStreamTableSource(propertiesMap);

Expand Down Expand Up @@ -215,7 +214,7 @@ public void testTableSink() {
.field(EVENT_TIME, Types.SQL_TIMESTAMP()))
.inAppendMode();

final Map<String, String> propertiesMap = JavaScalaConversionUtil.toJavaMap(testDesc);
final Map<String, String> propertiesMap = testDesc.toProperties();
final TableSink<?> actualSink = TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
.createStreamTableSink(propertiesMap);

Expand Down
22 changes: 18 additions & 4 deletions flink-formats/flink-avro/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,30 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<!-- use a dedicated Scala version to not depend on it -->
<artifactId>flink-table_2.11</artifactId>
<artifactId>flink-table-common</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
<!-- Projects depending on this project, won't depend on flink-table. -->
<optional>true</optional>
</dependency>

<!-- test dependencies -->

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>

<!-- TODO This could be dropped if we move the Table Avro IT Case -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<!-- TODO This could be dropped if we move the Table Avro IT Case -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
Expand Down
Loading

0 comments on commit 9e5a427

Please sign in to comment.