Skip to content

Commit

Permalink
[FLINK-28628][table] Introduce Operation Execution Plugin (apache#20247)
Browse files Browse the repository at this point in the history
  • Loading branch information
luoyuxia committed Jul 26, 2022
1 parent 94c3daa commit c7af66b
Show file tree
Hide file tree
Showing 12 changed files with 200 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.delegation.ExtendedOperationExecutor;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.planner.delegation.ParserFactory;
import org.apache.flink.table.planner.delegation.DialectFactory;

import java.util.Collections;
import java.util.Set;

/** A Parser factory that creates {@link HiveParser}. */
public class HiveParserFactory implements ParserFactory {
public class HiveDialectFactory implements DialectFactory {

@Override
public String factoryIdentifier() {
Expand All @@ -52,4 +53,9 @@ public Parser create(Context context) {
context.getPlannerContext()::createCalciteParser,
context.getPlannerContext());
}

@Override
public ExtendedOperationExecutor createExtendedOperationExecutor(Context context) {
return new HiveOperationExecutor();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.delegation.hive;

import org.apache.flink.table.api.internal.TableResultInternal;
import org.apache.flink.table.delegation.ExtendedOperationExecutor;
import org.apache.flink.table.operations.Operation;

import java.util.Optional;

/**
* A Hive's operation executor used to execute operation in custom way instead of Flink's
* implementation.
*/
public class HiveOperationExecutor implements ExtendedOperationExecutor {
@Override
public Optional<TableResultInternal> executeOperation(Operation operation) {
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@
org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory
org.apache.flink.table.endpoint.hive.HiveServer2EndpointFactory
org.apache.flink.table.module.hive.HiveModuleFactory
org.apache.flink.table.planner.delegation.hive.HiveParserFactory
org.apache.flink.table.planner.delegation.hive.HiveDialectFactory
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
Expand All @@ -34,13 +35,15 @@
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.delegation.ExtendedOperationExecutor;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.operations.DescribeTableOperation;
import org.apache.flink.table.operations.command.ClearOperation;
import org.apache.flink.table.operations.command.HelpOperation;
import org.apache.flink.table.operations.command.QuitOperation;
import org.apache.flink.table.operations.command.ResetOperation;
import org.apache.flink.table.operations.command.SetOperation;
import org.apache.flink.table.planner.delegation.hive.HiveOperationExecutor;
import org.apache.flink.table.planner.delegation.hive.HiveParser;
import org.apache.flink.table.utils.CatalogManagerMocks;
import org.apache.flink.types.Row;
Expand Down Expand Up @@ -114,18 +117,30 @@ public void tearDown() {
}

@Test
public void testPluggableParser() {
public void testPluggableDialect() {
TableEnvironmentInternal tableEnvInternal = (TableEnvironmentInternal) tableEnv;
Parser parser = tableEnvInternal.getParser();
// hive dialect should use HiveParser
assertThat(parser).isInstanceOf(HiveParser.class);
// execute some sql and verify the parser instance is reused
ExtendedOperationExecutor operationExecutor =
((TableEnvironmentImpl) tableEnvInternal).getExtendedOperationExecutor();
// hive dialect should use HiveOperationExecutor
assertThat(operationExecutor).isInstanceOf(HiveOperationExecutor.class);
// execute some sql and verify the parser/operation executor instance is reused
tableEnvInternal.executeSql("show databases");
assertThat(tableEnvInternal.getParser()).isSameAs(parser);
assertThat(((TableEnvironmentImpl) tableEnvInternal).getExtendedOperationExecutor())
.isSameAs(operationExecutor);
// switching dialect will result in a new parser
tableEnvInternal.getConfig().setSqlDialect(SqlDialect.DEFAULT);
assertThat(tableEnvInternal.getParser().getClass().getName())
.isNotEqualTo(parser.getClass().getName());
assertThat(
((TableEnvironmentImpl) tableEnvInternal)
.getExtendedOperationExecutor()
.getClass()
.getName())
.isNotEqualTo(operationExecutor.getClass().getName());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.ExecutorFactory;
import org.apache.flink.table.delegation.ExtendedOperationExecutor;
import org.apache.flink.table.delegation.InternalPlan;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.delegation.Planner;
Expand Down Expand Up @@ -703,7 +704,8 @@ public TableResult executeSql(String statement) {
throw new TableException(UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG);
}

return executeInternal(operations.get(0));
Operation operation = operations.get(0);
return executeInternal(operation);
}

@Override
Expand Down Expand Up @@ -872,6 +874,14 @@ private TableResultInternal executeQueryOperation(QueryOperation operation) {

@Override
public TableResultInternal executeInternal(Operation operation) {
// try to use extended operation executor to execute the operation
Optional<TableResultInternal> tableResult =
getExtendedOperationExecutor().executeOperation(operation);
// if the extended operation executor return non-empty result, return it
if (tableResult.isPresent()) {
return tableResult.get();
}
// otherwise, fall back to internal implementation
if (operation instanceof ModifyOperation) {
return executeInternal(Collections.singletonList((ModifyOperation) operation));
} else if (operation instanceof StatementSetOperation) {
Expand Down Expand Up @@ -1639,6 +1649,10 @@ public Parser getParser() {
return getPlanner().getParser();
}

public ExtendedOperationExecutor getExtendedOperationExecutor() {
return getPlanner().getExtendedOperationExecutor();
}

@Override
public CatalogManager getCatalogManager() {
return catalogManager;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.delegation;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.internal.TableResultInternal;
import org.apache.flink.table.operations.Operation;

import java.util.Optional;

/**
* An extended operation executor which provides method for executing operation. External pluggable
* dialect can implement this interface to execute operation in its own way instead of using Flink's
* own implementation for operation execution.
*/
@Internal
public interface ExtendedOperationExecutor {

/**
* Execute the given operation and return the execution result. This method will delegate
* Flink's own operation execution.
*
* <p>If return Optional.empty(), the operation will then fall to Flink's operation execution.
*/
Optional<TableResultInternal> executeOperation(Operation operation);
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ public interface Planner {
*/
Parser getParser();

/**
* Retrieves a {@link ExtendedOperationExecutor} that provides method for executing operation in
* a custom way.
*
* @return initialized {@link ExtendedOperationExecutor}
*/
ExtendedOperationExecutor getExtendedOperationExecutor();

/**
* Converts a relational tree of {@link ModifyOperation}s into a set of runnable {@link
* Transformation}s.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.PlanReference;
import org.apache.flink.table.delegation.ExtendedOperationExecutor;
import org.apache.flink.table.delegation.InternalPlan;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.delegation.Planner;
Expand All @@ -29,6 +30,7 @@

import java.io.IOException;
import java.util.List;
import java.util.Optional;

/** Mocking {@link Planner} for tests. */
public class PlannerMock implements Planner {
Expand All @@ -38,6 +40,11 @@ public Parser getParser() {
return new ParserMock();
}

@Override
public ExtendedOperationExecutor getExtendedOperationExecutor() {
return (operation) -> Optional.empty();
}

@Override
public List<Transformation<?>> translate(List<ModifyOperation> modifyOperations) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import java.util.Set;

/** A Parser factory that creates {@link ParserImpl}. */
public class DefaultParserFactory implements ParserFactory {
public class DefaultDialectFactory implements DialectFactory {

@Override
public String factoryIdentifier() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,37 +20,52 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.api.internal.TableResultInternal;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.ExtendedOperationExecutor;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.operations.Operation;

import java.util.Optional;

/**
* Factory that creates {@link Parser}.
* Factory that creates {@link Parser} and {@link ExtendedOperationExecutor}.
*
* <p>The {@link #factoryIdentifier()} is identified by matching it against {@link
* TableConfigOptions#TABLE_SQL_DIALECT}.
*/
@Internal
public interface ParserFactory extends Factory {
public interface DialectFactory extends Factory {

/** Creates a new parser. */
Parser create(Context context);

default ExtendedOperationExecutor createExtendedOperationExecutor(Context context) {
return new EmptyOperationExecutor();
}

/** Context provided when a parser is created. */
interface Context {
CatalogManager getCatalogManager();

PlannerContext getPlannerContext();

Executor getExecutor();
}

/** Default implementation for {@link Context}. */
class DefaultParserContext implements Context {
private final CatalogManager catalogManager;
private final PlannerContext plannerContext;
private final Executor executor;

public DefaultParserContext(CatalogManager catalogManager, PlannerContext plannerContext) {
public DefaultParserContext(
CatalogManager catalogManager, PlannerContext plannerContext, Executor executor) {
this.catalogManager = catalogManager;
this.plannerContext = plannerContext;
this.executor = executor;
}

@Override
Expand All @@ -62,5 +77,23 @@ public CatalogManager getCatalogManager() {
public PlannerContext getPlannerContext() {
return plannerContext;
}

@Override
public Executor getExecutor() {
return executor;
}
}

/**
* Default implementation for {@link ExtendedOperationExecutor} that doesn't extend any
* operation behavior but forward all operations to the Flink planner.
*/
class EmptyOperationExecutor implements ExtendedOperationExecutor {

@Override
public Optional<TableResultInternal> executeOperation(Operation operation) {
// return empty so that it'll use Flink's own implementation for operation execution.
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@
# limitations under the License.

org.apache.flink.table.planner.delegation.DefaultExecutorFactory
org.apache.flink.table.planner.delegation.DefaultParserFactory
org.apache.flink.table.planner.delegation.DefaultDialectFactory
org.apache.flink.table.planner.delegation.DefaultPlannerFactory
Loading

0 comments on commit c7af66b

Please sign in to comment.