Skip to content

Commit

Permalink
[FLINK-14067] Refactor ExecutionEnvironment.getExecutionPlan() to be …
Browse files Browse the repository at this point in the history
…in root class

Before, each subclass had a slightly different way of getting the
execution plan (as JSON). Now, we factor that part out into a utility
and use that in the ExecutionEnvironment root class. This does mean,
that we don't take into account special information that a cluster
client or some other environment might have for plan generation.

Also, we can't remove the throws Exception from getExecutionPlan()
because it is a @public method.
  • Loading branch information
aljoscha committed Sep 18, 2019
1 parent 82cb7da commit 394a6e3
Show file tree
Hide file tree
Showing 10 changed files with 48 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;

import java.net.URL;
Expand Down Expand Up @@ -74,15 +72,6 @@ public JobExecutionResult execute(String jobName) throws Exception {
return lastJobExecutionResult;
}

@Override
public String getExecutionPlan() throws Exception {
Plan plan = createProgramPlan("unnamed job");

OptimizedPlan op = ClusterClient.getOptimizedPlan(client.compiler, plan, getParallelism());
PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator();
return gen.getOptimizerPlanAsJSON(op);
}

private void verifyExecuteIsCalledOnceWhenInDetachedMode() {
if (alreadyCalled && detached) {
throw new InvalidProgramException(DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.EXECUTE_TWICE_MESSAGE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,6 @@ public JobExecutionResult execute(String jobName) throws Exception {
throw new ProgramAbortException();
}

@Override
public String getExecutionPlan() throws Exception {
Plan plan = createProgramPlan(null, false);
this.optimizerPlan = compiler.compile(plan);

// do not go on with anything now!
throw new ProgramAbortException();
}

public FlinkPlan getOptimizedPlan(PackagedProgram prog) throws ProgramInvocationException {

// temporarily write syserr and sysout to a byte array.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,4 @@ public JobExecutionResult execute(String jobName) throws Exception {
public int getParallelism() {
return 1; // always serial
}

@Override
public String getExecutionPlan() throws Exception {
throw new UnsupportedOperationException("Execution plans are not used for collection-based execution.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -766,13 +766,14 @@ public JobExecutionResult execute() throws Exception {
/**
* Creates the plan with which the system will execute the program, and returns it as
* a String using a JSON representation of the execution data flow graph.
* Note that this needs to be called, before the plan is executed.
*
* @return The execution plan of the program, as a JSON String.
* @throws Exception Thrown, if the compiler could not be instantiated, or the master could not
* be contacted to retrieve information relevant to the execution planning.
* @throws Exception Thrown, if the compiler could not be instantiated.
*/
public abstract String getExecutionPlan() throws Exception;
public String getExecutionPlan() throws Exception {
Plan p = createProgramPlan(getDefaultName(), false);
return ExecutionPlanUtil.getExecutionPlanAsJSON(p);
}

/**
* Registers a file at the distributed cache under the given name. The file will be accessible
Expand Down Expand Up @@ -836,7 +837,7 @@ protected void registerCachedFilesWithPlan(Plan p) throws IOException {
*/
@Internal
public Plan createProgramPlan() {
return createProgramPlan(null);
return createProgramPlan(getDefaultName());
}

/**
Expand Down Expand Up @@ -868,6 +869,8 @@ public Plan createProgramPlan(String jobName) {
*/
@Internal
public Plan createProgramPlan(String jobName, boolean clearSinks) {
checkNotNull(jobName);

if (this.sinks.isEmpty()) {
if (wasExecuted) {
throw new RuntimeException("No new data sinks have been defined since the " +
Expand All @@ -880,10 +883,6 @@ public Plan createProgramPlan(String jobName, boolean clearSinks) {
}
}

if (jobName == null) {
jobName = getDefaultName();
}

OperatorTranslation translator = new OperatorTranslation();
Plan plan = translator.translateToPlan(this.sinks, jobName);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.flink.api.java;

import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.PlanExecutor;
import org.apache.flink.configuration.Configuration;

/**
* A utility for extracting an execution plan (as JSON) from a {@link Plan}.
*/
class ExecutionPlanUtil {

/**
* Extracts the execution plan (as JSON) from the given {@link Plan}.
*/
static String getExecutionPlanAsJSON(Plan plan) {
// make sure that we do not start an executor in any case here.
// if one runs, fine, of not, we only create the class but disregard immediately afterwards
PlanExecutor tempExecutor = PlanExecutor.createLocalExecutor(new Configuration());
return tempExecutor.getOptimizerPlanAsJSON(plan);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,6 @@ public JobExecutionResult execute(String jobName) throws Exception {
return lastJobExecutionResult;
}

@Override
public String getExecutionPlan() throws Exception {
final Plan p = createProgramPlan("plan", false);
final PlanExecutor tempExecutor = PlanExecutor.createLocalExecutor(configuration);
return tempExecutor.getOptimizerPlanAsJSON(p);
}

@Override
public String toString() {
return "Local Environment (parallelism = " + (getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT ? "default" : getParallelism()) + ").";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,6 @@ public JobExecutionResult execute(String jobName) throws Exception {
return lastJobExecutionResult;
}

@Override
public String getExecutionPlan() throws Exception {
final Plan p = createProgramPlan("plan", false);
final PlanExecutor tempExecutor = PlanExecutor.createLocalExecutor(new Configuration());
return tempExecutor.getOptimizerPlanAsJSON(p);
}

@Override
public String toString() {
return "Remote Environment (" + this.host + ":" + this.port + " - parallelism = " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.minicluster.JobExecutor;
Expand Down Expand Up @@ -113,14 +112,6 @@ public JobExecutionResult execute(String jobName) throws Exception {
return this.lastJobExecutionResult;
}

@Override
public String getExecutionPlan() throws Exception {
OptimizedPlan op = compileProgram("unused");

PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
return jsonGen.getOptimizerPlanAsJSON(op);
}

private OptimizedPlan compileProgram(String jobName) {
Plan p = createProgramPlan(jobName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,11 +354,6 @@ public JobExecutionResult execute(String jobName) throws Exception {
throw new AbortError();
}

@Override
public String getExecutionPlan() throws Exception {
throw new UnsupportedOperationException();
}

public static void setAsNext(final JsonValidator validator, final int defaultParallelism) {
initializeContextEnvironment(new ExecutionEnvironmentFactory() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,6 @@ public JobExecutionResult execute(String jobName) throws Exception {
throw new OptimizerPlanEnvironment.ProgramAbortException();
}

@Override
public String getExecutionPlan() throws Exception {
throw new OptimizerPlanEnvironment.ProgramAbortException();
}

public void setAsContext() {
ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() {
@Override
Expand Down

0 comments on commit 394a6e3

Please sign in to comment.