Skip to content

Commit

Permalink
[FLINK-15912][table-planner-blink] Support create table source/sink b…
Browse files Browse the repository at this point in the history
…y context in blink planner
  • Loading branch information
JingsongLi authored and wuchong committed Feb 22, 2020
1 parent 306a89a commit 69d8816
Show file tree
Hide file tree
Showing 8 changed files with 245 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,34 @@
*/
public class TableFactoryUtil {

/**
* Returns a table source matching the descriptor.
*/
@SuppressWarnings("unchecked")
public static <T> TableSource<T> findAndCreateTableSource(TableSourceFactory.Context context) {
try {
return TableFactoryService
.find(TableSourceFactory.class, context.getTable().toProperties())
.createTableSource(context);
} catch (Throwable t) {
throw new TableException("findAndCreateTableSource failed.", t);
}
}

/**
* Returns a table sink matching the context.
*/
@SuppressWarnings("unchecked")
public static <T> TableSink<T> findAndCreateTableSink(TableSinkFactory.Context context) {
try {
return TableFactoryService
.find(TableSinkFactory.class, context.getTable().toProperties())
.createTableSink(context);
} catch (Throwable t) {
throw new TableException("findAndCreateTableSink failed.", t);
}
}

/**
* Returns a table source matching the properties.
*/
Expand Down Expand Up @@ -86,4 +114,15 @@ public static Optional<TableSink> createTableSinkForCatalogTable(Catalog catalog
return Optional.empty();
}

/**
* Creates a table sink for a {@link CatalogTable} using table factory associated with the catalog.
*/
public static Optional<TableSink> createTableSinkForCatalogTable(Catalog catalog, TableSinkFactory.Context context) {
TableFactory tableFactory = catalog.getTableFactory().orElse(null);
if (tableFactory instanceof TableSinkFactory) {
return Optional.ofNullable(((TableSinkFactory) tableFactory).createTableSink(context));
}
return Optional.empty();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.flink.table.api.config.ExecutionConfigOptions
import org.apache.flink.table.api.{TableConfig, TableEnvironment, TableException}
import org.apache.flink.table.catalog._
import org.apache.flink.table.delegation.{Executor, Parser, Planner}
import org.apache.flink.table.factories.{TableFactoryService, TableFactoryUtil, TableSinkFactory}
import org.apache.flink.table.factories.{TableFactoryUtil, TableSinkFactoryContextImpl}
import org.apache.flink.table.operations.OutputConversionModifyOperation.UpdateMode
import org.apache.flink.table.operations._
import org.apache.flink.table.planner.calcite.{CalciteParser, FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory}
Expand All @@ -40,7 +40,7 @@ import org.apache.flink.table.planner.plan.utils.SameRelObjectShuttle
import org.apache.flink.table.planner.sinks.DataStreamTableSink
import org.apache.flink.table.planner.sinks.TableSinkUtils.{inferSinkPhysicalSchema, validateLogicalPhysicalTypesCompatible, validateSchemaAndApplyImplicitCast, validateTableSink}
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil
import org.apache.flink.table.sinks.{OverwritableTableSink, TableSink}
import org.apache.flink.table.sinks.TableSink
import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter
import org.apache.flink.table.utils.TableSchemaUtils

Expand Down Expand Up @@ -296,19 +296,15 @@ abstract class PlannerBase(
case Some(s) if s.isInstanceOf[CatalogTable] =>
val catalog = catalogManager.getCatalog(objectIdentifier.getCatalogName)
val table = s.asInstanceOf[CatalogTable]
val context = new TableSinkFactoryContextImpl(
objectIdentifier, table, getTableConfig.getConfiguration)
if (catalog.isPresent && catalog.get().getTableFactory.isPresent) {
val objectPath = objectIdentifier.toObjectPath
val sink = TableFactoryUtil.createTableSinkForCatalogTable(
catalog.get(),
table,
objectPath)
val sink = TableFactoryUtil.createTableSinkForCatalogTable(catalog.get(), context)
if (sink.isPresent) {
return Option(table, sink.get())
}
}
val sinkProperties = table.toProperties
Option(table, TableFactoryService.find(classOf[TableSinkFactory[_]], sinkProperties)
.createTableSink(sinkProperties))
Option(table, TableFactoryUtil.findAndCreateTableSink(context))

case _ => None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@

package org.apache.flink.table.planner.plan.schema

import org.apache.flink.configuration.ReadableConfig
import org.apache.flink.table.api.TableException
import org.apache.flink.table.catalog.CatalogTable
import org.apache.flink.table.factories.{TableFactoryUtil, TableSourceFactory}
import org.apache.flink.table.factories.{TableFactoryUtil, TableSourceFactory, TableSourceFactoryContextImpl}
import org.apache.flink.table.planner.calcite.{FlinkContext, FlinkRelBuilder}
import org.apache.flink.table.planner.catalog.CatalogSchemaTable
import org.apache.flink.table.sources.{StreamTableSource, TableSource, TableSourceValidation}
import org.apache.flink.table.utils.TableConnectorUtils.generateRuntimeName

import org.apache.calcite.plan.{RelOptSchema, RelOptTable}
import org.apache.calcite.rel.RelNode
Expand Down Expand Up @@ -62,12 +64,23 @@ class CatalogSourceTable[T](
.toMap
}

lazy val tableSource: TableSource[T] = findAndCreateTableSource().asInstanceOf[TableSource[T]]

override def getQualifiedName: JList[String] = explainSourceAsString(tableSource)
override def getQualifiedName: JList[String] = {
// Do not explain source, we already have full names, table source should be created in toRel.
val ret = new util.ArrayList[String](names)
// Add class name to distinguish TableSourceTable.
val name = generateRuntimeName(getClass, catalogTable.getSchema.getFieldNames)
ret.add(s"catalog_source: [$name]")
ret
}

override def toRel(context: RelOptTable.ToRelContext): RelNode = {
val cluster = context.getCluster
val flinkContext = cluster
.getPlanner
.getContext
.unwrap(classOf[FlinkContext])

val tableSource = findAndCreateTableSource(flinkContext.getTableConfig.getConfiguration)
val tableSourceTable = new TableSourceTable[T](
relOptSchema,
schemaTable.getTableIdentifier,
Expand All @@ -91,11 +104,7 @@ class CatalogSourceTable[T](
val relBuilder = FlinkRelBuilder.of(cluster, getRelOptSchema)
relBuilder.push(scan)

val toRexFactory = cluster
.getPlanner
.getContext
.unwrap(classOf[FlinkContext])
.getSqlExprToRexConverterFactory
val toRexFactory = flinkContext.getSqlExprToRexConverterFactory

// 2. push computed column project
val fieldNames = rowType.getFieldNames.asScala
Expand Down Expand Up @@ -140,32 +149,25 @@ class CatalogSourceTable[T](
relBuilder.build()
}

/** Create the table source lazily. */
private def findAndCreateTableSource(): TableSource[_] = {
/** Create the table source. */
private def findAndCreateTableSource(conf: ReadableConfig): TableSource[T] = {
val tableFactoryOpt = schemaTable.getTableFactory
val context = new TableSourceFactoryContextImpl(
schemaTable.getTableIdentifier, catalogTable, conf)
val tableSource = if (tableFactoryOpt.isPresent) {
tableFactoryOpt.get() match {
case tableSourceFactory: TableSourceFactory[_] =>
tableSourceFactory.createTableSource(
schemaTable.getTableIdentifier.toObjectPath,
catalogTable)
tableSourceFactory.createTableSource(context)
case _ => throw new TableException("Cannot query a sink-only table. "
+ "TableFactory provided by catalog must implement TableSourceFactory")
}
} else {
TableFactoryUtil.findAndCreateTableSource(catalogTable)
TableFactoryUtil.findAndCreateTableSource(context)
}
if (!tableSource.isInstanceOf[StreamTableSource[_]]) {
throw new TableException("Catalog tables support only "
+ "StreamTableSource and InputFormatTableSource")
}
tableSource
}

override protected def explainSourceAsString(ts: TableSource[_]): JList[String] = {
val ret = new util.ArrayList[String](super.explainSourceAsString(ts))
// Add class name to distinguish TableSourceTable.
ret.add("class: " + classOf[CatalogSourceTable[_]].getSimpleName)
ret
tableSource.asInstanceOf[TableSource[T]]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ Calc(select=[a, b, +(a, 1) AS c, TO_TIMESTAMP(b) AS d, my_udf(a) AS e])
<TestCase name="testTableApiScanWithDDL">
<Resource name="planBefore">
<![CDATA[
LogicalTableScan(table=[[default_catalog, default_database, t1, source: [CollectionTableSource(a, b)], class: CatalogSourceTable]])
LogicalTableScan(table=[[default_catalog, default_database, t1, catalog_source: [CatalogSourceTable(a, b)]]])
]]>
</Resource>
<Resource name="planAfter">
Expand All @@ -113,7 +113,7 @@ TableSourceScan(table=[[default_catalog, default_database, t1, source: [Collecti
<TestCase name="testTableApiScanWithTemporaryTable">
<Resource name="planBefore">
<![CDATA[
LogicalTableScan(table=[[default_catalog, default_database, t1, source: [CsvTableSource(read fields: word)], class: CatalogSourceTable]])
LogicalTableScan(table=[[default_catalog, default_database, t1, catalog_source: [CatalogSourceTable(word)]]])
]]>
</Resource>
<Resource name="planAfter">
Expand All @@ -126,7 +126,7 @@ TableSourceScan(table=[[default_catalog, default_database, t1, source: [CsvTable
<TestCase name="testTableApiScanWithWatermark">
<Resource name="planBefore">
<![CDATA[
LogicalTableScan(table=[[default_catalog, default_database, c_watermark_t, source: [CollectionTableSource(a, b)], class: CatalogSourceTable]])
LogicalTableScan(table=[[default_catalog, default_database, c_watermark_t, catalog_source: [CatalogSourceTable(a, b, c, d, e)]]])
]]>
</Resource>
<Resource name="planAfter">
Expand All @@ -140,7 +140,7 @@ Calc(select=[a, b, +(a, 1) AS c, TO_TIMESTAMP(b) AS d, my_udf(a) AS e])
<TestCase name="testTableApiScanWithComputedColumn">
<Resource name="planBefore">
<![CDATA[
LogicalTableScan(table=[[default_catalog, default_database, computed_column_t, source: [CollectionTableSource(a, b)], class: CatalogSourceTable]])
LogicalTableScan(table=[[default_catalog, default_database, computed_column_t, catalog_source: [CatalogSourceTable(a, b, c, d, e)]]])
]]>
</Resource>
<Resource name="planAfter">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,18 @@ import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.api.java.io.{CollectionInputFormat, LocalCollectionOutputFormat}
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink, DataStreamSource}
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.table.api.TableSchema
import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR
import org.apache.flink.table.descriptors.{DescriptorProperties, Schema}
import org.apache.flink.table.factories.{BatchTableSinkFactory, BatchTableSourceFactory, StreamTableSinkFactory, StreamTableSourceFactory}
import org.apache.flink.table.factories.{TableSinkFactory, TableSourceFactory}
import org.apache.flink.table.functions.{AsyncTableFunction, TableFunction}
import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory.{getCollectionSink, getCollectionSource}
import org.apache.flink.table.sinks.{AppendStreamTableSink, BatchTableSink, StreamTableSink, TableSink}
import org.apache.flink.table.sources.{BatchTableSource, LookupableTableSource, StreamTableSource, TableSource}
import org.apache.flink.table.sources.{BatchTableSource, LookupableTableSource, StreamTableSource}
import org.apache.flink.table.types.DataType
import org.apache.flink.types.Row

Expand All @@ -45,35 +43,16 @@ import java.util.{ArrayList => JArrayList, LinkedList => JLinkedList, List => JL

import scala.collection.JavaConversions._

class TestCollectionTableFactory
extends StreamTableSourceFactory[Row]
with StreamTableSinkFactory[Row]
with BatchTableSourceFactory[Row]
with BatchTableSinkFactory[Row]
{
class TestCollectionTableFactory extends TableSourceFactory[Row] with TableSinkFactory[Row] {

override def createTableSource(properties: JMap[String, String]): TableSource[Row] = {
getCollectionSource(properties)
override def createTableSource(
context: TableSourceFactory.Context): StreamTableSource[Row] = {
getCollectionSource(context)
}

override def createTableSink(properties: JMap[String, String]): TableSink[Row] = {
getCollectionSink(properties)
}

override def createStreamTableSource(properties: JMap[String, String]): StreamTableSource[Row] = {
getCollectionSource(properties)
}

override def createStreamTableSink(properties: JMap[String, String]): StreamTableSink[Row] = {
getCollectionSink(properties)
}

override def createBatchTableSource(properties: JMap[String, String]): BatchTableSource[Row] = {
getCollectionSource(properties)
}

override def createBatchTableSink(properties: JMap[String, String]): BatchTableSink[Row] = {
getCollectionSink(properties)
override def createTableSink(
context: TableSinkFactory.Context): StreamTableSink[Row] = {
getCollectionSink(context)
}

override def requiredContext(): JMap[String, String] = {
Expand Down Expand Up @@ -118,18 +97,14 @@ object TestCollectionTableFactory {

def getResult: util.List[Row] = RESULT

def getCollectionSource(props: JMap[String, String]): CollectionTableSource = {
val properties = new DescriptorProperties()
properties.putProperties(props)
val schema = properties.getTableSchema(Schema.SCHEMA)
val isBounded = properties.getOptionalBoolean(IS_BOUNDED).orElse(true)
def getCollectionSource(context: TableSourceFactory.Context): CollectionTableSource = {
val schema = context.getTable.getSchema
val isBounded = context.getTable.getProperties.getOrDefault(IS_BOUNDED, "true").toBoolean
new CollectionTableSource(emitIntervalMS, physicalSchema(schema), isBounded)
}

def getCollectionSink(props: JMap[String, String]): CollectionTableSink = {
val properties = new DescriptorProperties()
properties.putProperties(props)
val schema = properties.getTableSchema(Schema.SCHEMA)
def getCollectionSink(context: TableSinkFactory.Context): CollectionTableSink = {
val schema = context.getTable.getSchema
new CollectionTableSink(physicalSchema(schema))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,11 @@ package org.apache.flink.table.planner.plan.batch.sql

import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{DataTypes, TableSchema}
import org.apache.flink.table.catalog.{CatalogTableImpl, GenericInMemoryCatalog, ObjectPath}
import org.apache.flink.table.factories.TableSinkFactory
import org.apache.flink.table.planner.plan.optimize.RelNodeBlockPlanBuilder
import org.apache.flink.table.planner.utils.TableTestBase
import org.apache.flink.table.sinks.TableSink
import org.apache.flink.table.types.logical.{BigIntType, IntType}

import org.junit.Test
import org.mockito.{ArgumentMatchers, Mockito}

import java.util.Optional

import scala.collection.JavaConverters._

class SinkTest extends TableTestBase {

Expand Down Expand Up @@ -68,26 +59,4 @@ class SinkTest extends TableTestBase {

util.verifyPlan()
}

@Test
def testCatalogTableSink(): Unit = {
val schemaBuilder = new TableSchema.Builder()
schemaBuilder.fields(Array("i"), Array(DataTypes.INT()))
val schema = schemaBuilder.build()
val sink = util.createCollectTableSink(schema.getFieldNames, Array(INT))
val catalog = Mockito.spy(new GenericInMemoryCatalog("dummy"))
val factory = Mockito.mock(classOf[TableSinkFactory[_]])
Mockito.when[Optional[_]](catalog.getTableFactory).thenReturn(Optional.of(factory))
Mockito.when[TableSink[_]](factory.createTableSink(
ArgumentMatchers.any(), ArgumentMatchers.any())).thenReturn(sink)
util.tableEnv.registerCatalog(catalog.getName, catalog)
util.tableEnv.useCatalog(catalog.getName)
val catalogTable = new CatalogTableImpl(schema, Map[String, String]().asJava, "")
catalog.createTable(new ObjectPath("default", "tbl"), catalogTable, false)
util.tableEnv.sqlQuery("select 1").insertInto("tbl")
util.tableEnv.explain(false)
// verify we tried to get table factory from catalog
Mockito.verify(catalog, Mockito.atLeast(1)).getTableFactory
}

}
Loading

0 comments on commit 69d8816

Please sign in to comment.