Skip to content

Commit

Permalink
[FLINK-8866] [table] Make source/sink factories environment-dependent
Browse files Browse the repository at this point in the history
Usually it is very uncommon to define both a batch and streaming source in the same factory. Separating by environment is a concept that can be find throughout the entire flink-table module because both sources and sinks behave quite different per environment.

This closes apache#6323.
  • Loading branch information
twalthr committed Jul 15, 2018
1 parent 09fbfdf commit 0e5ac4d
Show file tree
Hide file tree
Showing 34 changed files with 353 additions and 184 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.table.api.TableSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.table.api.TableSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_08;

/**
* Tests for {@link Kafka08JsonTableSourceFactory}.
* Tests for legacy Kafka08JsonTableSourceFactory.
*
* @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
* drop support for format-specific table sources.
*/
@Deprecated
public class Kafka08JsonTableSourceFactoryTest extends KafkaJsonTableSourceFactoryTestBase {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.table.api.TableSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,10 @@
import org.apache.flink.table.descriptors.KafkaValidator;
import org.apache.flink.table.descriptors.SchemaValidator;
import org.apache.flink.table.factories.DeserializationSchemaFactory;
import org.apache.flink.table.factories.TableFactory;
import org.apache.flink.table.factories.StreamTableSourceFactory;
import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.table.factories.TableSourceFactory;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;

import java.util.ArrayList;
Expand Down Expand Up @@ -72,7 +71,7 @@
/**
* Factory for creating configured instances of {@link KafkaTableSource}.
*/
public abstract class KafkaTableSourceFactory implements TableSourceFactory<Row>, TableFactory {
public abstract class KafkaTableSourceFactory implements StreamTableSourceFactory<Row> {

@Override
public Map<String, String> requiredContext() {
Expand Down Expand Up @@ -119,7 +118,7 @@ public List<String> supportedProperties() {
}

@Override
public TableSource<Row> createTableSource(Map<String, String> properties) {
public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
final DescriptorProperties params = new DescriptorProperties(true);
params.putProperties(properties);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@
import org.apache.flink.formats.json.JsonRowSchemaConverter;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.table.factories.TableSourceFactory;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.FormatDescriptor;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Rowtime;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.descriptors.TestTableSourceDescriptor;
import org.apache.flink.table.factories.StreamTableSourceFactory;
import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.table.sources.tsextractors.ExistingField;
import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;
Expand Down Expand Up @@ -153,8 +153,8 @@ private void testTableSource(FormatDescriptor format) {
DescriptorProperties properties = new DescriptorProperties(true);
testDesc.addProperties(properties);
final TableSource<?> factorySource =
((TableSourceFactory) TableFactoryService.find(TableSourceFactory.class, testDesc))
.createTableSource(properties.asMap());
TableFactoryService.find(StreamTableSourceFactory.class, testDesc)
.createStreamTableSource(properties.asMap());

assertEquals(builderSource, factorySource);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import org.apache.flink.table.descriptors.Rowtime;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.descriptors.TestTableSourceDescriptor;
import org.apache.flink.table.factories.StreamTableSourceFactory;
import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.table.factories.TableSourceFactory;
import org.apache.flink.table.factories.utils.TestDeserializationSchema;
import org.apache.flink.table.factories.utils.TestTableFormat;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
Expand Down Expand Up @@ -147,8 +147,8 @@ public void testTableSource() {
testDesc.addProperties(descriptorProperties);
final Map<String, String> propertiesMap = descriptorProperties.asMap();

final TableSource<?> actualSource = TableFactoryService.find(TableSourceFactory.class, testDesc)
.createTableSource(propertiesMap);
final TableSource<?> actualSource = TableFactoryService.find(StreamTableSourceFactory.class, testDesc)
.createStreamTableSource(propertiesMap);

assertEquals(expected, actualSource);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,18 @@
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.client.config.Deployment;
import org.apache.flink.table.client.config.Environment;
import org.apache.flink.table.client.config.Execution;
import org.apache.flink.table.client.config.Sink;
import org.apache.flink.table.client.config.Source;
import org.apache.flink.table.client.config.SourceSink;
import org.apache.flink.table.client.gateway.SessionContext;
import org.apache.flink.table.client.gateway.SqlExecutionException;
import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.table.factories.TableSinkFactory;
import org.apache.flink.table.factories.TableSourceFactory;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.factories.BatchTableSinkFactory;
import org.apache.flink.table.factories.BatchTableSourceFactory;
import org.apache.flink.table.factories.StreamTableSinkFactory;
import org.apache.flink.table.factories.StreamTableSourceFactory;
import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.FunctionService;
import org.apache.flink.table.functions.ScalarFunction;
Expand Down Expand Up @@ -114,15 +117,12 @@ public ExecutionContext(Environment defaultEnvironment, SessionContext sessionCo
final DescriptorProperties properties = new DescriptorProperties(true);
descriptor.addProperties(properties);
final Map<String, String> propertyMap = properties.asMap();

if (descriptor instanceof Source || descriptor instanceof SourceSink) {
final TableSourceFactory<?> factory = (TableSourceFactory<?>)
TableFactoryService.find(TableSourceFactory.class, descriptor, classLoader);
tableSources.put(name, factory.createTableSource(propertyMap));
tableSources.put(name, createTableSource(mergedEnv.getExecution(), propertyMap, classLoader));
}
if (descriptor instanceof Sink || descriptor instanceof SourceSink) {
final TableSinkFactory<?> factory = (TableSinkFactory<?>)
TableFactoryService.find(TableSinkFactory.class, descriptor, classLoader);
tableSinks.put(name, factory.createTableSink(propertyMap));
tableSinks.put(name, createTableSink(mergedEnv.getExecution(), propertyMap, classLoader));
}
});

Expand Down Expand Up @@ -205,6 +205,32 @@ private static ClusterSpecification createClusterSpecification(CustomCommandLine
}
}

private static TableSource<?> createTableSource(Execution execution, Map<String, String> sourceProperties, ClassLoader classLoader) {
if (execution.isStreamingExecution()) {
final StreamTableSourceFactory<?> factory = (StreamTableSourceFactory<?>)
TableFactoryService.find(StreamTableSourceFactory.class, sourceProperties, classLoader);
return factory.createStreamTableSource(sourceProperties);
} else if (execution.isBatchExecution()) {
final BatchTableSourceFactory<?> factory = (BatchTableSourceFactory<?>)
TableFactoryService.find(BatchTableSourceFactory.class, sourceProperties, classLoader);
return factory.createBatchTableSource(sourceProperties);
}
throw new SqlExecutionException("Unsupported execution type for sources.");
}

private static TableSink<?> createTableSink(Execution execution, Map<String, String> sinkProperties, ClassLoader classLoader) {
if (execution.isStreamingExecution()) {
final StreamTableSinkFactory<?> factory = (StreamTableSinkFactory<?>)
TableFactoryService.find(StreamTableSinkFactory.class, sinkProperties, classLoader);
return factory.createStreamTableSink(sinkProperties);
} else if (execution.isBatchExecution()) {
final BatchTableSinkFactory<?> factory = (BatchTableSinkFactory<?>)
TableFactoryService.find(BatchTableSinkFactory.class, sinkProperties, classLoader);
return factory.createBatchTableSink(sinkProperties);
}
throw new SqlExecutionException("Unsupported execution type for sources.");
}

// --------------------------------------------------------------------------------------------

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.client.gateway.local.DependencyTest;
import org.apache.flink.table.factories.TableFactory;
import org.apache.flink.table.factories.TableSourceFactory;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.SchemaValidator;
import org.apache.flink.table.factories.StreamTableSourceFactory;
import org.apache.flink.table.sources.DefinedProctimeAttribute;
import org.apache.flink.table.sources.DefinedRowtimeAttributes;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
Expand All @@ -51,7 +50,7 @@
/**
* Table source factory for testing the classloading in {@link DependencyTest}.
*/
public class TestTableSourceFactory implements TableSourceFactory<Row>, TableFactory {
public class TestTableSourceFactory implements StreamTableSourceFactory<Row> {

@Override
public Map<String, String> requiredContext() {
Expand All @@ -73,7 +72,7 @@ public List<String> supportedProperties() {
}

@Override
public TableSource<Row> createTableSource(Map<String, String> properties) {
public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
final DescriptorProperties params = new DescriptorProperties(true);
params.putProperties(properties);
final Optional<String> proctime = SchemaValidator.deriveProctimeAttribute(params);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ package org.apache.flink.table.catalog

import org.apache.flink.table.api._
import org.apache.flink.table.descriptors.DescriptorProperties
import org.apache.flink.table.factories.{TableFactoryService, TableSourceFactory}
import org.apache.flink.table.factories.{BatchTableSourceFactory, StreamTableSourceFactory, TableFactoryService}
import org.apache.flink.table.plan.schema.{BatchTableSourceTable, StreamTableSourceTable, TableSourceSinkTable, TableSourceTable}
import org.apache.flink.table.plan.stats.FlinkStatistic
import org.apache.flink.table.sources.{BatchTableSource, StreamTableSource}
import org.apache.flink.table.util.Logging

/**
Expand All @@ -44,32 +43,27 @@ object ExternalTableSourceUtil extends Logging {
val properties = new DescriptorProperties()
externalCatalogTable.addProperties(properties)
val javaMap = properties.asMap
val source = TableFactoryService.find(classOf[TableSourceFactory[_]], javaMap)
.asInstanceOf[TableSourceFactory[_]]
.createTableSource(javaMap)
tableEnv match {
// check for a batch table source in this batch environment
case _: BatchTableEnvironment =>
source match {
case bts: BatchTableSource[_] =>
new TableSourceSinkTable(Some(new BatchTableSourceTable(
bts,
new FlinkStatistic(externalCatalogTable.getTableStats))), None)
case _ => throw new TableException(
s"Found table source '${source.getClass.getCanonicalName}' is not applicable " +
s"in a batch environment.")
}
val source = TableFactoryService
.find(classOf[BatchTableSourceFactory[_]], javaMap)
.createBatchTableSource(javaMap)
val sourceTable = new BatchTableSourceTable(
source,
new FlinkStatistic(externalCatalogTable.getTableStats))
new TableSourceSinkTable(Some(sourceTable), None)

// check for a stream table source in this streaming environment
case _: StreamTableEnvironment =>
source match {
case sts: StreamTableSource[_] =>
new TableSourceSinkTable(Some(new StreamTableSourceTable(
sts,
new FlinkStatistic(externalCatalogTable.getTableStats))), None)
case _ => throw new TableException(
s"Found table source '${source.getClass.getCanonicalName}' is not applicable " +
s"in a streaming environment.")
}
val source = TableFactoryService
.find(classOf[StreamTableSourceFactory[_]], javaMap)
.createStreamTableSource(javaMap)
val sourceTable = new StreamTableSourceTable(
source,
new FlinkStatistic(externalCatalogTable.getTableStats))
new TableSourceSinkTable(Some(sourceTable), None)

case _ => throw new TableException("Unsupported table environment.")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@

package org.apache.flink.table.descriptors

import org.apache.flink.table.api.{BatchTableEnvironment, Table, TableException, ValidationException}
import org.apache.flink.table.factories.{TableFactoryService, TableSourceFactory}
import org.apache.flink.table.sources.{BatchTableSource, TableSource}
import org.apache.flink.table.api.{BatchTableEnvironment, Table, ValidationException}
import org.apache.flink.table.factories.{BatchTableSourceFactory, TableFactoryService}
import org.apache.flink.table.sources.TableSource

class BatchTableSourceDescriptor(tableEnv: BatchTableEnvironment, connector: ConnectorDescriptor)
extends TableSourceDescriptor {
Expand All @@ -46,15 +46,10 @@ class BatchTableSourceDescriptor(tableEnv: BatchTableEnvironment, connector: Con
def toTableSource: TableSource[_] = {
val properties = new DescriptorProperties()
addProperties(properties)
val source = TableFactoryService.find(classOf[TableSourceFactory[_]], this)
.asInstanceOf[TableSourceFactory[_]]
.createTableSource(properties.asMap)
source match {
case _: BatchTableSource[_] => source
case _ => throw new TableException(
s"Found table source '${source.getClass.getCanonicalName}' is not applicable " +
s"in a batch environment.")
}
val javaMap = properties.asMap
TableFactoryService
.find(classOf[BatchTableSourceFactory[_]], javaMap)
.createBatchTableSource(javaMap)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@

package org.apache.flink.table.descriptors

import org.apache.flink.table.api.{StreamTableEnvironment, Table, TableException, ValidationException}
import org.apache.flink.table.factories.{TableFactoryService, TableSourceFactory}
import org.apache.flink.table.sources.{StreamTableSource, TableSource}
import org.apache.flink.table.api.{StreamTableEnvironment, Table, ValidationException}
import org.apache.flink.table.factories.{StreamTableSourceFactory, TableFactoryService}
import org.apache.flink.table.sources.TableSource

/**
* Descriptor for specifying a table source in a streaming environment.
Expand Down Expand Up @@ -49,15 +49,10 @@ class StreamTableSourceDescriptor(tableEnv: StreamTableEnvironment, connector: C
def toTableSource: TableSource[_] = {
val properties = new DescriptorProperties()
addProperties(properties)
val source = TableFactoryService.find(classOf[TableSourceFactory[_]], this)
.asInstanceOf[TableSourceFactory[_]]
.createTableSource(properties.asMap)
source match {
case _: StreamTableSource[_] => source
case _ => throw new TableException(
s"Found table source '${source.getClass.getCanonicalName}' is not applicable " +
s"in a streaming environment.")
}
val javaMap = properties.asMap
TableFactoryService
.find(classOf[StreamTableSourceFactory[_]], javaMap)
.createStreamTableSource(javaMap)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,24 @@

package org.apache.flink.table.factories

import org.apache.flink.table.sources.TableSource

import java.util

trait TableSourceFactory[T] extends TableFactory {
import org.apache.flink.table.sinks.BatchTableSink

/**
* A factory to create configured table sink instances in a streaming environment based on
* string-based properties. See also [[TableFactory]] for more information.
*
* @tparam T type of records that the factory consumes
*/
trait BatchTableSinkFactory[T] extends TableFactory {

/**
* Creates and configures a [[org.apache.flink.table.sources.TableSource]]
* Creates and configures a [[org.apache.flink.table.sinks.BatchTableSink]]
* using the given properties.
*
* @param properties normalized properties describing a table source.
* @return the configured table source.
* @param properties normalized properties describing a table sink.
* @return the configured table sink.
*/
def createTableSource(properties: util.Map[String, String]): TableSource[T]
def createBatchTableSink(properties: util.Map[String, String]): BatchTableSink[T]
}
Loading

0 comments on commit 0e5ac4d

Please sign in to comment.