Skip to content

Commit

Permalink
Add "stratosphere-yarn" sub-project
Browse files Browse the repository at this point in the history
based on https://github.com/hortonworks/simple-yarn-app

Build debian package only with special profile for that (to speed up the build)
Build uber-jar of stratosphere (for super-handy yarn deployment)

Old commit messages (from rebase:)

advanced configuration file handling (maven and jar magic)
Command line arguments for client (not yet done)

Change JobManager startup to allow external control
Use environment variables to pass variables to ApplicationMaster
Use contstants for settings
Fix CLI options
minor refinemens (pact-hbase dependencies)

Yarn runs with HDFS
jar files and configs are now shared using hdfs
many many deployment bug fixes
uberjar is automatically uploaded to dopa server
  • Loading branch information
rmetzger committed Dec 7, 2013
1 parent 7b0c53b commit 2dd2cb7
Show file tree
Hide file tree
Showing 16 changed files with 1,224 additions and 190 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -305,17 +305,30 @@ private boolean getBooleanInternal(final String key, final boolean defaultValue)
public static void loadConfiguration(final String configDir) {

if (configDir == null) {

LOG.warn("Given configuration directory is null, cannot load configuration");
return;
}

final File confDirFile = new File(configDir);
if (!(confDirFile.exists() && confDirFile.isDirectory())) {
if (!(confDirFile.exists())) {
LOG.warn("The given configuration directory name '" + configDir + "'(" + confDirFile.getAbsolutePath()
+ ") does not describe an existing directory.");
return;
}

if(confDirFile.isFile()) {
final File file = new File(configDir);
if(configDir.endsWith(".xml")) {
get().loadXMLResource( file );
} else if(configDir.endsWith(".yaml")) {
get().loadYAMLResource(file);
} else {
LOG.warn("The given configuration has an unknown extension.");
return;
}
configuration.confData.put(CONFIGDIRKEY, file.getAbsolutePath() );
return;
}

// get all XML and YAML files in the directory
final File[] xmlFiles = filterFilesBySuffix(confDirFile, ".xml");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ private static void logVersionInformation() {
* @param args
* arguments from the command line
*/
@SuppressWarnings("static-access")

public static void main(final String[] args) {

// determine if a valid log4j config exists and initialize a default logger if not
Expand All @@ -405,6 +405,19 @@ public static void main(final String[] args) {
root.setLevel(Level.INFO);
}

JobManager jobManager = initialize(args);

// Start info server for jobmanager
jobManager.startInfoServer();

// Run the main task loop
jobManager.runTaskLoop();

// Clean up task are triggered through a shutdown hook
}

@SuppressWarnings("static-access")
public static JobManager initialize(final String[] args) {
// output the version and revision information to the log
logVersionInformation();

Expand All @@ -430,15 +443,14 @@ public static void main(final String[] args) {
final String configDir = line.getOptionValue(configDirOpt.getOpt(), null);
final String executionModeName = line.getOptionValue(executionModeOpt.getOpt(), "local");

final ExecutionMode executionMode;
ExecutionMode executionMode = null;
if ("local".equals(executionModeName)) {
executionMode = ExecutionMode.LOCAL;
} else if ("cluster".equals(executionModeName)) {
executionMode = ExecutionMode.CLUSTER;
} else {
System.err.println("Unrecognized execution mode: " + executionModeName);
System.exit(FAILURERETURNCODE);
return;
}

// First, try to load global configuration
Expand All @@ -452,14 +464,8 @@ public static void main(final String[] args) {
if (configDir != null) {
infoserverConfig.setString(ConfigConstants.STRATOSPHERE_BASE_DIR_PATH_KEY, configDir+"/..");
}

// Start info server for jobmanager
jobManager.startInfoServer(infoserverConfig);

// Run the main task loop
jobManager.runTaskLoop();

// Clean up task are triggered through a shutdown hook
GlobalConfiguration.includeConfiguration(infoserverConfig);
return jobManager;
}

/**
Expand Down Expand Up @@ -1245,9 +1251,9 @@ public InputSplitWrapper requestNextInputSplit(final JobID jobID, final Executio
/**
* Starts the Jetty Infoserver for the Jobmanager
*
* @param config
*/
public void startInfoServer(Configuration config) {
public void startInfoServer() {
final Configuration config = GlobalConfiguration.getConfiguration();
// Start InfoServer
try {
int port = config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, ConfigConstants.DEFAULT_WEB_FRONTEND_PORT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,8 @@ public void runIOLoop() {
try {
this.jobManager.sendHeartbeat(this.localInstanceConnectionInfo, this.hardwareDescription);
} catch (IOException e) {
LOG.debug("sending the heart beat caused on IO Exception");
e.printStackTrace();
LOG.info("sending the heart beat caused on IO Exception");
}

// Check the status of the task threads to detect unexpected thread terminations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@
import eu.stratosphere.pact.client.nephele.api.Client;
import eu.stratosphere.pact.client.nephele.api.ErrorInPlanAssemblerException;
import eu.stratosphere.pact.client.nephele.api.PactProgram;
import eu.stratosphere.pact.client.nephele.api.PlanWithJars;
import eu.stratosphere.pact.client.nephele.api.ProgramInvocationException;
import eu.stratosphere.pact.common.plan.Plan;
import eu.stratosphere.pact.compiler.CompilerException;

/**
Expand All @@ -60,6 +62,7 @@ public class CliFrontend {

// actions
private static final String ACTION_RUN = "run";
private static final String ACTION_RUN_REMOTE = "remote";
private static final String ACTION_INFO = "info";
private static final String ACTION_LIST = "list";
private static final String ACTION_CANCEL = "cancel";
Expand Down Expand Up @@ -106,6 +109,7 @@ public CliFrontend() {
options = new HashMap<String, Options>();
options.put(GENERAL_OPTS, getGeneralOptions());
options.put(ACTION_RUN, getRunOptions());
options.put(ACTION_RUN_REMOTE, getRunOptions());
options.put(ACTION_INFO, getInfoOptions());
options.put(ACTION_LIST, getListOptions());
options.put(ACTION_CANCEL, getCancelOptions());
Expand Down Expand Up @@ -213,6 +217,35 @@ private Options getCancelOptions() {
return options;
}

/**
* Remote executor.
*
* @param params
*/
private void remote(String[] params) {
//if(params.length != 4) {
System.err.println("Usage: [host:port] [jar] [class] [args]");
// System.exit(1);
//}
for(int i = 0; i < params.length; i++) {
System.err.println("arg "+i+" = "+params[i]);
}

RemoteExecutor re = new RemoteExecutor(params[0], params[1]);
PactProgram program = null;
try {
program = new PactProgram(new File(params[1]), params[2], params[3].split(" "));
} catch (ProgramInvocationException e1) {
e1.printStackTrace();
}
try {
re.executePlanWithJars(program.getPlanWithJars());
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

/**
* Executions the run action.
*
Expand Down Expand Up @@ -730,6 +763,8 @@ private void parseParameters(String[] args) {
// do action
if(action.equals(ACTION_RUN)) {
run(params);
} else if (action.equals(ACTION_RUN_REMOTE)) {
remote(params);
} else if (action.equals(ACTION_LIST)) {
list(params);
} else if (action.equals(ACTION_INFO)) {
Expand All @@ -744,6 +779,8 @@ private void parseParameters(String[] args) {

}



/**
* Submits the job based on the arguments
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,29 +1,60 @@
package eu.stratosphere.pact.client;

import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.List;

import eu.stratosphere.pact.client.nephele.api.Client;
import eu.stratosphere.pact.client.nephele.api.ErrorInPlanAssemblerException;
import eu.stratosphere.pact.client.nephele.api.PlanWithJars;
import eu.stratosphere.pact.client.nephele.api.ProgramInvocationException;
import eu.stratosphere.pact.common.plan.Plan;
import eu.stratosphere.pact.compiler.CompilerException;

public class RemoteExecutor implements PlanExecutor {

private Client client;

private List<String> jarFiles;

public RemoteExecutor(String hostname, int port, List<String> jarFiles) {
this.client = new Client(new InetSocketAddress(hostname, port));
public RemoteExecutor(InetSocketAddress inet, List<String> jarFiles) {
this.client = new Client(inet);
this.jarFiles = jarFiles;

}

public RemoteExecutor(String hostname, int port, List<String> jarFiles) {
this(new InetSocketAddress(hostname, port), jarFiles);
}

public RemoteExecutor(String hostname, int port, String jarFile) {
this(hostname, port, Collections.singletonList(jarFile));
}

public RemoteExecutor(String hostport, String jarFile) {
this(getInetFromHostport(hostport), Collections.singletonList(jarFile));
}

private static InetSocketAddress getInetFromHostport(String hostport) {
// from http:https://stackoverflow.com/questions/2345063/java-common-way-to-validate-and-convert-hostport-to-inetsocketaddress
URI uri;
try {
uri = new URI("my:https://" + hostport);
} catch (URISyntaxException e) {
throw new RuntimeException("Could not identify hostname and port", e);
}
String host = uri.getHost();
int port = uri.getPort();
if (host == null || port == -1) {
throw new RuntimeException("Could not identify hostname and port");
}
return new InetSocketAddress(host, port);
}

public long executePlanWithJars(PlanWithJars p) throws Exception {
return this.client.run(p, true);
}
@Override
public long executePlan(Plan plan) throws Exception {
PlanWithJars p = new PlanWithJars(plan, this.jarFiles);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ public PactProgram(File jarFile, String className, String... args) throws Progra
}



/**
* Returns the plan with all required jars.
* @throws IOException
Expand Down
16 changes: 1 addition & 15 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,27 +48,13 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<hadoop-one.version>1.2.1</hadoop-one.version>
<hadoop-two.version>2.0.0-cdh4.2.1</hadoop-two.version>
<hadoop-two.version>2.2.0</hadoop-two.version>
</properties>

<pluginRepositories>
</pluginRepositories>

<repositories>
<repository>
<id>cloudera-releases</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>

<dependencies>

<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
Expand Down
60 changes: 42 additions & 18 deletions stratosphere-addons/pact-hbase/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,24 @@
<version>0.4-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>

<repositories>
<repository>
<id>cloudera-releases</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>

<properties>
<hbase.version>0.96.0-hadoop2</hbase.version>
</properties>

<artifactId>pact-hbase</artifactId>
<name>pact-hbase</name>

Expand All @@ -22,32 +39,39 @@
<artifactId>pact-common</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.hbase</groupId>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<version>0.94.2-cdh4.2.1</version>

<exclusions>
<!-- jruby is used for the hbase shell. -->
<exclusion>
<groupId>org.jruby</groupId>
<artifactId>jruby-complete</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<version>0.94.2-cdh4.2.1</version>
<exclusions>
<!-- jruby is used for the hbase shell. -->
<exclusion>
<groupId>org.jruby</groupId>
<artifactId>jruby-complete</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
</dependencies>
<!-- <dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
-->

<!--
hadoop-client is available for yarn and non-yarn, so there is no need to use profiles
See ticket https://issues.apache.org/jira/browse/HADOOP-8009 for description of hadoop-clients
-->
<!-- hadoop-client is available for yarn and non-yarn, so there is no need
to use profiles See ticket https://issues.apache.org/jira/browse/HADOOP-8009
for description of hadoop-clients -->

<reporting>
<plugins>
Expand Down
1 change: 1 addition & 0 deletions stratosphere-addons/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
</activation>
<modules>
<module>pact-hbase</module>
<module>stratosphere-yarn</module>
</modules>
</profile>
</profiles>
Expand Down
Loading

0 comments on commit 2dd2cb7

Please sign in to comment.