Skip to content

Commit

Permalink
[FLINK-23734][table] Migrate PlannerFactory to Factory
Browse files Browse the repository at this point in the history
This closes apache#16788.
  • Loading branch information
Airblader authored and twalthr committed Aug 17, 2021
1 parent 0d10ad3 commit 8d28e71
Show file tree
Hide file tree
Showing 22 changed files with 259 additions and 593 deletions.

This file was deleted.

38 changes: 0 additions & 38 deletions flink-python/pyflink/table/tests/test_environment_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,10 @@
from pyflink.common import Configuration
from pyflink.table import EnvironmentSettings
from pyflink.testing.test_case_utils import PyFlinkTestCase
from pyflink.util.java_utils import get_field_value


class EnvironmentSettingsTests(PyFlinkTestCase):

def test_planner_selection(self):

builder = EnvironmentSettings.new_instance()

# test the default behaviour to make sure it is consistent with the python doc
environment_settings = builder.build()

self.check_blink_planner(environment_settings)

# test use_blink_planner
environment_settings = EnvironmentSettings.new_instance().use_blink_planner().build()

self.check_blink_planner(environment_settings)

# test use_any_planner
environment_settings = builder.use_any_planner().build()

self.check_any_planner(environment_settings)

def test_mode_selection(self):

builder = EnvironmentSettings.new_instance()
Expand Down Expand Up @@ -115,21 +95,3 @@ def test_from_configuration(self):

actual_setting = EnvironmentSettings.from_configuration(config)
self.assertFalse(actual_setting.is_streaming_mode(), "Use batch mode.")

def check_blink_planner(self, settings: EnvironmentSettings):
gateway = get_gateway()
CLASS_NAME = gateway.jvm.EnvironmentSettings.CLASS_NAME

builder = EnvironmentSettings.new_instance()
BLINK_PLANNER_FACTORY = get_field_value(builder._j_builder, "BLINK_PLANNER_FACTORY")

self.assertEqual(
settings._j_environment_settings.toPlannerProperties()[CLASS_NAME],
BLINK_PLANNER_FACTORY)

def check_any_planner(self, settings: EnvironmentSettings):
gateway = get_gateway()
CLASS_NAME = gateway.jvm.EnvironmentSettings.CLASS_NAME

self.assertTrue(
CLASS_NAME not in settings._j_environment_settings.toPlannerProperties())
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def java_class(cls):
@classmethod
def excluded_methods(cls):
# internal interfaces, no need to expose to users.
return {'toPlannerProperties', 'getExecutor'}
return {'getPlanner', 'getExecutor'}


class EnvironmentSettingsBuilderCompletenessTests(PythonAPICompletenessTestCase, PyFlinkTestCase):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,13 @@
import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.ExecutorFactory;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.delegation.PlannerFactory;
import org.apache.flink.table.factories.ComponentFactoryService;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.PlannerFactoryUtil;
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.util.TemporaryClassLoaderContext;

import java.lang.reflect.Method;
import java.net.URLClassLoader;
import java.util.Map;
import java.util.function.Supplier;

