Skip to content

Commit

Permalink
cloudmgr fix (still experimental)
Browse files Browse the repository at this point in the history
  • Loading branch information
Philipp Richter committed Jul 20, 2011
2 parents 781e2c0 + 9f7550a commit b19e1bc
Show file tree
Hide file tree
Showing 45 changed files with 363 additions and 152 deletions.
4 changes: 2 additions & 2 deletions build-tools/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
<parent>
<artifactId>stratosphere</artifactId>
<groupId>eu.stratosphere</groupId>
<version>0.1</version>
<version>0.1.1</version>
</parent>

<groupId>eu.stratosphere</groupId>
<artifactId>build-tools</artifactId>
<version>0.1</version>
<version>0.1.1</version>
<name>build-tools</name>
<packaging>jar</packaging>
</project>
4 changes: 2 additions & 2 deletions nephele/nephele-clustermanager/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
<parent>
<groupId>eu.stratosphere</groupId>
<artifactId>nephele</artifactId>
<version>0.1</version>
<version>0.1.1</version>
</parent>

<modelVersion>4.0.0</modelVersion>

<groupId>eu.stratosphere</groupId>
<artifactId>nephele-clustermanager</artifactId>
<name>nephele-clustermanager</name>
<version>0.1</version>
<version>0.1.1</version>
<url>https://maven.apache.org</url>

<dependencies>
Expand Down
4 changes: 2 additions & 2 deletions nephele/nephele-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
<parent>
<artifactId>nephele</artifactId>
<groupId>eu.stratosphere</groupId>
<version>0.1</version>
<version>0.1.1</version>
</parent>

<groupId>eu.stratosphere</groupId>
<artifactId>nephele-common</artifactId>
<version>0.1</version>
<version>0.1.1</version>
<name>nephele-common</name>

