Skip to content

Commit

Permalink
[FLINK-8866] [table] Merge table source/sink/format factories
Browse files Browse the repository at this point in the history
- Rename to TableFactory and move it to org.apache.flink.table.factories package
- Unify source/sink/format factories with same logic and exceptions
  • Loading branch information
twalthr committed Jul 15, 2018
1 parent 9597248 commit abbb890
Show file tree
Hide file tree
Showing 50 changed files with 621 additions and 741 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connectors.DiscoverableTableFactory;
import org.apache.flink.table.connectors.TableSourceFactory;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.KafkaValidator;
import org.apache.flink.table.descriptors.SchemaValidator;
import org.apache.flink.table.formats.DeserializationSchemaFactory;
import org.apache.flink.table.formats.TableFormatFactoryService;
import org.apache.flink.table.factories.DeserializationSchemaFactory;
import org.apache.flink.table.factories.TableFactory;
import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.table.factories.TableSourceFactory;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.types.Row;
Expand Down Expand Up @@ -72,7 +72,7 @@
/**
* Factory for creating configured instances of {@link KafkaTableSource}.
*/
public abstract class KafkaTableSourceFactory implements TableSourceFactory<Row>, DiscoverableTableFactory {
public abstract class KafkaTableSourceFactory implements TableSourceFactory<Row>, TableFactory {

@Override
public Map<String, String> requiredContext() {
Expand Down Expand Up @@ -129,7 +129,7 @@ public TableSource<Row> createTableSource(Map<String, String> properties) {
new KafkaValidator().validate(params);

// deserialization schema using format discovery
final DeserializationSchemaFactory<?> formatFactory = TableFormatFactoryService.find(
final DeserializationSchemaFactory<?> formatFactory = TableFactoryService.find(
DeserializationSchemaFactory.class,
properties,
this.getClass().getClassLoader());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import org.apache.flink.formats.json.JsonRowSchemaConverter;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connectors.TableFactoryService;
import org.apache.flink.table.connectors.TableSourceFactory;
import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.table.factories.TableSourceFactory;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.FormatDescriptor;
import org.apache.flink.table.descriptors.Json;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,17 @@
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Rowtime;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.descriptors.TestTableSourceDescriptor;
import org.apache.flink.table.formats.utils.TestDeserializationSchema;
import org.apache.flink.table.formats.utils.TestTableFormat;
import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.table.factories.TableSourceFactory;
import org.apache.flink.table.factories.utils.TestDeserializationSchema;
import org.apache.flink.table.factories.utils.TestTableFormat;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.table.sources.TableSourceFactoryService;
import org.apache.flink.table.sources.tsextractors.ExistingField;
import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;
import org.apache.flink.types.Row;
Expand Down Expand Up @@ -141,8 +143,12 @@ public void testTableSource() {
.field(EVENT_TIME, Types.SQL_TIMESTAMP()).rowtime(
new Rowtime().timestampsFromField(TIME).watermarksPeriodicAscending())
.field(PROC_TIME, Types.SQL_TIMESTAMP()).proctime());
final DescriptorProperties descriptorProperties = new DescriptorProperties(true);
testDesc.addProperties(descriptorProperties);
final Map<String, String> propertiesMap = descriptorProperties.asMap();

final TableSource<?> actualSource = TableSourceFactoryService.findAndCreateTableSource(testDesc);
final TableSource<?> actualSource = TableFactoryService.find(TableSourceFactory.class, testDesc)
.createTableSource(propertiesMap);

assertEquals(expected, actualSource);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.apache.flink.table.descriptors.AvroValidator;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.FormatDescriptorValidator;
import org.apache.flink.table.formats.DeserializationSchemaFactory;
import org.apache.flink.table.formats.SerializationSchemaFactory;
import org.apache.flink.table.factories.DeserializationSchemaFactory;
import org.apache.flink.table.factories.SerializationSchemaFactory;
import org.apache.flink.types.Row;

import org.apache.avro.specific.SpecificRecord;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
import org.apache.flink.table.descriptors.Avro;
import org.apache.flink.table.descriptors.Descriptor;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.formats.DeserializationSchemaFactory;
import org.apache.flink.table.formats.SerializationSchemaFactory;
import org.apache.flink.table.formats.TableFormatFactoryService;
import org.apache.flink.table.factories.DeserializationSchemaFactory;
import org.apache.flink.table.factories.SerializationSchemaFactory;
import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.util.TestLogger;

import org.junit.Test;
Expand Down Expand Up @@ -63,31 +63,31 @@ public void testAvroSchema() {
}

private void testRecordClassSerializationSchema(Map<String, String> properties) {
final DeserializationSchema<?> actual2 = TableFormatFactoryService
final DeserializationSchema<?> actual2 = TableFactoryService
.find(DeserializationSchemaFactory.class, properties)
.createDeserializationSchema(properties);
final AvroRowDeserializationSchema expected2 = new AvroRowDeserializationSchema(AVRO_SPECIFIC_RECORD);
assertEquals(expected2, actual2);
}

private void testRecordClassDeserializationSchema(Map<String, String> properties) {
final SerializationSchema<?> actual1 = TableFormatFactoryService
final SerializationSchema<?> actual1 = TableFactoryService
.find(SerializationSchemaFactory.class, properties)
.createSerializationSchema(properties);
final SerializationSchema<?> expected1 = new AvroRowSerializationSchema(AVRO_SPECIFIC_RECORD);
assertEquals(expected1, actual1);
}

private void testAvroSchemaDeserializationSchema(Map<String, String> properties) {
final DeserializationSchema<?> actual2 = TableFormatFactoryService
final DeserializationSchema<?> actual2 = TableFactoryService
.find(DeserializationSchemaFactory.class, properties)
.createDeserializationSchema(properties);
final AvroRowDeserializationSchema expected2 = new AvroRowDeserializationSchema(AVRO_SCHEMA);
assertEquals(expected2, actual2);
}

private void testAvroSchemaSerializationSchema(Map<String, String> properties) {
final SerializationSchema<?> actual1 = TableFormatFactoryService
final SerializationSchema<?> actual1 = TableFactoryService
.find(SerializationSchemaFactory.class, properties)
.createSerializationSchema(properties);
final SerializationSchema<?> expected1 = new AvroRowSerializationSchema(AVRO_SCHEMA);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import org.apache.flink.table.descriptors.FormatDescriptorValidator;
import org.apache.flink.table.descriptors.JsonValidator;
import org.apache.flink.table.descriptors.SchemaValidator;
import org.apache.flink.table.formats.DeserializationSchemaFactory;
import org.apache.flink.table.formats.SerializationSchemaFactory;
import org.apache.flink.table.factories.DeserializationSchemaFactory;
import org.apache.flink.table.factories.SerializationSchemaFactory;
import org.apache.flink.types.Row;

import java.util.ArrayList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.formats.DeserializationSchemaFactory;
import org.apache.flink.table.formats.SerializationSchemaFactory;
import org.apache.flink.table.formats.TableFormatFactoryService;
import org.apache.flink.table.factories.DeserializationSchemaFactory;
import org.apache.flink.table.factories.SerializationSchemaFactory;
import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.types.Row;
import org.apache.flink.util.TestLogger;

Expand Down Expand Up @@ -107,7 +107,7 @@ public void testSchemaDerivation() {
}

private void testSchemaDeserializationSchema(Map<String, String> properties) {
final DeserializationSchema<?> actual2 = TableFormatFactoryService
final DeserializationSchema<?> actual2 = TableFactoryService
.find(DeserializationSchemaFactory.class, properties)
.createDeserializationSchema(properties);
final JsonRowDeserializationSchema expected2 = new JsonRowDeserializationSchema(SCHEMA);
Expand All @@ -116,15 +116,15 @@ private void testSchemaDeserializationSchema(Map<String, String> properties) {
}

private void testSchemaSerializationSchema(Map<String, String> properties) {
final SerializationSchema<?> actual1 = TableFormatFactoryService
final SerializationSchema<?> actual1 = TableFactoryService
.find(SerializationSchemaFactory.class, properties)
.createSerializationSchema(properties);
final SerializationSchema expected1 = new JsonRowSerializationSchema(SCHEMA);
assertEquals(expected1, actual1);
}

private void testJsonSchemaDeserializationSchema(Map<String, String> properties) {
final DeserializationSchema<?> actual2 = TableFormatFactoryService
final DeserializationSchema<?> actual2 = TableFactoryService
.find(DeserializationSchemaFactory.class, properties)
.createDeserializationSchema(properties);
final JsonRowDeserializationSchema expected2 = new JsonRowDeserializationSchema(JSON_SCHEMA);
Expand All @@ -133,7 +133,7 @@ private void testJsonSchemaDeserializationSchema(Map<String, String> properties)
}

private void testJsonSchemaSerializationSchema(Map<String, String> properties) {
final SerializationSchema<?> actual1 = TableFormatFactoryService
final SerializationSchema<?> actual1 = TableFactoryService
.find(SerializationSchemaFactory.class, properties)
.createSerializationSchema(properties);
final SerializationSchema<?> expected1 = new JsonRowSerializationSchema(JSON_SCHEMA);
Expand Down
2 changes: 1 addition & 1 deletion flink-libraries/flink-sql-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ under the License.
<include>org.codehaus.commons.compiler.properties</include>
<include>org/codehaus/janino/**</include>
<include>org/codehaus/commons/**</include>
<include>META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory</include>
<include>META-INF/services/org.apache.flink.table.factories.TableFactory</include>
<!-- flink-sql-client -->
<include>org/jline/**</include>
<include>com/fasterxml/jackson/**</include>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public class Environment {

private Deployment deployment;

private static final String NAME = "name";
private static final String TABLE_NAME = "name";
private static final String TABLE_TYPE = "type";

public Environment() {
this.tables = Collections.emptyMap();
Expand All @@ -64,16 +65,16 @@ public Map<String, TableDescriptor> getTables() {
public void setTables(List<Map<String, Object>> tables) {
this.tables = new HashMap<>(tables.size());
tables.forEach(config -> {
if (!config.containsKey(NAME)) {
if (!config.containsKey(TABLE_NAME)) {
throw new SqlClientException("The 'name' attribute of a table is missing.");
}
final Object nameObject = config.get(NAME);
final Object nameObject = config.get(TABLE_NAME);
if (nameObject == null || !(nameObject instanceof String) || ((String) nameObject).length() <= 0) {
throw new SqlClientException("Invalid table name '" + nameObject + "'.");
}
final String name = (String) nameObject;
final Map<String, Object> properties = new HashMap<>(config);
properties.remove(NAME);
properties.remove(TABLE_NAME);

if (this.tables.containsKey(name)) {
throw new SqlClientException("Duplicate table name '" + name + "'.");
Expand Down Expand Up @@ -209,11 +210,12 @@ public static Environment enrich(Environment env, Map<String, String> properties
* @return table descriptor describing a source, sink, or both
*/
private static TableDescriptor createTableDescriptor(String name, Map<String, Object> config) {
final Object typeObject = config.get(TableDescriptorValidator.TABLE_TYPE());
final Object typeObject = config.get(TABLE_TYPE);
if (typeObject == null || !(typeObject instanceof String)) {
throw new SqlClientException("Invalid 'type' attribute for table '" + name + "'.");
}
final String type = (String) config.get(TableDescriptorValidator.TABLE_TYPE());
final String type = (String) config.get(TABLE_TYPE);
config.remove(TABLE_TYPE);
final Map<String, String> normalizedConfig = ConfigUtil.normalizeYaml(config);
if (type.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE())) {
return new Source(name, normalizedConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@
import org.apache.flink.table.client.config.SourceSink;
import org.apache.flink.table.client.gateway.SessionContext;
import org.apache.flink.table.client.gateway.SqlExecutionException;
import org.apache.flink.table.connectors.TableFactoryService;
import org.apache.flink.table.connectors.TableSinkFactory;
import org.apache.flink.table.connectors.TableSourceFactory;
import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.table.factories.TableSinkFactory;
import org.apache.flink.table.factories.TableSourceFactory;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.FunctionService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ under the License.
<file>
<source>src/test/resources/test-factory-services-file</source>
<outputDirectory>META-INF/services</outputDirectory>
<destName>org.apache.flink.table.connectors.DiscoverableTableFactory</destName>
<destName>org.apache.flink.table.factories.TableFactory</destName>
<fileMode>0755</fileMode>
</file>
</files>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.client.gateway.local.DependencyTest;
import org.apache.flink.table.connectors.DiscoverableTableFactory;
import org.apache.flink.table.connectors.TableSourceFactory;
import org.apache.flink.table.factories.TableFactory;
import org.apache.flink.table.factories.TableSourceFactory;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.SchemaValidator;
import org.apache.flink.table.sources.DefinedProctimeAttribute;
Expand All @@ -51,7 +51,7 @@
/**
* Table source factory for testing the classloading in {@link DependencyTest}.
*/
public class TestTableSourceFactory implements TableSourceFactory<Row>, DiscoverableTableFactory {
public class TestTableSourceFactory implements TableSourceFactory<Row>, TableFactory {

@Override
public Map<String, String> requiredContext() {
Expand Down
Loading

0 comments on commit abbb890

Please sign in to comment.