Skip to content

Commit

Permalink
[FLINK-2373] Add configuration parameter to createRemoteEnvironment m…
Browse files Browse the repository at this point in the history
…ethod

This closes apache#1066
  • Loading branch information
akunft authored and fhueske committed Sep 15, 2015
1 parent 8a75025 commit e78b80c
Show file tree
Hide file tree
Showing 9 changed files with 263 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,30 +54,47 @@ public class RemoteExecutor extends PlanExecutor {
private static final Logger LOG = LoggerFactory.getLogger(RemoteExecutor.class);

private final List<String> jarFiles;
private final Configuration configuration;

private final Configuration clientConfiguration;

public RemoteExecutor(String hostname, int port) {
this(hostname, port, Collections.<String>emptyList());
this(hostname, port, Collections.<String>emptyList(), new Configuration());
}

public RemoteExecutor(String hostname, int port, String jarFile) {
this(hostname, port, Collections.singletonList(jarFile));
this(hostname, port, Collections.singletonList(jarFile), new Configuration());
}

public RemoteExecutor(String hostport, String jarFile) {
this(getInetFromHostport(hostport), Collections.singletonList(jarFile));
this(getInetFromHostport(hostport), Collections.singletonList(jarFile), new Configuration());
}

public RemoteExecutor(String hostname, int port, List<String> jarFiles) {
this(new InetSocketAddress(hostname, port), jarFiles);
this(new InetSocketAddress(hostname, port), jarFiles, new Configuration());
}

public RemoteExecutor(String hostname, int port, Configuration clientConfiguration) {
this(hostname, port, Collections.<String>emptyList(), clientConfiguration);
}

public RemoteExecutor(String hostname, int port, String jarFile, Configuration clientConfiguration) {
this(hostname, port, Collections.singletonList(jarFile), clientConfiguration);
}

public RemoteExecutor(String hostport, String jarFile, Configuration clientConfiguration) {
this(getInetFromHostport(hostport), Collections.singletonList(jarFile), clientConfiguration);
}

public RemoteExecutor(String hostname, int port, List<String> jarFiles, Configuration clientConfiguration) {
this(new InetSocketAddress(hostname, port), jarFiles, clientConfiguration);
}

public RemoteExecutor(InetSocketAddress inet, List<String> jarFiles) {
public RemoteExecutor(InetSocketAddress inet, List<String> jarFiles, Configuration clientConfiguration) {
this.jarFiles = jarFiles;
configuration = new Configuration();
this.clientConfiguration = clientConfiguration;

configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, inet.getHostName());
configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, inet.getPort());
clientConfiguration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, inet.getHostName());
clientConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, inet.getPort());
}

@Override
Expand All @@ -87,7 +104,7 @@ public JobExecutionResult executePlan(Plan plan) throws Exception {
}

