Skip to content

Commit

Permalink
[FLINK-13946] Remove session-related code from the PlanExecutors
Browse files Browse the repository at this point in the history
  • Loading branch information
kl0u committed Sep 6, 2019
1 parent 790fe27 commit 9753614
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 190 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@

import java.util.List;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;

/**
* A PlanExecutor that runs Flink programs on a local embedded Flink runtime instance.
*
Expand Down Expand Up @@ -77,11 +80,11 @@ public class LocalExecutor extends PlanExecutor {
// ------------------------------------------------------------------------

public LocalExecutor() {
this(null);
this(new Configuration());
}

public LocalExecutor(Configuration conf) {
this.baseConfiguration = conf != null ? conf : new Configuration();
this.baseConfiguration = checkNotNull(conf);
}

// ------------------------------------------------------------------------
Expand All @@ -106,19 +109,13 @@ public int getTaskManagerNumSlots() {

// --------------------------------------------------------------------------------------------

@Override
public void start() throws Exception {
synchronized (lock) {
if (jobExecutorService == null) {
// create the embedded runtime
jobExecutorServiceConfiguration = createConfiguration();

// start it up
jobExecutorService = createJobExecutorService(jobExecutorServiceConfiguration);
} else {
throw new IllegalStateException("The local executor was already started.");
}
}
private void start() throws Exception {
Thread.holdsLock(lock);
checkState(jobExecutorService == null);

// start the embedded runtime
jobExecutorServiceConfiguration = createConfiguration();
jobExecutorService = createJobExecutorService(jobExecutorServiceConfiguration);
}

private JobExecutorService createJobExecutorService(Configuration configuration) throws Exception {
Expand Down Expand Up @@ -146,20 +143,11 @@ private JobExecutorService createJobExecutorService(Configuration configuration)
return miniCluster;
}

@Override
public void stop() throws Exception {
synchronized (lock) {
if (jobExecutorService != null) {
jobExecutorService.close();
jobExecutorService = null;
}
}
}

@Override
public boolean isRunning() {
synchronized (lock) {
return jobExecutorService != null;
private void stop() throws Exception {
Thread.holdsLock(lock);
if (jobExecutorService != null) {
jobExecutorService.close();
jobExecutorService = null;
}
}

Expand All @@ -178,38 +166,13 @@ public boolean isRunning() {
*/
@Override
public JobExecutionResult executePlan(Plan plan) throws Exception {
if (plan == null) {
throw new IllegalArgumentException("The plan may not be null.");
}

stop();

// configure the number of local slots equal to the parallelism of the local plan
if (this.taskManagerNumSlots == DEFAULT_TASK_MANAGER_NUM_SLOTS) {
int maxParallelism = plan.getMaximumParallelism();
if (maxParallelism > 0) {
this.taskManagerNumSlots = maxParallelism;
}
}
start();
checkNotNull(plan);

synchronized (this.lock) {

// check if we start a session dedicated for this execution
final boolean shutDownAtEnd;

if (jobExecutorService == null) {
shutDownAtEnd = true;

// start the cluster for us
try {
configureNoOfLocalSlots(plan);
start();
}
else {
// we use the existing session
shutDownAtEnd = false;
}

try {
// TODO: Set job's default parallelism to max number of slots
final int slotsPerTaskManager = jobExecutorServiceConfiguration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, taskManagerNumSlots);
final int numTaskManagers = jobExecutorServiceConfiguration.getInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
Expand All @@ -222,11 +185,17 @@ public JobExecutionResult executePlan(Plan plan) throws Exception {
JobGraph jobGraph = jgg.compileJobGraph(op, plan.getJobId());

return jobExecutorService.executeJobBlocking(jobGraph);
} finally {
stop();
}
finally {
if (shutDownAtEnd) {
stop();
}
}
}

private void configureNoOfLocalSlots(final Plan plan) {
if (this.taskManagerNumSlots == DEFAULT_TASK_MANAGER_NUM_SLOTS) {
int maxParallelism = plan.getMaximumParallelism();
if (maxParallelism > 0) {
this.taskManagerNumSlots = maxParallelism;
}
}
}
Expand All @@ -236,10 +205,9 @@ public JobExecutionResult executePlan(Plan plan) throws Exception {
*
* @param plan The dataflow plan.
* @return The dataflow's execution plan, as a JSON string.
* @throws Exception Thrown, if the optimization process that creates the execution plan failed.
*/
@Override
public String getOptimizerPlanAsJSON(Plan plan) throws Exception {
public String getOptimizerPlanAsJSON(Plan plan) {
final int parallelism = plan.getDefaultParallelism() == ExecutionConfig.PARALLELISM_DEFAULT ? 1 : plan.getDefaultParallelism();

Optimizer pc = new Optimizer(new DataStatistics(), this.baseConfiguration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
import java.util.Collections;
import java.util.List;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;

/**
* The RemoteExecutor is a {@link org.apache.flink.api.common.PlanExecutor} that takes the program
* and ships it to a remote Flink cluster for execution.
Expand Down Expand Up @@ -145,31 +148,19 @@ public int getDefaultParallelism() {
// Startup & Shutdown
// ------------------------------------------------------------------------

@Override
public void start() throws Exception {
synchronized (lock) {
if (client == null) {
client = new RestClusterClient<>(clientConfiguration, "RemoteExecutor");
}
else {
throw new IllegalStateException("The remote executor was already started.");
}
}
}
private void start() throws Exception {
Thread.holdsLock(lock);
checkState(client == null);

@Override
public void stop() throws Exception {
synchronized (lock) {
if (client != null) {
client.shutdown();
client = null;
}
}
client = new RestClusterClient<>(clientConfiguration, "RemoteExecutor");
}

@Override
public boolean isRunning() {
return client != null;
private void stop() throws Exception {
Thread.holdsLock(lock);
if (client != null) {
client.shutdown();
client = null;
}
}

// ------------------------------------------------------------------------
Expand All @@ -178,49 +169,29 @@ public boolean isRunning() {

@Override
public JobExecutionResult executePlan(Plan plan) throws Exception {
if (plan == null) {
throw new IllegalArgumentException("The plan may not be null.");
}
checkNotNull(plan);

JobWithJars p = new JobWithJars(plan, this.jarFiles, this.globalClasspaths);
return executePlanWithJars(p);
}

public JobExecutionResult executePlanWithJars(JobWithJars program) throws Exception {
if (program == null) {
throw new IllegalArgumentException("The job may not be null.");
}
private JobExecutionResult executePlanWithJars(JobWithJars program) throws Exception {
checkNotNull(program);

synchronized (this.lock) {
// check if we start a session dedicated for this execution
final boolean shutDownAtEnd;

if (client == null) {
shutDownAtEnd = true;
// start the executor for us
start();
}
else {
// we use the existing session
shutDownAtEnd = false;
}

try {
start();
return client.run(program, defaultParallelism).getJobExecutionResult();
}
finally {
if (shutDownAtEnd) {
stop();
}
} finally {
stop();
}
}
}

@Override
public String getOptimizerPlanAsJSON(Plan plan) throws Exception {
public String getOptimizerPlanAsJSON(Plan plan) {
Optimizer opt = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), new Configuration());
OptimizedPlan optPlan = opt.compile(plan);
return new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(optPlan);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,35 +44,6 @@ public abstract class PlanExecutor {
private static final String LOCAL_EXECUTOR_CLASS = "org.apache.flink.client.LocalExecutor";
private static final String REMOTE_EXECUTOR_CLASS = "org.apache.flink.client.RemoteExecutor";

// ------------------------------------------------------------------------
// Startup & Shutdown
// ------------------------------------------------------------------------

/**
* Starts the program executor. After the executor has been started, it will keep
* running until {@link #stop()} is called.
*
* @throws Exception Thrown, if the executor startup failed.
*/
public abstract void start() throws Exception;

/**
* Shuts down the plan executor and releases all local resources.
*
* <p>This method also ends all sessions created by this executor. Remote job executions
* may complete, but the session is not kept alive after that.</p>
*
* @throws Exception Thrown, if the proper shutdown failed.
*/
public abstract void stop() throws Exception;

/**
* Checks if this executor is currently running.
*
* @return True is the executor is running, false otherwise.
*/
public abstract boolean isRunning();

// ------------------------------------------------------------------------
// Program Execution
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -154,12 +125,11 @@ public static PlanExecutor createRemoteExecutor(String hostname, int port, Confi
Collections.<URL>emptyList() : globalClasspaths;

try {
PlanExecutor executor = (clientConfiguration == null) ?
return (clientConfiguration == null) ?
reClass.getConstructor(String.class, int.class, List.class)
.newInstance(hostname, port, files) :
reClass.getConstructor(String.class, int.class, Configuration.class, List.class, List.class)
.newInstance(hostname, port, clientConfiguration, files, paths);
return executor;
}
catch (Throwable t) {
throw new RuntimeException("An error occurred while loading the remote executor ("
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,8 @@ public JobExecutionResult execute(String jobName) throws Exception {
final Plan p = createProgramPlan(jobName);

// TODO: 31.08.19 make the executor autocloseable
PlanExecutor executor = null;
try {
executor = PlanExecutor.createLocalExecutor(configuration);
executor.start();
lastJobExecutionResult = executor.executePlan(p);
} finally {
if (executor != null) {
executor.stop();
}
}
final PlanExecutor executor = PlanExecutor.createLocalExecutor(configuration);
lastJobExecutionResult = executor.executePlan(p);
return lastJobExecutionResult;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,16 +155,8 @@ public RemoteEnvironment(String host, int port, Configuration clientConfig,
public JobExecutionResult execute(String jobName) throws Exception {
final Plan p = createProgramPlan(jobName);

PlanExecutor executor = null;
try {
executor = PlanExecutor.createRemoteExecutor(host, port, clientConfiguration, jarFiles, globalClasspaths);
executor.start();
lastJobExecutionResult = executor.executePlan(p);
} finally {
if (executor != null) {
executor.stop();
}
}
final PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, clientConfiguration, jarFiles, globalClasspaths);
lastJobExecutionResult = executor.executePlan(p);
return lastJobExecutionResult;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,8 @@ public JobExecutionResult execute(String jobName) throws Exception {
final Plan p = createProgramPlan(jobName);
final List<URL> allJarFiles = getUpdatedJarFiles();

PlanExecutor executor = null;
try {
executor = PlanExecutor.createRemoteExecutor(host, port, clientConfiguration, allJarFiles, globalClasspaths);
executor.start();
lastJobExecutionResult = executor.executePlan(p);
} finally {
if (executor != null) {
executor.stop();
}
}
final PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, clientConfiguration, allJarFiles, globalClasspaths);
lastJobExecutionResult = executor.executePlan(p);
return lastJobExecutionResult;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,21 +127,6 @@ static class TestPlanExecutor extends PlanExecutor {
private List<String> jars;
private List<String> globalClasspaths;

@Override
public void start() throws Exception {

}

@Override
public void stop() throws Exception {

}

@Override
public boolean isRunning() {
return false;
}

@Override
public JobExecutionResult executePlan(Plan plan) throws Exception {
return null;
Expand Down
Loading

0 comments on commit 9753614

Please sign in to comment.