Skip to content

Commit

Permalink
[hotfix] handle in the same way in StreamExecutionEnvironment
Browse files Browse the repository at this point in the history
  • Loading branch information
leesf committed Jun 4, 2019
1 parent 83b7293 commit d8bab9a
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1065,11 +1065,10 @@ private static String getDefaultName() {
*/
public static ExecutionEnvironment getExecutionEnvironment() {

return contextEnvironmentFactory == null ?
(contextEnvironmentFactoryThreadLocal.get() == null ?
createLocalEnvironment() :
contextEnvironmentFactoryThreadLocal.get().createExecutionEnvironment()) :
contextEnvironmentFactory.createExecutionEnvironment();
return contextEnvironmentFactoryThreadLocal.get() == null ?
(contextEnvironmentFactory == null ?
createLocalEnvironment() : contextEnvironmentFactory.createExecutionEnvironment()) :
contextEnvironmentFactoryThreadLocal.get().createExecutionEnvironment();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ public abstract class StreamExecutionEnvironment {
*/
private static StreamExecutionEnvironmentFactory contextEnvironmentFactory;

/** The ThreadLocal used to store {@link StreamExecutionEnvironmentFactory}. */
private static ThreadLocal<StreamExecutionEnvironmentFactory> contextEnvironmentFactoryThreadLocal = new ThreadLocal<>();

/** The default parallelism used when creating a local environment. */
private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors();

Expand Down Expand Up @@ -1568,6 +1571,9 @@ public void addOperator(StreamTransformation<?> transformation) {
* executed.
*/
public static StreamExecutionEnvironment getExecutionEnvironment() {
if (contextEnvironmentFactoryThreadLocal.get() != null) {
return contextEnvironmentFactoryThreadLocal.get().createExecutionEnvironment();
}
if (contextEnvironmentFactory != null) {
return contextEnvironmentFactory.createExecutionEnvironment();
}
Expand Down Expand Up @@ -1766,10 +1772,12 @@ public static void setDefaultLocalParallelism(int parallelism) {

protected static void initializeContextEnvironment(StreamExecutionEnvironmentFactory ctx) {
contextEnvironmentFactory = ctx;
contextEnvironmentFactoryThreadLocal.set(contextEnvironmentFactory);
}

protected static void resetContextEnvironment() {
contextEnvironmentFactory = null;
contextEnvironmentFactoryThreadLocal.remove();
}

/**
Expand Down

0 comments on commit d8bab9a

Please sign in to comment.