Skip to content

Commit

Permalink
[hotfix][streaming-java] Avoid deep copy in StreamExecutionEnvironmen…
Browse files Browse the repository at this point in the history
…t.getConfiguration

StreamExecutionEnvironment.getConfiguration() was not only a ReadableConfig but also a
deep copy. This caused various problems downstream (Python API) and makes accessing
configuration expensive. Root configuration of TableConfig should not be a snapshot
but a reference that always reflects the current status of StreamExecutionEnvironment.
Otherwise, when merging e.g. PipelineOptions.JARS, it is possible that entries get
lost when an outdated root configuration is used during merging.
  • Loading branch information
twalthr committed Aug 23, 2022
1 parent 9201f1e commit cb26f08
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.UnmodifiableConfiguration;

/** A utility for creating a mutable {@link Configuration} from a {@link ReadableConfig}. */
@Internal
Expand All @@ -36,11 +35,10 @@ private MutableConfig() {}
* @return A mutable Configuration.
*/
public static Configuration of(ReadableConfig config) {
if (!(config instanceof UnmodifiableConfiguration)) {
if (!(config instanceof Configuration)) {
throw new IllegalStateException(
"Unexpected implementation of ReadableConfig: " + config.getClass());
}

return new Configuration((UnmodifiableConfiguration) config);
return new Configuration((Configuration) config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@

import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand All @@ -74,29 +73,8 @@ public class PythonConfigUtil {
public static final String STREAM_PARTITION_CUSTOM_MAP_OPERATOR_NAME =
"_partition_custom_map_operator";

/**
* Get the private field {@link StreamExecutionEnvironment#configuration} by reflection
* recursively. It allows modification to the configuration compared with {@link
* StreamExecutionEnvironment#getConfiguration()}.
*/
public static Configuration getEnvironmentConfig(StreamExecutionEnvironment env)
throws InvocationTargetException, IllegalAccessException, NoSuchFieldException {
Field configurationField = null;
for (Class<?> clz = env.getClass(); clz != Object.class; clz = clz.getSuperclass()) {
try {
configurationField = clz.getDeclaredField("configuration");
break;
} catch (NoSuchFieldException e) {
// ignore
}
}

if (configurationField == null) {
throw new NoSuchFieldException("Field 'configuration' not found.");
}

configurationField.setAccessible(true);
return (Configuration) configurationField.get(env);
public static Configuration getEnvironmentConfig(StreamExecutionEnvironment env) {
return (Configuration) env.getConfiguration();
}

public static void configPythonOperator(StreamExecutionEnvironment env) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.StateChangelogOptions;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.core.execution.CacheSupportedPipelineExecutor;
import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
import org.apache.flink.core.execution.DetachedJobExecutionResult;
Expand Down Expand Up @@ -2351,7 +2350,22 @@ public void addOperator(Transformation<?> transformation) {
*/
@Internal
public ReadableConfig getConfiguration() {
return new UnmodifiableConfiguration(configuration);
// Note to implementers:
// In theory, you can cast the return value of this method to Configuration and perform
// mutations. In practice, this could cause side effects. A better approach is to implement
// the ReadableConfig interface and create a layered configuration.
// For example:
// TableConfig implements ReadableConfig {
// underlyingLayer ReadableConfig
// thisConfigLayer Configuration
//
// get(configOption) {
// return thisConfigLayer
// .getOptional(configOption)
// .orElseGet(underlyingLayer.get(configOption))
// }
// }
return configuration;
}

// --------------------------------------------------------------------------------------------
Expand Down

0 comments on commit cb26f08

Please sign in to comment.