Skip to content

Commit

Permalink
[FLINK-21005][table] Introduce new runtime provider for unified Sink …
Browse files Browse the repository at this point in the history
…API and implement in planner

This closes apache#14822
  • Loading branch information
SteNicholas committed Feb 24, 2021
1 parent dc0c416 commit 3ac8364
Show file tree
Hide file tree
Showing 10 changed files with 390 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
*
* <p>Note: This provider is only meant for advanced connector developers. Usually, a sink should
* consist of a single entity expressed via {@link OutputFormatProvider} or {@link
* SinkFunctionProvider}.
* SinkFunctionProvider}, or {@link SinkProvider}.
*/
@PublicEvolving
public interface DataStreamSinkProvider extends DynamicTableSink.SinkRuntimeProvider {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.connector.sink;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.table.connector.ParallelismProvider;
import org.apache.flink.table.data.RowData;

import java.util.Optional;

/** Provider of a {@link Sink} instance as a runtime implementation for {@link DynamicTableSink}. */
@PublicEvolving
public interface SinkProvider extends DynamicTableSink.SinkRuntimeProvider, ParallelismProvider {

/** Helper method for creating a static provider. */
static SinkProvider of(Sink<RowData, ?, ?, ?> sink) {
return () -> sink;
}

/** Helper method for creating a Sink provider with a provided sink parallelism. */
static SinkProvider of(Sink<RowData, ?, ?, ?> sink, Integer sinkParallelism) {
return new SinkProvider() {

@Override
public Sink<RowData, ?, ?, ?> createSink() {
return sink;
}

@Override
public Optional<Integer> getParallelism() {
return Optional.ofNullable(sinkParallelism);
}
};
}

/** Creates a {@link Sink} instance. */
Sink<RowData, ?, ?, ?> createSink();
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
package org.apache.flink.table.planner.factories;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.SimpleStreamFormat;
import org.apache.flink.core.fs.FSDataInputStream;
Expand All @@ -31,27 +33,40 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkProvider;
import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceProvider;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.filesystem.FileSystemOptions;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

/** Test file source {@link DynamicTableSourceFactory}. */
public class TestFileSourceFactory implements DynamicTableSourceFactory {
/**
* Test implementation of {@link DynamicTableSourceFactory} and {@link DynamicTableSinkFactory} that
* creates a file source and sink based on {@link SourceProvider} and {@link SinkProvider}.
*/
public class TestFileFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {

// --------------------------------------------------------------------------------------------
// Factory
// --------------------------------------------------------------------------------------------

private static final String IDENTIFIER = "test-file";

private static final ConfigOption<String> RUNTIME_SOURCE =
ConfigOptions.key("runtime-source")
Expand All @@ -65,9 +80,15 @@ public DynamicTableSource createDynamicTableSource(Context context) {
new Path(conf.getString(FileSystemOptions.PATH)), conf.getString(RUNTIME_SOURCE));
}

@Override
public DynamicTableSink createDynamicTableSink(Context context) {
Configuration conf = Configuration.fromMap(context.getCatalogTable().getOptions());
return new TestFileTableSink(new Path(conf.getString(FileSystemOptions.PATH)));
}

@Override
public String factoryIdentifier() {
return "filesource";
return IDENTIFIER;
}

@Override
Expand Down Expand Up @@ -121,6 +142,37 @@ public String asSummaryString() {
}
}

private static class TestFileTableSink implements DynamicTableSink {

private final Path path;

private TestFileTableSink(Path path) {
this.path = path;
}

@Override
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
return requestedMode;
}

@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
final FileSink<RowData> fileSink =
FileSink.forRowFormat(path, new RowDataEncoder()).build();
return SinkProvider.of(fileSink);
}

@Override
public DynamicTableSink copy() {
return new TestFileTableSink(path);
}

@Override
public String asSummaryString() {
return "test-file-sink";
}
}

private static class FileFormat extends SimpleStreamFormat<RowData> {

@Override
Expand Down Expand Up @@ -169,4 +221,25 @@ public boolean isBounded() {
return true;
}
}

