Skip to content

Commit

Permalink
[FLINK-24255][tests] Test environments respect configuration when bei…
Browse files Browse the repository at this point in the history
…ng instantiated.

This closes apache#17240
  • Loading branch information
StephanEwen committed Nov 18, 2021
1 parent e6798c3 commit 57253c5
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,26 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment {

public TestStreamEnvironment(
MiniCluster miniCluster,
Configuration config,
int parallelism,
Collection<Path> jarFiles,
Collection<URL> classPaths) {
super(
new MiniClusterPipelineExecutorServiceLoader(miniCluster),
MiniClusterPipelineExecutorServiceLoader.createConfiguration(jarFiles, classPaths),
MiniClusterPipelineExecutorServiceLoader.updateConfigurationForMiniCluster(
config, jarFiles, classPaths),
null);

setParallelism(parallelism);
}

public TestStreamEnvironment(MiniCluster miniCluster, int parallelism) {
this(miniCluster, parallelism, Collections.emptyList(), Collections.emptyList());
this(
miniCluster,
new Configuration(),
parallelism,
Collections.emptyList(),
Collections.emptyList());
}

/**
Expand All @@ -83,7 +90,7 @@ public static void setAsContext(
conf -> {
TestStreamEnvironment env =
new TestStreamEnvironment(
miniCluster, parallelism, jarFiles, classpaths);
miniCluster, conf, parallelism, jarFiles, classpaths);

randomizeConfiguration(miniCluster, conf);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.client.deployment.executors.PipelineExecutorUtils;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
Expand All @@ -36,6 +37,9 @@
import org.apache.flink.runtime.minicluster.MiniClusterJobClient;
import org.apache.flink.streaming.api.graph.StreamGraph;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
Expand All @@ -48,6 +52,10 @@
* PipelineExecutors} that use a given {@link MiniCluster}.
*/
public class MiniClusterPipelineExecutorServiceLoader implements PipelineExecutorServiceLoader {

private static final Logger LOG =
LoggerFactory.getLogger(MiniClusterPipelineExecutorServiceLoader.class);

public static final String NAME = "minicluster";

private final MiniCluster miniCluster;
Expand All @@ -60,9 +68,14 @@ public MiniClusterPipelineExecutorServiceLoader(MiniCluster miniCluster) {
* Populates a {@link Configuration} that is compatible with this {@link
* MiniClusterPipelineExecutorServiceLoader}.
*/
public static Configuration createConfiguration(
Collection<Path> jarFiles, Collection<URL> classPaths) {
Configuration config = new Configuration();
public static Configuration updateConfigurationForMiniCluster(
Configuration config, Collection<Path> jarFiles, Collection<URL> classPaths) {

checkOverridesOption(config, PipelineOptions.JARS);
checkOverridesOption(config, PipelineOptions.CLASSPATHS);
checkOverridesOption(config, DeploymentOptions.TARGET);
checkOverridesOption(config, DeploymentOptions.ATTACHED);

ConfigUtils.encodeCollectionToConfig(
config,
PipelineOptions.JARS,
Expand All @@ -75,6 +88,12 @@ public static Configuration createConfiguration(
return config;
}

private static void checkOverridesOption(Configuration config, ConfigOption<?> option) {
if (config.contains(option)) {
LOG.warn("Overriding config setting '{}' for MiniCluster.", option.key());
}
}

private static String getAbsoluteURL(Path path) {
FileSystem fs;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironmentFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.util.Preconditions;
Expand All @@ -46,7 +47,8 @@ public TestEnvironment(
Collection<URL> classPaths) {
super(
new MiniClusterPipelineExecutorServiceLoader(miniCluster),
MiniClusterPipelineExecutorServiceLoader.createConfiguration(jarFiles, classPaths),
MiniClusterPipelineExecutorServiceLoader.updateConfigurationForMiniCluster(
new Configuration(), jarFiles, classPaths),
null);

this.miniCluster = Preconditions.checkNotNull(miniCluster);
Expand Down

0 comments on commit 57253c5

Please sign in to comment.