Skip to content

Commit

Permalink
[FLINK-26709][table] Replace TableConfig.getConfiguration
Browse files Browse the repository at this point in the history
Replace `TableConfig.getConfiguration()` with directly passing
`TableConfig`, since `TableConfig` is now a `ReadableConfig` and
 the calls to `get/getOptional` give a full view, including the
 `rootConfiguration`, which makes all the options coming from the
environment (flink-conf.yaml, CLI params) available.
  • Loading branch information
matriv authored and twalthr committed Mar 22, 2022
1 parent 0b4b535 commit 09f2c59
Show file tree
Hide file tree
Showing 30 changed files with 62 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ private DynamicTableSource getTableSource(String tableName) throws Exception {
hiveCatalog.getFactory().orElseThrow(IllegalStateException::new),
tableIdentifier,
tableEnvInternal.getCatalogManager().resolveCatalogTable(catalogTable),
tableEnv.getConfig().getConfiguration(),
tableEnv.getConfig(),
Thread.currentThread().getContextClassLoader(),
false);
}
Expand All @@ -306,7 +306,7 @@ private DynamicTableSink getTableSink(String tableName) throws Exception {
hiveCatalog.getFactory().orElseThrow(IllegalStateException::new),
tableIdentifier,
tableEnvInternal.getCatalogManager().resolveCatalogTable(catalogTable),
tableEnv.getConfig().getConfiguration(),
tableEnv.getConfig(),
Thread.currentThread().getContextClassLoader(),
false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ private FileSystemLookupFunction<HiveTablePartition> getLookupFunction(String ta
tableEnvInternal
.getCatalogManager()
.resolveCatalogTable(catalogTable),
tableEnv.getConfig().getConfiguration(),
tableEnv.getConfig(),
Thread.currentThread().getContextClassLoader(),
false);
FileSystemLookupFunction<HiveTablePartition> lookupFunction =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.python.util.PythonDependencyUtils;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.functions.python.PythonFunction;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
Expand Down Expand Up @@ -124,7 +125,11 @@ static PythonFunction getPythonFunction(
Configuration mergedConfig =
new Configuration(
ExecutionEnvironment.getExecutionEnvironment().getConfiguration());
PythonDependencyUtils.merge(mergedConfig, (Configuration) config);
if (config instanceof TableConfig) {
PythonDependencyUtils.merge(mergedConfig, ((TableConfig) config).getConfiguration());
} else {
PythonDependencyUtils.merge(mergedConfig, (Configuration) config);
}
PythonFunctionFactory pythonFunctionFactory =
PYTHON_FUNCTION_FACTORY_CACHE.get(CacheKey.of(mergedConfig, classLoader));
ensureCacheCleanupExecutorServiceStarted();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.internal.TableResultInternal;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
Expand Down Expand Up @@ -139,8 +138,7 @@ private static final class MockExecutor implements Executor {

private final TypedResult<?> typedResult;
private final CountDownLatch cancellationCounter;
private static final Configuration defaultConfig =
TableConfig.getDefault().getConfiguration();
private static final Configuration defaultConfig = new Configuration();

public MockExecutor(TypedResult<?> typedResult, CountDownLatch cancellationCounter) {
this.typedResult = typedResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.internal.TableResultInternal;
import org.apache.flink.table.client.cli.utils.SqlParserHelper;
import org.apache.flink.table.client.gateway.Executor;
Expand All @@ -40,7 +39,7 @@
/** A customizable {@link Executor} for testing purposes. */
class TestingExecutor implements Executor {

private static final Configuration defaultConfig = TableConfig.getDefault().getConfiguration();
private static final Configuration defaultConfig = new Configuration();
private int numCancelCalls = 0;

private int numRetrieveResultChancesCalls = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.client.cli.DefaultCLI;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.client.gateway.utils.UserDefinedFunctions;
import org.apache.flink.table.utils.TestUserClassLoaderJar;

Expand Down Expand Up @@ -219,12 +220,8 @@ private Map<String, String> getConfigurationMap() {
.toMap();
}

private Configuration getConfiguration() {
return sessionContext
.getExecutionContext()
.getTableEnvironment()
.getConfig()
.getConfiguration();
private ReadableConfig getConfiguration() {
return sessionContext.getExecutionContext().getTableEnvironment().getConfig();
}

private void validateAddJar(String jarPath) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,8 @@ protected <T> DataStream<T> toStreamInternal(Table table, ModifyOperation modify
final Transformation<T> transformation = getTransformation(table, transformations);
executionEnvironment.addOperator(transformation);

// reconfigure whenever planner transformations are added
// Reconfigure whenever planner transformations are added
// We pass only the configuration to avoid reconfiguration with the rootConfiguration
executionEnvironment.configure(tableConfig.getConfiguration());

return new DataStream<>(executionEnvironment, transformation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ default TableSchema getTableSchema() {
*
* <p>This method has slightly different behaviors under different checkpointing settings (to
* enable checkpointing for a streaming job, set checkpointing properties through {@link
* TableConfig#getConfiguration()}).
* TableConfig}).
*
* <ul>
* <li>For batch jobs or streaming jobs without checkpointing, this method has neither
Expand All @@ -205,7 +205,7 @@ default TableSchema getTableSchema() {
*
* <p>This method has slightly different behaviors under different checkpointing settings (to
* enable checkpointing for a streaming job, set checkpointing properties through {@link
* TableConfig#getConfiguration()}).
* TableConfig}).
*
* <ul>
* <li>For batch jobs or streaming jobs without checkpointing, this method has neither
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ public static TableEnvironmentImpl create(EnvironmentSettings settings) {
final CatalogManager catalogManager =
CatalogManager.newBuilder()
.classLoader(classLoader)
.config(tableConfig.getConfiguration())
.config(tableConfig)
.defaultCatalog(
settings.getBuiltInCatalogName(),
new GenericInMemoryCatalog(
Expand Down Expand Up @@ -796,6 +796,7 @@ public TableResultInternal executeInternal(List<ModifyOperation> operations) {
private TableResultInternal executeInternal(
List<Transformation<?>> transformations, List<String> sinkIdentifierNames) {
final String defaultJobName = "insert-into_" + String.join(",", sinkIdentifierNames);
// We pass only the configuration to avoid reconfiguration with the rootConfiguration
Pipeline pipeline =
execEnv.createPipeline(
transformations, tableConfig.getConfiguration(), defaultJobName);
Expand Down Expand Up @@ -826,6 +827,7 @@ private TableResultInternal executeQueryOperation(QueryOperation operation) {
List<Transformation<?>> transformations =
translate(Collections.singletonList(sinkOperation));
final String defaultJobName = "collect";
// We pass only the configuration to avoid reconfiguration with the rootConfiguration
Pipeline pipeline =
execEnv.createPipeline(
transformations, tableConfig.getConfiguration(), defaultJobName);
Expand Down Expand Up @@ -1352,10 +1354,7 @@ private TableResultInternal createCatalog(CreateCatalogOperation operation) {

Catalog catalog =
FactoryUtil.createCatalog(
catalogName,
properties,
tableConfig.getConfiguration(),
userClassLoader);
catalogName, properties, tableConfig, userClassLoader);
catalogManager.registerCatalog(catalogName, catalog);

return TableResultImpl.TABLE_RESULT_OK;
Expand All @@ -1371,7 +1370,7 @@ private TableResultInternal loadModule(LoadModuleOperation operation) {
FactoryUtil.createModule(
operation.getModuleName(),
operation.getOptions(),
tableConfig.getConfiguration(),
tableConfig,
userClassLoader);
moduleManager.loadModule(operation.getModuleName(), module);
return TableResultImpl.TABLE_RESULT_OK;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
Expand Down Expand Up @@ -76,11 +75,6 @@ public final class FunctionCatalog {
*/
private PlannerTypeInferenceUtil plannerTypeInferenceUtil;

public FunctionCatalog(
TableConfig tableConfig, CatalogManager catalogManager, ModuleManager moduleManager) {
this(checkNotNull(tableConfig).getConfiguration(), catalogManager, moduleManager);
}

public FunctionCatalog(
ReadableConfig config, CatalogManager catalogManager, ModuleManager moduleManager) {
this.config = checkNotNull(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ private ExpressionResolver(
List<LocalReferenceExpression> localReferences,
@Nullable DataType outputDataType,
boolean isGroupedAggregation) {
this.config = Preconditions.checkNotNull(tableConfig).getConfiguration();
this.config = Preconditions.checkNotNull(tableConfig);
this.tableLookup = Preconditions.checkNotNull(tableLookup);
this.fieldLookup = Preconditions.checkNotNull(fieldLookup);
this.functionLookup = Preconditions.checkNotNull(functionLookup);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ private static TableEnvironmentMock getInstance(boolean isStreamingMode) {
moduleManager,
tableConfig,
createExecutor(),
createFunctionCatalog(
tableConfig.getConfiguration(), catalogManager, moduleManager),
createFunctionCatalog(tableConfig, catalogManager, moduleManager),
createPlanner(),
isStreamingMode);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ object StreamTableEnvironmentImpl {

val catalogManager = CatalogManager.newBuilder
.classLoader(classLoader)
.config(tableConfig.getConfiguration)
.config(tableConfig)
.defaultCatalog(
settings.getBuiltInCatalogName,
new GenericInMemoryCatalog(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.table.planner.plan;

import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
Expand Down Expand Up @@ -527,10 +526,9 @@ private RelNode convertToExternalScan(
boolean isTopLevelRecord,
ChangelogMode changelogMode) {
final FlinkContext flinkContext = ShortcutUtils.unwrapContext(relBuilder);
final ReadableConfig config = flinkContext.getTableConfig().getConfiguration();
return DynamicSourceUtils.convertDataStreamToRel(
flinkContext.isBatchMode(),
config,
flinkContext.getTableConfig(),
relBuilder,
contextResolvedTable,
dataStream,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
Expand Down Expand Up @@ -75,15 +74,13 @@ public void apply(DynamicTableSource tableSource, SourceAbilityContext context)
if (tableSource instanceof SupportsWatermarkPushDown) {
GeneratedWatermarkGenerator generatedWatermarkGenerator =
WatermarkGeneratorCodeGenerator.generateWatermarkGenerator(
context.getTableConfig().getConfiguration(),
context.getTableConfig(),
context.getSourceRowType(),
watermarkExpr,
Option.apply("context"));
Configuration configuration = context.getTableConfig().getConfiguration();

WatermarkGeneratorSupplier<RowData> supplier =
new GeneratedWatermarkGeneratorSupplier(
configuration, generatedWatermarkGenerator);
new GeneratedWatermarkGeneratorSupplier(generatedWatermarkGenerator);

WatermarkStrategy<RowData> watermarkStrategy = WatermarkStrategy.forGenerator(supplier);
if (idleTimeoutMillis > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.serde;

import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.planner.calcite.FlinkContext;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
Expand Down Expand Up @@ -71,8 +71,8 @@ public Parser getParser() {
return parser;
}

public Configuration getConfiguration() {
return flinkContext.getTableConfig().getConfiguration();
public ReadableConfig getConfiguration() {
return flinkContext.getTableConfig();
}

public ClassLoader getClassLoader() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public DynamicTableSink getTableSink(FlinkContext flinkContext) {
contextResolvedTable.getIdentifier(),
contextResolvedTable.getResolvedTable(),
loadOptionsFromCatalogTable(contextResolvedTable, flinkContext),
flinkContext.getTableConfig().getConfiguration(),
flinkContext.getTableConfig(),
flinkContext.getClassLoader(),
contextResolvedTable.isTemporary());
if (sinkAbilities != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ private DynamicTableSource getTableSource(FlinkContext flinkContext) {
contextResolvedTable.getIdentifier(),
contextResolvedTable.getResolvedTable(),
loadOptionsFromCatalogTable(contextResolvedTable, flinkContext),
flinkContext.getTableConfig().getConfiguration(),
flinkContext.getTableConfig(),
flinkContext.getClassLoader(),
contextResolvedTable.isTemporary());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,7 @@ Map<String, String> loadOptionsFromCatalogTable(
// In case of CatalogPlanRestore.IDENTIFIER, getCatalogTable() already returns the table
// loaded from the catalog
final TableConfigOptions.CatalogPlanRestore catalogPlanRestore =
flinkContext
.getTableConfig()
.getConfiguration()
.get(TableConfigOptions.PLAN_RESTORE_CATALOG_OBJECTS);
flinkContext.getTableConfig().get(TableConfigOptions.PLAN_RESTORE_CATALOG_OBJECTS);
if (!contextResolvedTable.isPermanent()
|| catalogPlanRestore != TableConfigOptions.CatalogPlanRestore.ALL) {
return Collections.emptyMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.table.planner.plan.schema;

import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.catalog.Catalog;
Expand Down Expand Up @@ -118,7 +117,7 @@ public RelNode toRel(ToRelContext toRelContext) {
// prepare table source and convert to RelNode
return DynamicSourceUtils.convertSourceToRel(
!schemaTable.isStreamingMode(),
context.getTableConfig().getConfiguration(),
context.getTableConfig(),
relBuilder,
schemaTable.getContextResolvedTable(),
schemaTable.getStatistic(),
Expand Down Expand Up @@ -153,7 +152,6 @@ private ContextResolvedTable computeContextResolvedTable(

private DynamicTableSource createDynamicTableSource(
FlinkContext context, ResolvedCatalogTable catalogTable) {
final ReadableConfig config = context.getTableConfig().getConfiguration();

final Optional<DynamicTableSourceFactory> factoryFromCatalog =
schemaTable
Expand All @@ -178,7 +176,7 @@ private DynamicTableSource createDynamicTableSource(
factory,
schemaTable.getContextResolvedTable().getIdentifier(),
catalogTable,
config,
context.getTableConfig(),
Thread.currentThread().getContextClassLoader(),
schemaTable.isTemporary());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
Expand Down Expand Up @@ -84,6 +85,11 @@ public ExecutionConfig getConfig() {
return realExecEnv.getConfig();
}

@Override
public ReadableConfig getConfiguration() {
return realExecEnv.getConfiguration();
}

@Override
public List<Tuple2<String, DistributedCache.DistributedCacheEntry>> getCachedFiles() {
return realExecEnv.getCachedFiles();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public static FlinkContext unwrapContext(Context context) {
}

public static ReadableConfig unwrapTableConfig(RelNode relNode) {
return unwrapContext(relNode).getTableConfig().getConfiguration();
return unwrapContext(relNode).getTableConfig();
}

public static @Nullable FunctionDefinition unwrapFunctionDefinition(
Expand Down
Loading

0 comments on commit 09f2c59

Please sign in to comment.