Skip to content

Commit

Permalink
[FLINK-16991][table-planner-blink] Support DynamicTableSink in blink …
Browse files Browse the repository at this point in the history
…planner

This closes apache#12086
  • Loading branch information
wuchong committed May 13, 2020
1 parent 3c6df77 commit 64c732f
Show file tree
Hide file tree
Showing 37 changed files with 3,488 additions and 176 deletions.
6 changes: 6 additions & 0 deletions docs/_includes/generated/execution_config_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@
<td>String</td>
<td>Sets exec shuffle mode.<br />Accepted values are:<ul><li><span markdown="span">`ALL_EDGES_BLOCKING`</span>: All edges will use blocking shuffle.</li><li><span markdown="span">`FORWARD_EDGES_PIPELINED`</span>: Forward edges will use pipelined shuffle, others blocking.</li><li><span markdown="span">`POINTWISE_EDGES_PIPELINED`</span>: Pointwise edges will use pipelined shuffle, others blocking. Pointwise edges include forward and rescale edges.</li><li><span markdown="span">`ALL_EDGES_PIPELINED`</span>: All edges will use pipelined shuffle.</li><li><span markdown="span">`batch`</span>: the same as <span markdown="span">`ALL_EDGES_BLOCKING`</span>. Deprecated.</li><li><span markdown="span">`pipelined`</span>: the same as <span markdown="span">`ALL_EDGES_PIPELINED`</span>. Deprecated.</li></ul>Note: Blocking shuffle means data will be fully produced before sent to consumer tasks. Pipelined shuffle means data will be sent to consumer tasks once produced.</td>
</tr>
<tr>
<td><h5>table.exec.sink.not-null-enforcer</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">ERROR</td>
<td><p>Enum</p>Possible values: [ERROR, DROP]</td>
<td>The NOT NULL column constraint on a table enforces that null values can't be inserted into the table. Flink supports 'error' (default) and 'drop' enforcement behavior. By default, Flink will check values and throw runtime exception when null values writing into NOT NULL columns. Users can change the behavior to 'drop' to silently drop such records without throwing exception.</td>
</tr>
<tr>
<td><h5>table.exec.sort.async-merge-enabled</h5><br> <span class="label label-primary">Batch</span></td>
<td style="word-wrap: break-word;">true</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,22 @@ public class ExecutionConfigOptions {
"tasks to advance their watermarks without the need to wait for " +
"watermarks from this source while it is idle.");

// ------------------------------------------------------------------------
// Sink Options
// ------------------------------------------------------------------------

@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
public static final ConfigOption<NotNullEnforcer> TABLE_EXEC_SINK_NOT_NULL_ENFORCER =
key("table.exec.sink.not-null-enforcer")
.enumType(NotNullEnforcer.class)
.defaultValue(NotNullEnforcer.ERROR)
.withDescription("The NOT NULL column constraint on a table enforces that " +
"null values can't be inserted into the table. Flink supports " +
"'error' (default) and 'drop' enforcement behavior. By default, " +
"Flink will check values and throw runtime exception when null values writing " +
"into NOT NULL columns. Users can change the behavior to 'drop' to " +
"silently drop such records without throwing exception.");

// ------------------------------------------------------------------------
// Sort Options
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -250,4 +266,21 @@ public class ExecutionConfigOptions {
"Pipelined shuffle means data will be sent to consumer tasks once produced.")
.build());

// ------------------------------------------------------------------------------------------
// Enum option types
// ------------------------------------------------------------------------------------------

