From 3ac83641c52dfa4fa67366c62335bcb7ee8a6624 Mon Sep 17 00:00:00 2001 From: SteNicholas Date: Wed, 24 Feb 2021 10:45:54 +0800 Subject: [PATCH] [FLINK-21005][table] Introduce new runtime provider for unified Sink API and implement in planner This closes #14822 --- .../sink/DataStreamSinkProvider.java | 2 +- .../table/connector/sink/SinkProvider.java | 55 +++++ .../nodes/exec/common/CommonExecSink.java | 229 ++++++++++++------ ...ourceFactory.java => TestFileFactory.java} | 79 +++++- ...ultipleInputNodeCreationProcessorTest.java | 2 +- .../org.apache.flink.table.factories.Factory | 2 +- .../runtime/batch/sql/TableSourceITCase.scala | 5 +- .../stream/table/TableSinkITCase.scala | 94 +++++++ .../operators/sink/SinkNotNullEnforcer.java | 11 +- .../runtime/operators/sink/SinkOperator.java | 12 +- 10 files changed, 390 insertions(+), 101 deletions(-) create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/SinkProvider.java rename flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/{TestFileSourceFactory.java => TestFileFactory.java} (69%) diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/sink/DataStreamSinkProvider.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/sink/DataStreamSinkProvider.java index a517462b72c90..8aab8c5de3f4f 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/sink/DataStreamSinkProvider.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/sink/DataStreamSinkProvider.java @@ -29,7 +29,7 @@ * *

