Skip to content

Commit

Permalink
[FLINK-23482][table] Simplify BlinkExecutorFactory stack
Browse files Browse the repository at this point in the history
This closes apache#16585.
  • Loading branch information
twalthr committed Jul 26, 2021
1 parent abbc658 commit 45ccc9f
Show file tree
Hide file tree
Showing 23 changed files with 245 additions and 334 deletions.
8 changes: 0 additions & 8 deletions flink-python/pyflink/table/tests/test_environment_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,22 +121,14 @@ def check_blink_planner(self, settings: EnvironmentSettings):

builder = EnvironmentSettings.new_instance()
BLINK_PLANNER_FACTORY = get_private_field(builder._j_builder, "BLINK_PLANNER_FACTORY")
BLINK_EXECUTOR_FACTORY = get_private_field(builder._j_builder, "BLINK_EXECUTOR_FACTORY")

self.assertEqual(
settings._j_environment_settings.toPlannerProperties()[CLASS_NAME],
BLINK_PLANNER_FACTORY)

self.assertEqual(
settings._j_environment_settings.toExecutorProperties()[CLASS_NAME],
BLINK_EXECUTOR_FACTORY)

def check_any_planner(self, settings: EnvironmentSettings):
gateway = get_gateway()
CLASS_NAME = gateway.jvm.EnvironmentSettings.CLASS_NAME

self.assertTrue(
CLASS_NAME not in settings._j_environment_settings.toPlannerProperties())

self.assertTrue(
CLASS_NAME not in settings._j_environment_settings.toExecutorProperties())
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def java_class(cls):
@classmethod
def excluded_methods(cls):
# internal interfaces, no need to expose to users.
return {'toPlannerProperties', 'toExecutorProperties'}
return {'toPlannerProperties', 'getExecutor'}


class EnvironmentSettingsBuilderCompletenessTests(PythonAPICompletenessTestCase, PyFlinkTestCase):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.delegation.PlannerFactory;
import org.apache.flink.table.factories.ComponentFactoryService;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.util.TemporaryClassLoaderContext;

