diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java new file mode 100644 index 0000000000000..5c39e98f67c1f --- /dev/null +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.delegation.hive; + +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.CatalogManager; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.planner.calcite.CalciteParser; +import org.apache.flink.table.planner.calcite.FlinkPlannerImpl; +import org.apache.flink.table.planner.calcite.SqlExprToRexConverter; +import org.apache.flink.table.planner.delegation.ParserImpl; +import org.apache.flink.table.planner.delegation.PlannerContext; +import org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader; + +import org.apache.calcite.tools.FrameworkConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.function.Function; +import java.util.function.Supplier; + +/** A Parser that uses Hive's planner to parse a statement. */ +public class HiveParser extends ParserImpl { + + private static final Logger LOG = LoggerFactory.getLogger(HiveParser.class); + + private final PlannerContext plannerContext; + private final FlinkCalciteCatalogReader catalogReader; + private final FrameworkConfig frameworkConfig; + + HiveParser( + CatalogManager catalogManager, + Supplier validatorSupplier, + Supplier calciteParserSupplier, + Function sqlExprToRexConverterCreator, + PlannerContext plannerContext) { + super( + catalogManager, + validatorSupplier, + calciteParserSupplier, + sqlExprToRexConverterCreator); + this.plannerContext = plannerContext; + this.catalogReader = + plannerContext.createCatalogReader( + false, + catalogManager.getCurrentCatalog(), + catalogManager.getCurrentDatabase()); + this.frameworkConfig = plannerContext.createFrameworkConfig(); + } + + @Override + public List parse(String statement) { + return super.parse(statement); + } +} diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserFactory.java new file mode 100644 index 0000000000000..b23603e1226a8 --- /dev/null +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserFactory.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.delegation.hive; + +import org.apache.flink.table.api.SqlDialect; +import org.apache.flink.table.api.config.TableConfigOptions; +import org.apache.flink.table.catalog.CatalogManager; +import org.apache.flink.table.delegation.Parser; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.planner.calcite.SqlExprToRexConverterFactory; +import org.apache.flink.table.planner.delegation.ParserFactory; +import org.apache.flink.table.planner.delegation.PlannerContext; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** A Parser factory that creates {@link HiveParser}. */ +public class HiveParserFactory implements ParserFactory { + + @Override + public Parser create(CatalogManager catalogManager, PlannerContext plannerContext) { + SqlExprToRexConverterFactory sqlExprToRexConverterFactory = + plannerContext::createSqlExprToRexConverter; + return new HiveParser( + catalogManager, + () -> + plannerContext.createFlinkPlanner( + catalogManager.getCurrentCatalog(), + catalogManager.getCurrentDatabase()), + plannerContext::createCalciteParser, + tableSchema -> + sqlExprToRexConverterFactory.create( + plannerContext.getTypeFactory().buildRelNodeRowType(tableSchema)), + plannerContext); + } + + @Override + public Map optionalContext() { + DescriptorProperties properties = new DescriptorProperties(); + return properties.asMap(); + } + + @Override + public Map requiredContext() { + DescriptorProperties properties = new DescriptorProperties(); + properties.putString( + TableConfigOptions.TABLE_SQL_DIALECT.key(), SqlDialect.HIVE.name().toLowerCase()); + return properties.asMap(); + } + + @Override + public List supportedProperties() { + return Collections.singletonList(TableConfigOptions.TABLE_SQL_DIALECT.key()); + } +} diff --git a/flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory index e1871cfae2cb3..0e5b4574d4e5b 100644 --- a/flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory +++ b/flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory @@ -15,3 +15,4 @@ org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory org.apache.flink.table.module.hive.HiveModuleFactory +org.apache.flink.table.planner.delegation.hive.HiveParserFactory diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java index 1f125b50e18b3..6f399bf2a0cb6 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java @@ -24,12 +24,15 @@ import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.internal.TableEnvironmentInternal; import org.apache.flink.table.catalog.CatalogPartitionSpec; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.config.CatalogConfig; import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.table.catalog.hive.HiveTestUtils; +import org.apache.flink.table.delegation.Parser; +import org.apache.flink.table.planner.delegation.hive.HiveParser; import org.apache.flink.types.Row; import org.apache.flink.util.CollectionUtil; import org.apache.flink.util.FileUtils; @@ -65,6 +68,8 @@ import static org.apache.flink.table.api.EnvironmentSettings.DEFAULT_BUILTIN_DATABASE; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; /** Test Hive syntax when Hive dialect is used. */ @@ -99,6 +104,21 @@ public void tearDown() { } } + @Test + public void testPluggableParser() { + TableEnvironmentInternal tableEnvInternal = (TableEnvironmentInternal) tableEnv; + Parser parser = tableEnvInternal.getParser(); + // hive dialect should use HiveParser + assertTrue(parser instanceof HiveParser); + // execute some sql and verify the parser instance is reused + tableEnvInternal.executeSql("show databases"); + assertSame(parser, tableEnvInternal.getParser()); + // switching dialect will result in a new parser + tableEnvInternal.getConfig().setSqlDialect(SqlDialect.DEFAULT); + assertNotEquals( + parser.getClass().getName(), tableEnvInternal.getParser().getClass().getName()); + } + @Test public void testCreateDatabase() throws Exception { tableEnv.executeSql("create database db1 comment 'db1 comment'"); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 6de689c093709..b9d1fcfe90b03 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -174,7 +174,6 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { protected final Executor execEnv; protected final FunctionCatalog functionCatalog; protected final Planner planner; - protected final Parser parser; private final boolean isStreamingMode; private final ClassLoader userClassLoader; private static final String UNSUPPORTED_QUERY_IN_SQL_UPDATE_MSG = @@ -195,7 +194,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { @Override public void createTemporaryTable(String path, CatalogBaseTable table) { - UnresolvedIdentifier unresolvedIdentifier = parser.parseIdentifier(path); + UnresolvedIdentifier unresolvedIdentifier = getParser().parseIdentifier(path); ObjectIdentifier objectIdentifier = catalogManager.qualifyIdentifier(unresolvedIdentifier); catalogManager.createTemporaryTable(table, objectIdentifier, false); @@ -221,18 +220,17 @@ protected TableEnvironmentImpl( this.functionCatalog = functionCatalog; this.planner = planner; - this.parser = planner.getParser(); this.isStreamingMode = isStreamingMode; this.userClassLoader = userClassLoader; this.operationTreeBuilder = OperationTreeBuilder.create( tableConfig, - functionCatalog.asLookup(parser::parseIdentifier), + functionCatalog.asLookup(getParser()::parseIdentifier), catalogManager.getDataTypeFactory(), path -> { try { UnresolvedIdentifier unresolvedIdentifier = - parser.parseIdentifier(path); + getParser().parseIdentifier(path); Optional catalogQueryOperation = scanInternal(unresolvedIdentifier); return catalogQueryOperation.map( @@ -250,7 +248,7 @@ protected TableEnvironmentImpl( }, (sqlExpression, inputSchema) -> { try { - return parser.parseSqlExpression(sqlExpression, inputSchema); + return getParser().parseSqlExpression(sqlExpression, inputSchema); } catch (Throwable t) { throw new ValidationException( String.format("Invalid SQL expression: %s", sqlExpression), @@ -420,14 +418,14 @@ public void createFunction( String path, Class functionClass, boolean ignoreIfExists) { - final UnresolvedIdentifier unresolvedIdentifier = parser.parseIdentifier(path); + final UnresolvedIdentifier unresolvedIdentifier = getParser().parseIdentifier(path); functionCatalog.registerCatalogFunction( unresolvedIdentifier, functionClass, ignoreIfExists); } @Override public boolean dropFunction(String path) { - final UnresolvedIdentifier unresolvedIdentifier = parser.parseIdentifier(path); + final UnresolvedIdentifier unresolvedIdentifier = getParser().parseIdentifier(path); return functionCatalog.dropCatalogFunction(unresolvedIdentifier, true); } @@ -441,14 +439,14 @@ public void createTemporaryFunction( @Override public void createTemporaryFunction(String path, UserDefinedFunction functionInstance) { - final UnresolvedIdentifier unresolvedIdentifier = parser.parseIdentifier(path); + final UnresolvedIdentifier unresolvedIdentifier = getParser().parseIdentifier(path); functionCatalog.registerTemporaryCatalogFunction( unresolvedIdentifier, functionInstance, false); } @Override public boolean dropTemporaryFunction(String path) { - final UnresolvedIdentifier unresolvedIdentifier = parser.parseIdentifier(path); + final UnresolvedIdentifier unresolvedIdentifier = getParser().parseIdentifier(path); return functionCatalog.dropTemporaryCatalogFunction(unresolvedIdentifier, true); } @@ -460,7 +458,7 @@ public void registerTable(String name, Table table) { @Override public void createTemporaryView(String path, Table view) { - UnresolvedIdentifier identifier = parser.parseIdentifier(path); + UnresolvedIdentifier identifier = getParser().parseIdentifier(path); createTemporaryView(identifier, view); } @@ -492,7 +490,7 @@ public Table scan(String... tablePath) { @Override public Table from(String path) { - UnresolvedIdentifier unresolvedIdentifier = parser.parseIdentifier(path); + UnresolvedIdentifier unresolvedIdentifier = getParser().parseIdentifier(path); return scanInternal(unresolvedIdentifier) .map(this::createTable) .orElseThrow( @@ -504,7 +502,7 @@ public Table from(String path) { @Override public void insertInto(String targetPath, Table table) { - UnresolvedIdentifier unresolvedIdentifier = parser.parseIdentifier(targetPath); + UnresolvedIdentifier unresolvedIdentifier = getParser().parseIdentifier(targetPath); insertIntoInternal(unresolvedIdentifier, table); } @@ -587,7 +585,7 @@ public String[] listTemporaryViews() { @Override public boolean dropTemporaryTable(String path) { - UnresolvedIdentifier unresolvedIdentifier = parser.parseIdentifier(path); + UnresolvedIdentifier unresolvedIdentifier = getParser().parseIdentifier(path); ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier); try { catalogManager.dropTemporaryTable(identifier, false); @@ -599,7 +597,7 @@ public boolean dropTemporaryTable(String path) { @Override public boolean dropTemporaryView(String path) { - UnresolvedIdentifier unresolvedIdentifier = parser.parseIdentifier(path); + UnresolvedIdentifier unresolvedIdentifier = getParser().parseIdentifier(path); ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier); try { catalogManager.dropTemporaryView(identifier, false); @@ -641,7 +639,7 @@ public String explain(boolean extended) { @Override public String explainSql(String statement, ExplainDetail... extraDetails) { - List operations = parser.parse(statement); + List operations = getParser().parse(statement); if (operations.size() != 1) { throw new TableException( @@ -663,7 +661,7 @@ public String[] getCompletionHints(String statement, int position) { @Override public Table sqlQuery(String query) { - List operations = parser.parse(query); + List operations = getParser().parse(query); if (operations.size() != 1) { throw new ValidationException( @@ -683,7 +681,7 @@ public Table sqlQuery(String query) { @Override public TableResult executeSql(String statement) { - List operations = parser.parse(statement); + List operations = getParser().parse(statement); if (operations.size() != 1) { throw new TableException(UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG); @@ -761,7 +759,7 @@ public TableResult executeInternal(QueryOperation operation) { @Override public void sqlUpdate(String stmt) { - List operations = parser.parse(stmt); + List operations = getParser().parse(stmt); if (operations.size() != 1) { throw new TableException(UNSUPPORTED_QUERY_IN_SQL_UPDATE_MSG); @@ -1362,7 +1360,7 @@ public JobExecutionResult execute(String jobName) throws Exception { @Override public Parser getParser() { - return parser; + return getPlanner().getParser(); } @Override @@ -1635,12 +1633,12 @@ protected TableImpl createTable(QueryOperation tableOperation) { this, tableOperation, operationTreeBuilder, - functionCatalog.asLookup(parser::parseIdentifier)); + functionCatalog.asLookup(getParser()::parseIdentifier)); } @Override public String getJsonPlan(String stmt) { - List operations = parser.parse(stmt); + List operations = getParser().parse(stmt); if (operations.size() != 1) { throw new TableException( "Unsupported SQL query! getJsonPlan() only accepts a single INSERT statement."); diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/DefaultParserFactory.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/DefaultParserFactory.java new file mode 100644 index 0000000000000..af968a06468ff --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/DefaultParserFactory.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.delegation; + +import org.apache.flink.table.api.SqlDialect; +import org.apache.flink.table.api.config.TableConfigOptions; +import org.apache.flink.table.catalog.CatalogManager; +import org.apache.flink.table.delegation.Parser; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.planner.calcite.SqlExprToRexConverterFactory; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** A Parser factory that creates {@link ParserImpl}. */ +public class DefaultParserFactory implements ParserFactory { + @Override + public Parser create(CatalogManager catalogManager, PlannerContext plannerContext) { + SqlExprToRexConverterFactory sqlExprToRexConverterFactory = + plannerContext::createSqlExprToRexConverter; + return new ParserImpl( + catalogManager, + () -> + plannerContext.createFlinkPlanner( + catalogManager.getCurrentCatalog(), + catalogManager.getCurrentDatabase()), + plannerContext::createCalciteParser, + tableSchema -> + sqlExprToRexConverterFactory.create( + plannerContext.getTypeFactory().buildRelNodeRowType(tableSchema))); + } + + @Override + public Map optionalContext() { + DescriptorProperties properties = new DescriptorProperties(); + return properties.asMap(); + } + + @Override + public Map requiredContext() { + DescriptorProperties properties = new DescriptorProperties(); + properties.putString( + TableConfigOptions.TABLE_SQL_DIALECT.key(), + SqlDialect.DEFAULT.name().toLowerCase()); + return properties.asMap(); + } + + @Override + public List supportedProperties() { + return Collections.singletonList(TableConfigOptions.TABLE_SQL_DIALECT.key()); + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ParserFactory.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ParserFactory.java new file mode 100644 index 0000000000000..f41ff00dcb5d4 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ParserFactory.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.delegation; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.catalog.CatalogManager; +import org.apache.flink.table.delegation.Parser; +import org.apache.flink.table.factories.ComponentFactory; + +/** + * Factory that creates {@link Parser}. + * + *

This factory is used with Java's Service Provider Interfaces (SPI) for discovering. A factory + * is called with a set of normalized properties that describe the desired configuration. Those + * properties may include table configurations like SQL dialect. + */ +@Internal +public interface ParserFactory extends ComponentFactory { + Parser create(CatalogManager catalogManager, PlannerContext plannerContext); +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java index 0a189f9ab6dc9..2b4bce781aaa6 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java @@ -134,7 +134,7 @@ public SqlExprToRexConverter createSqlExprToRexConverter(RelDataType rowType) { rowType); } - private FrameworkConfig createFrameworkConfig() { + public FrameworkConfig createFrameworkConfig() { return Frameworks.newConfigBuilder() .defaultSchema(rootSchema.plus()) .parserConfig(getSqlParserConfig()) @@ -202,7 +202,7 @@ public CalciteParser createCalciteParser() { return new CalciteParser(getSqlParserConfig()); } - private FlinkCalciteCatalogReader createCatalogReader( + public FlinkCalciteCatalogReader createCatalogReader( boolean lenientCaseSensitivity, String currentCatalog, String currentDatabase) { SqlParser.Config sqlParserConfig = getSqlParserConfig(); final boolean caseSensitive; diff --git a/flink-table/flink-table-planner-blink/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-table/flink-table-planner-blink/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory index c69bce3032bfe..d8c15c5e69171 100644 --- a/flink-table/flink-table-planner-blink/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory +++ b/flink-table/flink-table-planner-blink/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory @@ -15,3 +15,4 @@ org.apache.flink.table.planner.delegation.BlinkPlannerFactory org.apache.flink.table.planner.delegation.BlinkExecutorFactory +org.apache.flink.table.planner.delegation.DefaultParserFactory diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala index ba410408e702e..da24edbac9bce 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala @@ -21,13 +21,13 @@ package org.apache.flink.table.planner.delegation import org.apache.flink.annotation.VisibleForTesting import org.apache.flink.api.dag.Transformation import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment -import org.apache.flink.table.api.config.ExecutionConfigOptions -import org.apache.flink.table.api.{TableConfig, TableEnvironment, TableException, TableSchema} +import org.apache.flink.table.api.config.{ExecutionConfigOptions, TableConfigOptions} +import org.apache.flink.table.api.{SqlDialect, TableConfig, TableEnvironment, TableException, TableSchema} import org.apache.flink.table.catalog._ import org.apache.flink.table.connector.sink.DynamicTableSink import org.apache.flink.table.delegation.{Executor, Parser, Planner} import org.apache.flink.table.descriptors.{ConnectorDescriptorValidator, DescriptorProperties} -import org.apache.flink.table.factories.{FactoryUtil, TableFactoryUtil} +import org.apache.flink.table.factories.{ComponentFactoryService, FactoryUtil, TableFactoryUtil} import org.apache.flink.table.operations.OutputConversionModifyOperation.UpdateMode import org.apache.flink.table.operations._ import org.apache.flink.table.planner.JMap @@ -57,7 +57,6 @@ import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.tools.FrameworkConfig import java.util -import java.util.function.{Function => JFunction, Supplier => JSupplier} import _root_.scala.collection.JavaConversions._ @@ -91,23 +90,8 @@ abstract class PlannerBase( plannerContext.createSqlExprToRexConverter(tableRowType) } - private val parser: Parser = new ParserImpl( - catalogManager, - new JSupplier[FlinkPlannerImpl] { - override def get(): FlinkPlannerImpl = createFlinkPlanner - }, - // we do not cache the parser in order to use the most up to - // date configuration. Users might change parser configuration in TableConfig in between - // parsing statements - new JSupplier[CalciteParser] { - override def get(): CalciteParser = plannerContext.createCalciteParser() - }, - new JFunction[TableSchema, SqlExprToRexConverter] { - override def apply(t: TableSchema): SqlExprToRexConverter = { - sqlExprToRexConverterFactory.create(plannerContext.getTypeFactory.buildRelNodeRowType(t)) - } - } - ) + private var parser: Parser = _ + private var currentDialect: SqlDialect = getTableConfig.getSqlDialect @VisibleForTesting private[flink] val plannerContext: PlannerContext = @@ -151,7 +135,20 @@ abstract class PlannerBase( executor.asInstanceOf[ExecutorBase].getExecutionEnvironment } - override def getParser: Parser = parser + def createNewParser: Parser = { + val parserProps = Map(TableConfigOptions.TABLE_SQL_DIALECT.key() -> + getTableConfig.getSqlDialect.name().toLowerCase) + ComponentFactoryService.find(classOf[ParserFactory], parserProps) + .create(catalogManager, plannerContext) + } + + override def getParser: Parser = { + if (parser == null || getTableConfig.getSqlDialect != currentDialect) { + parser = createNewParser + currentDialect = getTableConfig.getSqlDialect + } + parser + } override def translate( modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]] = {