Skip to content

Commit

Permalink
[FLINK-12101] Deduplicate code by introducing ExecutionEnvironment#re…
Browse files Browse the repository at this point in the history
…solveFactory

ExecutionEnvironment#resolveFactory selects between the thread local and the global factory.
This method is used by the ExecutionEnvironment as well as the StreamExecutionEnvironment.

This closes apache#8543.
  • Loading branch information
tillrohrmann committed Jun 6, 2019
1 parent ad801b9 commit ac84e0c
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,10 @@ public abstract class ExecutionEnvironment {
protected static final Logger LOG = LoggerFactory.getLogger(ExecutionEnvironment.class);

/** The environment of the context (local by default, cluster if invoked through command line). */
private static ExecutionEnvironmentFactory contextEnvironmentFactory;
private static ExecutionEnvironmentFactory contextEnvironmentFactory = null;

/** The ThreadLocal used to store {@link ExecutionEnvironmentFactory}. */
private static ThreadLocal<ExecutionEnvironmentFactory> contextEnvironmentFactoryThreadLocal = new ThreadLocal<>();
private static final ThreadLocal<ExecutionEnvironmentFactory> threadLocalContextEnvironmentFactory = new ThreadLocal<>();

/** The default parallelism used by local environments. */
private static int defaultLocalDop = Runtime.getRuntime().availableProcessors();
Expand Down Expand Up @@ -1064,11 +1064,9 @@ private static String getDefaultName() {
* @return The execution environment of the context in which the program is executed.
*/
public static ExecutionEnvironment getExecutionEnvironment() {

return contextEnvironmentFactoryThreadLocal.get() == null ?
(contextEnvironmentFactory == null ?
createLocalEnvironment() : contextEnvironmentFactory.createExecutionEnvironment()) :
contextEnvironmentFactoryThreadLocal.get().createExecutionEnvironment();
return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
.map(ExecutionEnvironmentFactory::createExecutionEnvironment)
.orElseGet(ExecutionEnvironment::createLocalEnvironment);
}

/**
Expand Down Expand Up @@ -1259,7 +1257,7 @@ public static void setDefaultLocalParallelism(int parallelism) {
*/
protected static void initializeContextEnvironment(ExecutionEnvironmentFactory ctx) {
contextEnvironmentFactory = Preconditions.checkNotNull(ctx);
contextEnvironmentFactoryThreadLocal.set(contextEnvironmentFactory);
threadLocalContextEnvironmentFactory.set(contextEnvironmentFactory);
}

/**
Expand All @@ -1269,7 +1267,7 @@ protected static void initializeContextEnvironment(ExecutionEnvironmentFactory c
*/
protected static void resetContextEnvironment() {
contextEnvironmentFactory = null;
contextEnvironmentFactoryThreadLocal.remove();
threadLocalContextEnvironmentFactory.remove();
}

/**
Expand All @@ -1281,6 +1279,6 @@ protected static void resetContextEnvironment() {
*/
@Internal
public static boolean areExplicitEnvironmentsAllowed() {
return contextEnvironmentFactory == null && contextEnvironmentFactoryThreadLocal.get() == null;
return contextEnvironmentFactory == null && threadLocalContextEnvironmentFactory.get() == null;
}
}
21 changes: 21 additions & 0 deletions flink-java/src/main/java/org/apache/flink/api/java/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,12 @@

import org.apache.commons.lang3.StringUtils;

import javax.annotation.Nullable;

import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.Optional;
import java.util.Random;

/**
Expand Down Expand Up @@ -296,6 +299,24 @@ private static String getGenericTypeTree(Class<?> type, int indent) {
return ret;
}

// --------------------------------------------------------------------------------------------

/**
* Resolves the given factories. The thread local factory has preference over the static factory.
* If none is set, the method returns {@link Optional#empty()}.
*
* @param threadLocalFactory containing the thread local factory
* @param staticFactory containing the global factory
* @param <T> type of factory
* @return Optional containing the resolved factory if it exists, otherwise it's empty
*/
public static <T> Optional<T> resolveFactory(ThreadLocal<T> threadLocalFactory, @Nullable T staticFactory) {
final T localFactory = threadLocalFactory.get();
final T factory = localFactory == null ? staticFactory : localFactory;

return Optional.ofNullable(factory);
}

/**
* Private constructor to prevent instantiation.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
Expand Down Expand Up @@ -115,10 +116,10 @@ public abstract class StreamExecutionEnvironment {
/**
* The environment of the context (local by default, cluster if invoked through command line).
*/
private static StreamExecutionEnvironmentFactory contextEnvironmentFactory;
private static StreamExecutionEnvironmentFactory contextEnvironmentFactory = null;

/** The ThreadLocal used to store {@link StreamExecutionEnvironmentFactory}. */
private static ThreadLocal<StreamExecutionEnvironmentFactory> contextEnvironmentFactoryThreadLocal = new ThreadLocal<>();
private static final ThreadLocal<StreamExecutionEnvironmentFactory> threadLocalContextEnvironmentFactory = new ThreadLocal<>();

/** The default parallelism used when creating a local environment. */
private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors();
Expand Down Expand Up @@ -1571,13 +1572,12 @@ public void addOperator(StreamTransformation<?> transformation) {
* executed.
*/
public static StreamExecutionEnvironment getExecutionEnvironment() {
if (contextEnvironmentFactoryThreadLocal.get() != null) {
return contextEnvironmentFactoryThreadLocal.get().createExecutionEnvironment();
}
if (contextEnvironmentFactory != null) {
return contextEnvironmentFactory.createExecutionEnvironment();
}
return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
.map(StreamExecutionEnvironmentFactory::createExecutionEnvironment)
.orElseGet(StreamExecutionEnvironment::createStreamExecutionEnvironment);
}

private static StreamExecutionEnvironment createStreamExecutionEnvironment() {
// because the streaming project depends on "flink-clients" (and not the other way around)
// we currently need to intercept the data set environment and create a dependent stream env.
// this should be fixed once we rework the project dependencies
Expand Down Expand Up @@ -1772,12 +1772,12 @@ public static void setDefaultLocalParallelism(int parallelism) {

protected static void initializeContextEnvironment(StreamExecutionEnvironmentFactory ctx) {
contextEnvironmentFactory = ctx;
contextEnvironmentFactoryThreadLocal.set(contextEnvironmentFactory);
threadLocalContextEnvironmentFactory.set(contextEnvironmentFactory);
}

protected static void resetContextEnvironment() {
contextEnvironmentFactory = null;
contextEnvironmentFactoryThreadLocal.remove();
threadLocalContextEnvironmentFactory.remove();
}

/**
Expand Down

0 comments on commit ac84e0c

Please sign in to comment.