Skip to content

Commit

Permalink
[FLINK-15690][core] In environments, call configure() in constructors…
Browse files Browse the repository at this point in the history
… with passed Configuration

This change means it is possible to instantiate ExecutionEnvironment &
StreamExecutionEnvironment and apply a Configuration. Effectively this
enables configuring ExecutionConfig, CheckpointConfig and parameters
from environment via flink-conf.yaml.

This closes apache#10925
  • Loading branch information
dawidwys committed Jan 23, 2020
1 parent 5fcf9f6 commit 4a9a0a0
Show file tree
Hide file tree
Showing 10 changed files with 92 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.DetachedJobExecutionResult;
import org.apache.flink.core.execution.JobClient;
Expand All @@ -47,11 +46,6 @@ public class ContextEnvironment extends ExecutionEnvironment {
final Configuration configuration,
final ClassLoader userCodeClassLoader) {
super(executorServiceLoader, configuration, userCodeClassLoader);

final int parallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
if (parallelism > 0) {
setParallelism(parallelism);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut

private ClosureCleanerLevel closureCleanerLevel = ClosureCleanerLevel.RECURSIVE;

private int parallelism = PARALLELISM_DEFAULT;
private int parallelism = CoreOptions.DEFAULT_PARALLELISM.defaultValue();

/**
* The program wide maximum parallelism used for operators which haven't specified a maximum
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.api.common.cache;

import org.apache.flink.annotation.Public;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;

Expand All @@ -28,12 +29,15 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;

/**
* DistributedCache provides static methods to write the registered cache files into job configuration or decode
Expand Down Expand Up @@ -183,6 +187,33 @@ public static Set<Entry<String, DistributedCacheEntry>> readFileInfoFromConfig(C
return cacheFiles.entrySet();
}

/**
* Parses a list of distributed cache entries encoded in a string. Can be used to parse a config option
* described by {@link org.apache.flink.configuration.PipelineOptions#CACHED_FILES}.
*
* <p>See {@link org.apache.flink.configuration.PipelineOptions#CACHED_FILES} for the format.
*
* @param files List of string encoded distributed cache entries.
*/
public static List<Tuple2<String, DistributedCacheEntry>> parseCachedFilesFromString(List<String> files) {
return files.stream()
.map(v -> Arrays.stream(v.split(","))
.map(p -> p.split(":"))
.collect(
Collectors.toMap(
arr -> arr[0], // key name
arr -> arr[1] // value
)
)
)
.map(m -> Tuple2.of(
m.get("name"),
new DistributedCacheEntry(
m.get("path"),
Optional.ofNullable(m.get("executable")).map(Boolean::parseBoolean).orElse(false)))
).collect(Collectors.toList());
}

private static final String CACHE_FILE_NUM = "DISTRIBUTED_CACHE_FILE_NUM";

private static final String CACHE_FILE_NAME = "DISTRIBUTED_CACHE_FILE_NAME_";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,5 +225,7 @@ public class PipelineOptions {
"accessible from any user-defined function in the (distributed) runtime under a local path. " +
"Files may be local files (which will be distributed via BlobServer), or files in a distributed " +
"file system. The runtime will copy the files temporarily to a local cache, if needed.")
.linebreak()
.add(TextElement.code("name:file1,path:`file:https:///tmp/file1`;name:file2,path:`hdfs:https:///tmp/file2`"))
.build());
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.io.InputFormat;
Expand All @@ -50,8 +51,9 @@
import org.apache.flink.api.java.typeutils.ValueTypeInfo;
import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
import org.apache.flink.core.execution.DetachedJobExecutionResult;
Expand Down Expand Up @@ -166,15 +168,16 @@ public ExecutionEnvironment(
this.configuration = checkNotNull(configuration);
this.userClassloader = userClassloader == null ? getClass().getClassLoader() : userClassloader;

// the parallelism of a job or an operator can only be specified at the following places:
// i) at the operator level using the SingleOutputStreamOperator.setParallelism().
// ii) programmatically by using the env.setParallelism() method
// the configuration of a job or an operator can be specified at the following places:
// i) at the operator level using e.g. parallelism using the SingleOutputStreamOperator.setParallelism().
// ii) programmatically by using e.g. the env.setRestartStrategy() method
// iii) in the configuration passed here
//
// if specified in multiple places, the priority order is the above.
//
// Given this, it is safe to overwrite the execution config default value here because all other ways assume
// Given this, it is safe to overwrite the execution config default values here because all other ways assume
// that the env is already instantiated so they will overwrite the value passed here.
this.config.setParallelism(configuration.get(CoreOptions.DEFAULT_PARALLELISM));
this.configure(this.configuration, this.userClassloader);
}

/**
Expand Down Expand Up @@ -379,6 +382,28 @@ public void registerType(Class<?> type) {
}
}

/**
* Sets all relevant options contained in the {@link ReadableConfig} such as e.g.
* {@link PipelineOptions#CACHED_FILES}. It will reconfigure
* {@link ExecutionEnvironment} and {@link ExecutionConfig}.
*
* <p>It will change the value of a setting only if a corresponding option was set in the
* {@code configuration}. If a key is not present, the current value of a field will remain
* untouched.
*
* @param configuration a configuration to read the values from
* @param classLoader a class loader to use when loading classes
*/
@PublicEvolving
public void configure(ReadableConfig configuration, ClassLoader classLoader) {
configuration.getOptional(PipelineOptions.CACHED_FILES)
.ifPresent(f -> {
this.cacheFile.clear();
this.cacheFile.addAll(DistributedCache.parseCachedFilesFromString(f));
});
config.configure(configuration, classLoader);
}

// --------------------------------------------------------------------------------------------
// Data set creations
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def excluded_methods(cls):
'createCollectionsEnvironment', 'readFile', 'readFileOfPrimitives',
'generateSequence', 'areExplicitEnvironmentsAllowed', 'createInput',
'getUserCodeClassLoader', 'getExecutorServiceLoader', 'getConfiguration',
'executeAsync', 'registerJobListener', 'clearJobListeners'}
'executeAsync', 'registerJobListener', 'clearJobListeners', 'configure'}


