Skip to content

Commit

Permalink
[hotfix][table-planner-blink] Fix various checkstyle issues.
Browse files Browse the repository at this point in the history
  • Loading branch information
KurtYoung committed Jul 8, 2019
1 parent 3fa91f2 commit 20b3c6d
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ class PushFilterIntoTableSourceScanRule extends RelOptRule(

override def matches(call: RelOptRuleCall): Boolean = {
val config = call.getPlanner.getContext.asInstanceOf[FlinkContext].getTableConfig
if (!config.getConf.getBoolean(OptimizerConfigOptions.SQL_OPTIMIZER_PREDICATE_PUSHDOWN_ENABLED)) {
if (!config.getConf.getBoolean(
OptimizerConfigOptions.SQL_OPTIMIZER_PREDICATE_PUSHDOWN_ENABLED)) {
return false
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ class SortLimitTest extends TableTestBase {

private val util = batchTestUtil()
util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
util.tableEnv.getConfig.getConf.setInteger(ExecutionConfigOptions.SQL_EXEC_SORT_DEFAULT_LIMIT, 200)
util.tableEnv.getConfig.getConf.setInteger(
ExecutionConfigOptions.SQL_EXEC_SORT_DEFAULT_LIMIT, 200)

@Test
def testNonRangeSortWithoutOffset(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,37 +35,42 @@ class SortTest extends TableTestBase {
def testNonRangeSortOnSingleFieldWithoutForceLimit(): Unit = {
util.tableEnv.getConfig.getConf.setBoolean(
BatchExecSortRule.SQL_EXEC_SORT_RANGE_ENABLED, false)
util.tableEnv.getConfig.getConf.setInteger(ExecutionConfigOptions.SQL_EXEC_SORT_DEFAULT_LIMIT, -1)
util.tableEnv.getConfig.getConf.setInteger(
ExecutionConfigOptions.SQL_EXEC_SORT_DEFAULT_LIMIT, -1)
util.verifyPlan("SELECT * FROM MyTable ORDER BY a DESC")
}

@Test
def testNonRangeSortOnMultiFieldsWithoutForceLimit(): Unit = {
util.tableEnv.getConfig.getConf.setBoolean(
BatchExecSortRule.SQL_EXEC_SORT_RANGE_ENABLED, false)
util.tableEnv.getConfig.getConf.setInteger(ExecutionConfigOptions.SQL_EXEC_SORT_DEFAULT_LIMIT, -1)
util.tableEnv.getConfig.getConf.setInteger(
ExecutionConfigOptions.SQL_EXEC_SORT_DEFAULT_LIMIT, -1)
util.verifyPlan("SELECT * FROM MyTable ORDER BY a DESC, b")
}

@Test
def testNonRangeSortWithForceLimit(): Unit = {
util.tableEnv.getConfig.getConf.setBoolean(
BatchExecSortRule.SQL_EXEC_SORT_RANGE_ENABLED, false)
util.tableEnv.getConfig.getConf.setInteger(ExecutionConfigOptions.SQL_EXEC_SORT_DEFAULT_LIMIT, 200)
util.tableEnv.getConfig.getConf.setInteger(
ExecutionConfigOptions.SQL_EXEC_SORT_DEFAULT_LIMIT, 200)
util.verifyPlan("SELECT * FROM MyTable ORDER BY a DESC")
}

@Test
def testRangeSortWithoutForceLimit(): Unit = {
util.tableEnv.getConfig.getConf.setBoolean(BatchExecSortRule.SQL_EXEC_SORT_RANGE_ENABLED, true)
util.tableEnv.getConfig.getConf.setInteger(ExecutionConfigOptions.SQL_EXEC_SORT_DEFAULT_LIMIT, -1)
util.tableEnv.getConfig.getConf.setInteger(
ExecutionConfigOptions.SQL_EXEC_SORT_DEFAULT_LIMIT, -1)
util.verifyPlan("SELECT * FROM MyTable ORDER BY a DESC")
}

@Test
def testRangeSortWithForceLimit(): Unit = {
util.tableEnv.getConfig.getConf.setBoolean(BatchExecSortRule.SQL_EXEC_SORT_RANGE_ENABLED, true)
util.tableEnv.getConfig.getConf.setInteger(ExecutionConfigOptions.SQL_EXEC_SORT_DEFAULT_LIMIT, 200)
util.tableEnv.getConfig.getConf.setInteger(
ExecutionConfigOptions.SQL_EXEC_SORT_DEFAULT_LIMIT, 200)
util.verifyPlan("SELECT * FROM MyTable ORDER BY a DESC")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class AggregateReduceGroupingITCase extends BatchTestBase {
OptimizerConfigOptions.SQL_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE")
// set smaller parallelism to avoid MemoryAllocationException
tEnv.getConfig.getConf.setInteger(ExecutionConfigOptions.SQL_RESOURCE_DEFAULT_PARALLELISM, 2)
tEnv.getConfig.getConf.setInteger(ExecutionConfigOptions.SQL_RESOURCE_HASH_AGG_TABLE_MEM, 2) // 2M
tEnv.getConfig.getConf.setInteger(ExecutionConfigOptions.SQL_RESOURCE_HASH_AGG_TABLE_MEM, 2)
testSingleAggOnTable()
}

Expand All @@ -136,7 +136,7 @@ class AggregateReduceGroupingITCase extends BatchTestBase {
tEnv.getConfig.getConf.setString(ExecutionConfigOptions.SQL_EXEC_DISABLED_OPERATORS, "SortAgg")
tEnv.getConfig.getConf.setString(
OptimizerConfigOptions.SQL_OPTIMIZER_AGG_PHASE_STRATEGY, "ONE_PHASE")
tEnv.getConfig.getConf.setInteger(ExecutionConfigOptions.SQL_RESOURCE_HASH_AGG_TABLE_MEM, 2) // 2M
tEnv.getConfig.getConf.setInteger(ExecutionConfigOptions.SQL_RESOURCE_HASH_AGG_TABLE_MEM, 2)
testSingleAggOnTable()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@ class StreamingWithAggTestBase(
tEnv.getConfig.withIdleStateRetentionTime(Time.hours(1))
if (aggMode.isLocalAggEnabled) {
tEnv.getConfig.getConf.setString(
OptimizerConfigOptions.SQL_OPTIMIZER_AGG_PHASE_STRATEGY, AggPhaseEnforcer.TWO_PHASE.toString)
OptimizerConfigOptions.SQL_OPTIMIZER_AGG_PHASE_STRATEGY,
AggPhaseEnforcer.TWO_PHASE.toString)
} else {
tEnv.getConfig.getConf.setString(
OptimizerConfigOptions.SQL_OPTIMIZER_AGG_PHASE_STRATEGY, AggPhaseEnforcer.ONE_PHASE.toString)
OptimizerConfigOptions.SQL_OPTIMIZER_AGG_PHASE_STRATEGY,
AggPhaseEnforcer.ONE_PHASE.toString)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ abstract class StreamingWithMiniBatchTestBase(
case MiniBatchOn =>
tableConfig.getConf.setBoolean(
ExecutionConfigOptions.SQL_EXEC_MINIBATCH_ENABLED, true)
tableConfig.getConf.setString(ExecutionConfigOptions.SQL_EXEC_MINIBATCH_ALLOW_LATENCY, "1 s")
tableConfig.getConf.setString(
ExecutionConfigOptions.SQL_EXEC_MINIBATCH_ALLOW_LATENCY, "1 s")
tableConfig.getConf.setLong(ExecutionConfigOptions.SQL_EXEC_MINIBATCH_SIZE, 3L)
case MiniBatchOff =>
tableConfig.getConf.removeConfig(ExecutionConfigOptions.SQL_EXEC_MINIBATCH_ALLOW_LATENCY)
Expand Down

0 comments on commit 20b3c6d

Please sign in to comment.