Skip to content

Commit

Permalink
[hotfix] [table] Refactor SqlToConverter configuration
Browse files Browse the repository at this point in the history
This closes apache#6857.
  • Loading branch information
twalthr committed Nov 5, 2018
1 parent 9f31d5c commit 34af3a7
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ abstract class TableEnvironment(val config: TableConfig) {
.costFactory(new DataSetCostFactory)
.typeSystem(new FlinkTypeSystem)
.operatorTable(getSqlOperatorTable)
.sqlToRelConverterConfig(getSqlToRelConverterConfig)
// set the executor to evaluate constant expressions
.executor(new ExpressionReducer(config))
.build
Expand All @@ -109,15 +110,6 @@ abstract class TableEnvironment(val config: TableConfig) {
// registered external catalog names -> catalog
private val externalCatalogs = new mutable.HashMap[String, ExternalCatalog]

// configuration for SqlToRelConverter
private[flink] lazy val sqlToRelConverterConfig: SqlToRelConverter.Config = {
val calciteConfig = config.getCalciteConfig
calciteConfig.getSqlToRelConverterConfig match {
case Some(c) => c
case None => getSqlToRelConverterConfig
}
}

/** Returns the table config to define the runtime behavior of the Table API. */
def getConfig: TableConfig = config

Expand All @@ -132,11 +124,18 @@ abstract class TableEnvironment(val config: TableConfig) {
* Returns the SqlToRelConverter config.
*/
protected def getSqlToRelConverterConfig: SqlToRelConverter.Config = {
SqlToRelConverter.configBuilder()
.withTrimUnusedFields(false)
.withConvertTableAccess(false)
.withInSubQueryThreshold(Integer.MAX_VALUE)
.build()
val calciteConfig = config.getCalciteConfig
calciteConfig.getSqlToRelConverterConfig match {

case None =>
SqlToRelConverter.configBuilder()
.withTrimUnusedFields(false)
.withConvertTableAccess(false)
.withInSubQueryThreshold(Integer.MAX_VALUE)
.build()

case Some(c) => c
}
}

/**
Expand Down Expand Up @@ -717,8 +716,7 @@ abstract class TableEnvironment(val config: TableConfig) {
val planner = new FlinkPlannerImpl(
getFrameworkConfig,
getPlanner,
getTypeFactory,
sqlToRelConverterConfig)
getTypeFactory)
planner.getCompletionHints(statement, position)
}

Expand All @@ -740,8 +738,7 @@ abstract class TableEnvironment(val config: TableConfig) {
* @return The result of the query as Table
*/
def sqlQuery(query: String): Table = {
val planner = new FlinkPlannerImpl(
getFrameworkConfig, getPlanner, getTypeFactory, sqlToRelConverterConfig)
val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory)
// parse the sql query
val parsed = planner.parse(query)
if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
Expand Down Expand Up @@ -801,8 +798,7 @@ abstract class TableEnvironment(val config: TableConfig) {
* @param config The [[QueryConfig]] to use.
*/
def sqlUpdate(stmt: String, config: QueryConfig): Unit = {
val planner = new FlinkPlannerImpl(
getFrameworkConfig, getPlanner, getTypeFactory, sqlToRelConverterConfig)
val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory)
// parse the sql query
val parsed = planner.parse(stmt)
parsed match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,15 @@ import scala.collection.JavaConversions._
class FlinkPlannerImpl(
config: FrameworkConfig,
planner: RelOptPlanner,
typeFactory: FlinkTypeFactory,
sqlToRelConverterConfig: SqlToRelConverter.Config) {
typeFactory: FlinkTypeFactory) {

val operatorTable: SqlOperatorTable = config.getOperatorTable
/** Holds the trait definitions to be registered with planner. May be null. */
val traitDefs: ImmutableList[RelTraitDef[_ <: RelTrait]] = config.getTraitDefs
val parserConfig: SqlParser.Config = config.getParserConfig
val convertletTable: SqlRexConvertletTable = config.getConvertletTable
val defaultSchema: SchemaPlus = config.getDefaultSchema
val sqlToRelConverterConfig: SqlToRelConverter.Config = config.getSqlToRelConverterConfig

var validator: FlinkCalciteSqlValidator = _
var root: RelRoot = _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ abstract class ExpressionTestBase {
private val planner = new FlinkPlannerImpl(
context._2.getFrameworkConfig,
context._2.getPlanner,
context._2.getTypeFactory,
context._2.sqlToRelConverterConfig)
context._2.getTypeFactory)
private val logicalOptProgram = Programs.ofRules(FlinkRuleSets.LOGICAL_OPT_RULES)
private val dataSetOptProgram = Programs.ofRules(FlinkRuleSets.DATASET_OPT_RULES)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ abstract class PatternTranslatorTestBase extends TestLogger{
private val planner = new FlinkPlannerImpl(
context._2.getFrameworkConfig,
context._2.getPlanner,
context._2.getTypeFactory,
context._2.sqlToRelConverterConfig)
context._2.getTypeFactory)

private def prepareContext(typeInfo: TypeInformation[Row])
: (RelBuilder, StreamTableEnvironment, StreamExecutionEnvironment) = {
Expand Down

0 comments on commit 34af3a7

Please sign in to comment.