if __name__ == '__main__':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ public void testCancel() throws Exception {
assertEquals("application/json; charset=UTF-8", response.getType());
assertEquals("{\"jid\":\"" + jid + "\",\"name\":\"Stoppable streaming test job\"," +
"\"execution-config\":{\"execution-mode\":\"PIPELINED\",\"restart-strategy\":\"Cluster level default restart strategy\"," +
"\"job-parallelism\":-1,\"object-reuse-mode\":false,\"user-config\":{}}}", response.getContent());
"\"job-parallelism\":1,\"object-reuse-mode\":false,\"user-config\":{}}}", response.getContent());
}

BlockingInvokable.reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.flink.api.java.operators.DataSource
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfoBase, ValueTypeInfo}
import org.apache.flink.api.java.{CollectionEnvironment, ExecutionEnvironment => JavaEnv}
import org.apache.flink.configuration.Configuration
import org.apache.flink.configuration.{Configuration, ReadableConfig}
import org.apache.flink.core.execution.{JobClient, JobListener, PipelineExecutor}
import org.apache.flink.core.fs.Path
import org.apache.flink.types.StringValue
Expand Down Expand Up @@ -192,6 +192,23 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
javaEnv.registerType(typeClass)
}

/**
* Sets all relevant options contained in the [[ReadableConfig]] such as e.g.
* [[org.apache.flink.configuration.PipelineOptions#CACHED_FILES]]. It will reconfigure
* [[ExecutionEnvironment]] and [[ExecutionConfig]].
*
* It will change the value of a setting only if a corresponding option was set in the
* `configuration`. If a key is not present, the current value of a field will remain
* untouched.
*
* @param configuration a configuration to read the values from
* @param classLoader a class loader to use when loading classes
*/
@PublicEvolving
def configure(configuration: ReadableConfig, classLoader: ClassLoader): Unit = {
javaEnv.configure(configuration, classLoader)
}

/**
* Creates a DataSet of Strings produced by reading the given file line wise.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.apache.flink.client.program.ContextEnvironment;
import org.apache.flink.client.program.OptimizerPlanEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.PipelineOptions;
Expand Down Expand Up @@ -104,7 +103,6 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand Down Expand Up @@ -208,16 +206,16 @@ public StreamExecutionEnvironment(
this.configuration = checkNotNull(configuration);
this.userClassloader = userClassloader == null ? getClass().getClassLoader() : userClassloader;

// the parallelism of a job or an operator can only be specified at the following places:
// i) at the operator level using the SingleOutputStreamOperator.setParallelism().
// ii) programmatically by using the env.setParallelism() method, or
// the configuration of a job or an operator can be specified at the following places:
// i) at the operator level using e.g. parallelism using the SingleOutputStreamOperator.setParallelism().
// ii) programmatically by using e.g. the env.setRestartStrategy() method
// iii) in the configuration passed here
//
// if specified in multiple places, the priority order is the above.
//
// Given this, it is safe to overwrite the execution config default value here because all other ways assume
// Given this, it is safe to overwrite the execution config default values here because all other ways assume
// that the env is already instantiated so they will overwrite the value passed here.
this.config.setParallelism(configuration.get(CoreOptions.DEFAULT_PARALLELISM));
this.configure(this.configuration, this.userClassloader);
}

protected Configuration getConfiguration() {
Expand Down Expand Up @@ -758,7 +756,7 @@ public void configure(ReadableConfig configuration, ClassLoader classLoader) {
configuration.getOptional(PipelineOptions.CACHED_FILES)
.ifPresent(f -> {
this.cacheFile.clear();
parseCachedFiles(f).forEach(t -> registerCachedFile(t.f1, t.f0, t.f2));
this.cacheFile.addAll(DistributedCache.parseCachedFilesFromString(f));
});
config.configure(configuration, classLoader);
checkpointCfg.configure(configuration);
Expand All @@ -775,24 +773,6 @@ private StateBackend loadStateBackend(ReadableConfig configuration, ClassLoader
}
}

private List<Tuple3<String, String, Boolean>> parseCachedFiles(List<String> s) {
return s.stream()
.map(v -> Arrays.stream(v.split(","))
.map(p -> p.split(":"))
.collect(
Collectors.toMap(
arr -> arr[0], // key name
arr -> arr[1] // value
)
)
)
.map(m -> Tuple3.of(
m.get("name"),
m.get("path"),
Optional.ofNullable(m.get("executable")).map(Boolean::parseBoolean).orElse(false)))
.collect(Collectors.toList());
}

// --------------------------------------------------------------------------------------------
// Data stream creations
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public static Collection<TestSpec> specs() {
.whenSetFromFile("pipeline.operator-chaining", "false")
.viaSetter((env, b) -> {
if (b) {
throw new IllegalArgumentException("Cannot programatically enable operator chaining");
throw new IllegalArgumentException("Cannot programmatically enable operator chaining");
} else {
env.disableOperatorChaining();
}
Expand Down

0 comments on commit 4a9a0a0

Please sign in to comment.