Skip to content

Commit

Permalink
[FLINK-21069][table-planner-blink] Configuration "parallelism.default…
Browse files Browse the repository at this point in the history
…" doesn't take effect for TableEnvironment#explainSql

This closes apache#14742
  • Loading branch information
SteNicholas committed Jan 25, 2021
1 parent 273899a commit 0d3c785
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,6 @@ abstract class PlannerBase(
if (modifyOperations.isEmpty) {
return List.empty[Transformation[_]]
}
// prepare the execEnv before translating
getExecEnv.configure(
getTableConfig.getConfiguration,
Thread.currentThread().getContextClassLoader)
overrideEnvParallelism()

val relNodes = modifyOperations.map(translateToRel)
val optimizedRelNodes = optimize(relNodes)
Expand All @@ -186,6 +181,12 @@ abstract class PlannerBase(
*/
@VisibleForTesting
private[flink] def translateToRel(modifyOperation: ModifyOperation): RelNode = {
// prepare the execEnv before translating
getExecEnv.configure(
getTableConfig.getConfiguration,
Thread.currentThread().getContextClassLoader)
overrideEnvParallelism()

modifyOperation match {
case s: UnregisteredSinkModifyOperation[_] =>
val input = getRelBuilder.queryOperation(s.getChild).build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,18 @@ LegacySink(name=[`default_catalog`.`default_database`.`MySink`], fields=[first])
"type" : "CsvTableSource(read fields: first)",
"pact" : "Operator",
"contents" : "CsvTableSource(read fields: first)",
"parallelism" : 1,
"parallelism" : 4,
"predecessors" : [ {
"id" : ,
"ship_strategy" : "FORWARD",
"ship_strategy" : "REBALANCE",
"side" : "second"
} ]
}, {
"id" : ,
"type" : "SourceConversion(table=[default_catalog.default_database.MyTable, source: [CsvTableSource(read fields: first)]], fields=[first])",
"pact" : "Operator",
"contents" : "SourceConversion(table=[default_catalog.default_database.MyTable, source: [CsvTableSource(read fields: first)]], fields=[first])",
"parallelism" : 1,
"parallelism" : 4,
"predecessors" : [ {
"id" : ,
"ship_strategy" : "FORWARD",
Expand All @@ -46,7 +46,7 @@ LegacySink(name=[`default_catalog`.`default_database`.`MySink`], fields=[first])
"type" : "SinkConversionToRow",
"pact" : "Operator",
"contents" : "SinkConversionToRow",
"parallelism" : 1,
"parallelism" : 4,
"predecessors" : [ {
"id" : ,
"ship_strategy" : "FORWARD",
Expand All @@ -57,7 +57,7 @@ LegacySink(name=[`default_catalog`.`default_database`.`MySink`], fields=[first])
"type" : "Map",
"pact" : "Operator",
"contents" : "Map",
"parallelism" : 1,
"parallelism" : 4,
"predecessors" : [ {
"id" : ,
"ship_strategy" : "FORWARD",
Expand All @@ -68,7 +68,7 @@ LegacySink(name=[`default_catalog`.`default_database`.`MySink`], fields=[first])
"type" : "Sink: CsvTableSink(first)",
"pact" : "Data Sink",
"contents" : "Sink: CsvTableSink(first)",
"parallelism" : 1,
"parallelism" : 4,
"predecessors" : [ {
"id" : ,
"ship_strategy" : "FORWARD",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.flink.table.api

import org.apache.flink.api.common.typeinfo.Types.STRING
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.{StreamTableEnvironment, _}
Expand Down Expand Up @@ -104,12 +105,28 @@ class TableEnvironmentTest {
}

@Test
def testStreamTableEnvironmentExecutionExplain(): Unit = {
def testStreamTableEnvironmentExecutionExplainWithEnvParallelism(): Unit = {
val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
execEnv.setParallelism(4)
val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
val tEnv = StreamTableEnvironment.create(execEnv, settings)

verifyTableEnvironmentExecutionExplain(tEnv)
}

@Test
def testStreamTableEnvironmentExecutionExplainWithConfParallelism(): Unit = {
val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
execEnv.setParallelism(1)
val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
val tEnv = StreamTableEnvironment.create(execEnv, settings)
val configuration = new Configuration()
configuration.setInteger("parallelism.default", 4)
tEnv.getConfig.addConfiguration(configuration)

verifyTableEnvironmentExecutionExplain(tEnv)
}

private def verifyTableEnvironmentExecutionExplain(tEnv: TableEnvironment): Unit = {
TestTableSourceSinks.createPersonCsvTemporaryTable(tEnv, "MyTable")

TestTableSourceSinks.createCsvTemporarySinkTable(
Expand Down

0 comments on commit 0d3c785

Please sign in to comment.