Expand Down Expand Up @@ -112,8 +113,7 @@ private StreamTableEnvironment createTableEnvironment() {

StreamExecutionEnvironment streamExecEnv = createStreamExecutionEnvironment();

final Map<String, String> executorProperties = settings.toExecutorProperties();
Executor executor = lookupExecutor(executorProperties, streamExecEnv);
final Executor executor = lookupExecutor(settings.getExecutor(), streamExecEnv);
return createStreamTableEnvironment(
streamExecEnv,
settings,
Expand Down Expand Up @@ -158,18 +158,17 @@ private StreamTableEnvironment createStreamTableEnvironment(
}

private Executor lookupExecutor(
Map<String, String> executorProperties,
StreamExecutionEnvironment executionEnvironment) {
String executorIdentifier, StreamExecutionEnvironment executionEnvironment) {
try {
ExecutorFactory executorFactory =
ComponentFactoryService.find(ExecutorFactory.class, executorProperties);
Method createMethod =
final ExecutorFactory executorFactory =
FactoryUtil.discoverFactory(
classLoader, ExecutorFactory.class, executorIdentifier);
final Method createMethod =
executorFactory
.getClass()
.getMethod("create", Map.class, StreamExecutionEnvironment.class);
.getMethod("create", StreamExecutionEnvironment.class);

return (Executor)
createMethod.invoke(executorFactory, executorProperties, executionEnvironment);
return (Executor) createMethod.invoke(executorFactory, executionEnvironment);
} catch (Exception e) {
throw new TableException(
"Could not instantiate the executor. Make sure a planner module is on the classpath",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
Expand Down Expand Up @@ -57,6 +56,7 @@
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.ExpressionParser;
import org.apache.flink.table.factories.ComponentFactoryService;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.table.functions.TableFunction;
Expand Down Expand Up @@ -152,8 +152,8 @@ public static StreamTableEnvironment create(
FunctionCatalog functionCatalog =
new FunctionCatalog(tableConfig, catalogManager, moduleManager);

Map<String, String> executorProperties = settings.toExecutorProperties();
Executor executor = lookupExecutor(executorProperties, executionEnvironment);
final Executor executor =
lookupExecutor(classLoader, settings.getExecutor(), executionEnvironment);

Map<String, String> plannerProperties = settings.toPlannerProperties();
Planner planner =
Expand All @@ -178,18 +178,19 @@ public static StreamTableEnvironment create(
}

private static Executor lookupExecutor(
Map<String, String> executorProperties,
ClassLoader classLoader,
String executorIdentifier,
StreamExecutionEnvironment executionEnvironment) {
try {
ExecutorFactory executorFactory =
ComponentFactoryService.find(ExecutorFactory.class, executorProperties);
Method createMethod =
final ExecutorFactory executorFactory =
FactoryUtil.discoverFactory(
classLoader, ExecutorFactory.class, executorIdentifier);
final Method createMethod =
executorFactory
.getClass()
.getMethod("create", Map.class, StreamExecutionEnvironment.class);
.getMethod("create", StreamExecutionEnvironment.class);

return (Executor)
createMethod.invoke(executorFactory, executorProperties, executionEnvironment);
return (Executor) createMethod.invoke(executorFactory, executionEnvironment);
} catch (Exception e) {
throw new TableException(
"Could not instantiate the executor. Make sure a planner module is on the classpath",
Expand Down Expand Up @@ -549,11 +550,6 @@ public StreamExecutionEnvironment execEnv() {
return executionEnvironment;
}

/** This method is used for sql client to submit job. */
public Pipeline getPipeline(String jobName) {
return execEnv.createPipeline(translateAndClearBuffer(), tableConfig, jobName);
}

@Override
protected void validateTableSource(TableSource<?> tableSource) {
super.validateTableSource(tableSource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.ExecutorFactory;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.functions.UserDefinedFunction;

Expand Down Expand Up @@ -69,8 +70,8 @@ public class EnvironmentSettings {
/** Canonical name of the {@link Planner} class to use. */
private final String plannerClass;

/** Canonical name of the {@link Executor} class to use. */
private final String executorClass;
/** Factory identifier of the {@link Executor} to use. */
private final String executor;

/**
* Specifies the name of the initial catalog to be created when instantiating {@link
Expand All @@ -92,12 +93,12 @@ public class EnvironmentSettings {

private EnvironmentSettings(
@Nullable String plannerClass,
@Nullable String executorClass,
@Nullable String executor,
String builtInCatalogName,
String builtInDatabaseName,
boolean isStreamingMode) {
this.plannerClass = plannerClass;
this.executorClass = executorClass;
this.executor = executor;
this.builtInCatalogName = builtInCatalogName;
this.builtInDatabaseName = builtInDatabaseName;
this.isStreamingMode = isStreamingMode;
Expand Down Expand Up @@ -216,20 +217,17 @@ public boolean isBlinkPlanner() {
return true;
}

/** Returns the {@link Executor} that should submit and execute table programs. */
@Internal
public Map<String, String> toPlannerProperties() {
Map<String, String> properties = new HashMap<>(toCommonProperties());
if (plannerClass != null) {
properties.put(CLASS_NAME, plannerClass);
}
return properties;
public String getExecutor() {
return executor;
}

@Internal
public Map<String, String> toExecutorProperties() {
public Map<String, String> toPlannerProperties() {
Map<String, String> properties = new HashMap<>(toCommonProperties());
if (executorClass != null) {
properties.put(CLASS_NAME, executorClass);
if (plannerClass != null) {
properties.put(CLASS_NAME, plannerClass);
}
return properties;
}
Expand All @@ -244,11 +242,9 @@ private Map<String, String> toCommonProperties() {
public static class Builder {
private static final String BLINK_PLANNER_FACTORY =
"org.apache.flink.table.planner.delegation.BlinkPlannerFactory";
private static final String BLINK_EXECUTOR_FACTORY =
"org.apache.flink.table.planner.delegation.BlinkExecutorFactory";

private String plannerClass = BLINK_PLANNER_FACTORY;
private String executorClass = BLINK_EXECUTOR_FACTORY;
private String executor = ExecutorFactory.DEFAULT_IDENTIFIER;
private String builtInCatalogName = DEFAULT_BUILTIN_CATALOG;
private String builtInDatabaseName = DEFAULT_BUILTIN_DATABASE;
private boolean isStreamingMode = true;
Expand Down Expand Up @@ -278,7 +274,6 @@ public Builder useOldPlanner() {
@Deprecated
public Builder useBlinkPlanner() {
this.plannerClass = BLINK_PLANNER_FACTORY;
this.executorClass = BLINK_EXECUTOR_FACTORY;
return this;
}

Expand All @@ -296,7 +291,6 @@ public Builder useBlinkPlanner() {
@Deprecated
public Builder useAnyPlanner() {
this.plannerClass = null;
this.executorClass = null;
return this;
}

Expand Down Expand Up @@ -354,7 +348,7 @@ public Builder withBuiltInDatabaseName(String builtInDatabaseName) {
public EnvironmentSettings build() {
return new EnvironmentSettings(
plannerClass,
executorClass,
executor,
builtInCatalogName,
builtInDatabaseName,
isStreamingMode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,10 +313,10 @@ private static TableEnvironmentImpl create(
FunctionCatalog functionCatalog =
new FunctionCatalog(tableConfig, catalogManager, moduleManager);

Map<String, String> executorProperties = settings.toExecutorProperties();
Executor executor =
ComponentFactoryService.find(ExecutorFactory.class, executorProperties)
.create(executorProperties);
final ExecutorFactory executorFactory =
FactoryUtil.discoverFactory(
classLoader, ExecutorFactory.class, settings.getExecutor());
final Executor executor = executorFactory.create(configuration);

Map<String, String> plannerProperties = settings.toPlannerProperties();
Planner planner =
Expand Down Expand Up @@ -816,7 +816,8 @@ public TableResult executeInternal(List<ModifyOperation> operations) {
private TableResult executeInternal(
List<Transformation<?>> transformations, List<String> sinkIdentifierNames) {
String jobName = getJobName("insert-into_" + String.join(",", sinkIdentifierNames));
Pipeline pipeline = execEnv.createPipeline(transformations, tableConfig, jobName);
Pipeline pipeline =
execEnv.createPipeline(transformations, tableConfig.getConfiguration(), jobName);
try {
JobClient jobClient = execEnv.executeAsync(pipeline);
final List<Column> columns = new ArrayList<>();
Expand Down Expand Up @@ -852,7 +853,8 @@ private TableResult executeQueryOperation(QueryOperation operation) {
List<Transformation<?>> transformations =
translate(Collections.singletonList(sinkOperation));
String jobName = getJobName("collect");
Pipeline pipeline = execEnv.createPipeline(transformations, tableConfig, jobName);
Pipeline pipeline =
execEnv.createPipeline(transformations, tableConfig.getConfiguration(), jobName);
try {
JobClient jobClient = execEnv.executeAsync(pipeline);
CollectResultProvider resultProvider = sinkOperation.getSelectResultProvider();
Expand Down Expand Up @@ -1658,7 +1660,9 @@ public TableConfig getConfig() {

@Override
public JobExecutionResult execute(String jobName) throws Exception {
Pipeline pipeline = execEnv.createPipeline(translateAndClearBuffer(), tableConfig, jobName);
Pipeline pipeline =
execEnv.createPipeline(
translateAndClearBuffer(), tableConfig.getConfiguration(), jobName);
return execEnv.execute(pipeline);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,32 @@
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;

import java.util.List;

/**
* It enables execution of a {@link Transformation}s graph generated by {@link Planner}.
* It enables the execution of a graph of {@link Transformation}s generated by the {@link Planner}.
*
* <p>This uncouples the {@link TableEnvironment} from any given runtime.
*
* @see ExecutorFactory
*/
@Internal
public interface Executor {

/**
* Translates the given transformations to a Pipeline.
* Translates the given transformations to a {@link Pipeline}.
*
* @param transformations list of transformations
* @param configuration configuration options
* @param jobName what should be the name of the job
* @return The pipeline representing the transformations.
*/
Pipeline createPipeline(
List<Transformation<?>> transformations, TableConfig tableConfig, String jobName);
List<Transformation<?>> transformations, Configuration configuration, String jobName);

/**
* Executes the given pipeline.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,38 +19,33 @@
package org.apache.flink.table.delegation;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.factories.ComponentFactory;

import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.factories.Factory;

/**
* Factory that creates {@link Executor}.
* Factory that creates an {@link Executor} for submitting table programs.
*
* <p>The factory is used with Java's Service Provider Interfaces (SPI) for discovering. See {@link
* Factory} for more information.
*
* <p>This factory is used with Java's Service Provider Interfaces (SPI) for discovering. A factory
* is called with a set of normalized properties that describe the desired configuration. Those
* properties may include execution configurations such as watermark interval, max parallelism etc.,
* table specific initialization configuration such as if the queries should be executed in batch
* mode.
* <p>Usually, there should only be one executor factory in the class path. However, advanced users
* can implement a custom one for hooking into the submission process.
*
* <p><b>Important:</b> The implementations of this interface should also implement method
* <p><b>Important:</b> Implementations of this interface should also implement the method
*
* <pre>
* {@code public Executor create(Map<String, String> properties, StreamExecutionEnvironment executionEnvironment);}
* public Executor create(Configuration, StreamExecutionEnvironment);
* </pre>
*
* <p>This method will be used when instantiating a {@link
* org.apache.flink.table.api.TableEnvironment} from a bridging module which enables conversion
* from/to {@code DataStream} API and requires a pre configured {@code StreamTableEnvironment}.
* <p>This method will be used when instantiating a {@link TableEnvironment} from one of the
* bridging modules which enables conversion from/to {@code DataStream} API.
*/
@Internal
public interface ExecutorFactory extends ComponentFactory {
public interface ExecutorFactory extends Factory {

String DEFAULT_IDENTIFIER = "default";

/**
* Creates a corresponding {@link Executor}.
*
* @param properties Static properties of the {@link Executor}, the same that were used for
* factory lookup.
* @return instance of a {@link Executor}
*/
Executor create(Map<String, String> properties);
/** Creates a corresponding {@link Executor}. */
Executor create(Configuration configuration);
}
Loading

0 comments on commit 45ccc9f

Please sign in to comment.