<packaging>jar</packaging>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import eu.stratosphere.nephele.client.AbstractJobResult.ReturnCode;
import eu.stratosphere.nephele.configuration.ConfigConstants;
import eu.stratosphere.nephele.configuration.Configuration;
import eu.stratosphere.nephele.event.job.AbstractEvent;
Expand Down Expand Up @@ -111,6 +110,8 @@ public void run() {
// Terminate the running job if the configuration says so
if (this.jobClient.getConfiguration().getBoolean(ConfigConstants.JOBCLIENT_SHUTDOWN_TERMINATEJOB_KEY,
ConfigConstants.DEFAULT_JOBCLIENT_SHUTDOWN_TERMINATEJOB)) {
System.out.println(AbstractEvent.timestampToString(System.currentTimeMillis())
+ ":\tJobClient is shutting down, canceling job...");
this.jobClient.cancelJob();
}

Expand Down Expand Up @@ -221,12 +222,7 @@ public JobSubmissionResult submitJob() throws IOException {

synchronized (this.jobSubmitClient) {

final JobSubmissionResult result = this.jobSubmitClient.submitJob(this.jobGraph);
if (result.getReturnCode() == ReturnCode.SUCCESS) {
// Make sure the job is properly terminated when the user shut's down the client
Runtime.getRuntime().addShutdownHook(this.jobCleanUp);
}
return result;
return this.jobSubmitClient.submitJob(this.jobGraph);
}
}

Expand All @@ -239,8 +235,6 @@ public JobSubmissionResult submitJob() throws IOException {
*/
public JobCancelResult cancelJob() throws IOException {

Runtime.getRuntime().removeShutdownHook(this.jobCleanUp);

synchronized (this.jobSubmitClient) {
return this.jobSubmitClient.cancelJob(this.jobGraph.getJobID());
}
Expand Down Expand Up @@ -277,10 +271,10 @@ public void submitJobAndWait() throws IOException, JobExecutionException {
if (submissionResult.getReturnCode() == AbstractJobResult.ReturnCode.ERROR) {
LOG.error("ERROR: " + submissionResult.getDescription());
throw new JobExecutionException(submissionResult.getDescription(), false);
} else {
// Make sure the job is properly terminated when the user shut's down the client
Runtime.getRuntime().addShutdownHook(this.jobCleanUp);
}

// Make sure the job is properly terminated when the user shut's down the client
Runtime.getRuntime().addShutdownHook(this.jobCleanUp);
}

long sleep = 0;
Expand Down Expand Up @@ -334,7 +328,8 @@ public void submitJobAndWait() throws IOException, JobExecutionException {
} else if (jobStatus == JobStatus.CANCELED || jobStatus == JobStatus.FAILED) {
Runtime.getRuntime().removeShutdownHook(this.jobCleanUp);
LOG.info(jobEvent.getOptionalMessage());
throw new JobExecutionException(jobEvent.getOptionalMessage(), (jobStatus == JobStatus.CANCELED) ? true : false);
throw new JobExecutionException(jobEvent.getOptionalMessage(),
(jobStatus == JobStatus.CANCELED) ? true : false);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public long getTimestamp() {
* the timestamp in milliseconds since the beginning of "the epoch"
* @return the string unified representation of the timestamp
*/
protected static String timestampToString(long timestamp) {
public static String timestampToString(long timestamp) {

return dateFormatter.format(new Date(timestamp));

Expand Down
4 changes: 2 additions & 2 deletions nephele/nephele-compression-bzip2/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
<parent>
<artifactId>nephele</artifactId>
<groupId>eu.stratosphere</groupId>
<version>0.1</version>
<version>0.1.1</version>
</parent>

<groupId>eu.stratosphere</groupId>
<artifactId>nephele-compression-bzip2</artifactId>
<version>0.1</version>
<version>0.1.1</version>
<name>nephele-compression-bzip2</name>

<dependencies>
Expand Down
4 changes: 2 additions & 2 deletions nephele/nephele-compression-lzma/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
<parent>
<artifactId>nephele</artifactId>
<groupId>eu.stratosphere</groupId>
<version>0.1</version>
<version>0.1.1</version>
</parent>

<groupId>eu.stratosphere</groupId>
<artifactId>nephele-compression-lzma</artifactId>
<version>0.1</version>
<version>0.1.1</version>
<name>nephele-compression-lzma</name>

<dependencies>
Expand Down
4 changes: 2 additions & 2 deletions nephele/nephele-compression-snappy/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
<parent>
<artifactId>nephele</artifactId>
<groupId>eu.stratosphere</groupId>
<version>0.1</version>
<version>0.1.1</version>
</parent>

<groupId>eu.stratosphere</groupId>
<artifactId>nephele-compression-snappy</artifactId>
<version>0.1</version>
<version>0.1.1</version>
<name>nephele-compression-snappy</name>

<dependencies>
Expand Down
4 changes: 2 additions & 2 deletions nephele/nephele-compression-zlib/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
<parent>
<artifactId>nephele</artifactId>
<groupId>eu.stratosphere</groupId>
<version>0.1</version>
<version>0.1.1</version>
</parent>

<groupId>eu.stratosphere</groupId>
<artifactId>nephele-compression-zlib</artifactId>
<version>0.1</version>
<version>0.1.1</version>
<name>nephele-compression-zlib</name>

<dependencies>
Expand Down
4 changes: 2 additions & 2 deletions nephele/nephele-ec2cloudmanager/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@
<parent>
<artifactId>nephele</artifactId>
<groupId>eu.stratosphere</groupId>
<version>0.1</version>
<version>0.1.1</version>
</parent>

<groupId>eu.stratosphere</groupId>
<artifactId>nephele-ec2cloudmanager</artifactId>
<name>nephele-ec2cloudmanager</name>
<packaging>jar</packaging>

<version>0.1</version>
<version>0.1.1</version>

<dependencies>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,8 +543,8 @@ public void requestInstance(JobID jobID, Configuration conf, Map<InstanceType, I
* }
*/

final String awsAccessId = "AKIAJYQJNI7QH227NDQA";
final String awsSecretKey = "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";
final String awsAccessId = "HERE";
final String awsSecretKey = "HERE";
final String owner = "nobody";

final String sshKeyPair = conf.getString("job.cloud.sshkeypair", null);
Expand Down Expand Up @@ -640,7 +640,7 @@ private LinkedList<String> allocateCloudInstance(String awsAccessId, String awsS
* return null;
* }
*/
final String imageID = "ami-ea5b6b9e";
final String imageID = "ami-a24272d6";
final String jobManagerIPAddress = GlobalConfiguration.getString("jobmanager.rpc.address", null);
if (jobManagerIPAddress == null) {
LOG.error("JobManager IP address is not set (jobmanager.rpc.address)");
Expand Down Expand Up @@ -750,6 +750,30 @@ private LinkedList<CloudInstance> anyFloatingInstanceAvailable(String owner, Str

// Check if instance entry is a floating instance


final Iterator<Map.Entry<InstanceConnectionInfo, FloatingInstance>> it = this.floatingInstances.entrySet().iterator();

while (it.hasNext()) {

Entry<InstanceConnectionInfo, FloatingInstance> e = it.next();
if (e.getKey().getAddress().equals(inetAddress)) {
LOG.info("Suitable floating instance found.");
final FloatingInstance floatingInstance = e.getValue();
it.remove();

this.floatingInstanceIDs.remove(floatingInstance.getInstanceID());

floatinginstances.add(convertIntoCloudInstance(t,
floatingInstance.getInstanceConnectionInfo(), owner));

// If we already have enough floating instances found: return!
if (floatinginstances.size() >= count) {
return floatinginstances;
}
}

}

for (Entry<InstanceConnectionInfo, FloatingInstance> e : this.floatingInstances.entrySet()) {
if (e.getKey().getAddress().equals(inetAddress)) {
LOG.info("Suitable floating instance found.");
Expand Down
4 changes: 2 additions & 2 deletions nephele/nephele-examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
<parent>
<artifactId>nephele</artifactId>
<groupId>eu.stratosphere</groupId>
<version>0.1</version>
<version>0.1.1</version>
</parent>

<groupId>eu.stratosphere</groupId>
<artifactId>nephele-examples</artifactId>
<version>0.1</version>
<version>0.1.1</version>
<name>nephele-examples</name>
<packaging>jar</packaging>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public void invoke() throws Exception {

StringRecord s = input.next();
this.output.emit(s);
i++;
}

System.out.println("GREP: Emmited all " + i + " records");
Expand Down
4 changes: 2 additions & 2 deletions nephele/nephele-hdfs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
<parent>
<artifactId>nephele</artifactId>
<groupId>eu.stratosphere</groupId>
<version>0.1</version>
<version>0.1.1</version>
</parent>

<groupId>eu.stratosphere</groupId>
<artifactId>nephele-hdfs</artifactId>
<version>0.1</version>
<version>0.1.1</version>
<name>nephele-hdfs</name>
<packaging>jar</packaging>

Expand Down
4 changes: 2 additions & 2 deletions nephele/nephele-management/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
<parent>
<artifactId>nephele</artifactId>
<groupId>eu.stratosphere</groupId>
<version>0.1</version>
<version>0.1.1</version>
</parent>

<groupId>eu.stratosphere</groupId>
<artifactId>nephele-management</artifactId>
<version>0.1</version>
<version>0.1.1</version>
<name>nephele-management</name>
<packaging>jar</packaging>

Expand Down
4 changes: 2 additions & 2 deletions nephele/nephele-profiling/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
<parent>
<artifactId>nephele</artifactId>
<groupId>eu.stratosphere</groupId>
<version>0.1</version>
<version>0.1.1</version>
</parent>

<groupId>eu.stratosphere</groupId>
<artifactId>nephele-profiling</artifactId>
<version>0.1</version>
<version>0.1.1</version>
<name>nephele-profiling</name>
<packaging>jar</packaging>

Expand Down
4 changes: 2 additions & 2 deletions nephele/nephele-queuescheduler/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
<parent>
<groupId>eu.stratosphere</groupId>
<artifactId>nephele</artifactId>
<version>0.1</version>
<version>0.1.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>eu.stratosphere</groupId>
<artifactId>nephele-queuescheduler</artifactId>
<name>nephele-queuescheduler</name>
<version>0.1</version>
<version>0.1.1</version>
<url>https://maven.apache.org</url>

<dependencies>
Expand Down
4 changes: 2 additions & 2 deletions nephele/nephele-s3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
<parent>
<artifactId>nephele</artifactId>
<groupId>eu.stratosphere</groupId>
<version>0.1</version>
<version>0.1.1</version>
</parent>

<groupId>eu.stratosphere</groupId>
<artifactId>nephele-s3</artifactId>
<version>0.1</version>
<version>0.1.1</version>
<name>nephele-s3</name>
<packaging>jar</packaging>

Expand Down
4 changes: 2 additions & 2 deletions nephele/nephele-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
<parent>
<artifactId>nephele</artifactId>
<groupId>eu.stratosphere</groupId>
<version>0.1</version>
<version>0.1.1</version>
</parent>

<groupId>eu.stratosphere</groupId>
<artifactId>nephele-server</artifactId>
<version>0.1</version>
<version>0.1.1</version>
<name>nephele-server</name>

<packaging>jar</packaging>
Expand Down
Loading

0 comments on commit b19e1bc

Please sign in to comment.