diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveDialectFactory.java similarity index 83% rename from flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserFactory.java rename to flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveDialectFactory.java index 92dce590d5bb1..10b8dce8f9a0b 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserFactory.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveDialectFactory.java @@ -20,14 +20,15 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.table.api.SqlDialect; +import org.apache.flink.table.delegation.ExtendedOperationExecutor; import org.apache.flink.table.delegation.Parser; -import org.apache.flink.table.planner.delegation.ParserFactory; +import org.apache.flink.table.planner.delegation.DialectFactory; import java.util.Collections; import java.util.Set; /** A Parser factory that creates {@link HiveParser}. */ -public class HiveParserFactory implements ParserFactory { +public class HiveDialectFactory implements DialectFactory { @Override public String factoryIdentifier() { @@ -52,4 +53,9 @@ public Parser create(Context context) { context.getPlannerContext()::createCalciteParser, context.getPlannerContext()); } + + @Override + public ExtendedOperationExecutor createExtendedOperationExecutor(Context context) { + return new HiveOperationExecutor(); + } } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java new file mode 100644 index 0000000000000..2eb02ba27a678 --- /dev/null +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.delegation.hive; + +import org.apache.flink.table.api.internal.TableResultInternal; +import org.apache.flink.table.delegation.ExtendedOperationExecutor; +import org.apache.flink.table.operations.Operation; + +import java.util.Optional; + +/** + * A Hive's operation executor used to execute operation in custom way instead of Flink's + * implementation. + */ +public class HiveOperationExecutor implements ExtendedOperationExecutor { + @Override + public Optional executeOperation(Operation operation) { + return Optional.empty(); + } +} diff --git a/flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory index 929abe11d6e3e..dac2f67e247f4 100644 --- a/flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -16,4 +16,4 @@ org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory org.apache.flink.table.endpoint.hive.HiveServer2EndpointFactory org.apache.flink.table.module.hive.HiveModuleFactory -org.apache.flink.table.planner.delegation.hive.HiveParserFactory +org.apache.flink.table.planner.delegation.hive.HiveDialectFactory diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java index a1e868ab111b9..a861cde811446 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java @@ -25,6 +25,7 @@ import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.api.internal.TableEnvironmentImpl; import org.apache.flink.table.api.internal.TableEnvironmentInternal; import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogPartitionSpec; @@ -34,6 +35,7 @@ import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.table.catalog.hive.HiveTestUtils; +import org.apache.flink.table.delegation.ExtendedOperationExecutor; import org.apache.flink.table.delegation.Parser; import org.apache.flink.table.operations.DescribeTableOperation; import org.apache.flink.table.operations.command.ClearOperation; @@ -41,6 +43,7 @@ import org.apache.flink.table.operations.command.QuitOperation; import org.apache.flink.table.operations.command.ResetOperation; import org.apache.flink.table.operations.command.SetOperation; +import org.apache.flink.table.planner.delegation.hive.HiveOperationExecutor; import org.apache.flink.table.planner.delegation.hive.HiveParser; import org.apache.flink.table.utils.CatalogManagerMocks; import org.apache.flink.types.Row; @@ -114,18 +117,30 @@ public void tearDown() { } @Test - public void testPluggableParser() { + public void testPluggableDialect() { TableEnvironmentInternal tableEnvInternal = (TableEnvironmentInternal) tableEnv; Parser parser = tableEnvInternal.getParser(); // hive dialect should use HiveParser assertThat(parser).isInstanceOf(HiveParser.class); - // execute some sql and verify the parser instance is reused + ExtendedOperationExecutor operationExecutor = + ((TableEnvironmentImpl) tableEnvInternal).getExtendedOperationExecutor(); + // hive dialect should use HiveOperationExecutor + assertThat(operationExecutor).isInstanceOf(HiveOperationExecutor.class); + // execute some sql and verify the parser/operation executor instance is reused tableEnvInternal.executeSql("show databases"); assertThat(tableEnvInternal.getParser()).isSameAs(parser); + assertThat(((TableEnvironmentImpl) tableEnvInternal).getExtendedOperationExecutor()) + .isSameAs(operationExecutor); // switching dialect will result in a new parser tableEnvInternal.getConfig().setSqlDialect(SqlDialect.DEFAULT); assertThat(tableEnvInternal.getParser().getClass().getName()) .isNotEqualTo(parser.getClass().getName()); + assertThat( + ((TableEnvironmentImpl) tableEnvInternal) + .getExtendedOperationExecutor() + .getClass() + .getName()) + .isNotEqualTo(operationExecutor.getClass().getName()); } @Test diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 68b97a0d279d0..d4e10108d3893 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -73,6 +73,7 @@ import org.apache.flink.table.catalog.exceptions.TableNotExistException; import org.apache.flink.table.delegation.Executor; import org.apache.flink.table.delegation.ExecutorFactory; +import org.apache.flink.table.delegation.ExtendedOperationExecutor; import org.apache.flink.table.delegation.InternalPlan; import org.apache.flink.table.delegation.Parser; import org.apache.flink.table.delegation.Planner; @@ -703,7 +704,8 @@ public TableResult executeSql(String statement) { throw new TableException(UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG); } - return executeInternal(operations.get(0)); + Operation operation = operations.get(0); + return executeInternal(operation); } @Override @@ -872,6 +874,14 @@ private TableResultInternal executeQueryOperation(QueryOperation operation) { @Override public TableResultInternal executeInternal(Operation operation) { + // try to use extended operation executor to execute the operation + Optional tableResult = + getExtendedOperationExecutor().executeOperation(operation); + // if the extended operation executor return non-empty result, return it + if (tableResult.isPresent()) { + return tableResult.get(); + } + // otherwise, fall back to internal implementation if (operation instanceof ModifyOperation) { return executeInternal(Collections.singletonList((ModifyOperation) operation)); } else if (operation instanceof StatementSetOperation) { @@ -1639,6 +1649,10 @@ public Parser getParser() { return getPlanner().getParser(); } + public ExtendedOperationExecutor getExtendedOperationExecutor() { + return getPlanner().getExtendedOperationExecutor(); + } + @Override public CatalogManager getCatalogManager() { return catalogManager; diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExtendedOperationExecutor.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExtendedOperationExecutor.java new file mode 100644 index 0000000000000..a3a0b2189c394 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExtendedOperationExecutor.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.delegation; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.internal.TableResultInternal; +import org.apache.flink.table.operations.Operation; + +import java.util.Optional; + +/** + * An extended operation executor which provides method for executing operation. External pluggable + * dialect can implement this interface to execute operation in its own way instead of using Flink's + * own implementation for operation execution. + */ +@Internal +public interface ExtendedOperationExecutor { + + /** + * Execute the given operation and return the execution result. This method will delegate + * Flink's own operation execution. + * + *

If return Optional.empty(), the operation will then fall to Flink's operation execution. + */ + Optional executeOperation(Operation operation); +} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java index b9e33df8bf79f..ee969a19cc3c0 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java @@ -62,6 +62,14 @@ public interface Planner { */ Parser getParser(); + /** + * Retrieves a {@link ExtendedOperationExecutor} that provides method for executing operation in + * a custom way. + * + * @return initialized {@link ExtendedOperationExecutor} + */ + ExtendedOperationExecutor getExtendedOperationExecutor(); + /** * Converts a relational tree of {@link ModifyOperation}s into a set of runnable {@link * Transformation}s. diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java index a8f45523458b3..c45a184eb59d9 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java @@ -21,6 +21,7 @@ import org.apache.flink.api.dag.Transformation; import org.apache.flink.table.api.ExplainDetail; import org.apache.flink.table.api.PlanReference; +import org.apache.flink.table.delegation.ExtendedOperationExecutor; import org.apache.flink.table.delegation.InternalPlan; import org.apache.flink.table.delegation.Parser; import org.apache.flink.table.delegation.Planner; @@ -29,6 +30,7 @@ import java.io.IOException; import java.util.List; +import java.util.Optional; /** Mocking {@link Planner} for tests. */ public class PlannerMock implements Planner { @@ -38,6 +40,11 @@ public Parser getParser() { return new ParserMock(); } + @Override + public ExtendedOperationExecutor getExtendedOperationExecutor() { + return (operation) -> Optional.empty(); + } + @Override public List> translate(List modifyOperations) { return null; diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DefaultParserFactory.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DefaultDialectFactory.java similarity index 96% rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DefaultParserFactory.java rename to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DefaultDialectFactory.java index e75efbe343209..512a5f428e834 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DefaultParserFactory.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DefaultDialectFactory.java @@ -26,7 +26,7 @@ import java.util.Set; /** A Parser factory that creates {@link ParserImpl}. */ -public class DefaultParserFactory implements ParserFactory { +public class DefaultDialectFactory implements DialectFactory { @Override public String factoryIdentifier() { diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/ParserFactory.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DialectFactory.java similarity index 59% rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/ParserFactory.java rename to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DialectFactory.java index 9c1ef0cd0e1f5..6deebab3dceb6 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/ParserFactory.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DialectFactory.java @@ -20,37 +20,52 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.table.api.config.TableConfigOptions; +import org.apache.flink.table.api.internal.TableResultInternal; import org.apache.flink.table.catalog.CatalogManager; +import org.apache.flink.table.delegation.Executor; +import org.apache.flink.table.delegation.ExtendedOperationExecutor; import org.apache.flink.table.delegation.Parser; import org.apache.flink.table.factories.Factory; +import org.apache.flink.table.operations.Operation; + +import java.util.Optional; /** - * Factory that creates {@link Parser}. + * Factory that creates {@link Parser} and {@link ExtendedOperationExecutor}. * *

The {@link #factoryIdentifier()} is identified by matching it against {@link * TableConfigOptions#TABLE_SQL_DIALECT}. */ @Internal -public interface ParserFactory extends Factory { +public interface DialectFactory extends Factory { /** Creates a new parser. */ Parser create(Context context); + default ExtendedOperationExecutor createExtendedOperationExecutor(Context context) { + return new EmptyOperationExecutor(); + } + /** Context provided when a parser is created. */ interface Context { CatalogManager getCatalogManager(); PlannerContext getPlannerContext(); + + Executor getExecutor(); } /** Default implementation for {@link Context}. */ class DefaultParserContext implements Context { private final CatalogManager catalogManager; private final PlannerContext plannerContext; + private final Executor executor; - public DefaultParserContext(CatalogManager catalogManager, PlannerContext plannerContext) { + public DefaultParserContext( + CatalogManager catalogManager, PlannerContext plannerContext, Executor executor) { this.catalogManager = catalogManager; this.plannerContext = plannerContext; + this.executor = executor; } @Override @@ -62,5 +77,23 @@ public CatalogManager getCatalogManager() { public PlannerContext getPlannerContext() { return plannerContext; } + + @Override + public Executor getExecutor() { + return executor; + } + } + + /** + * Default implementation for {@link ExtendedOperationExecutor} that doesn't extend any + * operation behavior but forward all operations to the Flink planner. + */ + class EmptyOperationExecutor implements ExtendedOperationExecutor { + + @Override + public Optional executeOperation(Operation operation) { + // return empty so that it'll use Flink's own implementation for operation execution. + return Optional.empty(); + } } } diff --git a/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory index ac5f9b6bb1767..6121bb963fce0 100644 --- a/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -14,5 +14,5 @@ # limitations under the License. org.apache.flink.table.planner.delegation.DefaultExecutorFactory -org.apache.flink.table.planner.delegation.DefaultParserFactory +org.apache.flink.table.planner.delegation.DefaultDialectFactory org.apache.flink.table.planner.delegation.DefaultPlannerFactory diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala index 1c8c6b7b2ff9e..8d00017f46e5c 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala @@ -26,7 +26,7 @@ import org.apache.flink.table.api.config.ExecutionConfigOptions import org.apache.flink.table.catalog._ import org.apache.flink.table.catalog.ManagedTableListener.isManagedTable import org.apache.flink.table.connector.sink.DynamicTableSink -import org.apache.flink.table.delegation.{Executor, Parser, Planner} +import org.apache.flink.table.delegation.{Executor, ExtendedOperationExecutor, Parser, Planner} import org.apache.flink.table.factories.{DynamicTableSinkFactory, FactoryUtil, TableFactoryUtil} import org.apache.flink.table.module.{Module, ModuleManager} import org.apache.flink.table.operations._ @@ -36,7 +36,6 @@ import org.apache.flink.table.planner.calcite._ import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema import org.apache.flink.table.planner.connectors.DynamicSinkUtils import org.apache.flink.table.planner.connectors.DynamicSinkUtils.validateSchemaAndApplyImplicitCast -import org.apache.flink.table.planner.delegation.ParserFactory.DefaultParserContext import org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl import org.apache.flink.table.planner.hint.FlinkHints import org.apache.flink.table.planner.operations.PlannerQueryOperation @@ -56,6 +55,7 @@ import org.apache.flink.table.sinks.TableSink import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter import _root_.scala.collection.JavaConversions._ +import DialectFactory.DefaultParserContext import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema import org.apache.calcite.plan.{RelTrait, RelTraitDef} import org.apache.calcite.rel.RelNode @@ -100,7 +100,9 @@ abstract class PlannerBase( // temporary utility until we don't use planner expressions anymore functionCatalog.setPlannerTypeInferenceUtil(PlannerTypeInferenceUtilImpl.INSTANCE) + private var dialectFactory: DialectFactory = _ private var parser: Parser = _ + private var extendedOperationExecutor: ExtendedOperationExecutor = _ private var currentDialect: SqlDialect = getTableConfig.getSqlDialect @VisibleForTesting @@ -147,25 +149,38 @@ abstract class PlannerBase( executor.asInstanceOf[DefaultExecutor].getExecutionEnvironment } - def createNewParser: Parser = { - val factoryIdentifier = getTableConfig.getSqlDialect.name().toLowerCase - val parserFactory = FactoryUtil.discoverFactory( - getClass.getClassLoader, - classOf[ParserFactory], - factoryIdentifier) - - val context = new DefaultParserContext(catalogManager, plannerContext) - parserFactory.create(context) + def getDialectFactory: DialectFactory = { + if (dialectFactory == null || getTableConfig.getSqlDialect != currentDialect) { + val factoryIdentifier = getTableConfig.getSqlDialect.name().toLowerCase + dialectFactory = FactoryUtil.discoverFactory( + getClass.getClassLoader, + classOf[DialectFactory], + factoryIdentifier) + currentDialect = getTableConfig.getSqlDialect + parser = null + extendedOperationExecutor = null + } + dialectFactory } override def getParser: Parser = { if (parser == null || getTableConfig.getSqlDialect != currentDialect) { - parser = createNewParser - currentDialect = getTableConfig.getSqlDialect + dialectFactory = getDialectFactory + parser = + dialectFactory.create(new DefaultParserContext(catalogManager, plannerContext, executor)) } parser } + override def getExtendedOperationExecutor: ExtendedOperationExecutor = { + if (extendedOperationExecutor == null || getTableConfig.getSqlDialect != currentDialect) { + dialectFactory = getDialectFactory + extendedOperationExecutor = dialectFactory.createExtendedOperationExecutor( + new DefaultParserContext(catalogManager, plannerContext, executor)) + } + extendedOperationExecutor + } + override def translate( modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]] = { beforeTranslation()