From cb26f08ee82be30a72d9763866e7ef09fc9b3c71 Mon Sep 17 00:00:00 2001 From: Timo Walther Date: Fri, 19 Aug 2022 11:12:16 +0200 Subject: [PATCH] [hotfix][streaming-java] Avoid deep copy in StreamExecutionEnvironment.getConfiguration StreamExecutionEnvironment.getConfiguration() was not only a ReadableConfig but also a deep copy. This caused various problems downstream (Python API) and makes accessing configuration expensive. Root configuration of TableConfig should not be a snapshot but a reference that always reflects the current status of StreamExecutionEnvironment. Otherwise, when merging e.g. PipelineOptions.JARS, it is possible that entries get lost when an outdated root configuration is used during merging. --- .../state/api/runtime/MutableConfig.java | 6 ++--- .../flink/python/util/PythonConfigUtil.java | 26 ++----------------- .../StreamExecutionEnvironment.java | 18 +++++++++++-- 3 files changed, 20 insertions(+), 30 deletions(-) diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/MutableConfig.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/MutableConfig.java index b726c4829a3c7..3c768fdf92c78 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/MutableConfig.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/MutableConfig.java @@ -21,7 +21,6 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; -import org.apache.flink.configuration.UnmodifiableConfiguration; /** A utility for creating a mutable {@link Configuration} from a {@link ReadableConfig}. */ @Internal @@ -36,11 +35,10 @@ private MutableConfig() {} * @return A mutable Configuration. */ public static Configuration of(ReadableConfig config) { - if (!(config instanceof UnmodifiableConfiguration)) { + if (!(config instanceof Configuration)) { throw new IllegalStateException( "Unexpected implementation of ReadableConfig: " + config.getClass()); } - - return new Configuration((UnmodifiableConfiguration) config); + return new Configuration((Configuration) config); } } diff --git a/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java b/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java index 733b874859cc1..28a484f313ff2 100644 --- a/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java +++ b/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java @@ -57,7 +57,6 @@ import java.lang.reflect.Constructor; import java.lang.reflect.Field; -import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -74,29 +73,8 @@ public class PythonConfigUtil { public static final String STREAM_PARTITION_CUSTOM_MAP_OPERATOR_NAME = "_partition_custom_map_operator"; - /** - * Get the private field {@link StreamExecutionEnvironment#configuration} by reflection - * recursively. It allows modification to the configuration compared with {@link - * StreamExecutionEnvironment#getConfiguration()}. - */ - public static Configuration getEnvironmentConfig(StreamExecutionEnvironment env) - throws InvocationTargetException, IllegalAccessException, NoSuchFieldException { - Field configurationField = null; - for (Class clz = env.getClass(); clz != Object.class; clz = clz.getSuperclass()) { - try { - configurationField = clz.getDeclaredField("configuration"); - break; - } catch (NoSuchFieldException e) { - // ignore - } - } - - if (configurationField == null) { - throw new NoSuchFieldException("Field 'configuration' not found."); - } - - configurationField.setAccessible(true); - return (Configuration) configurationField.get(env); + public static Configuration getEnvironmentConfig(StreamExecutionEnvironment env) { + return (Configuration) env.getConfiguration(); } public static void configPythonOperator(StreamExecutionEnvironment env) throws Exception { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 22b7d27be0733..111b575824cac 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -62,7 +62,6 @@ import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.configuration.RestOptions; import org.apache.flink.configuration.StateChangelogOptions; -import org.apache.flink.configuration.UnmodifiableConfiguration; import org.apache.flink.core.execution.CacheSupportedPipelineExecutor; import org.apache.flink.core.execution.DefaultExecutorServiceLoader; import org.apache.flink.core.execution.DetachedJobExecutionResult; @@ -2351,7 +2350,22 @@ public void addOperator(Transformation transformation) { */ @Internal public ReadableConfig getConfiguration() { - return new UnmodifiableConfiguration(configuration); + // Note to implementers: + // In theory, you can cast the return value of this method to Configuration and perform + // mutations. In practice, this could cause side effects. A better approach is to implement + // the ReadableConfig interface and create a layered configuration. + // For example: + // TableConfig implements ReadableConfig { + // underlyingLayer ReadableConfig + // thisConfigLayer Configuration + // + // get(configOption) { + // return thisConfigLayer + // .getOptional(configOption) + // .orElseGet(underlyingLayer.get(configOption)) + // } + // } + return configuration; } // --------------------------------------------------------------------------------------------