From ac84e0c84dee39da7339722aad5ed2dcf26e2d27 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Wed, 5 Jun 2019 15:03:01 +0200 Subject: [PATCH] [FLINK-12101] Deduplicate code by introducing ExecutionEnvironment#resolveFactory ExecutionEnvironment#resolveFactory selects between the thread local and the global factory. This method is used by the ExecutionEnvironment as well as the StreamExecutionEnvironment. This closes #8543. --- .../flink/api/java/ExecutionEnvironment.java | 18 +++++++--------- .../java/org/apache/flink/api/java/Utils.java | 21 +++++++++++++++++++ .../StreamExecutionEnvironment.java | 20 +++++++++--------- 3 files changed, 39 insertions(+), 20 deletions(-) diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index f9fc1ef8eb6da..be927c5402a3b 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -102,10 +102,10 @@ public abstract class ExecutionEnvironment { protected static final Logger LOG = LoggerFactory.getLogger(ExecutionEnvironment.class); /** The environment of the context (local by default, cluster if invoked through command line). */ - private static ExecutionEnvironmentFactory contextEnvironmentFactory; + private static ExecutionEnvironmentFactory contextEnvironmentFactory = null; /** The ThreadLocal used to store {@link ExecutionEnvironmentFactory}. */ - private static ThreadLocal contextEnvironmentFactoryThreadLocal = new ThreadLocal<>(); + private static final ThreadLocal threadLocalContextEnvironmentFactory = new ThreadLocal<>(); /** The default parallelism used by local environments. */ private static int defaultLocalDop = Runtime.getRuntime().availableProcessors(); @@ -1064,11 +1064,9 @@ private static String getDefaultName() { * @return The execution environment of the context in which the program is executed. */ public static ExecutionEnvironment getExecutionEnvironment() { - - return contextEnvironmentFactoryThreadLocal.get() == null ? - (contextEnvironmentFactory == null ? - createLocalEnvironment() : contextEnvironmentFactory.createExecutionEnvironment()) : - contextEnvironmentFactoryThreadLocal.get().createExecutionEnvironment(); + return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory) + .map(ExecutionEnvironmentFactory::createExecutionEnvironment) + .orElseGet(ExecutionEnvironment::createLocalEnvironment); } /** @@ -1259,7 +1257,7 @@ public static void setDefaultLocalParallelism(int parallelism) { */ protected static void initializeContextEnvironment(ExecutionEnvironmentFactory ctx) { contextEnvironmentFactory = Preconditions.checkNotNull(ctx); - contextEnvironmentFactoryThreadLocal.set(contextEnvironmentFactory); + threadLocalContextEnvironmentFactory.set(contextEnvironmentFactory); } /** @@ -1269,7 +1267,7 @@ protected static void initializeContextEnvironment(ExecutionEnvironmentFactory c */ protected static void resetContextEnvironment() { contextEnvironmentFactory = null; - contextEnvironmentFactoryThreadLocal.remove(); + threadLocalContextEnvironmentFactory.remove(); } /** @@ -1281,6 +1279,6 @@ protected static void resetContextEnvironment() { */ @Internal public static boolean areExplicitEnvironmentsAllowed() { - return contextEnvironmentFactory == null && contextEnvironmentFactoryThreadLocal.get() == null; + return contextEnvironmentFactory == null && threadLocalContextEnvironmentFactory.get() == null; } } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java index ed86f7de0e8d0..c514b333ec969 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java @@ -31,9 +31,12 @@ import org.apache.commons.lang3.StringUtils; +import javax.annotation.Nullable; + import java.io.IOException; import java.lang.reflect.Field; import java.lang.reflect.Modifier; +import java.util.Optional; import java.util.Random; /** @@ -296,6 +299,24 @@ private static String getGenericTypeTree(Class type, int indent) { return ret; } + // -------------------------------------------------------------------------------------------- + + /** + * Resolves the given factories. The thread local factory has preference over the static factory. + * If none is set, the method returns {@link Optional#empty()}. + * + * @param threadLocalFactory containing the thread local factory + * @param staticFactory containing the global factory + * @param type of factory + * @return Optional containing the resolved factory if it exists, otherwise it's empty + */ + public static Optional resolveFactory(ThreadLocal threadLocalFactory, @Nullable T staticFactory) { + final T localFactory = threadLocalFactory.get(); + final T factory = localFactory == null ? staticFactory : localFactory; + + return Optional.ofNullable(factory); + } + /** * Private constructor to prevent instantiation. */ diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index f93fd4c50da29..ffa2d47d96154 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -34,6 +34,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.Utils; import org.apache.flink.api.java.io.TextInputFormat; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; @@ -115,10 +116,10 @@ public abstract class StreamExecutionEnvironment { /** * The environment of the context (local by default, cluster if invoked through command line). */ - private static StreamExecutionEnvironmentFactory contextEnvironmentFactory; + private static StreamExecutionEnvironmentFactory contextEnvironmentFactory = null; /** The ThreadLocal used to store {@link StreamExecutionEnvironmentFactory}. */ - private static ThreadLocal contextEnvironmentFactoryThreadLocal = new ThreadLocal<>(); + private static final ThreadLocal threadLocalContextEnvironmentFactory = new ThreadLocal<>(); /** The default parallelism used when creating a local environment. */ private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors(); @@ -1571,13 +1572,12 @@ public void addOperator(StreamTransformation transformation) { * executed. */ public static StreamExecutionEnvironment getExecutionEnvironment() { - if (contextEnvironmentFactoryThreadLocal.get() != null) { - return contextEnvironmentFactoryThreadLocal.get().createExecutionEnvironment(); - } - if (contextEnvironmentFactory != null) { - return contextEnvironmentFactory.createExecutionEnvironment(); - } + return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory) + .map(StreamExecutionEnvironmentFactory::createExecutionEnvironment) + .orElseGet(StreamExecutionEnvironment::createStreamExecutionEnvironment); + } + private static StreamExecutionEnvironment createStreamExecutionEnvironment() { // because the streaming project depends on "flink-clients" (and not the other way around) // we currently need to intercept the data set environment and create a dependent stream env. // this should be fixed once we rework the project dependencies @@ -1772,12 +1772,12 @@ public static void setDefaultLocalParallelism(int parallelism) { protected static void initializeContextEnvironment(StreamExecutionEnvironmentFactory ctx) { contextEnvironmentFactory = ctx; - contextEnvironmentFactoryThreadLocal.set(contextEnvironmentFactory); + threadLocalContextEnvironmentFactory.set(contextEnvironmentFactory); } protected static void resetContextEnvironment() { contextEnvironmentFactory = null; - contextEnvironmentFactoryThreadLocal.remove(); + threadLocalContextEnvironmentFactory.remove(); } /**