Skip to content

Commit

Permalink
Merge pull request #6 from apache/master
Browse files Browse the repository at this point in the history
Merge from apache/flink master
  • Loading branch information
chaojianok committed Apr 26, 2020
2 parents 432882a + ba54a8d commit 67027fc
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.table.planner.plan.rules.physical.batch

import org.apache.flink.table.api.TableException
import org.apache.flink.table.filesystem.FileSystemTableFactory
import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution
import org.apache.flink.table.planner.plan.nodes.FlinkConventions
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSink
Expand Down Expand Up @@ -53,9 +54,16 @@ class BatchExecSinkRule extends ConverterRule(
val dynamicPartIndices =
dynamicPartFields.map(partitionSink.getTableSchema.getFieldNames.indexOf(_))

requiredTraitSet = requiredTraitSet.plus(
FlinkRelDistribution.hash(dynamicPartIndices
.map(Integer.valueOf), requireStrict = false))
val shuffleEnable = sinkNode
.catalogTable
.getProperties
.get(FileSystemTableFactory.SINK_SHUFFLE_BY_PARTITION.key())

if (shuffleEnable != null && shuffleEnable.toBoolean) {
requiredTraitSet = requiredTraitSet.plus(
FlinkRelDistribution.hash(dynamicPartIndices
.map(Integer.valueOf), requireStrict = false))
}

if (partitionSink.configurePartitionGrouping(true)) {
// default to asc.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
package org.apache.flink.table.planner.plan.rules.physical.stream

import org.apache.flink.table.api.TableException
import org.apache.flink.table.filesystem.FileSystemTableFactory
import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution
import org.apache.flink.table.planner.plan.nodes.FlinkConventions
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSink
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink
import org.apache.flink.table.planner.sinks.DataStreamTableSink
import org.apache.flink.table.sinks.PartitionableTableSink

import org.apache.calcite.plan.RelOptRule
Expand Down Expand Up @@ -53,15 +53,20 @@ class StreamExecSinkRule extends ConverterRule(
val dynamicPartIndices =
dynamicPartFields.map(partitionSink.getTableSchema.getFieldNames.indexOf(_))

if (partitionSink.configurePartitionGrouping(false)) {
throw new TableException("Partition grouping in stream mode is not supported yet!")
}
val shuffleEnable = sinkNode
.catalogTable
.getProperties
.get(FileSystemTableFactory.SINK_SHUFFLE_BY_PARTITION.key())

if (!partitionSink.isInstanceOf[DataStreamTableSink[_]]) {
if (shuffleEnable != null && shuffleEnable.toBoolean) {
requiredTraitSet = requiredTraitSet.plus(
FlinkRelDistribution.hash(dynamicPartIndices
.map(Integer.valueOf), requireStrict = false))
}

if (partitionSink.configurePartitionGrouping(false)) {
throw new TableException("Partition grouping in stream mode is not supported yet!")
}
}
case _ => throw new TableException("We need PartitionableTableSink to write data to" +
s" partitioned table: ${sinkNode.sinkName}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,82 +19,89 @@ limitations under the License.
<TestCase name="testDynamic">
<Resource name="sql">
<![CDATA[INSERT INTO sink SELECT a, b, c FROM MyTable]]>

</Resource>
<Resource name="planBefore">
<![CDATA[
LogicalSink(name=[`default_catalog`.`default_database`.`sink`], fields=[a, b, c])
+- LogicalProject(a=[$0], b=[$1], c=[$2])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
]]>

</Resource>
<Resource name="planAfter">
<![CDATA[
Sink(name=[`default_catalog`.`default_database`.`sink`], fields=[a, b, c])
+- Sort(orderBy=[b ASC, c ASC])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
<TestCase name="testDynamicShuffleBy">
<Resource name="sql">
<![CDATA[INSERT INTO sinkShuffleBy SELECT a, b, c FROM MyTable]]>
</Resource>
<Resource name="planBefore">
<![CDATA[
LogicalSink(name=[`default_catalog`.`default_database`.`sinkShuffleBy`], fields=[a, b, c])
+- LogicalProject(a=[$0], b=[$1], c=[$2])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
Sink(name=[`default_catalog`.`default_database`.`sinkShuffleBy`], fields=[a, b, c])
+- Sort(orderBy=[b ASC, c ASC])
+- Exchange(distribution=[hash[b, c]])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>

</Resource>
</TestCase>
<TestCase name="testPartial">
<Resource name="sql">
<![CDATA[INSERT INTO sink PARTITION (b=1) SELECT a, c FROM MyTable]]>

</Resource>
<Resource name="planBefore">
<![CDATA[
LogicalSink(name=[`default_catalog`.`default_database`.`sink`], fields=[a, b, c])
+- LogicalProject(a=[$0], EXPR$1=[1:BIGINT], c=[$2])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
]]>

</Resource>
<Resource name="planAfter">
<![CDATA[
Sink(name=[`default_catalog`.`default_database`.`sink`], fields=[a, b, c])
+- Sort(orderBy=[c ASC])
+- Exchange(distribution=[hash[c]])
+- Calc(select=[a, 1:BIGINT AS EXPR$1, c])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+- Calc(select=[a, 1:BIGINT AS EXPR$1, c])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>

</Resource>
</TestCase>
<TestCase name="testStatic">
<Resource name="sql">
<![CDATA[INSERT INTO sink PARTITION (b=1, c=1) SELECT a FROM MyTable]]>

</Resource>
<Resource name="planBefore">
<![CDATA[
LogicalSink(name=[`default_catalog`.`default_database`.`sink`], fields=[a, b, c])
+- LogicalProject(a=[$0], EXPR$1=[1:BIGINT], EXPR$2=[1:BIGINT])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
]]>

</Resource>
<Resource name="planAfter">
<![CDATA[
Sink(name=[`default_catalog`.`default_database`.`sink`], fields=[a, b, c])
+- Calc(select=[a, 1:BIGINT AS EXPR$1, 1:BIGINT AS EXPR$2])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>

</Resource>
</TestCase>
<TestCase name="testWrongFields">
<Resource name="sql">
<![CDATA[INSERT INTO sink PARTITION (b=1) SELECT a, b, c FROM MyTable]]>

</Resource>
</TestCase>
<TestCase name="testWrongStatic">
<Resource name="sql">
<![CDATA[INSERT INTO sink PARTITION (a=1) SELECT b, c FROM MyTable]]>

</Resource>
</TestCase>
</Root>
Original file line number Diff line number Diff line change
Expand Up @@ -19,80 +19,86 @@ limitations under the License.
<TestCase name="testDynamic">
<Resource name="sql">
<![CDATA[INSERT INTO sink SELECT a, b, c FROM MyTable]]>

</Resource>
<Resource name="planBefore">
<![CDATA[
LogicalSink(name=[`default_catalog`.`default_database`.`sink`], fields=[a, b, c])
+- LogicalProject(a=[$0], b=[$1], c=[$2])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
]]>

</Resource>
<Resource name="planAfter">
<![CDATA[
Sink(name=[`default_catalog`.`default_database`.`sink`], fields=[a, b, c])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
<TestCase name="testDynamicShuffleBy">
<Resource name="sql">
<![CDATA[INSERT INTO sinkShuffleBy SELECT a, b, c FROM MyTable]]>
</Resource>
<Resource name="planBefore">
<![CDATA[
LogicalSink(name=[`default_catalog`.`default_database`.`sinkShuffleBy`], fields=[a, b, c])
+- LogicalProject(a=[$0], b=[$1], c=[$2])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
Sink(name=[`default_catalog`.`default_database`.`sinkShuffleBy`], fields=[a, b, c])
+- Exchange(distribution=[hash[b, c]])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>

</Resource>
</TestCase>
<TestCase name="testPartial">
<Resource name="sql">
<![CDATA[INSERT INTO sink PARTITION (b=1) SELECT a, c FROM MyTable]]>

</Resource>
<Resource name="planBefore">
<![CDATA[
LogicalSink(name=[`default_catalog`.`default_database`.`sink`], fields=[a, b, c])
+- LogicalProject(a=[$0], EXPR$1=[1:BIGINT], c=[$2])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
]]>

</Resource>
<Resource name="planAfter">
<![CDATA[
Sink(name=[`default_catalog`.`default_database`.`sink`], fields=[a, b, c])
+- Exchange(distribution=[hash[c]])
+- Calc(select=[a, 1:BIGINT AS EXPR$1, c])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+- Calc(select=[a, 1:BIGINT AS EXPR$1, c])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>

</Resource>
</TestCase>
<TestCase name="testStatic">
<Resource name="sql">
<![CDATA[INSERT INTO sink PARTITION (b=1, c=1) SELECT a FROM MyTable]]>

</Resource>
<Resource name="planBefore">
<![CDATA[
LogicalSink(name=[`default_catalog`.`default_database`.`sink`], fields=[a, b, c])
+- LogicalProject(a=[$0], EXPR$1=[1:BIGINT], EXPR$2=[1:BIGINT])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
]]>

</Resource>
<Resource name="planAfter">
<![CDATA[
Sink(name=[`default_catalog`.`default_database`.`sink`], fields=[a, b, c])
+- Calc(select=[a, 1:BIGINT AS EXPR$1, 1:BIGINT AS EXPR$2])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>

</Resource>
</TestCase>
<TestCase name="testWrongFields">
<Resource name="sql">
<![CDATA[INSERT INTO sink PARTITION (b=1) SELECT a, b, c FROM MyTable]]>

</Resource>
</TestCase>
<TestCase name="testWrongStatic">
<Resource name="sql">
<![CDATA[INSERT INTO sink PARTITION (a=1) SELECT b, c FROM MyTable]]>

</Resource>
</TestCase>
</Root>
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,9 @@

package org.apache.flink.table.planner.plan.batch.sql

import org.apache.flink.api.common.typeinfo.{TypeInformation, Types}
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.scala._
import org.apache.flink.table.api.ValidationException
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{SqlDialect, TableConfig, ValidationException}
import org.apache.flink.table.planner.runtime.batch.sql.PartitionableSinkITCase
import org.apache.flink.table.planner.utils.TableTestBase

import org.junit.Test
Expand All @@ -32,14 +29,23 @@ class PartitionableSinkTest extends TableTestBase {

private val util = batchTestUtil()
util.addTableSource[(Long, Long, Long)]("MyTable", 'a, 'b, 'c)
PartitionableSinkITCase.registerTableSink(
util.tableEnv,
"sink",
new RowTypeInfo(
Array[TypeInformation[_]](Types.LONG, Types.LONG, Types.LONG),
Array("a", "b", "c")),
grouping = true,
Array("b", "c"))
createTable("sink", shuffleBy = false)

private def createTable(name: String, shuffleBy: Boolean): Unit = {
util.tableEnv.sqlUpdate(
s"""
|create table $name (
| a bigint,
| b bigint,
| c bigint
|) partitioned by (b, c) with (
| 'connector' = 'filesystem',
| 'path' = '/non',
| ${if (shuffleBy) "'sink.shuffle-by-partition.enable'='true'," else ""}
| 'format' = 'testcsv'
|)
|""".stripMargin)
}

@Test
def testStatic(): Unit = {
Expand All @@ -51,6 +57,12 @@ class PartitionableSinkTest extends TableTestBase {
util.verifySqlUpdate("INSERT INTO sink SELECT a, b, c FROM MyTable")
}

@Test
def testDynamicShuffleBy(): Unit = {
createTable("sinkShuffleBy", shuffleBy = true)
util.verifySqlUpdate("INSERT INTO sinkShuffleBy SELECT a, b, c FROM MyTable")
}

@Test
def testPartial(): Unit = {
util.verifySqlUpdate("INSERT INTO sink PARTITION (b=1) SELECT a, c FROM MyTable")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,9 @@

package org.apache.flink.table.planner.plan.stream.sql

import org.apache.flink.api.common.typeinfo.{TypeInformation, Types}
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.scala._
import org.apache.flink.table.api.ValidationException
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{SqlDialect, TableConfig, ValidationException}
import org.apache.flink.table.planner.runtime.batch.sql.PartitionableSinkITCase
import org.apache.flink.table.planner.utils.TableTestBase

import org.junit.Test
Expand All @@ -32,14 +29,23 @@ class PartitionableSinkTest extends TableTestBase {

private val util = streamTestUtil()
util.addTableSource[(Long, Long, Long)]("MyTable", 'a, 'b, 'c)
PartitionableSinkITCase.registerTableSink(
util.tableEnv,
"sink",
new RowTypeInfo(
Array[TypeInformation[_]](Types.LONG, Types.LONG, Types.LONG),
Array("a", "b", "c")),
grouping = false,
Array("b", "c"))
createTable("sink", shuffleBy = false)

private def createTable(name: String, shuffleBy: Boolean): Unit = {
util.tableEnv.sqlUpdate(
s"""
|create table $name (
| a bigint,
| b bigint,
| c bigint
|) partitioned by (b, c) with (
| 'connector' = 'filesystem',
| 'path' = '/non',
| ${if (shuffleBy) "'sink.shuffle-by-partition.enable'='true'," else ""}
| 'format' = 'testcsv'
|)
|""".stripMargin)
}

@Test
def testStatic(): Unit = {
Expand All @@ -51,6 +57,12 @@ class PartitionableSinkTest extends TableTestBase {
util.verifySqlUpdate("INSERT INTO sink SELECT a, b, c FROM MyTable")
}

@Test
def testDynamicShuffleBy(): Unit = {
createTable("sinkShuffleBy", shuffleBy = true)
util.verifySqlUpdate("INSERT INTO sinkShuffleBy SELECT a, b, c FROM MyTable")
}

@Test
def testPartial(): Unit = {
util.verifySqlUpdate("INSERT INTO sink PARTITION (b=1) SELECT a, c FROM MyTable")
Expand Down
Loading

0 comments on commit 67027fc

Please sign in to comment.