/**
* The enforcer to guarantee NOT NULL column constraint when writing data into sink.
*/
public enum NotNullEnforcer {
/**
* Throws runtime exception when writing null values into NOT NULL column.
*/
ERROR,
/**
* Drop records when writing null values into NOT NULL column.
*/
DROP
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.TableSource;

import javax.annotation.Nullable;

import java.util.Optional;

/**
Expand Down Expand Up @@ -91,6 +93,31 @@ public static <T> TableSink<T> findAndCreateTableSink(TableSinkFactory.Context c
}
}

/**
* Creates a {@link TableSink} from a {@link CatalogTable}.
*
* <p>It considers {@link Catalog#getFactory()} if provided.
*/
@SuppressWarnings("unchecked")
public static <T> TableSink<T> findAndCreateTableSink(
@Nullable Catalog catalog,
ObjectIdentifier objectIdentifier,
CatalogTable catalogTable,
ReadableConfig configuration,
boolean isStreamingMode) {
TableSinkFactory.Context context = new TableSinkFactoryContextImpl(
objectIdentifier,
catalogTable,
configuration,
!isStreamingMode);
if (catalog == null) {
return findAndCreateTableSink(context);
} else {
return createTableSinkForCatalogTable(catalog, context)
.orElseGet(() -> findAndCreateTableSink(context));
}
}

/**
* Creates a table sink for a {@link CatalogTable} using table factory associated with the catalog.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;

import java.util.List;
Expand Down Expand Up @@ -77,6 +78,22 @@ public static TableSchema checkNoGeneratedColumns(TableSchema schema) {
return schema;
}

/**
* Returns the field indices of primary key in the physical columns of
* this schema (not include computed columns).
*/
public static int[] getPrimaryKeyIndices(TableSchema schema) {
if (schema.getPrimaryKey().isPresent()) {
RowType physicalRowType = (RowType) schema.toPhysicalRowDataType().getLogicalType();
List<String> fieldNames = physicalRowType.getFieldNames();
return schema.getPrimaryKey().get().getColumns().stream()
.mapToInt(fieldNames::indexOf)
.toArray();
} else {
return new int[0];
}
}

/**
* Creates a builder with given table schema.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ private static FlinkPreparingTableBase convertCatalogTable(
RelDataType rowType,
CatalogTable catalogTable,
CatalogSchemaTable schemaTable) {
if (isLegacyConnectorOptions(catalogTable, schemaTable)) {
if (isLegacySourceOptions(catalogTable, schemaTable)) {
return new LegacyCatalogSourceTable<>(
relOptSchema,
names,
Expand All @@ -211,9 +211,9 @@ private static FlinkPreparingTableBase convertCatalogTable(
}

/**
* Checks whether the {@link CatalogTable} uses legacy connector options.
* Checks whether the {@link CatalogTable} uses legacy connector source options.
*/
private static boolean isLegacyConnectorOptions(
private static boolean isLegacySourceOptions(
CatalogTable catalogTable,
CatalogSchemaTable schemaTable) {
// normalize option keys
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,24 +163,19 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {
val input = snapshot.getInput.accept(this)
snapshot.copy(snapshot.getTraitSet, input, snapshot.getPeriod)

case sink: LogicalLegacySink =>
var newInput = sink.getInput.accept(this)
var needsConversion = false

val projects = newInput.getRowType.getFieldList.map { field =>
if (isProctimeIndicatorType(field.getType)) {
needsConversion = true
rexBuilder.makeCall(FlinkSqlOperatorTable.PROCTIME_MATERIALIZE,
new RexInputRef(field.getIndex, field.getType))
} else {
new RexInputRef(field.getIndex, field.getType)
}
}
case sink: LogicalSink =>
val newInput = convertSinkInput(sink.getInput)
new LogicalSink(
sink.getCluster,
sink.getTraitSet,
newInput,
sink.tableIdentifier,
sink.catalogTable,
sink.tableSink,
sink.staticPartitions)

// add final conversion if necessary
if (needsConversion) {
newInput = LogicalProject.create(newInput, projects, newInput.getRowType.getFieldNames)
}
case sink: LogicalLegacySink =>
val newInput = convertSinkInput(sink.getInput)
new LogicalLegacySink(
sink.getCluster,
sink.getTraitSet,
Expand Down Expand Up @@ -373,6 +368,28 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {
rowType.getFieldList.exists(field => isRowtimeIndicatorType(field.getType))
}

private def convertSinkInput(sinkInput: RelNode): RelNode = {
var newInput = sinkInput.accept(this)
var needsConversion = false

val projects = newInput.getRowType.getFieldList.map { field =>
if (isProctimeIndicatorType(field.getType)) {
needsConversion = true
rexBuilder.makeCall(FlinkSqlOperatorTable.PROCTIME_MATERIALIZE,
new RexInputRef(field.getIndex, field.getType))
} else {
new RexInputRef(field.getIndex, field.getType)
}
}

// add final conversion if necessary
if (needsConversion) {
LogicalProject.create(newInput, projects, newInput.getRowType.getFieldNames)
} else {
newInput
}
}

private def convertAggregate(aggregate: Aggregate): LogicalAggregate = {
// visit children and update inputs
val input = aggregate.getInput.accept(this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,18 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.api.config.ExecutionConfigOptions
import org.apache.flink.table.api.{TableConfig, TableEnvironment, TableException}
import org.apache.flink.table.catalog._
import org.apache.flink.table.connector.sink.DynamicTableSink
import org.apache.flink.table.delegation.{Executor, Parser, Planner}
import org.apache.flink.table.factories.{TableFactoryUtil, TableSinkFactoryContextImpl}
import org.apache.flink.table.descriptors.{ConnectorDescriptorValidator, DescriptorProperties}
import org.apache.flink.table.factories.{FactoryUtil, TableFactoryUtil}
import org.apache.flink.table.operations.OutputConversionModifyOperation.UpdateMode
import org.apache.flink.table.operations._
import org.apache.flink.table.planner.JMap
import org.apache.flink.table.planner.calcite.{CalciteParser, FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory}
import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema
import org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl
import org.apache.flink.table.planner.hint.FlinkHints
import org.apache.flink.table.planner.plan.nodes.calcite.LogicalLegacySink
import org.apache.flink.table.planner.plan.nodes.calcite.{LogicalLegacySink, LogicalSink}
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode
import org.apache.flink.table.planner.plan.nodes.physical.FlinkPhysicalRel
import org.apache.flink.table.planner.plan.optimize.Optimizer
Expand Down Expand Up @@ -188,25 +190,42 @@ abstract class PlannerBase(
val input = getRelBuilder.queryOperation(modifyOperation.getChild).build()
val identifier = catalogSink.getTableIdentifier
val dynamicOptions = catalogSink.getDynamicOptions
getTableSink(identifier, dynamicOptions).map { case (table, sink) =>
// check the logical field type and physical field type are compatible
val queryLogicalType = FlinkTypeFactory.toLogicalRowType(input.getRowType)
// validate logical schema and physical schema are compatible
validateLogicalPhysicalTypesCompatible(table, sink, queryLogicalType)
// validate TableSink
validateTableSink(catalogSink, identifier, sink, table.getPartitionKeys)
// validate query schema and sink schema, and apply cast if possible
val query = validateSchemaAndApplyImplicitCast(
input,
TableSchemaUtils.getPhysicalSchema(table.getSchema),
getTypeFactory,
Some(catalogSink.getTableIdentifier.asSummaryString()))
LogicalLegacySink.create(
query,
sink,
identifier.toString,
table,
catalogSink.getStaticPartitions.toMap)
getTableSink(identifier, dynamicOptions).map {
case (table, sink: TableSink[_]) =>
// check the logical field type and physical field type are compatible
val queryLogicalType = FlinkTypeFactory.toLogicalRowType(input.getRowType)
// validate logical schema and physical schema are compatible
validateLogicalPhysicalTypesCompatible(table, sink, queryLogicalType)
// validate TableSink
validateTableSink(catalogSink, identifier, sink, table.getPartitionKeys)
// validate query schema and sink schema, and apply cast if possible
val query = validateSchemaAndApplyImplicitCast(
input,
TableSchemaUtils.getPhysicalSchema(table.getSchema),
getTypeFactory,
Some(catalogSink.getTableIdentifier.asSummaryString()))
LogicalLegacySink.create(
query,
sink,
identifier.toString,
table,
catalogSink.getStaticPartitions.toMap)

case (table, sink: DynamicTableSink) =>
// validate TableSink
validateTableSink(catalogSink, identifier, sink, table.getPartitionKeys)
// validate query schema and sink schema, and apply cast if possible
val query = validateSchemaAndApplyImplicitCast(
input,
TableSchemaUtils.getPhysicalSchema(table.getSchema),
getTypeFactory,
Some(catalogSink.getTableIdentifier.asSummaryString()))
LogicalSink.create(
query,
identifier,
table,
sink,
catalogSink.getStaticPartitions.toMap)
} match {
case Some(sinkRel) => sinkRel
case None =>
Expand Down Expand Up @@ -286,38 +305,72 @@ abstract class PlannerBase(
private def getTableSink(
objectIdentifier: ObjectIdentifier,
dynamicOptions: JMap[String, String])
: Option[(CatalogTable, TableSink[_])] = {
: Option[(CatalogTable, Any)] = {
JavaScalaConversionUtil.toScala(catalogManager.getTable(objectIdentifier))
.map(_.getTable) match {
case Some(s) if s.isInstanceOf[ConnectorCatalogTable[_, _]] =>
val table = s.asInstanceOf[ConnectorCatalogTable[_, _]]
case Some(table: ConnectorCatalogTable[_, _]) =>
JavaScalaConversionUtil.toScala(table.getTableSink) match {
case Some(sink) => Some(table, sink)
case None => None
}

case Some(s) if s.isInstanceOf[CatalogTable] =>
case Some(table: CatalogTable) =>
val catalog = catalogManager.getCatalog(objectIdentifier.getCatalogName)
val table = s.asInstanceOf[CatalogTable]
val tableToFind = if (dynamicOptions.nonEmpty) {
table.copy(FlinkHints.mergeTableOptions(dynamicOptions, table.getProperties))
} else {
table
}
val context = new TableSinkFactoryContextImpl(
objectIdentifier,
tableToFind,
getTableConfig.getConfiguration,
!isStreamingMode)
if (catalog.isPresent && catalog.get().getTableFactory.isPresent) {
val sink = TableFactoryUtil.createTableSinkForCatalogTable(catalog.get(), context)
if (sink.isPresent) {
return Option(table, sink.get())
}
if (isLegacyConnectorOptions(objectIdentifier, table)) {
val tableSink = TableFactoryUtil.findAndCreateTableSink(
catalog.orElse(null),
objectIdentifier,
tableToFind,
getTableConfig.getConfiguration,
isStreamingMode)
Option(table, tableSink)
} else {
val tableSink = FactoryUtil.createTableSink(
catalog.orElse(null),
objectIdentifier,
tableToFind,
getTableConfig.getConfiguration,
Thread.currentThread().getContextClassLoader)
Option(table, tableSink)
}
Option(table, TableFactoryUtil.findAndCreateTableSink(context))

case _ => None
}
}

/**
* Checks whether the [[CatalogTable]] uses legacy connector sink options.
*/
private def isLegacyConnectorOptions(
objectIdentifier: ObjectIdentifier,
catalogTable: CatalogTable) = {
// normalize option keys
val properties = new DescriptorProperties(true)
properties.putProperties(catalogTable.getOptions)
if (properties.containsKey(ConnectorDescriptorValidator.CONNECTOR_TYPE)) {
true
} else {
val catalog = catalogManager.getCatalog(objectIdentifier.getCatalogName)
try {
// try to create legacy table source using the options,
// some legacy factories uses the new 'connector' key
TableFactoryUtil.findAndCreateTableSink(
catalog.orElse(null),
objectIdentifier,
catalogTable,
getTableConfig.getConfiguration,
isStreamingMode)
// success, then we will use the legacy factories
true
} catch {
// fail, then we will use new factories
case _: Throwable => false
}
}
}
}
Loading

0 comments on commit 64c732f

Please sign in to comment.