Skip to content

Commit

Permalink
Merge branch 'warneke_v2' into datamodel
Browse files Browse the repository at this point in the history
Conflicts:
	pact/pact-clients/src/main/java/eu/stratosphere/pact/testing/MockInstanceManager.java
	stratosphere-dist/src/main/assemblies/bin.xml
  • Loading branch information
sewen committed Oct 17, 2011
2 parents 1665554 + fccd0e1 commit 068bdf4
Show file tree
Hide file tree
Showing 15 changed files with 430 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,21 @@
import eu.stratosphere.nephele.jobgraph.JobID;

/**
* An allocated slice is a part of an instance which is assigned to a job
* An allocated slice is a part of an instance which is assigned to a job.
* <p>
* This class is thread-safe.
*
* @author Dominic Battre
*/
class AllocatedSlice {

/**
* The allocation ID which idenfies the resources occupied by this slice.
* The allocation ID which identifies the resources occupied by this slice.
*/
private final AllocationID allocationID;

/**
* The machine hosting the slice
* The machine hosting the slice.
*/
private final ClusterInstance hostingInstance;

Expand All @@ -48,12 +49,12 @@ class AllocatedSlice {
private final JobID jobID;

/**
* Time when this machine has been allocation in milliseconds, {@see currentTimeMillis()}
* Time when this machine has been allocation in milliseconds, {@see currentTimeMillis()}.
*/
private final long allocationTime;

/**
* Creates a new allocated slice on the given hosting instance
* Creates a new allocated slice on the given hosting instance.
*
* @param hostingInstance
* the instance hosting the slice
Expand All @@ -64,7 +65,8 @@ class AllocatedSlice {
* @param allocationTime
* the time the instance was allocated
*/
public AllocatedSlice(ClusterInstance hostingInstance, InstanceType type, JobID jobID, long allocationTime) {
public AllocatedSlice(final ClusterInstance hostingInstance, final InstanceType type, final JobID jobID,
final long allocationTime) {

this.allocationID = new AllocationID();
this.hostingInstance = hostingInstance;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,22 @@
class ClusterInstance extends AbstractInstance {

/**
* A map of slices allocated on this host
* A map of slices allocated on this host.
*/
private final Map<AllocationID, AllocatedSlice> allocatedSlices = new HashMap<AllocationID, AllocatedSlice>();

/**
* The remaining capacity of this host that can be used by instances
* The remaining capacity of this host that can be used by instances.
*/
private InstanceType remainingCapacity;

/** Time when last heat beat has been received from the task manager running on this instance */
/**
* Time when last heat beat has been received from the task manager running on this instance.
*/
private long lastReceivedHeartBeat = System.currentTimeMillis();

/**
* Constructor.
* Constructs a new cluster instance.
*
* @param instanceConnectionInfo
* the instance connection info identifying the host
Expand All @@ -67,8 +69,10 @@ class ClusterInstance extends AbstractInstance {
* @param hardwareDescription
* the hardware description reported by the instance itself
*/
public ClusterInstance(InstanceConnectionInfo instanceConnectionInfo, InstanceType capacity,
NetworkNode parentNode, NetworkTopology networkTopology, HardwareDescription hardwareDescription) {
public ClusterInstance(final InstanceConnectionInfo instanceConnectionInfo, final InstanceType capacity,
final NetworkNode parentNode, final NetworkTopology networkTopology,
final HardwareDescription hardwareDescription) {

super(capacity, instanceConnectionInfo, parentNode, networkTopology, hardwareDescription);

this.remainingCapacity = capacity;
Expand All @@ -87,9 +91,10 @@ synchronized void reportHeartBeat() {
* @param cleanUpInterval
* duration (in milliseconds) after which a host is
* considered dead if it has no received heat-beats.
* @return true if the host has received a heat-beat before the <code>cleanUpInterval</code> duration has expired.
* @return <code>true</code> if the host has received a heat-beat before the <code>cleanUpInterval</code> duration
* has expired, <code>false</code> otherwise
*/
synchronized boolean isStillAlive(long cleanUpInterval) {
synchronized boolean isStillAlive(final long cleanUpInterval) {

if (this.lastReceivedHeartBeat + cleanUpInterval < System.currentTimeMillis()) {
return false;
Expand All @@ -98,7 +103,7 @@ synchronized boolean isStillAlive(long cleanUpInterval) {
}

/**
* Tries to create a new slice on this instance
* Tries to create a new slice on this instance.
*
* @param reqType
* the type describing the hardware characteristics of the slice
Expand All @@ -108,7 +113,7 @@ synchronized boolean isStillAlive(long cleanUpInterval) {
* still be accommodated on this instance or <code>null</code> if the instance's remaining resources
* were insufficient to host the desired slice
*/
synchronized AllocatedSlice createSlice(InstanceType reqType, JobID jobID) {
synchronized AllocatedSlice createSlice(final InstanceType reqType, final JobID jobID) {

// check whether we can accommodate the instance
if (remainingCapacity.getNumberOfComputeUnits() >= reqType.getNumberOfComputeUnits()
Expand Down Expand Up @@ -143,7 +148,7 @@ synchronized AllocatedSlice createSlice(InstanceType reqType, JobID jobID) {
* @return the slice with has been removed from the instance or <code>null</code> if no slice
* with the given allocation ID could be found
*/
synchronized AllocatedSlice removeAllocatedSlice(AllocationID allocationID) {
synchronized AllocatedSlice removeAllocatedSlice(final AllocationID allocationID) {

final AllocatedSlice slice = this.allocatedSlices.remove(allocationID);
if (slice != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import java.util.List;

import eu.stratosphere.nephele.instance.AbstractInstance;
import eu.stratosphere.nephele.instance.AllocatedResource;
import eu.stratosphere.nephele.instance.InstanceListener;
import eu.stratosphere.nephele.jobgraph.JobID;
Expand Down
Loading

0 comments on commit 068bdf4

Please sign in to comment.