Skip to content

Commit

Permalink
fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
luoyuxia committed Jul 25, 2022
1 parent e8e448e commit 97c4a9b
Show file tree
Hide file tree
Showing 10 changed files with 50 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.delegation.OperationExternalExecutor;
import org.apache.flink.table.delegation.ExtendedOperationExecutor;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.planner.delegation.DialectFactory;

Expand Down Expand Up @@ -55,7 +55,7 @@ public Parser create(Context context) {
}

@Override
public OperationExternalExecutor createOperatorExternalExecutor(Context context) {
public ExtendedOperationExecutor createExtendedOperationExecutor(Context context) {
return new HiveOperationExecutor();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.table.planner.delegation.hive;

import org.apache.flink.table.api.internal.TableResultInternal;
import org.apache.flink.table.delegation.OperationExternalExecutor;
import org.apache.flink.table.delegation.ExtendedOperationExecutor;
import org.apache.flink.table.operations.Operation;

import java.util.Optional;
Expand All @@ -28,7 +28,7 @@
* A Hive's operation executor used to execute operation in custom way instead of Flink's
* implementation.
*/
public class HiveOperationExecutor implements OperationExternalExecutor {
public class HiveOperationExecutor implements ExtendedOperationExecutor {
@Override
public Optional<TableResultInternal> executeOperation(Operation operation) {
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
Expand All @@ -34,7 +35,7 @@
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.delegation.OperationExternalExecutor;
import org.apache.flink.table.delegation.ExtendedOperationExecutor;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.operations.DescribeTableOperation;
import org.apache.flink.table.operations.command.ClearOperation;
Expand Down Expand Up @@ -121,19 +122,24 @@ public void testPluggableDialect() {
Parser parser = tableEnvInternal.getParser();
// hive dialect should use HiveParser
assertThat(parser).isInstanceOf(HiveParser.class);
OperationExternalExecutor operationExecutor =
tableEnvInternal.getOperationExternalExecutor();
ExtendedOperationExecutor operationExecutor =
((TableEnvironmentImpl) tableEnvInternal).getExtendedOperationExecutor();
// hive dialect should use HiveOperationExecutor
assertThat(operationExecutor).isInstanceOf(HiveOperationExecutor.class);
// execute some sql and verify the parser/operation executor instance is reused
tableEnvInternal.executeSql("show databases");
assertThat(tableEnvInternal.getParser()).isSameAs(parser);
assertThat(tableEnvInternal.getOperationExternalExecutor()).isSameAs(operationExecutor);
assertThat(((TableEnvironmentImpl) tableEnvInternal).getExtendedOperationExecutor())
.isSameAs(operationExecutor);
// switching dialect will result in a new parser
tableEnvInternal.getConfig().setSqlDialect(SqlDialect.DEFAULT);
assertThat(tableEnvInternal.getParser().getClass().getName())
.isNotEqualTo(parser.getClass().getName());
assertThat(tableEnvInternal.getOperationExternalExecutor().getClass().getName())
assertThat(
((TableEnvironmentImpl) tableEnvInternal)
.getExtendedOperationExecutor()
.getClass()
.getName())
.isNotEqualTo(operationExecutor.getClass().getName());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.ExecutorFactory;
import org.apache.flink.table.delegation.ExtendedOperationExecutor;
import org.apache.flink.table.delegation.InternalPlan;
import org.apache.flink.table.delegation.OperationExternalExecutor;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.expressions.ApiExpressionUtils;
Expand Down Expand Up @@ -705,7 +705,7 @@ public TableResult executeSql(String statement) {
}

Operation operation = operations.get(0);
return executeOperation(operation);
return executeInternal(operation);
}

@Override
Expand Down Expand Up @@ -800,15 +800,6 @@ public TableResultInternal executeInternal(List<ModifyOperation> operations) {
return result;
}

@Override
public TableResultInternal executeOperation(Operation operation) {
// try to use external operation executor to execute the operation
Optional<TableResultInternal> tableResult =
getOperationExternalExecutor().executeOperation(operation);
// if the external operation executor return empty, fall back to internal implementation
return tableResult.orElseGet(() -> executeInternal(operation));
}

private TableResultInternal executeInternal(
List<Transformation<?>> transformations, List<String> sinkIdentifierNames) {
final String defaultJobName = "insert-into_" + String.join(",", sinkIdentifierNames);
Expand Down Expand Up @@ -883,6 +874,14 @@ private TableResultInternal executeQueryOperation(QueryOperation operation) {

@Override
public TableResultInternal executeInternal(Operation operation) {
// try to use extended operation executor to execute the operation
Optional<TableResultInternal> tableResult =
getExtendedOperationExecutor().executeOperation(operation);
// if the extended operation executor return non-empty result, return it
if (tableResult.isPresent()) {
return tableResult.get();
}
// otherwise, fall back to internal implementation
if (operation instanceof ModifyOperation) {
return executeInternal(Collections.singletonList((ModifyOperation) operation));
} else if (operation instanceof StatementSetOperation) {
Expand Down Expand Up @@ -1650,9 +1649,8 @@ public Parser getParser() {
return getPlanner().getParser();
}

@Override
public OperationExternalExecutor getOperationExternalExecutor() {
return getPlanner().getOperationExternalExecutor();
public ExtendedOperationExecutor getExtendedOperationExecutor() {
return getPlanner().getExtendedOperationExecutor();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.delegation.InternalPlan;
import org.apache.flink.table.delegation.OperationExternalExecutor;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
Expand Down Expand Up @@ -54,14 +53,6 @@ public interface TableEnvironmentInternal extends TableEnvironment {
*/
Parser getParser();

/**
* Return a {@link OperationExternalExecutor} that provides method for executing operation in
* external implementation.
*
* @return initialized {@link OperationExternalExecutor}.
*/
OperationExternalExecutor getOperationExternalExecutor();

/** Returns a {@link CatalogManager} that deals with all catalog objects. */
CatalogManager getCatalogManager();

Expand Down Expand Up @@ -90,15 +81,6 @@ public interface TableEnvironmentInternal extends TableEnvironment {
* @param operation The operation to be executed.
* @return the content of the execution result.
*/
TableResultInternal executeOperation(Operation operation);

/**
* Execute the given operation with Flink's internal implementation and return the execution
* result.
*
* @param operation The operation to be executed.
* @return the content of the execution result.
*/
TableResultInternal executeInternal(Operation operation);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
import java.util.Optional;

/**
* An external operation executor which provides method for executing operation. External pluggable
* An extended operation executor which provides method for executing operation. External pluggable
* dialect can implement this interface to execute operation in its own way instead of using Flink's
* own implementation for operation execution.
*/
@Internal
public interface OperationExternalExecutor {
public interface ExtendedOperationExecutor {

/**
* Execute the given operation and return the execution result. This method will delegate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,12 @@ public interface Planner {
Parser getParser();

/**
* Retrieves a {@link OperationExternalExecutor} that provides method for executing operation in
* Retrieves a {@link ExtendedOperationExecutor} that provides method for executing operation in
* a custom way.
*
* @return initialized {@link OperationExternalExecutor}
* @return initialized {@link ExtendedOperationExecutor}
*/
OperationExternalExecutor getOperationExternalExecutor();
ExtendedOperationExecutor getExtendedOperationExecutor();

/**
* Converts a relational tree of {@link ModifyOperation}s into a set of runnable {@link
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.PlanReference;
import org.apache.flink.table.delegation.ExtendedOperationExecutor;
import org.apache.flink.table.delegation.InternalPlan;
import org.apache.flink.table.delegation.OperationExternalExecutor;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.operations.ModifyOperation;
Expand All @@ -40,7 +40,7 @@ public Parser getParser() {
}

@Override
public OperationExternalExecutor getOperationExternalExecutor() {
public ExtendedOperationExecutor getExtendedOperationExecutor() {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@
import org.apache.flink.table.api.internal.TableResultInternal;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.OperationExternalExecutor;
import org.apache.flink.table.delegation.ExtendedOperationExecutor;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.operations.Operation;

import java.util.Optional;

/**
* Factory that creates {@link Parser} and {@link OperationExternalExecutor}.
* Factory that creates {@link Parser} and {@link ExtendedOperationExecutor}.
*
* <p>The {@link #factoryIdentifier()} is identified by matching it against {@link
* TableConfigOptions#TABLE_SQL_DIALECT}.
Expand All @@ -42,8 +42,8 @@ public interface DialectFactory extends Factory {
/** Creates a new parser. */
Parser create(Context context);

default OperationExternalExecutor createOperatorExternalExecutor(Context context) {
return new DefaultOperationExternalExecutor();
default ExtendedOperationExecutor createExtendedOperationExecutor(Context context) {
return new EmptyOperationExecutor();
}

/** Context provided when a parser is created. */
Expand Down Expand Up @@ -84,8 +84,11 @@ public Executor getExecutor() {
}
}

/** Default implementation for {@link OperationExternalExecutor}. */
class DefaultOperationExternalExecutor implements OperationExternalExecutor {
/**
* Default implementation for {@link ExtendedOperationExecutor} that doesn't extend any
* operation behavior but forward all operations to the Flink planner.
*/
class EmptyOperationExecutor implements ExtendedOperationExecutor {

@Override
public Optional<TableResultInternal> executeOperation(Operation operation) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.flink.table.api.config.ExecutionConfigOptions
import org.apache.flink.table.catalog._
import org.apache.flink.table.catalog.ManagedTableListener.isManagedTable
import org.apache.flink.table.connector.sink.DynamicTableSink
import org.apache.flink.table.delegation.{Executor, OperationExternalExecutor, Parser, Planner}
import org.apache.flink.table.delegation.{Executor, ExtendedOperationExecutor, Parser, Planner}
import org.apache.flink.table.factories.{DynamicTableSinkFactory, FactoryUtil, TableFactoryUtil}
import org.apache.flink.table.module.{Module, ModuleManager}
import org.apache.flink.table.operations._
Expand Down Expand Up @@ -102,7 +102,7 @@ abstract class PlannerBase(

private var dialectFactory: DialectFactory = _
private var parser: Parser = _
private var operationExternalExecutor: OperationExternalExecutor = _
private var extendedOperationExecutor: ExtendedOperationExecutor = _
private var currentDialect: SqlDialect = getTableConfig.getSqlDialect

@VisibleForTesting
Expand Down Expand Up @@ -158,7 +158,7 @@ abstract class PlannerBase(
factoryIdentifier)
currentDialect = getTableConfig.getSqlDialect
parser = null
operationExternalExecutor = null
extendedOperationExecutor = null
}
dialectFactory
}
Expand All @@ -172,13 +172,13 @@ abstract class PlannerBase(
parser
}

override def getOperationExternalExecutor: OperationExternalExecutor = {
if (operationExternalExecutor == null || getTableConfig.getSqlDialect != currentDialect) {
override def getExtendedOperationExecutor: ExtendedOperationExecutor = {
if (extendedOperationExecutor == null || getTableConfig.getSqlDialect != currentDialect) {
dialectFactory = getDialectFactory
operationExternalExecutor = dialectFactory.createOperatorExternalExecutor(
extendedOperationExecutor = dialectFactory.createExtendedOperationExecutor(
new DefaultParserContext(catalogManager, plannerContext, executor))
}
operationExternalExecutor
extendedOperationExecutor
}

override def translate(
Expand Down

0 comments on commit 97c4a9b

Please sign in to comment.