import static org.apache.flink.table.client.gateway.context.SessionContext.SessionState;
Expand Down Expand Up @@ -135,15 +133,9 @@ private StreamTableEnvironment createStreamTableEnvironment(
FunctionCatalog functionCatalog,
ClassLoader userClassLoader) {

final Map<String, String> plannerProperties = settings.toPlannerProperties();
final Planner planner =
ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
.create(
plannerProperties,
executor,
config,
functionCatalog,
catalogManager);
PlannerFactoryUtil.createPlanner(
settings.getPlanner(), executor, config, catalogManager, functionCatalog);

return new StreamTableEnvironmentImpl(
catalogManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,11 @@
import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.ExecutorFactory;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.delegation.PlannerFactory;
import org.apache.flink.table.expressions.ApiExpressionUtils;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.ExpressionParser;
import org.apache.flink.table.factories.ComponentFactoryService;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.PlannerFactoryUtil;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.table.functions.TableFunction;
Expand Down Expand Up @@ -83,7 +82,6 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -127,11 +125,11 @@ public static StreamTableEnvironment create(
TableConfig tableConfig) {

// temporary solution until FLINK-15635 is fixed
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();

ModuleManager moduleManager = new ModuleManager();
final ModuleManager moduleManager = new ModuleManager();

CatalogManager catalogManager =
final CatalogManager catalogManager =
CatalogManager.newBuilder()
.classLoader(classLoader)
.config(tableConfig.getConfiguration())
Expand All @@ -143,21 +141,19 @@ public static StreamTableEnvironment create(
.executionConfig(executionEnvironment.getConfig())
.build();

FunctionCatalog functionCatalog =
final FunctionCatalog functionCatalog =
new FunctionCatalog(tableConfig, catalogManager, moduleManager);

final Executor executor =
lookupExecutor(classLoader, settings.getExecutor(), executionEnvironment);

Map<String, String> plannerProperties = settings.toPlannerProperties();
Planner planner =
ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
.create(
plannerProperties,
executor,
tableConfig,
functionCatalog,
catalogManager);
final Planner planner =
PlannerFactoryUtil.createPlanner(
settings.getPlanner(),
executor,
tableConfig,
catalogManager,
functionCatalog);

return new StreamTableEnvironmentImpl(
catalogManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,11 @@
import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.ExecutorFactory;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.delegation.PlannerFactory;
import org.apache.flink.table.functions.UserDefinedFunction;

import javax.annotation.Nullable;

import java.util.HashMap;
import java.util.Map;

import static org.apache.flink.api.common.RuntimeExecutionMode.BATCH;
import static org.apache.flink.api.common.RuntimeExecutionMode.STREAMING;
import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
Expand Down Expand Up @@ -63,13 +61,11 @@ public class EnvironmentSettings {
private static final EnvironmentSettings DEFAULT_BATCH_MODE_SETTINGS =
EnvironmentSettings.newInstance().inBatchMode().build();

public static final String STREAMING_MODE = "streaming-mode";
public static final String CLASS_NAME = "class-name";
public static final String DEFAULT_BUILTIN_CATALOG = "default_catalog";
public static final String DEFAULT_BUILTIN_DATABASE = "default_database";

/** Canonical name of the {@link Planner} class to use. */
private final String plannerClass;
/** Factory identifier of the {@link Planner} to use. */
private final String planner;

/** Factory identifier of the {@link Executor} to use. */
private final String executor;
Expand All @@ -93,12 +89,12 @@ public class EnvironmentSettings {
private final boolean isStreamingMode;

private EnvironmentSettings(
@Nullable String plannerClass,
String planner,
@Nullable String executor,
String builtInCatalogName,
String builtInDatabaseName,
boolean isStreamingMode) {
this.plannerClass = plannerClass;
this.planner = planner;
this.executor = executor;
this.builtInCatalogName = builtInCatalogName;
this.builtInDatabaseName = builtInDatabaseName;
Expand Down Expand Up @@ -211,34 +207,23 @@ public boolean isBlinkPlanner() {
return true;
}

/** Returns the {@link Executor} that should submit and execute table programs. */
/** Returns the identifier of the {@link Planner} to be used. */
@Internal
public String getExecutor() {
return executor;
public String getPlanner() {
return planner;
}

/** Returns the {@link Executor} that should submit and execute table programs. */
@Internal
public Map<String, String> toPlannerProperties() {
Map<String, String> properties = new HashMap<>(toCommonProperties());
if (plannerClass != null) {
properties.put(CLASS_NAME, plannerClass);
}
return properties;
}

private Map<String, String> toCommonProperties() {
Map<String, String> properties = new HashMap<>();
properties.put(STREAMING_MODE, Boolean.toString(isStreamingMode));
return properties;
public String getExecutor() {
return executor;
}

/** A builder for {@link EnvironmentSettings}. */
public static class Builder {
private static final String BLINK_PLANNER_FACTORY =
"org.apache.flink.table.planner.delegation.BlinkPlannerFactory";
private final String planner = PlannerFactory.DEFAULT_IDENTIFIER;
private final String executor = ExecutorFactory.DEFAULT_IDENTIFIER;

private String plannerClass = BLINK_PLANNER_FACTORY;
private String executor = ExecutorFactory.DEFAULT_IDENTIFIER;
private String builtInCatalogName = DEFAULT_BUILTIN_CATALOG;
private String builtInDatabaseName = DEFAULT_BUILTIN_DATABASE;
private boolean isStreamingMode = true;
Expand Down Expand Up @@ -267,7 +252,6 @@ public Builder useOldPlanner() {
*/
@Deprecated
public Builder useBlinkPlanner() {
this.plannerClass = BLINK_PLANNER_FACTORY;
return this;
}

Expand All @@ -284,7 +268,6 @@ public Builder useBlinkPlanner() {
*/
@Deprecated
public Builder useAnyPlanner() {
this.plannerClass = null;
return this;
}

Expand Down Expand Up @@ -341,11 +324,7 @@ public Builder withBuiltInDatabaseName(String builtInDatabaseName) {
/** Returns an immutable instance of {@link EnvironmentSettings}. */
public EnvironmentSettings build() {
return new EnvironmentSettings(
plannerClass,
executor,
builtInCatalogName,
builtInDatabaseName,
isStreamingMode);
planner, executor, builtInCatalogName, builtInDatabaseName, isStreamingMode);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,10 @@
import org.apache.flink.table.delegation.ExecutorFactory;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.delegation.PlannerFactory;
import org.apache.flink.table.expressions.ApiExpressionUtils;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.factories.ComponentFactoryService;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.PlannerFactoryUtil;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.functions.UserDefinedFunctionHelper;
Expand Down Expand Up @@ -271,15 +270,15 @@ public static TableEnvironmentImpl create(EnvironmentSettings settings) {
private static TableEnvironmentImpl create(
EnvironmentSettings settings, Configuration configuration) {
// temporary solution until FLINK-15635 is fixed
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();

// use configuration to init table config
TableConfig tableConfig = new TableConfig();
final TableConfig tableConfig = new TableConfig();
tableConfig.addConfiguration(configuration);

ModuleManager moduleManager = new ModuleManager();
final ModuleManager moduleManager = new ModuleManager();

CatalogManager catalogManager =
final CatalogManager catalogManager =
CatalogManager.newBuilder()
.classLoader(classLoader)
.config(tableConfig.getConfiguration())
Expand All @@ -290,23 +289,21 @@ private static TableEnvironmentImpl create(
settings.getBuiltInDatabaseName()))
.build();

FunctionCatalog functionCatalog =
final FunctionCatalog functionCatalog =
new FunctionCatalog(tableConfig, catalogManager, moduleManager);

final ExecutorFactory executorFactory =
FactoryUtil.discoverFactory(
classLoader, ExecutorFactory.class, settings.getExecutor());
final Executor executor = executorFactory.create(configuration);

Map<String, String> plannerProperties = settings.toPlannerProperties();
Planner planner =
ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
.create(
plannerProperties,
executor,
tableConfig,
functionCatalog,
catalogManager);
final Planner planner =
PlannerFactoryUtil.createPlanner(
settings.getPlanner(),
executor,
tableConfig,
catalogManager,
functionCatalog);

return new TableEnvironmentImpl(
catalogManager,
Expand Down
Loading

0 comments on commit 8d28e71

Please sign in to comment.