Skip to content

Commit

Permalink
[FLINK-1879] [client] Simplify JobClient. Hide actorRefs behind metho…
Browse files Browse the repository at this point in the history
…d calls where possible.

 - Drop redundant routing actor
 - Consistently set the flag to subscribe to updates or not.
 - Scala style cleanups: Drop default values for some method parameters.
  • Loading branch information
StephanEwen committed Apr 13, 2015
1 parent f81d9f0 commit ad63707
Show file tree
Hide file tree
Showing 28 changed files with 678 additions and 604 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,17 @@
* limitations under the License.
*/


package org.apache.flink.client;

import java.util.List;

import akka.actor.ActorRef;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.PlanExecutor;
import org.apache.flink.api.common.Program;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.client.SerializedJobExecutionResult;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.api.java.ExecutionEnvironment;
Expand Down Expand Up @@ -180,12 +177,8 @@ public JobExecutionResult executePlan(Plan plan) throws Exception {

JobGraphGenerator jgg = new JobGraphGenerator();
JobGraph jobGraph = jgg.compileJobGraph(op);

ActorRef jobClient = flink.getJobClient();

SerializedJobExecutionResult result =
JobClient.submitJobAndWait(jobGraph, printStatusDuringExecution, jobClient, flink.timeout());


SerializedJobExecutionResult result = flink.submitJobAndWait(jobGraph, printStatusDuringExecution);
return result.toJobExecutionResult(ClassLoader.getSystemClassLoader());
}
finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.SerializedJobExecutionResult;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import scala.Tuple2;
import scala.concurrent.duration.FiniteDuration;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
Expand Down Expand Up @@ -316,36 +316,50 @@ public JobSubmissionResult run(OptimizedPlan compiledPlan, List<File> libraries,

public JobSubmissionResult run(JobGraph jobGraph, boolean wait) throws ProgramInvocationException {
this.lastJobId = jobGraph.getJobID();
final String hostname = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
if (hostname == null) {
throw new ProgramInvocationException("Could not find hostname of job manager.");
}

FiniteDuration timeout = AkkaUtils.getTimeout(configuration);

InetSocketAddress jobManagerAddress;
try {
jobManagerAddress = JobClient.getJobManagerAddress(configuration);
}
catch (IOException e) {
throw new ProgramInvocationException(e.getMessage(), e);
}
LOG.info("JobManager actor system address is " + jobManagerAddress);

LOG.info("Starting client actor system");
final ActorSystem actorSystem;
final ActorRef client;

try {
Tuple2<ActorSystem, ActorRef> pair = JobClient.startActorSystemAndActor(configuration, false);
actorSystem = pair._1();
client = pair._2();
actorSystem = JobClient.startJobClientActorSystem(configuration);
}
catch (Exception e) {
throw new ProgramInvocationException("Could not build up connection to JobManager.", e);
throw new ProgramInvocationException("Could start client actor system.", e);
}

LOG.info("Looking up JobManager");
ActorRef jobManager;
try {
jobManager = JobManager.getJobManagerRemoteReference(jobManagerAddress, actorSystem, configuration);
}
catch (IOException e) {
throw new ProgramInvocationException("Failed to resolve JobManager", e);
}
LOG.info("JobManager runs at " + jobManager.path());

FiniteDuration timeout = AkkaUtils.getTimeout(configuration);
LOG.info("Communication between client and JobManager will have a timeout of " + timeout);

LOG.info("Checking and uploading JAR files");
try {
JobClient.uploadJarFiles(jobGraph, hostname, client, timeout);
JobClient.uploadJarFiles(jobGraph, jobManager, timeout);
}
catch (IOException e) {
throw new ProgramInvocationException("Could not upload the program's JAR files to the JobManager.", e);
}

try{
if (wait) {
SerializedJobExecutionResult result =
JobClient.submitJobAndWait(jobGraph, printStatusDuringExecution, client, timeout);
SerializedJobExecutionResult result = JobClient.submitJobAndWait(actorSystem,
jobManager, jobGraph, timeout, printStatusDuringExecution);
try {
return result.toJobExecutionResult(this.userCodeClassLoader);
}
Expand All @@ -355,8 +369,8 @@ public JobSubmissionResult run(JobGraph jobGraph, boolean wait) throws ProgramIn
}
}
else {
JobClient.submitJobDetached(jobGraph, client, timeout);
// return a "Fake" execution result with the JobId
JobClient.submitJobDetached(jobManager, jobGraph, timeout);
// return a dummy execution result with the JobId
return new JobSubmissionResult(jobGraph.getJobID());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import akka.actor.Status;
import akka.actor.UntypedActor;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.ExecutionEnvironment;
Expand All @@ -38,6 +37,7 @@
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.net.NetUtils;
import org.junit.After;

Expand Down Expand Up @@ -224,7 +224,13 @@ public static class SuccessReturningActor extends UntypedActor {

@Override
public void onReceive(Object message) throws Exception {
getSender().tell(new Status.Success(new JobID()), getSelf());
if (message instanceof JobManagerMessages.SubmitJob) {
JobID jid = ((JobManagerMessages.SubmitJob) message).jobGraph().getJobID();
getSender().tell(new Status.Success(jid), getSelf());
}
else {
getSender().tell(new Status.Failure(new Exception("Unknown message " + message)), getSelf());
}
}
}

Expand Down
Loading

0 comments on commit ad63707

Please sign in to comment.