Skip to content

Commit

Permalink
[FLINK-20322][table-planner-blink] Rename union-all-as-breakpoint con…
Browse files Browse the repository at this point in the history
…figuration and change the default value
  • Loading branch information
godfreyhe authored Nov 26, 2020
1 parent bf2c13d commit 74fd8b7
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ import scala.collection.mutable
* RelNode, the RelNode is the output node of a new block (or named break-point).
* There are several special cases that a RelNode can not be a break-point.
* (1). UnionAll is not a break-point
* when [[RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_DISABLED]] is true
* when [[RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED]] is false
* (2). [[TableFunctionScan]], [[Snapshot]] or window aggregate ([[Aggregate]] on a [[Project]]
* with window attribute) are not a break-point because their physical RelNodes are a composite
* RelNode, each of them cannot be optimized individually. e.g. FlinkLogicalTableFunctionScan and
Expand Down Expand Up @@ -263,8 +263,8 @@ class RelNodeBlockPlanBuilder private(config: TableConfig) {
private val node2Wrapper = new util.IdentityHashMap[RelNode, RelNodeWrapper]()
private val node2Block = new util.IdentityHashMap[RelNode, RelNodeBlock]()

private val isUnionAllAsBreakPointDisabled = config.getConfiguration.getBoolean(
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_DISABLED)
private val isUnionAllAsBreakPointEnabled = config.getConfiguration.getBoolean(
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED)

/**
* Decompose the [[RelNode]] plan into many [[RelNodeBlock]]s,
Expand Down Expand Up @@ -319,7 +319,7 @@ class RelNodeBlockPlanBuilder private(config: TableConfig) {
*/
private def isValidBreakPoint(node: RelNode): Boolean = node match {
case _: TableFunctionScan | _: Snapshot => false
case union: Union if union.all => !isUnionAllAsBreakPointDisabled
case union: Union if union.all => isUnionAllAsBreakPointEnabled
case project: Project => project.getProjects.forall(p => !hasWindowGroup(p))
case agg: Aggregate =>
agg.getInput match {
Expand Down Expand Up @@ -387,10 +387,12 @@ object RelNodeBlockPlanBuilder {

// It is a experimental config, will may be removed later.
@Experimental
val TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_DISABLED: ConfigOption[JBoolean] =
key("table.optimizer.union-all-as-breakpoint-disabled")
.defaultValue(JBoolean.valueOf(false))
.withDescription("Disable union-all node as breakpoint when constructing common sub-graph.")
val TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED: ConfigOption[JBoolean] =
key("table.optimizer.union-all-as-breakpoint-enabled")
.defaultValue(JBoolean.valueOf(true))
.withDescription("When true, the optimizer will breakup the graph at union-all node " +
"when it's a breakpoint. When false, the optimizer will skip the union-all node " +
"even it's a breakpoint, and will try find the breakpoint in its inputs.")

// It is a experimental config, will may be removed later.
@Experimental
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ class DagOptimizationTest extends TableTestBase {
def testMultiSinks2(): Unit = {
val stmtSet = util.tableEnv.createStatementSet()
util.tableEnv.getConfig.getConfiguration.setBoolean(
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_DISABLED, false)
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, true)
util.addTableSource[(Int, Long, String, Double, Boolean)]("MyTable2", 'a, 'b, 'c, 'd, 'e)

val table1 = util.tableEnv.sqlQuery("SELECT a as a1, b as b1 FROM MyTable WHERE a <= 10")
Expand All @@ -175,7 +175,7 @@ class DagOptimizationTest extends TableTestBase {
def testMultiSinks3(): Unit = {
val stmtSet = util.tableEnv.createStatementSet()
util.tableEnv.getConfig.getConfiguration.setBoolean(
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_DISABLED, true)
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, false)
util.addTableSource[(Int, Long, String, Double, Boolean)]("MyTable2", 'a, 'b, 'c, 'd, 'e)

val table1 = util.tableEnv.sqlQuery("SELECT a AS a1, b AS b1 FROM MyTable WHERE a <= 10")
Expand Down Expand Up @@ -251,7 +251,7 @@ class DagOptimizationTest extends TableTestBase {
def testMultiLevelViews(): Unit = {
val stmtSet = util.tableEnv.createStatementSet()
util.tableEnv.getConfig.getConfiguration.setBoolean(
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_DISABLED, true)
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, false)

val table1 = util.tableEnv.sqlQuery("SELECT a, b, c FROM MyTable WHERE c LIKE '%hello%'")
util.tableEnv.registerTable("TempTable1", table1)
Expand Down Expand Up @@ -291,7 +291,7 @@ class DagOptimizationTest extends TableTestBase {
def testMultiSinksWithUDTF(): Unit = {
val stmtSet = util.tableEnv.createStatementSet()
util.tableEnv.getConfig.getConfiguration.setBoolean(
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_DISABLED, true)
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, false)
util.addFunction("split", new TableFunc1)
val sqlQuery1 =
"""
Expand Down Expand Up @@ -333,7 +333,7 @@ class DagOptimizationTest extends TableTestBase {
def testMultiSinksSplitOnUnion1(): Unit = {
val stmtSet = util.tableEnv.createStatementSet()
util.tableEnv.getConfig.getConfiguration.setBoolean(
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_DISABLED, true)
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, false)

val table = util.tableEnv.sqlQuery(
"SELECT a, c FROM MyTable UNION ALL SELECT d, f FROM MyTable1")
Expand All @@ -358,7 +358,7 @@ class DagOptimizationTest extends TableTestBase {
util.tableEnv.getConfig.getConfiguration.setBoolean(
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, true)
util.tableEnv.getConfig.getConfiguration.setBoolean(
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_DISABLED, true)
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, false)
util.addTableSource[(Int, Long, String)]("MyTable2", 'a, 'b, 'c)

val sqlQuery1 =
Expand Down Expand Up @@ -395,7 +395,7 @@ class DagOptimizationTest extends TableTestBase {
def testMultiSinksSplitOnUnion3(): Unit = {
val stmtSet = util.tableEnv.createStatementSet()
util.tableEnv.getConfig.getConfiguration.setBoolean(
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_DISABLED, true)
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, false)
util.addTableSource[(Int, Long, String)]("MyTable2", 'a, 'b, 'c)

val sqlQuery1 = "SELECT a, c FROM MyTable UNION ALL SELECT d, f FROM MyTable1"
Expand Down Expand Up @@ -427,7 +427,7 @@ class DagOptimizationTest extends TableTestBase {
def testMultiSinksSplitOnUnion4(): Unit = {
val stmtSet = util.tableEnv.createStatementSet()
util.tableEnv.getConfig.getConfiguration.setBoolean(
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_DISABLED, true)
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, false)
util.addTableSource[(Int, Long, String)]("MyTable2", 'a, 'b, 'c)

val sqlQuery =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ class DagOptimizationTest extends TableTestBase {
@Test
def testSingleSinkSplitOnUnion(): Unit = {
util.tableEnv.getConfig.getConfiguration.setBoolean(
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_DISABLED, true)
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, false)

val sqlQuery = "SELECT SUM(a) AS total_sum FROM " +
"(SELECT a, c FROM MyTable UNION ALL SELECT d, f FROM MyTable1)"
Expand Down Expand Up @@ -152,7 +152,7 @@ class DagOptimizationTest extends TableTestBase {
def testMultiSinks2(): Unit = {
val stmtSet = util.tableEnv.createStatementSet()
util.tableEnv.getConfig.getConfiguration.setBoolean(
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_DISABLED, false)
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, true)
util.addTableSource[(Int, Long, String, Double, Boolean)]("MyTable2", 'a, 'b, 'c, 'd, 'e)

val table1 = util.tableEnv.sqlQuery("SELECT a as a1, b as b1 FROM MyTable WHERE a <= 10")
Expand All @@ -178,7 +178,7 @@ class DagOptimizationTest extends TableTestBase {
def testMultiSinks3(): Unit = {
val stmtSet = util.tableEnv.createStatementSet()
util.tableEnv.getConfig.getConfiguration.setBoolean(
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_DISABLED, false)
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, true)
util.addTableSource[(Int, Long, String, Double, Boolean)]("MyTable2", 'a, 'b, 'c, 'd, 'e)

val table1 = util.tableEnv.sqlQuery("SELECT a AS a1, b AS b1 FROM MyTable WHERE a <= 10")
Expand Down Expand Up @@ -257,7 +257,7 @@ class DagOptimizationTest extends TableTestBase {
def testMultiSinksWithUDTF(): Unit = {
val stmtSet = util.tableEnv.createStatementSet()
util.tableEnv.getConfig.getConfiguration.setBoolean(
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_DISABLED, true)
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, false)
util.addFunction("split", new TableFunc1)
val sqlQuery1 =
"""
Expand Down Expand Up @@ -301,7 +301,7 @@ class DagOptimizationTest extends TableTestBase {
def testMultiSinksSplitOnUnion1(): Unit = {
val stmtSet = util.tableEnv.createStatementSet()
util.tableEnv.getConfig.getConfiguration.setBoolean(
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_DISABLED, true)
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, false)

val table = util.tableEnv.sqlQuery(
"SELECT a, c FROM MyTable UNION ALL SELECT d, f FROM MyTable1")
Expand All @@ -328,7 +328,7 @@ class DagOptimizationTest extends TableTestBase {
util.tableEnv.getConfig.getConfiguration.setBoolean(
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, true)
util.tableEnv.getConfig.getConfiguration.setBoolean(
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_DISABLED, true)
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, false)
util.addTableSource[(Int, Long, String)]("MyTable2", 'a, 'b, 'c)

val sqlQuery1 =
Expand Down Expand Up @@ -368,7 +368,7 @@ class DagOptimizationTest extends TableTestBase {
def testMultiSinksSplitOnUnion3(): Unit = {
val stmtSet = util.tableEnv.createStatementSet()
util.tableEnv.getConfig.getConfiguration.setBoolean(
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_DISABLED, true)
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, false)
util.addTableSource[(Int, Long, String)]("MyTable2", 'a, 'b, 'c)

val sqlQuery1 = "SELECT a, c FROM MyTable UNION ALL SELECT d, f FROM MyTable1"
Expand Down Expand Up @@ -403,7 +403,7 @@ class DagOptimizationTest extends TableTestBase {
def testMultiSinksSplitOnUnion4(): Unit = {
val stmtSet = util.tableEnv.createStatementSet()
util.tableEnv.getConfig.getConfiguration.setBoolean(
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_DISABLED, true)
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, false)
util.addTableSource[(Int, Long, String)]("MyTable2", 'a, 'b, 'c)

val sqlQuery =
Expand Down Expand Up @@ -510,7 +510,7 @@ class DagOptimizationTest extends TableTestBase {
def testMultiLevelViews(): Unit = {
val stmtSet = util.tableEnv.createStatementSet()
util.tableEnv.getConfig.getConfiguration.setBoolean(
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_DISABLED, true)
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, false)

val table1 = util.tableEnv.sqlQuery("SELECT a, b, c FROM MyTable WHERE c LIKE '%hello%'")
util.tableEnv.registerTable("TempTable1", table1)
Expand Down Expand Up @@ -553,7 +553,7 @@ class DagOptimizationTest extends TableTestBase {
def testSharedUnionNode(): Unit = {
val stmtSet = util.tableEnv.createStatementSet()
util.tableEnv.getConfig.getConfiguration.setBoolean(
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_DISABLED, true)
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_ENABLED, false)

val table1 = util.tableEnv.sqlQuery("SELECT a, b, c FROM MyTable WHERE c LIKE '%hello%'")
util.tableEnv.registerTable("TempTable1", table1)
Expand Down

0 comments on commit 74fd8b7

Please sign in to comment.