Skip to content

Commit

Permalink
[FLINK-13946] Remove session-related code from execution env
Browse files Browse the repository at this point in the history
  • Loading branch information
kl0u committed Sep 6, 2019
1 parent 6e80a83 commit 790fe27
Show file tree
Hide file tree
Showing 12 changed files with 73 additions and 401 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,17 @@ public JobExecutionResult executePlan(Plan plan) throws Exception {
throw new IllegalArgumentException("The plan may not be null.");
}

stop();

// configure the number of local slots equal to the parallelism of the local plan
if (this.taskManagerNumSlots == DEFAULT_TASK_MANAGER_NUM_SLOTS) {
int maxParallelism = plan.getMaximumParallelism();
if (maxParallelism > 0) {
this.taskManagerNumSlots = maxParallelism;
}
}
start();

synchronized (this.lock) {

// check if we start a session dedicated for this execution
Expand All @@ -190,14 +201,6 @@ public JobExecutionResult executePlan(Plan plan) throws Exception {
if (jobExecutorService == null) {
shutDownAtEnd = true;

// configure the number of local slots equal to the parallelism of the local plan
if (this.taskManagerNumSlots == DEFAULT_TASK_MANAGER_NUM_SLOTS) {
int maxParallelism = plan.getMaximumParallelism();
if (maxParallelism > 0) {
this.taskManagerNumSlots = maxParallelism;
}
}

// start the cluster for us
start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.optimizer.plan.OptimizedPlan;
Expand Down Expand Up @@ -72,15 +71,9 @@ public String getExecutionPlan() throws Exception {
return gen.getOptimizerPlanAsJSON(op);
}

@Override
public void startNewSession() throws Exception {
jobID = JobID.generate();
}

@Override
public String toString() {
return "Context Environment (parallelism = " + (getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT ? "default" : getParallelism())
+ ") : " + getIdString();
return "Context Environment (parallelism = " + (getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT ? "default" : getParallelism()) + ")";
}

public ClusterClient<?> getClient() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,6 @@ public String getExecutionPlan() throws Exception {
throw new ProgramAbortException();
}

@Override
public void startNewSession() {
// do nothing
}

public FlinkPlan getOptimizedPlan(PackagedProgram prog) throws ProgramInvocationException {

// temporarily write syserr and sysout to a byte array.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,6 @@ public String getExecutionPlan() throws Exception {
throw new OptimizerPlanEnvironment.ProgramAbortException();
}

@Override
public void startNewSession() {
}

public void setAsContext() {
ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,4 @@ public int getParallelism() {
public String getExecutionPlan() throws Exception {
throw new UnsupportedOperationException("Execution plans are not used for collection-based execution.");
}

@Override
public void startNewSession() throws Exception {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
import org.apache.flink.api.common.io.FileInputFormat;
Expand Down Expand Up @@ -120,21 +119,14 @@ public abstract class ExecutionEnvironment {
/** Result from the latest execution, to make it retrievable when using eager execution methods. */
protected JobExecutionResult lastJobExecutionResult;

/** The ID of the session, defined by this execution environment. Sessions and Jobs are same in
* Flink, as Jobs can consist of multiple parts that are attached to the growing dataflow graph. */
protected JobID jobID;

/** The session timeout in seconds. */
protected long sessionTimeout;

/** Flag to indicate whether sinks have been cleared in previous executions. */
private boolean wasExecuted = false;

/**
* Creates a new Execution Environment.
*/
protected ExecutionEnvironment() {
jobID = JobID.generate();

}

// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -245,68 +237,6 @@ public JobExecutionResult getLastJobExecutionResult(){
return this.lastJobExecutionResult;
}

// --------------------------------------------------------------------------------------------
// Session Management
// --------------------------------------------------------------------------------------------

/**
* Gets the JobID by which this environment is identified. The JobID sets the execution context
* in the cluster or local environment.
*
* @return The JobID of this environment.
* @see #getIdString()
*/
@PublicEvolving
public JobID getId() {
return this.jobID;
}

/**
* Gets the JobID by which this environment is identified, as a string.
*
* @return The JobID as a string.
* @see #getId()
*/
@PublicEvolving
public String getIdString() {
return this.jobID.toString();
}

/**
* Sets the session timeout to hold the intermediate results of a job. This only
* applies the updated timeout in future executions.
*
* @param timeout The timeout, in seconds.
*/
@PublicEvolving
public void setSessionTimeout(long timeout) {
throw new IllegalStateException("Support for sessions is currently disabled. " +
"It will be enabled in future Flink versions.");
// Session management is disabled, revert this commit to enable
//if (timeout < 0) {
// throw new IllegalArgumentException("The session timeout must not be less than zero.");
//}
//this.sessionTimeout = timeout;
}

/**
* Gets the session timeout for this environment. The session timeout defines for how long
* after an execution, the job and its intermediate results will be kept for future
* interactions.
*
* @return The session timeout, in seconds.
*/
@PublicEvolving
public long getSessionTimeout() {
return sessionTimeout;
}

/**
* Starts a new session, discarding the previous data flow and all of its intermediate results.
*/
@PublicEvolving
public abstract void startNewSession() throws Exception;

// --------------------------------------------------------------------------------------------
// Registry for types and serializers
// --------------------------------------------------------------------------------------------
Expand Down
164 changes: 24 additions & 140 deletions flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@
package org.apache.flink.api.java;

import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.PlanExecutor;
import org.apache.flink.configuration.Configuration;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* An {@link ExecutionEnvironment} that runs the program locally, multi-threaded, in the JVM where the
* environment is instantiated.
Expand All @@ -45,14 +45,6 @@ public class LocalEnvironment extends ExecutionEnvironment {
/** The user-defined configuration for the local execution. */
private final Configuration configuration;

/** Create lazily upon first use. */
private PlanExecutor executor;

/** In case we keep the executor alive for sessions, this reaper shuts it down eventually.
* The reaper's finalize method triggers the executor shutdown. */
@SuppressWarnings("all")
private ExecutorReaper executorReaper;

/**
* Creates a new local environment.
*/
Expand All @@ -71,150 +63,42 @@ public LocalEnvironment(Configuration config) {
"The LocalEnvironment cannot be instantiated when running in a pre-defined context " +
"(such as Command Line Client, Scala Shell, or TestEnvironment)");
}
this.configuration = config == null ? new Configuration() : config;
this.configuration = checkNotNull(config);
}

// --------------------------------------------------------------------------------------------

@Override
public JobExecutionResult execute(String jobName) throws Exception {
if (executor == null) {
startNewSession();
}

Plan p = createProgramPlan(jobName);

// Session management is disabled, revert this commit to enable
//p.setJobId(jobID);
//p.setSessionTimeout(sessionTimeout);

JobExecutionResult result = executor.executePlan(p);

this.lastJobExecutionResult = result;
return result;
}
// TODO: 31.08.19 make sure that start and stop are called in the execute.
// the other place would be here, but this can complicate code, as the
// lifecycle management would be outside the executor itself.

@Override
public String getExecutionPlan() throws Exception {
Plan p = createProgramPlan(null, false);
public JobExecutionResult execute(String jobName) throws Exception {
final Plan p = createProgramPlan(jobName);

// make sure that we do not start an executor in any case here.
// if one runs, fine, of not, we only create the class but disregard immediately afterwards
if (executor != null) {
return executor.getOptimizerPlanAsJSON(p);
}
else {
PlanExecutor tempExecutor = PlanExecutor.createLocalExecutor(configuration);
return tempExecutor.getOptimizerPlanAsJSON(p);
// TODO: 31.08.19 make the executor autocloseable
PlanExecutor executor = null;
try {
executor = PlanExecutor.createLocalExecutor(configuration);
executor.start();
lastJobExecutionResult = executor.executePlan(p);
} finally {
if (executor != null) {
executor.stop();
}
}
return lastJobExecutionResult;
}

@Override
@PublicEvolving
public void startNewSession() throws Exception {
if (executor != null) {
// we need to end the previous session
executor.stop();
// create also a new JobID
jobID = JobID.generate();
}

// create a new local executor
executor = PlanExecutor.createLocalExecutor(configuration);

// if we have a session, start the mini cluster eagerly to have it available across sessions
if (getSessionTimeout() > 0) {
executor.start();

// also install the reaper that will shut it down eventually
executorReaper = new ExecutorReaper(executor);
}
public String getExecutionPlan() throws Exception {
final Plan p = createProgramPlan("plan", false);
final PlanExecutor tempExecutor = PlanExecutor.createLocalExecutor(configuration);
return tempExecutor.getOptimizerPlanAsJSON(p);
}

// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------

@Override
public String toString() {
return "Local Environment (parallelism = " + (getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT ? "default" : getParallelism())
+ ") : " + getIdString();
}

// ------------------------------------------------------------------------
// Reaping the local executor when in session mode
// ------------------------------------------------------------------------

/**
* This thread shuts down the local executor.
*
* <p><b>IMPORTANT:</b> This must be a static inner class to hold no reference to the outer class.
* Otherwise, the outer class could never become garbage collectible while this thread runs.
*/
private static class ShutdownThread extends Thread {

private final Object monitor = new Object();

private final PlanExecutor executor;

private volatile boolean triggered = false;

ShutdownThread(PlanExecutor executor) {
super("Local cluster reaper");
setDaemon(true);
setPriority(Thread.MIN_PRIORITY);

this.executor = executor;
}

@Override
public void run() {
synchronized (monitor) {
while (!triggered) {
try {
monitor.wait();
}
catch (InterruptedException e) {
// should never happen
}
}
}

try {
executor.stop();
}
catch (Throwable t) {
System.err.println("Cluster reaper caught exception during shutdown");
t.printStackTrace();
}
}

void trigger() {
triggered = true;
synchronized (monitor) {
monitor.notifyAll();
}
}

}

/**
* A class that, upon finalization, shuts down the local mini cluster by triggering the reaper
* thread.
*/
private static class ExecutorReaper {

private final ShutdownThread shutdownThread;

ExecutorReaper(PlanExecutor executor) {
this.shutdownThread = new ShutdownThread(executor);
this.shutdownThread.start();
}

@Override
protected void finalize() throws Throwable {
super.finalize();
shutdownThread.trigger();
}
return "Local Environment (parallelism = " + (getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT ? "default" : getParallelism()) + ").";
}
}
Loading

0 comments on commit 790fe27

Please sign in to comment.