Skip to content

Commit

Permalink
[FLINK-14258][table][filesystem] Integrate file system connector to s…
Browse files Browse the repository at this point in the history
…treaming sink


This closes apache#11796
  • Loading branch information
JingsongLi committed Apr 26, 2020
1 parent 65e43b8 commit f095f26
Show file tree
Hide file tree
Showing 6 changed files with 418 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.formats.parquet;

import org.apache.flink.table.planner.runtime.stream.FsStreamingSinkITCaseBase;

import java.util.ArrayList;
import java.util.List;

/**
* Checkpoint ITCase for {@link ParquetFileSystemFormatFactory}.
*/
public class ParquetFsStreamingSinkITCase extends FsStreamingSinkITCaseBase {

@Override
public String[] additionalProperties() {
List<String> ret = new ArrayList<>();
ret.add("'format'='parquet'");
ret.add("'format.parquet.compression'='gzip'");
return ret.toArray(new String[0]);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.runtime.stream

import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.util.FiniteTestSource
import org.apache.flink.table.api.Expressions.$
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment, TableUtils}
import org.apache.flink.table.planner.runtime.utils.{StreamingTestBase, TestSinkUtil}
import org.apache.flink.types.Row

import org.junit.Assert.assertEquals
import org.junit.rules.Timeout
import org.junit.{Before, Rule, Test}

import scala.collection.Seq

import scala.collection.JavaConversions._

/**
* Streaming sink ITCase base, test checkpoint.
*/
abstract class FsStreamingSinkITCaseBase extends StreamingTestBase {

@Rule
def timeoutPerTest: Timeout = Timeout.seconds(20)

protected var resultPath: String = _

private val data = Seq(
Row.of(Integer.valueOf(1), "a", "b", "c", "12345"),
Row.of(Integer.valueOf(2), "p", "q", "r", "12345"),
Row.of(Integer.valueOf(3), "x", "y", "z", "12345"))

@Before
override def before(): Unit = {
super.before()
resultPath = tempFolder.newFolder().toURI.toString

env.setParallelism(1)
env.enableCheckpointing(100)

val stream = new DataStream(env.getJavaEnv.addSource(
new FiniteTestSource[Row](data: _*),
new RowTypeInfo(Types.INT, Types.STRING, Types.STRING, Types.STRING, Types.STRING)))

tEnv.createTemporaryView("my_table", stream, $("a"), $("b"), $("c"), $("d"), $("e"))
}

def additionalProperties(): Array[String] = Array()

@Test
def testNonPart(): Unit = {
test(false)
}

@Test
def testPart(): Unit = {
test(true)
}

private def test(partition: Boolean): Unit = {
val ddl = s"""
|create table sink_table (
| a int,
| b string,
| c string,
| d string,
| e string
|)
|${if (partition) "partitioned by (d, e)" else ""}
|with (
| 'connector' = 'filesystem',
| 'path' = '$resultPath',
| ${additionalProperties().mkString(",\n")}
|)
""".stripMargin
tEnv.sqlUpdate(ddl)

tEnv.insertInto("sink_table", tEnv.sqlQuery("select * from my_table"))
tEnv.execute("insert")

check(
ddl,
"select * from sink_table",
data ++ data)
}

def check(ddl: String, sqlQuery: String, expectedResult: Seq[Row]): Unit = {
val setting = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val tEnv = TableEnvironment.create(setting)
tEnv.sqlUpdate(ddl)

val result = TableUtils.collectToList(tEnv.sqlQuery(sqlQuery))

assertEquals(
expectedResult.map(TestSinkUtil.rowToString(_)).sorted,
result.map(TestSinkUtil.rowToString(_)).sorted)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.runtime.stream.sql

import org.apache.flink.table.planner.runtime.stream.FsStreamingSinkITCaseBase
import org.apache.flink.table.planner.utils.TestCsvFileSystemFormatFactory.USE_BULK_WRITER

import org.junit.runner.RunWith
import org.junit.runners.Parameterized

import scala.collection.Seq

/**
* Test checkpoint for file system table factory with testcsv format.
*/
@RunWith(classOf[Parameterized])
class FsStreamingSinkTestCsvITCase(useBulkWriter: Boolean) extends FsStreamingSinkITCaseBase {

override def additionalProperties(): Array[String] = {
super.additionalProperties() ++
Seq("'format' = 'testcsv'", s"'$USE_BULK_WRITER' = '$useBulkWriter'") ++
(if (useBulkWriter) Seq() else Seq("'sink.rolling-policy.file-size' = '1'"))
}
}

object FsStreamingSinkTestCsvITCase {
@Parameterized.Parameters(name = "useBulkWriter-{0}")
def parameters(): java.util.Collection[Boolean] = {
java.util.Arrays.asList(true, false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,16 @@ public class FileSystemTableFactory implements
.withDescription("The default partition name in case the dynamic partition" +
" column value is null/empty string");

public static final ConfigOption<Long> SINK_ROLLING_POLICY_FILE_SIZE = key("sink.rolling-policy.file-size")
.longType()
.defaultValue(1024L * 1024L * 128L)
.withDescription("The maximum part file size before rolling (by default 128MB).");

public static final ConfigOption<Long> SINK_ROLLING_POLICY_TIME_INTERVAL = key("sink.rolling-policy.time.interval")
.longType()
.defaultValue(60L * 1000L)
.withDescription("The maximum time duration a part file can stay open before rolling (by default 60 sec).");

@Override
public Map<String, String> requiredContext() {
Map<String, String> context = new HashMap<>();
Expand All @@ -93,6 +103,8 @@ public List<String> supportedProperties() {
properties.add(DescriptorProperties.PARTITION_KEYS + ".#." +
DescriptorProperties.PARTITION_KEYS_NAME);
properties.add(PARTITION_DEFAULT_NAME.key());
properties.add(SINK_ROLLING_POLICY_FILE_SIZE.key());
properties.add(SINK_ROLLING_POLICY_TIME_INTERVAL.key());

// format
properties.add(FORMAT);
Expand Down Expand Up @@ -120,10 +132,15 @@ public TableSink<BaseRow> createTableSink(TableSinkFactory.Context context) {
properties.putProperties(context.getTable().getProperties());

return new FileSystemTableSink(
context.isBounded(),
context.getTable().getSchema(),
new Path(properties.getString(PATH)),
context.getTable().getPartitionKeys(),
getPartitionDefaultName(properties),
properties.getOptionalLong(SINK_ROLLING_POLICY_FILE_SIZE.key())
.orElse(SINK_ROLLING_POLICY_FILE_SIZE.defaultValue()),
properties.getOptionalLong(SINK_ROLLING_POLICY_TIME_INTERVAL.key())
.orElse(SINK_ROLLING_POLICY_TIME_INTERVAL.defaultValue()),
getFormatProperties(context.getTable().getProperties()));
}

Expand Down
Loading

0 comments on commit f095f26

Please sign in to comment.