Skip to content

Commit

Permalink
Clean-up
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniel Warneke committed Mar 7, 2012
1 parent b8fdb35 commit c1ca650
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,6 @@ public static boolean getDecision(final RuntimeTask task, final ResourceUtilizat
return isNetworkTask(task);
}

if (rus.getForced() != null) {
LOG.info("Checkpoint decision was forced to " + rus.getForced());
// checkpoint decision was forced by the user
return rus.getForced();
}

final double CPlower = CheckpointUtils.getCPLower();

final double CPupper = CheckpointUtils.getCPUpper();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,17 @@ public final class ResourceUtilizationSnapshot implements IOReadableWritable {
* Stores the utilization of task's output channels at the time when the snapshot was created.
*/
private final Map<ChannelID, Long> channelUtilization;

/**
* userCPu Time in percent
*/
private long userCPU;

/**
* The forced decision if annotated
*/
private Boolean forced;


/**
* amount of input bytes of all input-channels
*/
private long totalInputAmount;

/**
* amount of transmitted bytes of all output-channels
*/
Expand All @@ -69,8 +65,9 @@ public final class ResourceUtilizationSnapshot implements IOReadableWritable {

private boolean isDam;


public ResourceUtilizationSnapshot(final long timestamp, final Map<ChannelID, Long> channelUtilization,long userCPU, Boolean force, long totalInputAmount2, long totalOutputAmount2, long averageOutputRecordSize2, long averageInputRecordSize2, double pactRatio, boolean isDam) {
public ResourceUtilizationSnapshot(final long timestamp, final Map<ChannelID, Long> channelUtilization,
long userCPU, long totalInputAmount2, long totalOutputAmount2, long averageOutputRecordSize2,
long averageInputRecordSize2, double pactRatio, boolean isDam) {

if (timestamp <= 0L) {
throw new IllegalArgumentException("Argument timestamp must be larger than zero");
Expand All @@ -83,16 +80,17 @@ public ResourceUtilizationSnapshot(final long timestamp, final Map<ChannelID, Lo
this.timestamp = timestamp;
this.channelUtilization = channelUtilization;
this.userCPU = userCPU;
this.forced = force;
this.totalInputAmount = totalInputAmount2;
this.totalOutputAmount = totalOutputAmount2;
this.averageInputRecordSize = averageInputRecordSize2;
this.averageOutputRecordSize = averageOutputRecordSize2;
this.pactRatio = pactRatio;
this.isDam = isDam;

}
public ResourceUtilizationSnapshot(final long timestamp, final Map<ChannelID, Long> channelUtilization,long userCPU, final Boolean forced, final long totalInputAmount, final long totalOutputAmount) {

public ResourceUtilizationSnapshot(final long timestamp, final Map<ChannelID, Long> channelUtilization,
long userCPU, final Boolean forced, final long totalInputAmount, final long totalOutputAmount) {

if (timestamp <= 0L) {
throw new IllegalArgumentException("Argument timestamp must be larger than zero");
Expand All @@ -105,9 +103,8 @@ public ResourceUtilizationSnapshot(final long timestamp, final Map<ChannelID, Lo
this.timestamp = timestamp;
this.channelUtilization = channelUtilization;
this.userCPU = userCPU;
this.forced = forced;
this.totalInputAmount = totalInputAmount;
this.totalOutputAmount = totalOutputAmount;
this.totalOutputAmount = totalOutputAmount;
}

public ResourceUtilizationSnapshot() {
Expand All @@ -128,13 +125,13 @@ public ResourceUtilizationSnapshot(long timestamp, Map<ChannelID, Long> channelU
this.timestamp = timestamp;
this.channelUtilization = channelUtilization;
this.userCPU = userCPU;
this.forced = force;
this.totalInputAmount = totalInputAmount;
this.totalOutputAmount = totalOutputAmount;
this.totalOutputAmount = totalOutputAmount;
this.averageOutputRecordSize = averageOutputRecordSize;
this.averageInputRecordSize = averageInputRecordSize;

}

/**
* {@inheritDoc}
*/
Expand All @@ -155,13 +152,7 @@ public void write(final DataOutput out) throws IOException {
}
// Write the userCPU
out.writeLong(this.userCPU);
// Write forced decision
if(this.forced == null){
out.writeByte(0);
}else{
out.writeByte(1);
out.writeBoolean(this.forced);
}

out.writeLong(this.totalInputAmount);
out.writeLong(this.totalOutputAmount);
out.writeLong(this.averageInputRecordSize);
Expand All @@ -187,10 +178,7 @@ public void read(final DataInput in) throws IOException {
this.channelUtilization.put(channelID, l);
}
this.userCPU = in.readLong();

if(in.readByte() == 1){
this.forced = in.readBoolean();
}

this.totalInputAmount = in.readLong();
this.totalOutputAmount = in.readLong();
this.averageInputRecordSize = in.readLong();
Expand Down Expand Up @@ -226,7 +214,6 @@ public long getAmountOfDataTransmitted(final ChannelID channelID) {
return l.longValue();
}


/**
* Returns the userCPU.
*
Expand All @@ -235,33 +222,28 @@ public long getAmountOfDataTransmitted(final ChannelID channelID) {
public long getUserCPU() {
return this.userCPU;
}
/**
* Returns whether a decision was forced by the user
* @return
*/
public Boolean getForced() {
return forced;
}


public long getTotalInputAmount() {
return this.totalInputAmount;
}


public long getTotalOutputAmount() {
return this.totalOutputAmount;
}

public long getAverageOutputRecordSize() {
return averageOutputRecordSize;
}

public long getAverageInputRecordSize() {
return averageInputRecordSize;
}
public double getPactRatio(){

public double getPactRatio() {
return this.pactRatio;
}
public boolean isDam(){

public boolean isDam() {
return this.isDam;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ public ExecutionVertex(final JobID jobID, final Class<? extends AbstractInvokabl
if (forcedCheckpoint != null) {
ics = forcedCheckpoint.checkpoint() ? CheckpointState.PARTIAL : CheckpointState.NONE;
}

// TODO: Consider state annotation here
}

groupVertex.setInitialCheckpointState(ics);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import eu.stratosphere.nephele.annotations.ForceCheckpoint;
import eu.stratosphere.nephele.annotations.Stateful;
import eu.stratosphere.nephele.annotations.Stateless;
import eu.stratosphere.nephele.checkpointing.CheckpointDecision;
import eu.stratosphere.nephele.checkpointing.EphemeralCheckpoint;
import eu.stratosphere.nephele.execution.ResourceUtilizationSnapshot;
Expand Down Expand Up @@ -265,29 +262,9 @@ void reportExhaustionOfMemoryBuffers() throws IOException, InterruptedException
if (numinrec != 0) {
averageInputRecordSize = totalInputAmount / numinrec;
}
Boolean force = null;
if (environment.getInvokable().getClass().isAnnotationPresent(Stateful.class)
&& !environment.getInvokable().getClass().isAnnotationPresent(Stateless.class)) {
// Don't checkpoint stateful tasks
force = false;
} else {
if (environment.getForced() != null) {
force = environment.getForced();
} else {
// look for a forced decision from the user
ForceCheckpoint forced = environment.getInvokable().getClass().getAnnotation(ForceCheckpoint.class);

// this.environment.getInvokable().getTaskConfiguration().getBoolean("forced_checkpoint", false)

if (forced != null) {
force = forced.checkpoint();
}
}
}

final ResourceUtilizationSnapshot rus = new ResourceUtilizationSnapshot(timestamp, channelUtilization,
userCPU,
force, totalInputAmount, totalOutputAmount, averageOutputRecordSize, averageInputRecordSize,
userCPU, totalInputAmount, totalOutputAmount, averageOutputRecordSize, averageInputRecordSize,
this.task.getPACTInputOutputRatio(), allClosed);

System.out.println("Making checkpoint decision for " + environment.getTaskNameWithIndex());
Expand Down

0 comments on commit c1ca650

Please sign in to comment.