Skip to content

Commit

Permalink
Merge branch 'master' into version02
Browse files Browse the repository at this point in the history
Conflicts:
	nephele/nephele-common/src/main/java/eu/stratosphere/nephele/io/AbstractID.java
	nephele/nephele-common/src/main/java/eu/stratosphere/nephele/io/DefaultChannelSelector.java
	nephele/nephele-common/src/main/java/eu/stratosphere/nephele/io/channels/DeserializationBuffer.java
	nephele/nephele-common/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/InMemoryOutputChannel.java
	nephele/nephele-common/src/main/java/eu/stratosphere/nephele/jobgraph/JobID.java
	nephele/nephele-common/src/main/java/eu/stratosphere/nephele/jobgraph/JobStatus.java
	nephele/nephele-common/src/main/java/eu/stratosphere/nephele/util/SerializableHashMap.java
	nephele/nephele-ec2cloudmanager/src/main/java/eu/stratosphere/nephele/instance/cloud/CloudManager.java
	nephele/nephele-ec2cloudmanager/src/main/java/eu/stratosphere/nephele/instance/cloud/FloatingInstance.java
	nephele/nephele-ec2cloudmanager/src/main/java/eu/stratosphere/nephele/instance/ec2/EC2ClientFactory.java
	nephele/nephele-ec2cloudmanager/src/main/java/eu/stratosphere/nephele/instance/ec2/EC2CloudInstance.java
	nephele/nephele-ec2cloudmanager/src/main/java/eu/stratosphere/nephele/instance/ec2/EC2CloudInstanceNotifier.java
	nephele/nephele-ec2cloudmanager/src/main/java/eu/stratosphere/nephele/instance/ec2/JobToInstancesMapping.java
	nephele/nephele-ec2cloudmanager/src/test/java/eu/stratosphere/nephele/instance/ec2/EC2CloudInstanceTest.java
	nephele/nephele-ec2cloudmanager/src/test/java/eu/stratosphere/nephele/instance/ec2/EC2CloudManagerTest.java
	nephele/nephele-ec2cloudmanager/src/test/java/eu/stratosphere/nephele/instance/ec2/FloatingInstanceTest.java
	nephele/nephele-ec2cloudmanager/src/test/java/eu/stratosphere/nephele/instance/ec2/JobToInstancesMappingTest.java
	nephele/nephele-queuescheduler/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/QueueScheduler.java
	nephele/nephele-server/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/AbstractScheduler.java
	nephele/nephele-server/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/local/LocalScheduler.java
  • Loading branch information
