From 4920ac89fa34d20e06674da67bf55a594162a29a Mon Sep 17 00:00:00 2001 From: tison Date: Fri, 20 Dec 2019 15:42:22 +0800 Subject: [PATCH] [FLINK-15341][client] Reset context classload in PackagedProgramUtils#createJobGraph --- .../client/program/PackagedProgramUtils.java | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java index edf361780f74a..07511667f6410 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java @@ -51,13 +51,7 @@ public static JobGraph createJobGraph( Configuration configuration, int defaultParallelism, @Nullable JobID jobID) throws ProgramInvocationException { - - Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader()); - - final OptimizerPlanEnvironment optimizerPlanEnvironment = new OptimizerPlanEnvironment(); - optimizerPlanEnvironment.setParallelism(defaultParallelism); - final Pipeline pipeline = optimizerPlanEnvironment.getPipeline(packagedProgram); - + final Pipeline pipeline = getPipelineFromProgram(packagedProgram, defaultParallelism); final JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(pipeline, configuration, defaultParallelism); if (jobID != null) { @@ -81,14 +75,15 @@ public static JobGraph createJobGraph( * @throws ProgramInvocationException if the JobGraph generation failed */ public static JobGraph createJobGraph( - PackagedProgram packagedProgram, - Configuration configuration, - int defaultParallelism) throws ProgramInvocationException { + PackagedProgram packagedProgram, + Configuration configuration, + int defaultParallelism) throws ProgramInvocationException { return createJobGraph(packagedProgram, configuration, defaultParallelism, null); } - public static Pipeline getPipelineFromProgram(PackagedProgram prog, int parallelism) - throws CompilerException, ProgramInvocationException { + public static Pipeline getPipelineFromProgram( + PackagedProgram prog, + int parallelism) throws CompilerException, ProgramInvocationException { final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); try { Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());