private static class RowDataEncoder implements Encoder<RowData> {

private static final long serialVersionUID = 1L;

private static final byte FIELD_DELIMITER = ",".getBytes(StandardCharsets.UTF_8)[0];
private static final byte LINE_DELIMITER = "\n".getBytes(StandardCharsets.UTF_8)[0];

public RowDataEncoder() {}

@Override
public void encode(RowData rowData, OutputStream stream) throws IOException {
for (int index = 0; index < rowData.getArity(); index++) {
stream.write(rowData.getString(index).toBytes());
if (index != rowData.getArity() - 1) {
stream.write(FIELD_DELIMITER);
}
}
stream.write(LINE_DELIMITER);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ private void createTestFileSource(TableEnvironment tEnv, String name, String run
+ "(\n"
+ " a STRING\n"
+ ") WITH (\n"
+ " 'connector' = 'filesource',\n"
+ " 'connector' = 'test-file',\n"
+ " 'path' = '"
+ file.toURI()
+ "',\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@
# limitations under the License.

org.apache.flink.table.planner.factories.TestValuesTableFactory
org.apache.flink.table.planner.factories.TestFileSourceFactory
org.apache.flink.table.planner.factories.TestFileFactory
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@ import org.apache.flink.util.FileUtils

import org.junit.{Before, Test}

import java.time.{LocalDateTime, ZoneId}
import java.time.format.DateTimeFormatter

class TableSourceITCase extends BatchTestBase {

@Before
Expand Down Expand Up @@ -282,7 +279,7 @@ class TableSourceITCase extends BatchTestBase {
|CREATE TABLE MyFileSourceTable (
| `a` STRING
|) WITH (
| 'connector' = 'filesource',
| 'connector' = 'test-file',
| 'path' = '${file.toURI}'
|)
|""".stripMargin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,13 @@ import org.junit.Assert.{assertEquals, assertFalse, assertTrue, fail}
import org.junit.experimental.categories.Category
import org.junit.{Rule, Test}

import java.io.File
import java.lang.{Long => JLong}
import java.math.{BigDecimal => JBigDecimal}
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.JavaConversions._
import scala.collection.{Seq, mutable}
import scala.io.Source
import scala.util.{Failure, Success, Try}

class TableSinkITCase extends StreamingTestBase {
Expand Down Expand Up @@ -840,6 +843,97 @@ class TableSinkITCase extends StreamingTestBase {

}

@Test
def testUnifiedSinkInterfaceWithoutNotNullEnforcer(): Unit = {
val file = tempFolder.newFolder()
tEnv.executeSql(
s"""
|CREATE TABLE MyFileSinkTable (
| `a` STRING,
| `b` STRING,
| `c` STRING
|) WITH (
| 'connector' = 'test-file',
| 'path' = '${file.getAbsolutePath}'
|)
|""".stripMargin)

val stringTupleData3: Seq[(String, String, String)] = {
val data = new mutable.MutableList[(String, String, String)]
data.+=(("Test", "Sink", "Hi"))
data.+=(("Sink", "Provider", "Hello"))
data.+=(("Test", "Provider", "Hello world"))
data
}
val table = env.fromCollection(stringTupleData3).toTable(tEnv, 'a, 'b, 'c)
table.executeInsert("MyFileSinkTable").await()

// verify the content of in progress file generated by TestFileTableSink.
val source = Source.fromFile(
new File(file.getAbsolutePath, file.list()(0)).listFiles()(0).getAbsolutePath)
val result = source.getLines().toArray.toList
source.close()

val expected = List(
"Test,Sink,Hi",
"Sink,Provider,Hello",
"Test,Provider,Hello world")
assertEquals(expected.sorted, result.sorted)
}

@Test
def testUnifiedSinkInterfaceWithNotNullEnforcer(): Unit = {
val file = tempFolder.newFolder()
tEnv.executeSql(
s"""
|CREATE TABLE MyFileSinkTable (
| `a` STRING NOT NULL,
| `b` STRING,
| `c` STRING
|) WITH (
| 'connector' = 'test-file',
| 'path' = '${file.getAbsolutePath}'
|)
|""".stripMargin)

val stringTupleData4: Seq[(String, String, String)] = {
val data = new mutable.MutableList[(String, String, String)]
data.+=((null, "Sink", "Hi"))
data.+=(("Sink", "Provider", "Hello"))
data.+=((null, "Enforcer", "Hi world"))
data.+=(("Test", "Provider", "Hello world"))
data
}
val table = env.fromCollection(stringTupleData4).toTable(tEnv, 'a, 'b, 'c)
// default should fail, because there are null values in the source
try {
table.executeInsert("MyFileSinkTable").await()
fail("Execution should fail.")
} catch {
case t: Throwable =>
val exception = ExceptionUtils.findThrowableWithMessage(
t,
"Column 'a' is NOT NULL, however, a null value is being written into it. " +
"You can set job configuration 'table.exec.sink.not-null-enforcer'='drop' " +
"to suppress this exception and drop such records silently.")
assertTrue(exception.isPresent)
}

// enable drop enforcer to make the query can run
tEnv.getConfig.getConfiguration.setString("table.exec.sink.not-null-enforcer", "drop")
table.executeInsert("MyFileSinkTable").await()

// verify the content of in progress file generated by TestFileTableSink.
val source = Source.fromFile(
new File(file.getAbsolutePath, file.list()(0)).listFiles()(0).getAbsolutePath)
val result = source.getLines().toArray.toList
source.close()

val expected = List(
"Sink,Provider,Hello",
"Test,Provider,Hello world")
assertEquals(expected.sorted, result.sorted)
}

private def innerTestSetParallelism(provider: String, parallelism: Int, index: Int): Unit = {
val dataId = TestValuesTableFactory.registerData(data1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,30 +24,29 @@
import org.apache.flink.table.api.config.ExecutionConfigOptions.NotNullEnforcer;
import org.apache.flink.table.data.RowData;

import static org.apache.flink.util.Preconditions.checkArgument;

/** Checks writing null values into NOT NULL columns. */
public class SinkNotNullEnforcer implements FilterFunction<RowData> {

private static final long serialVersionUID = 1L;

private final NotNullEnforcer notNullEnforcer;
private final int[] notNullFieldIndices;
private final boolean notNullCheck;
private final String[] allFieldNames;

public SinkNotNullEnforcer(
NotNullEnforcer notNullEnforcer, int[] notNullFieldIndices, String[] allFieldNames) {
checkArgument(
notNullFieldIndices.length > 0,
"SinkNotNullEnforcer requires that there are not-null fields.");
this.notNullFieldIndices = notNullFieldIndices;
this.notNullEnforcer = notNullEnforcer;
this.notNullCheck = notNullFieldIndices.length > 0;
this.allFieldNames = allFieldNames;
}

@Override
public boolean filter(RowData row) {
if (!notNullCheck) {
return true;
}

for (int index : notNullFieldIndices) {
if (row.isNullAt(index)) {
if (notNullEnforcer == NotNullEnforcer.ERROR) {
Expand Down
Loading

0 comments on commit 3ac8364

Please sign in to comment.