Skip to content

Commit

Permalink
implement a simple session management
Browse files Browse the repository at this point in the history
This pull request implements a rudimentary session management. Together with the backtracking apache#640, this will enable users to submit jobs to the cluster and access intermediate results. Session handling ensures that the results are cleared eventually.

ExecutionGraphs are kept as long as
- no timeout occurred or
- the session has not been explicitly ended
  • Loading branch information
mxm committed May 18, 2015
1 parent 7378054 commit 9cb049a
Show file tree
Hide file tree
Showing 18 changed files with 344 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.List;
import java.util.UUID;

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobSubmissionResult;
Expand Down Expand Up @@ -52,26 +53,28 @@ public class RemoteExecutor extends PlanExecutor {

private static final Logger LOG = LoggerFactory.getLogger(RemoteExecutor.class);

private final UUID executionID;
private final List<String> jarFiles;
private final InetSocketAddress address;

public RemoteExecutor(String hostname, int port) {
this(hostname, port, Collections.<String>emptyList());
public RemoteExecutor(UUID executionID, String hostname, int port) {
this(executionID, hostname, port, Collections.<String>emptyList());
}

public RemoteExecutor(String hostname, int port, String jarFile) {
this(hostname, port, Collections.singletonList(jarFile));
public RemoteExecutor(UUID executionID, String hostname, int port, String jarFile) {
this(executionID, hostname, port, Collections.singletonList(jarFile));
}

public RemoteExecutor(String hostport, String jarFile) {
this(getInetFromHostport(hostport), Collections.singletonList(jarFile));
public RemoteExecutor(UUID executionID, String hostport, String jarFile) {
this(executionID, getInetFromHostport(hostport), Collections.singletonList(jarFile));
}

public RemoteExecutor(String hostname, int port, List<String> jarFiles) {
this(new InetSocketAddress(hostname, port), jarFiles);
public RemoteExecutor(UUID executionID, String hostname, int port, List<String> jarFiles) {
this(executionID, new InetSocketAddress(hostname, port), jarFiles);
}

public RemoteExecutor(InetSocketAddress inet, List<String> jarFiles) {
public RemoteExecutor(UUID executionID, InetSocketAddress inet, List<String> jarFiles) {
this.executionID = executionID;
this.jarFiles = jarFiles;
this.address = inet;
}
Expand Down Expand Up @@ -101,6 +104,7 @@ public JobExecutionResult executeJar(String jarPath, String assemblerClass, Stri

Client c = new Client(this.address, new Configuration(), program.getUserCodeClassLoader(), -1);
c.setPrintStatusDuringExecution(isPrintingStatusDuringExecution());
c.setSessionID(executionID);

JobSubmissionResult result = c.run(program.getPlanWithJars(), -1, true);
if(result instanceof JobExecutionResult) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.List;
import java.util.UUID;

import akka.pattern.Patterns;
import akka.util.Timeout;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.configuration.IllegalConfigurationException;
Expand All @@ -52,6 +55,7 @@
import org.apache.flink.runtime.client.SerializedJobExecutionResult;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -89,6 +93,11 @@ public class Client {
*/
private int maxSlots = -1;

/**
* The session id, passed from the ExecutionEnvironment.
*/
private UUID sessionID;

/** ID of the last job submitted with this client. */
private JobID lastJobId = null;

Expand Down Expand Up @@ -351,6 +360,7 @@ public JobSubmissionResult run(JobWithJars prog, int parallelism, boolean wait)

public JobSubmissionResult run(OptimizedPlan compiledPlan, List<File> libraries, boolean wait) throws ProgramInvocationException {
JobGraph job = getJobGraph(compiledPlan, libraries);
job.setSessionID(sessionID);
this.lastJobId = job.getJobID();
return run(job, wait);
}
Expand Down Expand Up @@ -420,6 +430,40 @@ public JobSubmissionResult run(JobGraph jobGraph, boolean wait) throws ProgramIn
}
}

/**
* Setter for the ExecutionEnvironment to set the sesion id.
* @param sessionID
*/
public void setSessionID(UUID sessionID) {
this.sessionID = sessionID;
}

/**
* Finishes the current session at the job manager.
* @throws ProgramInvocationException
*/
public void endSession() throws Exception {
LOG.info("Telling job manager to end the session {}.", sessionID);
final ActorSystem actorSystem;
try {
actorSystem = JobClient.startJobClientActorSystem(configuration);
}
catch (Exception e) {
throw new RuntimeException("Could start client actor system.", e);
}

LOG.info("Looking up JobManager");
final ActorRef jobManager;
try {
jobManager = JobManager.getJobManagerRemoteReference(jobManagerAddress, actorSystem, configuration);
}
catch (IOException e) {
throw new RuntimeException("Failed to resolve JobManager", e);
}

Patterns.ask(jobManager, new JobManagerMessages.RemoveCachedJobs(sessionID), new Timeout(AkkaUtils.getDefaultTimeout()));
}

// --------------------------------------------------------------------------------------------

public static final class OptimizerPlanEnvironment extends ExecutionEnvironment {
Expand Down Expand Up @@ -450,7 +494,11 @@ public String getExecutionPlan() throws Exception {
// do not go on with anything now!
throw new ProgramAbortException();
}


@Override
public void startNewSession() {
}

private void setAsContext() {
ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.File;
import java.util.List;
import java.util.UUID;

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobSubmissionResult;
Expand Down Expand Up @@ -79,6 +80,12 @@ public String getExecutionPlan() throws Exception {
return gen.getOptimizerPlanAsJSON(op);
}

@Override
public void startNewSession() throws Exception {
client.endSession();
executionId = UUID.randomUUID();
}

public boolean isWait() {
return wait;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,11 @@ public String getExecutionPlan() throws Exception {
// do not go on with anything now!
throw new Client.ProgramAbortException();
}


@Override
public void startNewSession() throws Exception {
}

public void setAsContext() {
ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.UUID;

import static org.junit.Assert.fail;

Expand All @@ -38,7 +39,7 @@ public class RemoteExecutorHostnameResolutionTest {
@Test
public void testUnresolvableHostname1() {
try {
RemoteExecutor exec = new RemoteExecutor(nonExistingHostname, port);
RemoteExecutor exec = new RemoteExecutor(UUID.randomUUID(), nonExistingHostname, port);
exec.executePlan(getProgram());
fail("This should fail with an UnknownHostException");
}
Expand All @@ -56,7 +57,7 @@ public void testUnresolvableHostname1() {
public void testUnresolvableHostname2() {
try {
InetSocketAddress add = new InetSocketAddress(nonExistingHostname, port);
RemoteExecutor exec = new RemoteExecutor(add, Collections.<String>emptyList());
RemoteExecutor exec = new RemoteExecutor(UUID.randomUUID(), add, Collections.<String>emptyList());
exec.executePlan(getProgram());
fail("This should fail with an UnknownHostException");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.UUID;

/**
* A PlanExecutor runs a plan. The specific implementation (such as the org.apache.flink.client.LocalExecutor
Expand Down Expand Up @@ -109,7 +110,7 @@ public static PlanExecutor createLocalExecutor(Configuration configuration) {
* from within the UDFs.
* @return A remote executor.
*/
public static PlanExecutor createRemoteExecutor(String hostname, int port, String... jarFiles) {
public static PlanExecutor createRemoteExecutor(UUID executionID, String hostname, int port, String... jarFiles) {
if (hostname == null) {
throw new IllegalArgumentException("The hostname must not be null.");
}
Expand All @@ -123,7 +124,7 @@ public static PlanExecutor createRemoteExecutor(String hostname, int port, Strin
: Arrays.asList(jarFiles);

try {
return reClass.getConstructor(String.class, int.class, List.class).newInstance(hostname, port, files);
return reClass.getConstructor(UUID.class, String.class, int.class, List.class).newInstance(executionID, hostname, port, files);
}
catch (Throwable t) {
throw new RuntimeException("An error occurred while loading the remote executor ("
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,8 @@ public int getParallelism() {
public String getExecutionPlan() throws Exception {
throw new UnsupportedOperationException("Execution plans are not used for collection-based execution.");
}

@Override
public void startNewSession() throws Exception {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,9 @@ public abstract class ExecutionEnvironment {
private static boolean allowLocalExecution = true;

// --------------------------------------------------------------------------------------------
private final UUID executionId;

protected UUID executionId;

private final List<DataSink<?>> sinks = new ArrayList<DataSink<?>>();

private final List<Tuple2<String, DistributedCacheEntry>> cacheFile = new ArrayList<Tuple2<String, DistributedCacheEntry>>();
Expand All @@ -123,7 +123,7 @@ public abstract class ExecutionEnvironment {
* Creates a new Execution Environment.
*/
protected ExecutionEnvironment() {
this.executionId = UUID.randomUUID();
executionId = UUID.randomUUID();
}

/**
Expand Down Expand Up @@ -242,6 +242,11 @@ public String getIdString() {
return this.executionId.toString();
}

/**
* Starts a new session, discarding all intermediate results.
*/
public abstract void startNewSession() throws Exception;

// --------------------------------------------------------------------------------------------
// Registry for types and serializers
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.flink.api.common.PlanExecutor;
import org.apache.flink.configuration.Configuration;

import java.util.UUID;

/**
* An {@link ExecutionEnvironment} that runs the program locally, multi-threaded, in the JVM where the
* environment is instantiated. When this environment is instantiated, it uses a default parallelism
Expand Down Expand Up @@ -61,6 +63,12 @@ public String getExecutionPlan() throws Exception {
PlanExecutor executor = PlanExecutor.createLocalExecutor(configuration);
return executor.getOptimizerPlanAsJSON(p);
}

@Override
public void startNewSession() {
executionId = UUID.randomUUID();
}

// --------------------------------------------------------------------------------------------

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.PlanExecutor;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;

import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.UUID;

/**
* An {@link ExecutionEnvironment} that sends programs
Expand All @@ -35,6 +42,8 @@ public class RemoteEnvironment extends ExecutionEnvironment {
private final int port;

private final String[] jarFiles;

private static String clientClassName = "org.apache.flink.client.program.Client";

/**
* Creates a new RemoteEnvironment that points to the master (JobManager) described by the
Expand Down Expand Up @@ -65,7 +74,7 @@ public RemoteEnvironment(String host, int port, String... jarFiles) {
public JobExecutionResult execute(String jobName) throws Exception {
Plan p = createProgramPlan(jobName);

PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, jarFiles);
PlanExecutor executor = PlanExecutor.createRemoteExecutor(executionId, host, port, jarFiles);
executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled());
return executor.executePlan(p);
}
Expand All @@ -76,10 +85,38 @@ public String getExecutionPlan() throws Exception {
p.setDefaultParallelism(getParallelism());
registerCachedFilesWithPlan(p);

PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, jarFiles);
PlanExecutor executor = PlanExecutor.createRemoteExecutor(executionId, host, port, jarFiles);
return executor.getOptimizerPlanAsJSON(p);
}

@Override
public void startNewSession() {
try {
Class<?> clientClass = Class.forName(clientClassName);
Constructor<?> constructor = clientClass.getConstructor(Configuration.class, ClassLoader.class);
Configuration configuration = new Configuration();
configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, host);
configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port);
Method setSession = clientClass.getDeclaredMethod("setSessionID", UUID.class);
Method startNewSession = clientClass.getDeclaredMethod("endSession");

Object client = constructor.newInstance(configuration, ClassLoader.getSystemClassLoader());
setSession.invoke(client, executionId);
startNewSession.invoke(client);
} catch (NoSuchMethodException e) {
throw new RuntimeException("Couldn't find constructor/method method to invoke on the Client class.");
} catch (InstantiationException e) {
throw new RuntimeException("Couldn't instantiate the Client class.");
} catch (IllegalAccessException e) {
throw new RuntimeException("Couldn't access the Client class or its methods.");
} catch (InvocationTargetException e) {
throw new RuntimeException("Couldn't invoke the Client class method.");
} catch (ClassNotFoundException e) {
throw new RuntimeException("Couldn't find the Client class.");
}
executionId = UUID.randomUUID();
}

@Override
public String toString() {
return "Remote Environment (" + this.host + ":" + this.port + " - parallelism = " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ public class ExecutionGraph implements Serializable {

/** Flag that indicate whether the executed dataflow should be periodically snapshotted */
private boolean snapshotCheckpointsEnabled;

/** Flag to indicate whether the Graph has been archived */
private boolean isArchived = false;


// ------ Execution status and progress. These values are volatile, and accessed under the lock -------
Expand Down Expand Up @@ -283,6 +286,10 @@ public ScheduleMode getScheduleMode() {
return scheduleMode;
}

public boolean isArchived() {
return isArchived;
}

public void enableSnaphotCheckpointing(long interval, long checkpointTimeout,
List<ExecutionJobVertex> verticesToTrigger,
List<ExecutionJobVertex> verticesToWaitFor,
Expand Down Expand Up @@ -641,6 +648,8 @@ public void prepareForArchiving() {
requiredJarFiles.clear();
jobStatusListenerActors.clear();
executionListenerActors.clear();

isArchived = true;
}

public ExecutionConfig getExecutionConfig() {
Expand Down
Loading

0 comments on commit 9cb049a

Please sign in to comment.