Skip to content

Commit

Permalink
[FLINK-13946] Refactor executor code to ban resource sharing between …
Browse files Browse the repository at this point in the history
…plan executions.
  • Loading branch information
kl0u committed Sep 6, 2019
1 parent 9753614 commit c746cbb
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import java.util.List;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;

/**
* A PlanExecutor that runs Flink programs on a local embedded Flink runtime instance.
Expand All @@ -59,18 +58,9 @@ public class LocalExecutor extends PlanExecutor {

private static final int DEFAULT_TASK_MANAGER_NUM_SLOTS = -1;

/** we lock to ensure singleton execution. */
private final Object lock = new Object();

/** Custom user configuration for the execution. */
private final Configuration baseConfiguration;

/** Service for executing Flink jobs. */
private JobExecutorService jobExecutorService;

/** Current job executor service configuration. */
private Configuration jobExecutorServiceConfiguration;

/** Config value for how many slots to provide in the local cluster. */
private int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;

Expand Down Expand Up @@ -109,15 +99,6 @@ public int getTaskManagerNumSlots() {

// --------------------------------------------------------------------------------------------

private void start() throws Exception {
Thread.holdsLock(lock);
checkState(jobExecutorService == null);

// start the embedded runtime
jobExecutorServiceConfiguration = createConfiguration();
jobExecutorService = createJobExecutorService(jobExecutorServiceConfiguration);
}

private JobExecutorService createJobExecutorService(Configuration configuration) throws Exception {
if (!configuration.contains(RestOptions.BIND_PORT)) {
configuration.setString(RestOptions.BIND_PORT, "0");
Expand All @@ -143,14 +124,6 @@ private JobExecutorService createJobExecutorService(Configuration configuration)
return miniCluster;
}

private void stop() throws Exception {
Thread.holdsLock(lock);
if (jobExecutorService != null) {
jobExecutorService.close();
jobExecutorService = null;
}
}

/**
* Executes the given program on a local runtime and waits for the job to finish.
*
Expand All @@ -168,30 +141,28 @@ private void stop() throws Exception {
public JobExecutionResult executePlan(Plan plan) throws Exception {
checkNotNull(plan);

synchronized (this.lock) {
try {
configureNoOfLocalSlots(plan);
start();
final Configuration jobExecutorServiceConfiguration = configureExecution(plan);

// TODO: Set job's default parallelism to max number of slots
final int slotsPerTaskManager = jobExecutorServiceConfiguration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, taskManagerNumSlots);
final int numTaskManagers = jobExecutorServiceConfiguration.getInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
plan.setDefaultParallelism(slotsPerTaskManager * numTaskManagers);
try (final JobExecutorService executorService = createJobExecutorService(jobExecutorServiceConfiguration)) {

Optimizer pc = new Optimizer(new DataStatistics(), jobExecutorServiceConfiguration);
OptimizedPlan op = pc.compile(plan);
Optimizer pc = new Optimizer(new DataStatistics(), jobExecutorServiceConfiguration);
OptimizedPlan op = pc.compile(plan);

JobGraphGenerator jgg = new JobGraphGenerator(jobExecutorServiceConfiguration);
JobGraph jobGraph = jgg.compileJobGraph(op, plan.getJobId());
JobGraphGenerator jgg = new JobGraphGenerator(jobExecutorServiceConfiguration);
JobGraph jobGraph = jgg.compileJobGraph(op, plan.getJobId());

return jobExecutorService.executeJobBlocking(jobGraph);
} finally {
stop();
}
return executorService.executeJobBlocking(jobGraph);
}
}

private void configureNoOfLocalSlots(final Plan plan) {
private Configuration configureExecution(final Plan plan) {
setNumberOfTaskSlots(plan);
final Configuration executorConfiguration = createExecutorServiceConfig();
setPlanParallelism(plan, executorConfiguration);
return executorConfiguration;
}

private void setNumberOfTaskSlots(final Plan plan) {
if (this.taskManagerNumSlots == DEFAULT_TASK_MANAGER_NUM_SLOTS) {
int maxParallelism = plan.getMaximumParallelism();
if (maxParallelism > 0) {
Expand All @@ -200,6 +171,25 @@ private void configureNoOfLocalSlots(final Plan plan) {
}
}

private Configuration createExecutorServiceConfig() {
final Configuration newConfiguration = new Configuration();
newConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, taskManagerNumSlots);
newConfiguration.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, defaultOverwriteFiles);

newConfiguration.addAll(baseConfiguration);

return newConfiguration;
}

private void setPlanParallelism(final Plan plan, final Configuration executorServiceConfig) {
// TODO: Set job's default parallelism to max number of slots
final int slotsPerTaskManager = executorServiceConfig.getInteger(
TaskManagerOptions.NUM_TASK_SLOTS, taskManagerNumSlots);
final int numTaskManagers = executorServiceConfig.getInteger(
ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
plan.setDefaultParallelism(slotsPerTaskManager * numTaskManagers);
}

/**
* Creates a JSON representation of the given dataflow's execution plan.
*
Expand All @@ -217,16 +207,6 @@ public String getOptimizerPlanAsJSON(Plan plan) {
return new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(op);
}

private Configuration createConfiguration() {
Configuration newConfiguration = new Configuration();
newConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, getTaskManagerNumSlots());
newConfiguration.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, isDefaultOverwriteFiles());

newConfiguration.addAll(baseConfiguration);

return newConfiguration;
}

// --------------------------------------------------------------------------------------------
// Static variants that internally bring up an instance and shut it down after the execution
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import java.util.List;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;

/**
* The RemoteExecutor is a {@link org.apache.flink.api.common.PlanExecutor} that takes the program
Expand All @@ -54,16 +53,12 @@
*/
public class RemoteExecutor extends PlanExecutor {

private final Object lock = new Object();

private final List<URL> jarFiles;

private final List<URL> globalClasspaths;

private final Configuration clientConfiguration;

private ClusterClient<?> client;

private int defaultParallelism = 1;

public RemoteExecutor(String hostname, int port) {
Expand Down Expand Up @@ -143,26 +138,7 @@ public void setDefaultParallelism(int defaultParallelism) {
public int getDefaultParallelism() {
return defaultParallelism;
}

// ------------------------------------------------------------------------
// Startup & Shutdown
// ------------------------------------------------------------------------

private void start() throws Exception {
Thread.holdsLock(lock);
checkState(client == null);

client = new RestClusterClient<>(clientConfiguration, "RemoteExecutor");
}

private void stop() throws Exception {
Thread.holdsLock(lock);
if (client != null) {
client.shutdown();
client = null;
}
}


// ------------------------------------------------------------------------
// Executing programs
// ------------------------------------------------------------------------
Expand All @@ -178,12 +154,13 @@ public JobExecutionResult executePlan(Plan plan) throws Exception {
private JobExecutionResult executePlanWithJars(JobWithJars program) throws Exception {
checkNotNull(program);

synchronized (this.lock) {
try {
start();
return client.run(program, defaultParallelism).getJobExecutionResult();
} finally {
stop();
ClusterClient<?> client = null;
try {
client = new RestClusterClient<>(clientConfiguration, "RemoteExecutor");
return client.run(program, defaultParallelism).getJobExecutionResult();
} finally {
if (client != null) {
client.shutdown();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,10 @@ public LocalEnvironment(Configuration config) {

// --------------------------------------------------------------------------------------------

// TODO: 31.08.19 make sure that start and stop are called in the execute.
// the other place would be here, but this can complicate code, as the
// lifecycle management would be outside the executor itself.

@Override
public JobExecutionResult execute(String jobName) throws Exception {
final Plan p = createProgramPlan(jobName);

// TODO: 31.08.19 make the executor autocloseable
final PlanExecutor executor = PlanExecutor.createLocalExecutor(configuration);
lastJobExecutionResult = executor.executePlan(p);
return lastJobExecutionResult;
Expand Down

0 comments on commit c746cbb

Please sign in to comment.