Note: This provider is only meant for advanced connector developers. Usually, a sink should * consist of a single entity expressed via {@link OutputFormatProvider} or {@link - * SinkFunctionProvider}. + * SinkFunctionProvider}, or {@link SinkProvider}. */ @PublicEvolving public interface DataStreamSinkProvider extends DynamicTableSink.SinkRuntimeProvider { diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/SinkProvider.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/SinkProvider.java new file mode 100644 index 0000000000000..82914ab65f8c9 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/SinkProvider.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.connector.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.table.connector.ParallelismProvider; +import org.apache.flink.table.data.RowData; + +import java.util.Optional; + +/** Provider of a {@link Sink} instance as a runtime implementation for {@link DynamicTableSink}. */ +@PublicEvolving +public interface SinkProvider extends DynamicTableSink.SinkRuntimeProvider, ParallelismProvider { + + /** Helper method for creating a static provider. */ + static SinkProvider of(Sink sink) { + return () -> sink; + } + + /** Helper method for creating a Sink provider with a provided sink parallelism. */ + static SinkProvider of(Sink sink, Integer sinkParallelism) { + return new SinkProvider() { + + @Override + public Sink createSink() { + return sink; + } + + @Override + public Optional getParallelism() { + return Optional.ofNullable(sinkParallelism); + } + }; + } + + /** Creates a {@link Sink} instance. */ + Sink createSink(); +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java index 4447297f072bc..0f30496898972 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java @@ -27,8 +27,11 @@ import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamFilter; import org.apache.flink.streaming.api.transformations.LegacySinkTransformation; +import org.apache.flink.streaming.api.transformations.OneInputTransformation; import org.apache.flink.streaming.api.transformations.PartitionTransformation; +import org.apache.flink.streaming.api.transformations.SinkTransformation; import org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner; import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableException; @@ -40,6 +43,7 @@ import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.sink.OutputFormatProvider; import org.apache.flink.table.connector.sink.SinkFunctionProvider; +import org.apache.flink.table.connector.sink.SinkProvider; import org.apache.flink.table.data.RowData; import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase; @@ -57,16 +61,18 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.utils.TableSchemaUtils; import org.apache.flink.types.RowKind; -import org.apache.flink.util.Preconditions; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import java.util.Arrays; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; +import static org.apache.flink.util.Preconditions.checkArgument; + /** * Base {@link ExecNode} to write data to an external sink defined by a {@link DynamicTableSink}. */ @@ -103,21 +109,8 @@ protected Transformation createSinkTransformation( final DynamicTableSink tableSink = tableSinkSpec.getTableSink(); final DynamicTableSink.SinkRuntimeProvider runtimeProvider = tableSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(isBounded)); - - final ExecutionConfigOptions.NotNullEnforcer notNullEnforcer = - tableConfig - .getConfiguration() - .get(ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER); final TableSchema tableSchema = tableSinkSpec.getCatalogTable().getSchema(); - final int[] notNullFieldIndices = TableSinkUtils.getNotNullFieldIndices(tableSchema); - final String[] fieldNames = - ((RowType) tableSchema.toPhysicalRowDataType().getLogicalType()) - .getFieldNames() - .toArray(new String[0]); - final SinkNotNullEnforcer enforcer = - new SinkNotNullEnforcer(notNullEnforcer, notNullFieldIndices, fieldNames); - final InternalTypeInfo inputTypeInfo = - InternalTypeInfo.of(getInputEdges().get(0).getOutputType()); + inputTransform = applyNotNullEnforcer(tableConfig, tableSchema, inputTransform); if (runtimeProvider instanceof DataStreamSinkProvider) { if (runtimeProvider instanceof ParallelismProvider) { @@ -125,87 +118,173 @@ protected Transformation createSinkTransformation( "`DataStreamSinkProvider` is not allowed to work with" + " `ParallelismProvider`, " + "please see document of `ParallelismProvider`"); - } else { - final DataStream dataStream = - new DataStream<>(env, inputTransform).filter(enforcer); - final DataStreamSinkProvider provider = (DataStreamSinkProvider) runtimeProvider; - return provider.consumeDataStream(dataStream).getTransformation(); } + + final DataStream dataStream = new DataStream<>(env, inputTransform); + final DataStreamSinkProvider provider = (DataStreamSinkProvider) runtimeProvider; + return provider.consumeDataStream(dataStream).getTransformation(); } else { - Preconditions.checkArgument( + checkArgument( runtimeProvider instanceof ParallelismProvider, - "runtimeProvider with `ParallelismProvider` implementation is required"); + "%s should implement ParallelismProvider interface.", + runtimeProvider.getClass().getName()); + final int inputParallelism = inputTransform.getParallelism(); + final int sinkParallelism = + deriveSinkParallelism((ParallelismProvider) runtimeProvider, inputParallelism); + + // apply keyBy partition transformation if needed + inputTransform = + applyKeyByForDifferentParallelism( + tableSchema, inputTransform, inputParallelism, sinkParallelism); final SinkFunction sinkFunction; if (runtimeProvider instanceof SinkFunctionProvider) { sinkFunction = ((SinkFunctionProvider) runtimeProvider).createSinkFunction(); + return createSinkFunctionTransformation( + sinkFunction, env, inputTransform, rowtimeFieldIndex, sinkParallelism); + } else if (runtimeProvider instanceof OutputFormatProvider) { OutputFormat outputFormat = ((OutputFormatProvider) runtimeProvider).createOutputFormat(); sinkFunction = new OutputFormatSinkFunction<>(outputFormat); + return createSinkFunctionTransformation( + sinkFunction, env, inputTransform, rowtimeFieldIndex, sinkParallelism); + + } else if (runtimeProvider instanceof SinkProvider) { + return new SinkTransformation<>( + inputTransform, + ((SinkProvider) runtimeProvider).createSink(), + getDescription(), + sinkParallelism); + } else { throw new TableException("This should not happen."); } + } + } - if (sinkFunction instanceof InputTypeConfigurable) { - ((InputTypeConfigurable) sinkFunction).setInputType(inputTypeInfo, env.getConfig()); - } - - final SinkOperator operator = - new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer); + /** + * Apply an operator to filter or report error to process not-null values for not-null fields. + */ + private Transformation applyNotNullEnforcer( + TableConfig config, TableSchema tableSchema, Transformation inputTransform) { + final ExecutionConfigOptions.NotNullEnforcer notNullEnforcer = + config.getConfiguration() + .get(ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER); + final int[] notNullFieldIndices = TableSinkUtils.getNotNullFieldIndices(tableSchema); + final String[] fieldNames = + ((RowType) tableSchema.toPhysicalRowDataType().getLogicalType()) + .getFieldNames() + .toArray(new String[0]); - final int inputParallelism = inputTransform.getParallelism(); - final int parallelism; - final Optional parallelismOptional = - ((ParallelismProvider) runtimeProvider).getParallelism(); - if (parallelismOptional.isPresent()) { - parallelism = parallelismOptional.get(); - if (parallelism <= 0) { - throw new TableException( - String.format( - "Table: %s configured sink parallelism: " - + "%s should not be less than zero or equal to zero", - tableSinkSpec.getObjectIdentifier().asSummaryString(), - parallelism)); - } - } else { - parallelism = inputParallelism; - } + if (notNullFieldIndices.length > 0) { + final SinkNotNullEnforcer enforcer = + new SinkNotNullEnforcer(notNullEnforcer, notNullFieldIndices, fieldNames); + final List notNullFieldNames = + Arrays.stream(notNullFieldIndices) + .mapToObj(idx -> fieldNames[idx]) + .collect(Collectors.toList()); + final String operatorName = + String.format( + "NotNullEnforcer(fields=[%s])", String.join(", ", notNullFieldNames)); + return new OneInputTransformation<>( + inputTransform, + operatorName, + new StreamFilter<>(enforcer), + getInputTypeInfo(), + inputTransform.getParallelism()); + } else { + // there are no not-null fields, just skip adding the enforcer operator + return inputTransform; + } + } - final int[] primaryKeys = TableSchemaUtils.getPrimaryKeyIndices(tableSchema); - final Transformation finalInputTransform; - if (inputParallelism == parallelism || changelogMode.containsOnly(RowKind.INSERT)) { - // if the inputParallelism is equals to the parallelism or insert-only mode, do - // nothing. - finalInputTransform = inputTransform; - } else if (primaryKeys.length == 0) { + /** + * Returns the parallelism of sink operator, it assumes the sink runtime provider implements + * {@link ParallelismProvider}. It returns parallelism defined in {@link ParallelismProvider} if + * the parallelism is provided, otherwise it uses parallelism of input transformation. + */ + private int deriveSinkParallelism( + ParallelismProvider parallelismProvider, int inputParallelism) { + final Optional parallelismOptional = parallelismProvider.getParallelism(); + if (parallelismOptional.isPresent()) { + int sinkParallelism = parallelismOptional.get(); + if (sinkParallelism <= 0) { throw new TableException( String.format( - "Table: %s configured sink parallelism is: %s, while the input parallelism is: " - + "%s. Since configured parallelism is different from input parallelism and the changelog mode " - + "contains [%s], which is not INSERT_ONLY mode, primary key is required but no primary key is found", + "Table: %s configured sink parallelism: " + + "%s should not be less than zero or equal to zero", tableSinkSpec.getObjectIdentifier().asSummaryString(), - parallelism, - inputParallelism, - changelogMode.getContainedKinds().stream() - .map(Enum::toString) - .collect(Collectors.joining(",")))); - } else { - // key by before sink - final RowDataKeySelector selector = - KeySelectorUtil.getRowDataSelector(primaryKeys, inputTypeInfo); - final KeyGroupStreamPartitioner partitioner = - new KeyGroupStreamPartitioner<>( - selector, - KeyGroupRangeAssignment.DEFAULT_LOWER_BOUND_MAX_PARALLELISM); - finalInputTransform = new PartitionTransformation<>(inputTransform, partitioner); - finalInputTransform.setParallelism(parallelism); + sinkParallelism)); } - return new LegacySinkTransformation<>( - finalInputTransform, - getDescription(), - SimpleOperatorFactory.of(operator), - parallelism); + return sinkParallelism; + } else { + // use input parallelism if not specified + return inputParallelism; + } + } + + /** + * Apply a keyBy partition transformation if the parallelism of sink operator and input operator + * is different and sink changelog-mode is not insert-only. This is used to guarantee the strict + * ordering of changelog messages. + */ + private Transformation applyKeyByForDifferentParallelism( + TableSchema tableSchema, + Transformation inputTransform, + int inputParallelism, + int sinkParallelism) { + final int[] primaryKeys = TableSchemaUtils.getPrimaryKeyIndices(tableSchema); + if (inputParallelism == sinkParallelism || changelogMode.containsOnly(RowKind.INSERT)) { + // if the inputParallelism is equals to the parallelism or insert-only mode, do nothing. + return inputTransform; + } else if (primaryKeys.length == 0) { + throw new TableException( + String.format( + "Table: %s configured sink parallelism is: %s, while the input parallelism is: " + + "%s. Since configured parallelism is different from input parallelism and the changelog mode " + + "contains [%s], which is not INSERT_ONLY mode, primary key is required but no primary key is found", + tableSinkSpec.getObjectIdentifier().asSummaryString(), + sinkParallelism, + inputParallelism, + changelogMode.getContainedKinds().stream() + .map(Enum::toString) + .collect(Collectors.joining(",")))); + } else { + // keyBy before sink + final RowDataKeySelector selector = + KeySelectorUtil.getRowDataSelector(primaryKeys, getInputTypeInfo()); + final KeyGroupStreamPartitioner partitioner = + new KeyGroupStreamPartitioner<>( + selector, KeyGroupRangeAssignment.DEFAULT_LOWER_BOUND_MAX_PARALLELISM); + Transformation partitionedTransform = + new PartitionTransformation<>(inputTransform, partitioner); + partitionedTransform.setParallelism(sinkParallelism); + return partitionedTransform; + } + } + + private Transformation createSinkFunctionTransformation( + SinkFunction sinkFunction, + StreamExecutionEnvironment env, + Transformation inputTransformation, + int rowtimeFieldIndex, + int sinkParallelism) { + final SinkOperator operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex); + + if (sinkFunction instanceof InputTypeConfigurable) { + ((InputTypeConfigurable) sinkFunction) + .setInputType(getInputTypeInfo(), env.getConfig()); } + + return new LegacySinkTransformation<>( + inputTransformation, + getDescription(), + SimpleOperatorFactory.of(operator), + sinkParallelism); + } + + private InternalTypeInfo getInputTypeInfo() { + return InternalTypeInfo.of(getInputEdges().get(0).getOutputType()); } } diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestFileSourceFactory.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestFileFactory.java similarity index 69% rename from flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestFileSourceFactory.java rename to flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestFileFactory.java index 00796a5d0e2e5..7bc7727ec262b 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestFileSourceFactory.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestFileFactory.java @@ -19,10 +19,12 @@ package org.apache.flink.table.planner.factories; import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.serialization.Encoder; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.file.sink.FileSink; import org.apache.flink.connector.file.src.FileSource; import org.apache.flink.connector.file.src.reader.SimpleStreamFormat; import org.apache.flink.core.fs.FSDataInputStream; @@ -31,6 +33,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.SinkProvider; import org.apache.flink.table.connector.source.DataStreamScanProvider; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.ScanTableSource; @@ -38,6 +42,7 @@ import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; +import org.apache.flink.table.factories.DynamicTableSinkFactory; import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.filesystem.FileSystemOptions; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; @@ -45,13 +50,23 @@ import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; +import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.HashSet; import java.util.Set; -/** Test file source {@link DynamicTableSourceFactory}. */ -public class TestFileSourceFactory implements DynamicTableSourceFactory { +/** + * Test implementation of {@link DynamicTableSourceFactory} and {@link DynamicTableSinkFactory} that + * creates a file source and sink based on {@link SourceProvider} and {@link SinkProvider}. + */ +public class TestFileFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory { + + // -------------------------------------------------------------------------------------------- + // Factory + // -------------------------------------------------------------------------------------------- + + private static final String IDENTIFIER = "test-file"; private static final ConfigOption RUNTIME_SOURCE = ConfigOptions.key("runtime-source") @@ -65,9 +80,15 @@ public DynamicTableSource createDynamicTableSource(Context context) { new Path(conf.getString(FileSystemOptions.PATH)), conf.getString(RUNTIME_SOURCE)); } + @Override + public DynamicTableSink createDynamicTableSink(Context context) { + Configuration conf = Configuration.fromMap(context.getCatalogTable().getOptions()); + return new TestFileTableSink(new Path(conf.getString(FileSystemOptions.PATH))); + } + @Override public String factoryIdentifier() { - return "filesource"; + return IDENTIFIER; } @Override @@ -121,6 +142,37 @@ public String asSummaryString() { } } + private static class TestFileTableSink implements DynamicTableSink { + + private final Path path; + + private TestFileTableSink(Path path) { + this.path = path; + } + + @Override + public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { + return requestedMode; + } + + @Override + public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { + final FileSink fileSink = + FileSink.forRowFormat(path, new RowDataEncoder()).build(); + return SinkProvider.of(fileSink); + } + + @Override + public DynamicTableSink copy() { + return new TestFileTableSink(path); + } + + @Override + public String asSummaryString() { + return "test-file-sink"; + } + } + private static class FileFormat extends SimpleStreamFormat { @Override @@ -169,4 +221,25 @@ public boolean isBounded() { return true; } } + + private static class RowDataEncoder implements Encoder { + + private static final long serialVersionUID = 1L; + + private static final byte FIELD_DELIMITER = ",".getBytes(StandardCharsets.UTF_8)[0]; + private static final byte LINE_DELIMITER = "\n".getBytes(StandardCharsets.UTF_8)[0]; + + public RowDataEncoder() {} + + @Override + public void encode(RowData rowData, OutputStream stream) throws IOException { + for (int index = 0; index < rowData.getArity(); index++) { + stream.write(rowData.getString(index).toBytes()); + if (index != rowData.getArity() - 1) { + stream.write(FIELD_DELIMITER); + } + } + stream.write(LINE_DELIMITER); + } + } } diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java index c7dc05bd9f043..45b85e7786c35 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java @@ -153,7 +153,7 @@ private void createTestFileSource(TableEnvironment tEnv, String name, String run + "(\n" + " a STRING\n" + ") WITH (\n" - + " 'connector' = 'filesource',\n" + + " 'connector' = 'test-file',\n" + " 'path' = '" + file.toURI() + "',\n" diff --git a/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory index 4428f1b762c8c..11ded7d46e8b3 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -14,4 +14,4 @@ # limitations under the License. org.apache.flink.table.planner.factories.TestValuesTableFactory -org.apache.flink.table.planner.factories.TestFileSourceFactory +org.apache.flink.table.planner.factories.TestFileFactory \ No newline at end of file diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala index bd8e2e95cf4e2..5409e6fec3b1a 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala @@ -27,9 +27,6 @@ import org.apache.flink.util.FileUtils import org.junit.{Before, Test} -import java.time.{LocalDateTime, ZoneId} -import java.time.format.DateTimeFormatter - class TableSourceITCase extends BatchTestBase { @Before @@ -282,7 +279,7 @@ class TableSourceITCase extends BatchTestBase { |CREATE TABLE MyFileSourceTable ( | `a` STRING |) WITH ( - | 'connector' = 'filesource', + | 'connector' = 'test-file', | 'path' = '${file.toURI}' |) |""".stripMargin diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala index bc5214430ec12..5fe8fff490537 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala @@ -32,10 +32,13 @@ import org.junit.Assert.{assertEquals, assertFalse, assertTrue, fail} import org.junit.experimental.categories.Category import org.junit.{Rule, Test} +import java.io.File import java.lang.{Long => JLong} import java.math.{BigDecimal => JBigDecimal} import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConversions._ +import scala.collection.{Seq, mutable} +import scala.io.Source import scala.util.{Failure, Success, Try} class TableSinkITCase extends StreamingTestBase { @@ -840,6 +843,97 @@ class TableSinkITCase extends StreamingTestBase { } + @Test + def testUnifiedSinkInterfaceWithoutNotNullEnforcer(): Unit = { + val file = tempFolder.newFolder() + tEnv.executeSql( + s""" + |CREATE TABLE MyFileSinkTable ( + | `a` STRING, + | `b` STRING, + | `c` STRING + |) WITH ( + | 'connector' = 'test-file', + | 'path' = '${file.getAbsolutePath}' + |) + |""".stripMargin) + + val stringTupleData3: Seq[(String, String, String)] = { + val data = new mutable.MutableList[(String, String, String)] + data.+=(("Test", "Sink", "Hi")) + data.+=(("Sink", "Provider", "Hello")) + data.+=(("Test", "Provider", "Hello world")) + data + } + val table = env.fromCollection(stringTupleData3).toTable(tEnv, 'a, 'b, 'c) + table.executeInsert("MyFileSinkTable").await() + + // verify the content of in progress file generated by TestFileTableSink. + val source = Source.fromFile( + new File(file.getAbsolutePath, file.list()(0)).listFiles()(0).getAbsolutePath) + val result = source.getLines().toArray.toList + source.close() + + val expected = List( + "Test,Sink,Hi", + "Sink,Provider,Hello", + "Test,Provider,Hello world") + assertEquals(expected.sorted, result.sorted) + } + + @Test + def testUnifiedSinkInterfaceWithNotNullEnforcer(): Unit = { + val file = tempFolder.newFolder() + tEnv.executeSql( + s""" + |CREATE TABLE MyFileSinkTable ( + | `a` STRING NOT NULL, + | `b` STRING, + | `c` STRING + |) WITH ( + | 'connector' = 'test-file', + | 'path' = '${file.getAbsolutePath}' + |) + |""".stripMargin) + + val stringTupleData4: Seq[(String, String, String)] = { + val data = new mutable.MutableList[(String, String, String)] + data.+=((null, "Sink", "Hi")) + data.+=(("Sink", "Provider", "Hello")) + data.+=((null, "Enforcer", "Hi world")) + data.+=(("Test", "Provider", "Hello world")) + data + } + val table = env.fromCollection(stringTupleData4).toTable(tEnv, 'a, 'b, 'c) + // default should fail, because there are null values in the source + try { + table.executeInsert("MyFileSinkTable").await() + fail("Execution should fail.") + } catch { + case t: Throwable => + val exception = ExceptionUtils.findThrowableWithMessage( + t, + "Column 'a' is NOT NULL, however, a null value is being written into it. " + + "You can set job configuration 'table.exec.sink.not-null-enforcer'='drop' " + + "to suppress this exception and drop such records silently.") + assertTrue(exception.isPresent) + } + + // enable drop enforcer to make the query can run + tEnv.getConfig.getConfiguration.setString("table.exec.sink.not-null-enforcer", "drop") + table.executeInsert("MyFileSinkTable").await() + + // verify the content of in progress file generated by TestFileTableSink. + val source = Source.fromFile( + new File(file.getAbsolutePath, file.list()(0)).listFiles()(0).getAbsolutePath) + val result = source.getLines().toArray.toList + source.close() + + val expected = List( + "Sink,Provider,Hello", + "Test,Provider,Hello world") + assertEquals(expected.sorted, result.sorted) + } private def innerTestSetParallelism(provider: String, parallelism: Int, index: Int): Unit = { val dataId = TestValuesTableFactory.registerData(data1) diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkNotNullEnforcer.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkNotNullEnforcer.java index aa17442ee6e72..3fd793abe1866 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkNotNullEnforcer.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkNotNullEnforcer.java @@ -24,6 +24,8 @@ import org.apache.flink.table.api.config.ExecutionConfigOptions.NotNullEnforcer; import org.apache.flink.table.data.RowData; +import static org.apache.flink.util.Preconditions.checkArgument; + /** Checks writing null values into NOT NULL columns. */ public class SinkNotNullEnforcer implements FilterFunction { @@ -31,23 +33,20 @@ public class SinkNotNullEnforcer implements FilterFunction { private final NotNullEnforcer notNullEnforcer; private final int[] notNullFieldIndices; - private final boolean notNullCheck; private final String[] allFieldNames; public SinkNotNullEnforcer( NotNullEnforcer notNullEnforcer, int[] notNullFieldIndices, String[] allFieldNames) { + checkArgument( + notNullFieldIndices.length > 0, + "SinkNotNullEnforcer requires that there are not-null fields."); this.notNullFieldIndices = notNullFieldIndices; this.notNullEnforcer = notNullEnforcer; - this.notNullCheck = notNullFieldIndices.length > 0; this.allFieldNames = allFieldNames; } @Override public boolean filter(RowData row) { - if (!notNullCheck) { - return true; - } - for (int index : notNullFieldIndices) { if (row.isNullAt(index)) { if (notNullEnforcer == NotNullEnforcer.ERROR) { diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkOperator.java index 00a8acf05f827..530a505947dae 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkOperator.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkOperator.java @@ -41,20 +41,15 @@ public class SinkOperator extends AbstractUdfStreamOperator sinkFunction, - int rowtimeFieldIndex, - SinkNotNullEnforcer enforcer) { + public SinkOperator(SinkFunction sinkFunction, int rowtimeFieldIndex) { super(sinkFunction); this.rowtimeFieldIndex = rowtimeFieldIndex; - this.enforcer = enforcer; chainingStrategy = ChainingStrategy.ALWAYS; } @@ -67,10 +62,7 @@ public void open() throws Exception { @Override public void processElement(StreamRecord element) throws Exception { sinkContext.element = element; - RowData row = element.getValue(); - if (enforcer.filter(row)) { - userFunction.invoke(row, sinkContext); - } + userFunction.invoke(element.getValue(), sinkContext); } @Override