Skip to content

Commit

Permalink
[FLINK-21460][table-api] Support to create TableEnvironment using Con…
Browse files Browse the repository at this point in the history
…figuration

This closes apache#15018
  • Loading branch information
fsk119 committed Mar 8, 2021
1 parent e964e40 commit ad52d3f
Show file tree
Hide file tree
Showing 27 changed files with 562 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>table.planner</h5></td>
<td style="word-wrap: break-word;">BLINK</td>
<td><p>Enum</p>Possible values: [BLINK, OLD]</td>
<td>Use either 'blink' planner or 'old' planner. Default is blink planner. For TableEnvironment, this option is used to construct a TableEnvironment, but this option can't be changed after that. However there is no such limitation for SQL Client.</td>
</tr>
<tr>
<td><h5>table.dynamic-table-options.enabled</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">false</td>
Expand Down
23 changes: 23 additions & 0 deletions flink-python/pyflink/table/environment_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
################################################################################
from pyflink.java_gateway import get_gateway

from pyflink.common import Configuration

__all__ = ['EnvironmentSettings']


Expand Down Expand Up @@ -174,6 +176,17 @@ def is_streaming_mode(self) -> bool:
"""
return self._j_environment_settings.isStreamingMode()

def to_configuration(self) -> Configuration:
"""
Convert to `pyflink.common.Configuration`.
It sets the `table.planner` and `execution.runtime-mode` according to the current
EnvironmentSetting.
:return: Configuration with specified value.
"""
return Configuration(j_configuration=self._j_environment_settings.toConfiguration())

@staticmethod
def new_instance() -> 'EnvironmentSettings.Builder':
"""
Expand All @@ -185,3 +198,13 @@ def new_instance() -> 'EnvironmentSettings.Builder':
:return: A builder of EnvironmentSettings.
"""
return EnvironmentSettings.Builder()

@staticmethod
def from_configuration(config: Configuration) -> 'EnvironmentSettings':
"""
Create the EnvironmentSetting with specified Configuration.
:return: EnvironmentSettings.
"""
return EnvironmentSettings(
get_gateway().jvm.EnvironmentSettings.fromConfiguration(config._j_configuration))
110 changes: 71 additions & 39 deletions flink-python/pyflink/table/tests/test_environment_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
################################################################################
from pyflink.java_gateway import get_gateway

from pyflink.common import Configuration
from pyflink.table import EnvironmentSettings
from pyflink.testing.test_case_utils import PyFlinkTestCase, get_private_field

Expand All @@ -25,58 +26,27 @@ class EnvironmentSettingsTests(PyFlinkTestCase):

def test_planner_selection(self):

gateway = get_gateway()

CLASS_NAME = gateway.jvm.EnvironmentSettings.CLASS_NAME

builder = EnvironmentSettings.new_instance()

OLD_PLANNER_FACTORY = get_private_field(builder._j_builder, "OLD_PLANNER_FACTORY")
OLD_EXECUTOR_FACTORY = get_private_field(builder._j_builder, "OLD_EXECUTOR_FACTORY")
BLINK_PLANNER_FACTORY = get_private_field(builder._j_builder, "BLINK_PLANNER_FACTORY")
BLINK_EXECUTOR_FACTORY = get_private_field(builder._j_builder, "BLINK_EXECUTOR_FACTORY")

# test the default behaviour to make sure it is consistent with the python doc
envrionment_settings = builder.build()

self.assertEqual(
envrionment_settings._j_environment_settings.toPlannerProperties()[CLASS_NAME],
BLINK_PLANNER_FACTORY)
environment_settings = builder.build()

self.assertEqual(
envrionment_settings._j_environment_settings.toExecutorProperties()[CLASS_NAME],
BLINK_EXECUTOR_FACTORY)
self.check_blink_planner(environment_settings)

# test use_old_planner
envrionment_settings = builder.use_old_planner().build()

self.assertEqual(
envrionment_settings._j_environment_settings.toPlannerProperties()[CLASS_NAME],
OLD_PLANNER_FACTORY)
environment_settings = builder.use_old_planner().build()

self.assertEqual(
envrionment_settings._j_environment_settings.toExecutorProperties()[CLASS_NAME],
OLD_EXECUTOR_FACTORY)
self.check_old_planner(environment_settings)

# test use_blink_planner
envrionment_settings = builder.use_blink_planner().build()
environment_settings = EnvironmentSettings.new_instance().use_blink_planner().build()

self.assertEqual(
envrionment_settings._j_environment_settings.toPlannerProperties()[CLASS_NAME],
BLINK_PLANNER_FACTORY)

self.assertEqual(
envrionment_settings._j_environment_settings.toExecutorProperties()[CLASS_NAME],
BLINK_EXECUTOR_FACTORY)
self.check_blink_planner(environment_settings)

# test use_any_planner
envrionment_settings = builder.use_any_planner().build()
environment_settings = builder.use_any_planner().build()

self.assertTrue(
CLASS_NAME not in envrionment_settings._j_environment_settings.toPlannerProperties())

self.assertTrue(
CLASS_NAME not in envrionment_settings._j_environment_settings.toExecutorProperties())
self.check_any_planner(environment_settings)

def test_mode_selection(self):

Expand Down Expand Up @@ -131,3 +101,65 @@ def test_with_built_in_database_name(self):
envrionment_settings = builder.with_built_in_database_name("my_database").build()

self.assertEqual(envrionment_settings.get_built_in_database_name(), "my_database")

def test_to_Configuration(self):

expected_settings = \
EnvironmentSettings.new_instance().use_old_planner().in_batch_mode().build()
config = expected_settings.to_configuration()

self.assertEqual("OLD", config.get_string("table.planner", "blink"))
self.assertEqual("BATCH", config.get_string("execution.runtime-mode", "stream"))

def test_from_Configuration(self):

config = Configuration()
config.set_string("table.planner", "old")
config.set_string("execution.runtime-mode", "batch")

actual_setting = EnvironmentSettings.from_configuration(config)
self.assertFalse(actual_setting.is_streaming_mode(), "Use batch mode.")

self.check_old_planner(actual_setting)

def check_blink_planner(self, settings: EnvironmentSettings):
gateway = get_gateway()
CLASS_NAME = gateway.jvm.EnvironmentSettings.CLASS_NAME

builder = EnvironmentSettings.new_instance()
BLINK_PLANNER_FACTORY = get_private_field(builder._j_builder, "BLINK_PLANNER_FACTORY")
BLINK_EXECUTOR_FACTORY = get_private_field(builder._j_builder, "BLINK_EXECUTOR_FACTORY")

self.assertEqual(
settings._j_environment_settings.toPlannerProperties()[CLASS_NAME],
BLINK_PLANNER_FACTORY)

self.assertEqual(
settings._j_environment_settings.toExecutorProperties()[CLASS_NAME],
BLINK_EXECUTOR_FACTORY)

def check_old_planner(self, settings: EnvironmentSettings):
gateway = get_gateway()
CLASS_NAME = gateway.jvm.EnvironmentSettings.CLASS_NAME

builder = EnvironmentSettings.new_instance()
OLD_PLANNER_FACTORY = get_private_field(builder._j_builder, "OLD_PLANNER_FACTORY")
OLD_EXECUTOR_FACTORY = get_private_field(builder._j_builder, "OLD_EXECUTOR_FACTORY")

self.assertEqual(
settings._j_environment_settings.toPlannerProperties()[CLASS_NAME],
OLD_PLANNER_FACTORY)

self.assertEqual(
settings._j_environment_settings.toExecutorProperties()[CLASS_NAME],
OLD_EXECUTOR_FACTORY)

def check_any_planner(self, settings: EnvironmentSettings):
gateway = get_gateway()
CLASS_NAME = gateway.jvm.EnvironmentSettings.CLASS_NAME

self.assertTrue(
CLASS_NAME not in settings._j_environment_settings.toPlannerProperties())

self.assertTrue(
CLASS_NAME not in settings._j_environment_settings.toExecutorProperties())
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ private void initializeTableEnvironment(@Nullable SessionState sessionState) {
final EnvironmentSettings settings = environment.getExecution().getEnvironmentSettings();
final boolean noInheritedState = sessionState == null;
// Step 0.0 Initialize the table configuration.
final TableConfig config = createTableConfig();
final TableConfig config = createTableConfig(settings);

if (noInheritedState) {
// --------------------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -575,9 +575,14 @@ private void initializeTableEnvironment(@Nullable SessionState sessionState) {
}
}

private TableConfig createTableConfig() {
private TableConfig createTableConfig(EnvironmentSettings settings) {
final TableConfig config = new TableConfig();
config.addConfiguration(flinkConfig);
// Override the value in configuration.
// TODO: use `table.planner` and `execution.runtime-mode` to configure the TableEnvironment
// But we need to wait for the FLINK-21485
config.addConfiguration(settings.toConfiguration());

Configuration conf = config.getConfiguration();
environment.getConfiguration().asMap().forEach(conf::setString);
ExecutionEntry execution = environment.getExecution();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
package org.apache.flink.table.api.bridge.java;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
Expand All @@ -37,6 +39,8 @@

import java.lang.reflect.Constructor;

import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;

/**
* The {@link TableEnvironment} for a Java batch {@link ExecutionEnvironment} that works with {@link
* DataSet}s.
Expand Down Expand Up @@ -446,7 +450,11 @@ public interface BatchTableEnvironment extends TableEnvironment {
* TableEnvironment.
*/
static BatchTableEnvironment create(ExecutionEnvironment executionEnvironment) {
return create(executionEnvironment, new TableConfig());
Configuration configuration = new Configuration();
configuration.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH);
TableConfig config = new TableConfig();
config.addConfiguration(configuration);
return create(executionEnvironment, config);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@ static StreamTableEnvironment create(StreamExecutionEnvironment executionEnviron
*/
static StreamTableEnvironment create(
StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings) {
return StreamTableEnvironmentImpl.create(executionEnvironment, settings, new TableConfig());
TableConfig config = new TableConfig();
config.addConfiguration(settings.toConfiguration());
return StreamTableEnvironmentImpl.create(executionEnvironment, settings, config);
}

/**
Expand Down Expand Up @@ -142,7 +144,9 @@ static StreamTableEnvironment create(
static StreamTableEnvironment create(
StreamExecutionEnvironment executionEnvironment, TableConfig tableConfig) {
return StreamTableEnvironmentImpl.create(
executionEnvironment, EnvironmentSettings.newInstance().build(), tableConfig);
executionEnvironment,
EnvironmentSettings.fromConfiguration(tableConfig.getConfiguration()),
tableConfig);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.functions.ScalarFunction;
Expand All @@ -30,6 +31,11 @@
import java.util.HashMap;
import java.util.Map;

import static org.apache.flink.api.common.RuntimeExecutionMode.BATCH;
import static org.apache.flink.api.common.RuntimeExecutionMode.STREAMING;
import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_PLANNER;

/**
* Defines all parameters that initialize a table environment. Those parameters are used only during
* instantiation of a {@link TableEnvironment} and cannot be changed afterwards.
Expand Down Expand Up @@ -100,6 +106,56 @@ public static Builder newInstance() {
return new Builder();
}

/** Creates an instance of {@link EnvironmentSettings} from {@link Configuration}. */
public static EnvironmentSettings fromConfiguration(Configuration configuration) {
Builder builder = new Builder();
if (configuration.get(RUNTIME_MODE).equals(STREAMING)) {
builder.inStreamingMode();
} else {
builder.inBatchMode();
}

switch (configuration.get(RUNTIME_MODE)) {
case STREAMING:
builder.inStreamingMode();
break;
case BATCH:
builder.inBatchMode();
break;
case AUTOMATIC:
throw new UnsupportedOperationException(
"TableEnvironment currently doesn't support `AUTOMATIC` mode.");
default:
throw new IllegalArgumentException(
String.format(
"Unrecognized value '%s' for option '%s'.",
configuration.get(RUNTIME_MODE), RUNTIME_MODE.key()));
}

switch (configuration.get(TABLE_PLANNER)) {
case BLINK:
builder.useBlinkPlanner();
break;
case OLD:
builder.useOldPlanner();
break;
default:
throw new IllegalArgumentException(
String.format(
"Unrecognized value '%s' for option '%s'.",
configuration.get(TABLE_PLANNER), TABLE_PLANNER.key()));
}
return builder.build();
}

/** Convert the environment setting to the {@link Configuration}. */
public Configuration toConfiguration() {
Configuration configuration = new Configuration();
configuration.set(RUNTIME_MODE, isStreamingMode() ? STREAMING : BATCH);
configuration.set(TABLE_PLANNER, isBlinkPlanner() ? PlannerType.BLINK : PlannerType.OLD);
return configuration;
}

/**
* Gets the specified name of the initial catalog to be created when instantiating a {@link
* TableEnvironment}.
Expand All @@ -121,6 +177,13 @@ public boolean isStreamingMode() {
return isStreamingMode;
}

/** Tells if the {@link TableEnvironment} should work in the blink planner or old planner. */
boolean isBlinkPlanner() {
return (this.plannerClass == null && this.executorClass == null)
|| (Builder.BLINK_PLANNER_FACTORY.equals(this.plannerClass)
&& Builder.BLINK_EXECUTOR_FACTORY.equals(this.executorClass));
}

@Internal
public Map<String, String> toPlannerProperties() {
Map<String, String> properties = new HashMap<>(toCommonProperties());
Expand Down
Loading

0 comments on commit ad52d3f

Please sign in to comment.