Daniel Warneke committed Aug 17, 2011
2 parents d316f38 + d0c21f4 commit 5df886e
Show file tree
Hide file tree
Showing 101 changed files with 1,488 additions and 1,541 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,16 @@ public abstract class AbstractJobResult implements IOReadableWritable {
* @author warneke
*/
public enum ReturnCode {
SUCCESS, ERROR

/**
* The success return code.
*/
SUCCESS,

/**
* The error return code.
*/
ERROR
};

/**
Expand Down Expand Up @@ -156,23 +165,23 @@ public boolean equals(final Object obj) {

return true;
}

/**
* {@inheritDoc}
*/
@Override
public int hashCode() {

long hashCode = 0;
if(this.returnCode != null) {

if (this.returnCode != null) {
hashCode += this.returnCode.hashCode();
}

if(this.description != null) {
if (this.description != null) {
hashCode += this.description.hashCode();
}

return (int) (hashCode % Integer.MAX_VALUE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class JobCancelResult extends AbstractJobResult {
* @param description
* the optional error description
*/
public JobCancelResult(ReturnCode returnCode, String description) {
public JobCancelResult(final ReturnCode returnCode, final String description) {
super(returnCode, description);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,15 @@

/**
* The job client is able to submit, control, and abort jobs.
* <p>
* This class is thread-safe.
*
* @author warneke
*/
public class JobClient {

/**
* The logging object used for debugging
* The logging object used for debugging.
*/
private static final Log LOG = LogFactory.getLog(JobClient.class);

Expand Down Expand Up @@ -94,7 +95,7 @@ public static class JobCleanUp extends Thread {
* @param jobClient
* the job client this clean up object belongs to
*/
public JobCleanUp(JobClient jobClient) {
public JobCleanUp(final JobClient jobClient) {

this.jobClient = jobClient;
}
Expand Down Expand Up @@ -134,7 +135,7 @@ public void run() {
* @throws IOException
* thrown on error while initializing the RPC connection to the job manager
*/
public JobClient(JobGraph jobGraph) throws IOException {
public JobClient(final JobGraph jobGraph) throws IOException {

this(jobGraph, new Configuration());
}
Expand All @@ -150,7 +151,7 @@ public JobClient(JobGraph jobGraph) throws IOException {
* @throws IOException
* thrown on error while initializing the RPC connection to the job manager
*/
public JobClient(JobGraph jobGraph, Configuration configuration) throws IOException {
public JobClient(final JobGraph jobGraph, final Configuration configuration) throws IOException {

final String address = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
final int port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
Expand All @@ -163,8 +164,7 @@ public JobClient(JobGraph jobGraph, Configuration configuration) throws IOExcept
this.configuration = configuration;
this.jobCleanUp = new JobCleanUp(this);
}



/**
* Constructs a new job client object and instantiates a local
* RPC proxy for the {@link JobSubmissionProtocol}.
Expand All @@ -177,16 +177,18 @@ public JobClient(JobGraph jobGraph, Configuration configuration) throws IOExcept
* IP/Port of the jobmanager (not taken from provided configuration object).
* @throws IOException
* thrown on error while initializing the RPC connection to the job manager
*/
public JobClient(JobGraph jobGraph, Configuration configuration, InetSocketAddress jobManagerAddress) throws IOException {

*/
public JobClient(final JobGraph jobGraph, final Configuration configuration,
final InetSocketAddress jobManagerAddress)
throws IOException {

this.jobSubmitClient = (JobManagementProtocol) RPC.getProxy(JobManagementProtocol.class, jobManagerAddress, NetUtils
.getSocketFactory());
this.jobSubmitClient = (JobManagementProtocol) RPC.getProxy(JobManagementProtocol.class, jobManagerAddress,
NetUtils
.getSocketFactory());
this.jobGraph = jobGraph;
this.configuration = configuration;
this.jobCleanUp = new JobCleanUp(this);
}
}

/**
* Close the <code>JobClient</code>.
Expand Down Expand Up @@ -328,8 +330,11 @@ public void submitJobAndWait() throws IOException, JobExecutionException {
} else if (jobStatus == JobStatus.CANCELED || jobStatus == JobStatus.FAILED) {
Runtime.getRuntime().removeShutdownHook(this.jobCleanUp);
LOG.info(jobEvent.getOptionalMessage());
throw new JobExecutionException(jobEvent.getOptionalMessage(),
(jobStatus == JobStatus.CANCELED) ? true : false);
if (jobStatus == JobStatus.CANCELED) {
throw new JobExecutionException(jobEvent.getOptionalMessage(), true);
} else {
throw new JobExecutionException(jobEvent.getOptionalMessage(), false);
}
}
}
}
Expand All @@ -353,7 +358,7 @@ public void submitJobAndWait() throws IOException, JobExecutionException {
* @throws IOException
* thrown after the error message is written to the log
*/
private void logErrorAndRethrow(String errorMessage) throws IOException {
private void logErrorAndRethrow(final String errorMessage) throws IOException {

LOG.error(errorMessage);
throw new IOException(errorMessage);
Expand All @@ -366,7 +371,7 @@ private void logErrorAndRethrow(String errorMessage) throws IOException {
* @param sleepTime
* the sleep time in milliseconds after which old events shall be removed from the processed event queue
*/
private void cleanUpOldEvents(long sleepTime) {
private void cleanUpOldEvents(final long sleepTime) {

long mostRecentTimestamp = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ public class JobExecutionException extends Exception {
*/
private static final long serialVersionUID = 2818087325120827525L;

/**
* Indicates whether the job has been aborted as a result of user request.
*/
private final boolean canceledByUser;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ public class JobProgressResult extends AbstractJobResult {
* @param events
* the job events to be transported within this object
*/
public JobProgressResult(ReturnCode returnCode, String description, SerializableArrayList<AbstractEvent> events) {
public JobProgressResult(final ReturnCode returnCode, final String description,
final SerializableArrayList<AbstractEvent> events) {

super(returnCode, description);

this.events = events;
Expand All @@ -65,7 +67,7 @@ public JobProgressResult() {
* {@inheritDoc}
*/
@Override
public void read(DataInput in) throws IOException {
public void read(final DataInput in) throws IOException {
super.read(in);

this.events.read(in);
Expand All @@ -75,7 +77,7 @@ public void read(DataInput in) throws IOException {
* {@inheritDoc}
*/
@Override
public void write(DataOutput out) throws IOException {
public void write(final DataOutput out) throws IOException {
super.write(out);

this.events.write(out);
Expand All @@ -90,36 +92,36 @@ public Iterator<AbstractEvent> getEvents() {

return this.events.iterator();
}

/**
* {@inheritDoc}
*/
@Override
public boolean equals(Object obj) {
if(!super.equals(obj)) {
public boolean equals(final Object obj) {

if (!super.equals(obj)) {
return false;
}
if(!(obj instanceof JobProgressResult)) {

if (!(obj instanceof JobProgressResult)) {
return false;
}

final JobProgressResult jpr = (JobProgressResult) obj;
if(!this.events.equals(jpr.events)) {

if (!this.events.equals(jpr.events)) {
return false;
}

return true;
}

/**
* {@inheritDoc}
*/
@Override
public int hashCode() {

return super.hashCode();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class JobSubmissionResult extends AbstractJobResult {
* @param description
* the error description
*/
public JobSubmissionResult(ReturnCode returnCode, String description) {
public JobSubmissionResult(final ReturnCode returnCode, final String description) {
super(returnCode, description);
}

Expand All @@ -54,15 +54,15 @@ public JobSubmissionResult() {
* {@inheritDoc}
*/
@Override
public void read(DataInput in) throws IOException {
public void read(final DataInput in) throws IOException {
super.read(in);
}

/**
* {@inheritDoc}
*/
@Override
public void write(DataOutput out) throws IOException {
public void write(final DataOutput out) throws IOException {
super.write(out);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public final class ConfigConstants {
public static final int DEFAULT_JOB_MANAGER_IPC_PORT = 6123;

/**
* The default network port the task manager expects incoming IPC connections
* The default network port the task manager expects incoming IPC connections.
*/
public static final int DEFAULT_TASK_MANAGER_IPC_PORT = 6122;

Expand All @@ -97,7 +97,7 @@ public final class ConfigConstants {
public static final long DEFAULT_MEMORY_MANAGER_MIN_UNRESERVED_MEMORY = 256 * 1024 * 1024;

/**
* The default directory for temporary files of the task manager
* The default directory for temporary files of the task manager.
*/
public static final String DEFAULT_TASK_MANAGER_TMP_PATH = System.getProperty("java.io.tmpdir");

Expand Down
Loading

0 comments on commit 5df886e

Please sign in to comment.