public JobExecutionResult executePlanWithJars(JobWithJars p) throws Exception {
Client c = new Client(configuration, p.getUserCodeClassLoader(), -1);
Client c = new Client(clientConfiguration, p.getUserCodeClassLoader(), -1);
c.setPrintStatusDuringExecution(isPrintingStatusDuringExecution());

JobSubmissionResult result = c.run(p, -1, true);
Expand All @@ -103,7 +120,7 @@ public JobExecutionResult executeJar(String jarPath, String assemblerClass, Stri
File jarFile = new File(jarPath);
PackagedProgram program = new PackagedProgram(jarFile, assemblerClass, args);

Client c = new Client(configuration, program.getUserCodeClassLoader(), -1);
Client c = new Client(clientConfiguration, program.getUserCodeClassLoader(), -1);
c.setPrintStatusDuringExecution(isPrintingStatusDuringExecution());

JobSubmissionResult result = c.run(program.getPlanWithJars(), -1, true);
Expand All @@ -118,7 +135,7 @@ public JobExecutionResult executeJar(String jarPath, String assemblerClass, Stri
@Override
public String getOptimizerPlanAsJSON(Plan plan) throws Exception {
JobWithJars p = new JobWithJars(plan, this.jarFiles);
Client c = new Client(configuration, p.getUserCodeClassLoader(), -1);
Client c = new Client(clientConfiguration, p.getUserCodeClassLoader(), -1);

OptimizedPlan op = (OptimizedPlan) c.getOptimizedPlan(p, -1);
PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.client.program.ProgramInvocationException;
import org.junit.Test;

Expand Down Expand Up @@ -67,7 +68,7 @@ public void testUnresolvableHostname2() {

try {
InetSocketAddress add = new InetSocketAddress(nonExistingHostname, port);
RemoteExecutor exec = new RemoteExecutor(add, Collections.<String>emptyList());
RemoteExecutor exec = new RemoteExecutor(add, Collections.<String>emptyList(), new Configuration());
exec.executePlan(getProgram());
fail("This should fail with an ProgramInvocationException");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,12 @@ public static PlanExecutor createLocalExecutor(Configuration configuration) {
*
* @param hostname The address of the JobManager to send the program to.
* @param port The port of the JobManager to send the program to.
* @param clientConfiguration The configuration for the client (Akka, default.parallelism).
* @param jarFiles A list of jar files that contain the user-defined function (UDF) classes and all classes used
* from within the UDFs.
* @return A remote executor.
*/
public static PlanExecutor createRemoteExecutor(String hostname, int port, String... jarFiles) {
public static PlanExecutor createRemoteExecutor(String hostname, int port, Configuration clientConfiguration, String... jarFiles) {
if (hostname == null) {
throw new IllegalArgumentException("The hostname must not be null.");
}
Expand All @@ -123,7 +124,10 @@ public static PlanExecutor createRemoteExecutor(String hostname, int port, Strin
: Arrays.asList(jarFiles);

try {
return reClass.getConstructor(String.class, int.class, List.class).newInstance(hostname, port, files);
PlanExecutor executor = (clientConfiguration == null) ?
reClass.getConstructor(String.class, int.class, List.class).newInstance(hostname, port, files) :
reClass.getConstructor(String.class, int.class, List.class, Configuration.class).newInstance(hostname, port, files, clientConfiguration);
return executor;
}
catch (Throwable t) {
throw new RuntimeException("An error occurred while loading the remote executor ("
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1105,6 +1105,28 @@ public static ExecutionEnvironment createRemoteEnvironment(String host, int port
return new RemoteEnvironment(host, port, jarFiles);
}

/**
* Creates a {@link RemoteEnvironment}. The remote environment sends (parts of) the program
* to a cluster for execution. Note that all file paths used in the program must be accessible from the
* cluster. The custom configuration file is used to configure Akka specific configuration parameters
* for the Client only; Program parallelism can be set via {@link ExecutionEnvironment#setParallelism(int)}.
*
* Cluster configuration has to be done in the remotely running Flink instance.
*
* @param host The host name or address of the master (JobManager), where the program should be executed.
* @param port The port of the master (JobManager), where the program should be executed.
* @param clientConfiguration Pass a custom configuration to the Client.
* @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the program uses
* user-defined functions, user-defined input formats, or any libraries, those must be
* provided in the JAR files.
* @return A remote environment that executes the program on a cluster.
*/
public static ExecutionEnvironment createRemoteEnvironment(String host, int port, Configuration clientConfiguration, String... jarFiles) {
RemoteEnvironment rec = new RemoteEnvironment(host, port, jarFiles);
rec.setClientConfiguration(clientConfiguration);
return rec;
}

/**
* Creates a {@link RemoteEnvironment}. The remote environment sends (parts of) the program
* to a cluster for execution. Note that all file paths used in the program must be accessible from the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.PlanExecutor;
import org.apache.flink.configuration.Configuration;

/**
* An {@link ExecutionEnvironment} that sends programs
Expand All @@ -35,6 +36,8 @@ public class RemoteEnvironment extends ExecutionEnvironment {
protected final int port;

private final String[] jarFiles;

private Configuration clientConfiguration;

/**
* Creates a new RemoteEnvironment that points to the master (JobManager) described by the
Expand Down Expand Up @@ -65,7 +68,7 @@ public RemoteEnvironment(String host, int port, String... jarFiles) {
public JobExecutionResult execute(String jobName) throws Exception {
Plan p = createProgramPlan(jobName);

PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, jarFiles);
PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, clientConfiguration, jarFiles);
executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled());

this.lastJobExecutionResult = executor.executePlan(p);
Expand All @@ -78,7 +81,7 @@ public String getExecutionPlan() throws Exception {
p.setDefaultParallelism(getParallelism());
registerCachedFilesWithPlan(p);

PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, jarFiles);
PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, clientConfiguration, jarFiles);
return executor.getOptimizerPlanAsJSON(p);
}

Expand All @@ -87,4 +90,8 @@ public String toString() {
return "Remote Environment (" + this.host + ":" + this.port + " - parallelism = " +
(getParallelism() == -1 ? "default" : getParallelism()) + ") : " + getIdString();
}

public void setClientConfiguration(Configuration clientConfiguration) {
this.clientConfiguration = clientConfiguration;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
def getId: UUID = {
javaEnv.getId
}

/**
* Gets the JobExecutionResult of the last executed job.
*/
Expand Down Expand Up @@ -181,13 +181,13 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
def addDefaultKryoSerializer(clazz: Class[_], serializer: Serializer[_]): Unit = {
javaEnv.addDefaultKryoSerializer(clazz, serializer)
}

/**
* Registers the given type with the serialization stack. If the type is eventually
* serialized as a POJO, then the type is registered with the POJO serializer. If the
* type ends up being serialized with Kryo, then it will be registered at Kryo to make
* sure that only tags are written.
*
*
*/
def registerType(typeClass: Class[_]) {
javaEnv.registerType(typeClass)
Expand Down Expand Up @@ -707,5 +707,32 @@ object ExecutionEnvironment {
javaEnv.setParallelism(parallelism)
new ExecutionEnvironment(javaEnv)
}

/**
* Creates a remote execution environment. The remote environment sends (parts of) the program
* to a cluster for execution. Note that all file paths used in the program must be accessible
* from the cluster. The custom configuration file is used to configure Akka specific
* configuration parameters for the Client only; Program parallelism can be set via
* [[ExecutionEnvironment.setParallelism]].
*
* Cluster configuration has to be done in the remotely running Flink instance.
*
* @param host The host name or address of the master (JobManager), where the program should be
* executed.
* @param port The port of the master (JobManager), where the program should be executed.
* @param clientConfiguration Pass a custom configuration to the Client.
* @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the
* program uses user-defined functions, user-defined input formats, or any
* libraries, those must be provided in the JAR files.
* @return A remote environment that executes the program on a cluster.
*/
def createRemoteEnvironment(
host: String,
port: Int,
clientConfiguration: Configuration,
jarFiles: String*): ExecutionEnvironment = {
val javaEnv = JavaEnv.createRemoteEnvironment(host, port, clientConfiguration, jarFiles: _*)
new ExecutionEnvironment(javaEnv)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.api.common.PlanExecutor;

import org.apache.flink.api.scala.FlinkILoop;
import org.apache.flink.configuration.Configuration;

import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -78,7 +79,7 @@ public JobExecutionResult execute(String jobName) throws Exception {
alljars.add(jarFile);
String[] alljarsArr = new String[alljars.size()];
alljarsArr = alljars.toArray(alljarsArr);
PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, alljarsArr);
PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, new Configuration(), alljarsArr);

executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled());
return executor.executePlan(p);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,7 @@ public void mapPartition(Iterable<Integer> values, Collector<Integer> out) throw
out.collect(getRuntimeContext().getIndexOfThisSubtask());
}
});
List<Integer> resultCollection = new ArrayList<Integer>();
result.output(new LocalCollectionOutputFormat<Integer>(resultCollection));
env.execute();
List<Integer> resultCollection = result.collect();
assertEquals(PARALLELISM, resultCollection.size());
}

Expand Down
Loading

0 comments on commit e78b80c

Please sign in to comment.