Skip to content

Commit

Permalink
[FLINK-20002] Add a StreamExecutionEnvironment#getExecutionEnvironmen…
Browse files Browse the repository at this point in the history
…t(Configuration) method
  • Loading branch information
dawidwys committed Nov 7, 2020
1 parent dc56273 commit c037dcb
Show file tree
Hide file tree
Showing 9 changed files with 108 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,17 @@ public static void setAsContext(
final ClassLoader userCodeClassLoader,
final boolean enforceSingleJobExecution,
final boolean suppressSysout) {
StreamExecutionEnvironmentFactory factory = () -> new StreamContextEnvironment(
executorServiceLoader,
configuration,
userCodeClassLoader,
enforceSingleJobExecution,
suppressSysout);
StreamExecutionEnvironmentFactory factory = conf -> {
Configuration mergedConfiguration = new Configuration();
mergedConfiguration.addAll(configuration);
mergedConfiguration.addAll(conf);
return new StreamContextEnvironment(
executorServiceLoader,
mergedConfiguration,
userCodeClassLoader,
enforceSingleJobExecution,
suppressSysout);
};
initializeContextEnvironment(factory);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ public JobClient executeAsync(StreamGraph streamGraph) {
}

public void setAsContext() {
StreamExecutionEnvironmentFactory factory = () -> this;
StreamExecutionEnvironmentFactory factory = conf -> {
this.configure(conf, getUserClassloader());
return this;
};
initializeContextEnvironment(factory);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ private List<URL> getUpdatedJarFiles() throws MalformedURLException {
}

public static void disableAllContextAndOtherEnvironments() {
initializeContextEnvironment(() -> {
initializeContextEnvironment(configuration -> {
throw new UnsupportedOperationException("Execution Environment is already defined for this shell.");
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@
* The LocalStreamEnvironment is a StreamExecutionEnvironment that runs the program locally,
* multi-threaded, in the JVM where the environment is instantiated. It spawns an embedded
* Flink cluster in the background and executes the program on that cluster.
*
* <p>When this environment is instantiated, it uses a default parallelism of {@code 1}. The default
* parallelism can be set via {@link #setParallelism(int)}.
*/
@Public
public class LocalStreamEnvironment extends StreamExecutionEnvironment {
Expand All @@ -54,7 +51,6 @@ public LocalStreamEnvironment() {
*/
public LocalStreamEnvironment(@Nonnull Configuration configuration) {
super(validateAndGetConfiguration(configuration));
setParallelism(1);
}

private static Configuration validateAndGetConfiguration(final Configuration configuration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.PipelineOptions;
Expand Down Expand Up @@ -2043,9 +2044,27 @@ public void addOperator(Transformation<?> transformation) {
* executed.
*/
public static StreamExecutionEnvironment getExecutionEnvironment() {
return getExecutionEnvironment(new Configuration());
}

/**
* Creates an execution environment that represents the context in which the
* program is currently executed. If the program is invoked standalone, this
* method returns a local execution environment, as returned by
* {@link #createLocalEnvironment(Configuration)}.
*
* <p>When executed from the command line the given configuration is stacked on top of the
* global configuration which comes from the {@code flink-conf.yaml}, potentially overriding
* duplicated options.
*
* @param configuration The configuration to instantiate the environment with.
* @return The execution environment of the context in which the program is
* executed.
*/
public static StreamExecutionEnvironment getExecutionEnvironment(Configuration configuration) {
return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
.map(StreamExecutionEnvironmentFactory::createExecutionEnvironment)
.orElseGet(StreamExecutionEnvironment::createLocalEnvironment);
.map(factory -> factory.createExecutionEnvironment(configuration))
.orElseGet(() -> StreamExecutionEnvironment.createLocalEnvironment(configuration));
}

/**
Expand Down Expand Up @@ -2088,12 +2107,30 @@ public static LocalStreamEnvironment createLocalEnvironment(int parallelism) {
* @return A local execution environment with the specified parallelism.
*/
public static LocalStreamEnvironment createLocalEnvironment(int parallelism, Configuration configuration) {
final LocalStreamEnvironment currentEnvironment;

currentEnvironment = new LocalStreamEnvironment(configuration);
currentEnvironment.setParallelism(parallelism);
Configuration copyOfConfiguration = new Configuration();
copyOfConfiguration.addAll(configuration);
copyOfConfiguration.set(CoreOptions.DEFAULT_PARALLELISM, parallelism);
return createLocalEnvironment(copyOfConfiguration);
}

return currentEnvironment;
/**
* Creates a {@link LocalStreamEnvironment}. The local execution environment
* will run the program in a multi-threaded fashion in the same JVM as the
* environment was created in.
*
* @param configuration
* Pass a custom configuration into the cluster
* @return A local execution environment with the specified parallelism.
*/
public static LocalStreamEnvironment createLocalEnvironment(Configuration configuration) {
if (configuration.getOptional(CoreOptions.DEFAULT_PARALLELISM).isPresent()) {
return new LocalStreamEnvironment(configuration);
} else {
Configuration copyOfConfiguration = new Configuration();
copyOfConfiguration.addAll(configuration);
copyOfConfiguration.set(CoreOptions.DEFAULT_PARALLELISM, defaultLocalParallelism);
return new LocalStreamEnvironment(copyOfConfiguration);
}
}

/**
Expand All @@ -2116,7 +2153,7 @@ public static StreamExecutionEnvironment createLocalEnvironmentWithWebUI(Configu
conf.setInteger(RestOptions.PORT, RestOptions.PORT.defaultValue());
}

return createLocalEnvironment(defaultLocalParallelism, conf);
return createLocalEnvironment(conf);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,20 @@
package org.apache.flink.streaming.api.environment;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.Configuration;

/**
* Factory class for stream execution environments.
*/
@PublicEvolving
@FunctionalInterface
public interface StreamExecutionEnvironmentFactory {

/**
* Creates a StreamExecutionEnvironment from this factory.
*
* @return A StreamExecutionEnvironment.
*/
StreamExecutionEnvironment createExecutionEnvironment();
StreamExecutionEnvironment createExecutionEnvironment(Configuration configuration);

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobListener;
Expand All @@ -34,6 +36,7 @@

import javax.annotation.Nullable;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;

Expand Down Expand Up @@ -123,6 +126,37 @@ public void testLoadingListenersFromConfiguration() {
assertThat(envFromConfiguration.getJobListeners().get(1), instanceOf(BasicJobExecutedCounter.class));
}

@Test
public void testGettingEnvironmentWithConfiguration() {
Configuration configuration = new Configuration();
configuration.setString("state.backend", "jobmanager");
configuration.set(CoreOptions.DEFAULT_PARALLELISM, 10);
configuration.set(PipelineOptions.AUTO_WATERMARK_INTERVAL, Duration.ofMillis(100));

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(
configuration);

assertThat(env.getParallelism(), equalTo(10));
assertThat(env.getConfig().getAutoWatermarkInterval(), equalTo(100L));
assertThat(env.getStateBackend(), instanceOf(MemoryStateBackend.class));
}

@Test
public void testLocalEnvironmentExplicitParallelism() {
Configuration configuration = new Configuration();
configuration.setString("state.backend", "jobmanager");
configuration.set(CoreOptions.DEFAULT_PARALLELISM, 10);
configuration.set(PipelineOptions.AUTO_WATERMARK_INTERVAL, Duration.ofMillis(100));

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(
2,
configuration);

assertThat(env.getParallelism(), equalTo(2));
assertThat(env.getConfig().getAutoWatermarkInterval(), equalTo(100L));
assertThat(env.getStateBackend(), instanceOf(MemoryStateBackend.class));
}

/**
* JobSubmitted counter listener for unit test.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,15 @@ public static void setAsContext(
final Collection<Path> jarFiles,
final Collection<URL> classpaths) {

StreamExecutionEnvironmentFactory factory = () -> new TestStreamEnvironment(
StreamExecutionEnvironmentFactory factory = conf -> {
TestStreamEnvironment env = new TestStreamEnvironment(
miniCluster,
parallelism,
jarFiles,
classpaths);
env.configure(conf, env.getUserClassloader());
return env;
};

initializeContextEnvironment(factory);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,18 +127,19 @@ public void batchSumSingleResultPerKey() throws Exception {
}

private StreamExecutionEnvironment getExecutionEnvironment() {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Configuration config = new Configuration();
config.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
env.configure(config, DataStreamBatchExecutionITCase.class.getClassLoader());

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(
config);
env.setParallelism(1);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, Time.milliseconds(1)));

// trick the collecting sink into working even in the face of failures 🙏
env.enableCheckpointing(42);

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, Time.milliseconds(1)));

return env;
}

Expand Down

0 comments on commit c037dcb

Please sign in to comment.