Skip to content

Commit

Permalink
Merge branch 'stage1' into version02
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniel Warneke committed Sep 9, 2011
2 parents cb3f757 + 7e23536 commit 9c05463
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -284,12 +284,15 @@ public void submitJobAndWait() throws IOException, JobExecutionException {
final IntegerRecord interval = this.jobSubmitClient.getRecommendedPollingInterval();
sleep = interval.getValue() * 1000;
} catch (IOException ioe) {
logErrorAndRethrow(StringUtils.stringifyException(ioe));
Runtime.getRuntime().removeShutdownHook(this.jobCleanUp);
// Rethrow error
throw ioe;
}

try {
Thread.sleep(sleep / 2);
} catch (InterruptedException e) {
Runtime.getRuntime().removeShutdownHook(this.jobCleanUp);
logErrorAndRethrow(StringUtils.stringifyException(e));
}

Expand All @@ -301,7 +304,14 @@ public void submitJobAndWait() throws IOException, JobExecutionException {
logErrorAndRethrow("Job client has been interrupted");
}

final JobProgressResult jobProgressResult = getJobProgress();
JobProgressResult jobProgressResult = null;
try {
jobProgressResult = getJobProgress();
} catch (IOException ioe) {
Runtime.getRuntime().removeShutdownHook(this.jobCleanUp);
// Rethrow error
throw ioe;
}

if (jobProgressResult == null) {
logErrorAndRethrow("Returned job progress is unexpectedly null!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1019,4 +1019,103 @@ public void testConvertSelfCross() {
}
}
}

/**
* This test checks the correctness of the instance sharing API. In particular, the test checks the behavior of the
* instance sharing as reported broken in ticket #198
*/
@Test
public void testInstanceSharing() {

final int degreeOfParallelism = 4;
File inputFile1 = null;
JobID jobID = null;

try {

inputFile1 = ServerTestUtils.createInputFile(0);

// create job graph
final JobGraph jg = new JobGraph("Instance Sharing Test Job");
jobID = jg.getJobID();

// input vertex
final JobFileInputVertex input1 = new JobFileInputVertex("Input 1", jg);
input1.setFileInputClass(FileLineReader.class);
input1.setFilePath(new Path("file:https://" + inputFile1.getAbsolutePath()));
input1.setNumberOfSubtasks(degreeOfParallelism);

// forward vertex 1
final JobTaskVertex forward1 = new JobTaskVertex("Forward 1", jg);
forward1.setTaskClass(ForwardTask1Input1Output.class);
forward1.setNumberOfSubtasks(degreeOfParallelism);

// forward vertex 2
final JobTaskVertex forward2 = new JobTaskVertex("Forward 2", jg);
forward2.setTaskClass(ForwardTask1Input1Output.class);
forward2.setNumberOfSubtasks(degreeOfParallelism);

// forward vertex 3
final JobTaskVertex forward3 = new JobTaskVertex("Forward 3", jg);
forward3.setTaskClass(ForwardTask1Input1Output.class);
forward3.setNumberOfSubtasks(degreeOfParallelism);

// output vertex
final JobFileOutputVertex output1 = new JobFileOutputVertex("Output 1", jg);
output1.setFileOutputClass(FileLineWriter.class);
output1.setFilePath(new Path("file:https://" + ServerTestUtils.getRandomFilename()));
output1.setNumberOfSubtasks(degreeOfParallelism);

// connect vertices
input1.connectTo(forward1, ChannelType.INMEMORY, CompressionLevel.NO_COMPRESSION);
forward1.connectTo(forward2, ChannelType.INMEMORY, CompressionLevel.NO_COMPRESSION);
forward2.connectTo(forward3, ChannelType.NETWORK, CompressionLevel.NO_COMPRESSION);
forward3.connectTo(output1, ChannelType.INMEMORY, CompressionLevel.NO_COMPRESSION);

// setup instance sharing
input1.setVertexToShareInstancesWith(forward1);
forward1.setVertexToShareInstancesWith(forward2);
forward2.setVertexToShareInstancesWith(forward3);
forward3.setVertexToShareInstancesWith(output1);

LibraryCacheManager.register(jobID, new String[0]);

// now convert job graph to execution graph
final ExecutionGraph eg = new ExecutionGraph(jg, INSTANCE_MANAGER);

// Check number of stages
assertEquals(1, eg.getNumberOfStages());

// Check number of vertices in stage
final ExecutionStage stage = eg.getStage(0);
assertEquals(5, stage.getNumberOfStageMembers());

// Check number of required instances
Map<InstanceType, Integer> instanceMap = new HashMap<InstanceType, Integer>();
stage.collectRequiredInstanceTypes(instanceMap, ExecutionState.CREATED);

// First, we expect all required instances to be of the same type
assertEquals(1, instanceMap.size());

final Integer numberOfRequiredInstances = instanceMap.values().iterator().next();
assertEquals(degreeOfParallelism, numberOfRequiredInstances.intValue());

} catch (GraphConversionException e) {
fail(e.getMessage());
} catch (JobGraphDefinitionException e) {
fail(e.getMessage());
} catch (IOException ioe) {
fail(ioe.getMessage());
} finally {
if (inputFile1 != null) {
inputFile1.delete();
}
if (jobID != null) {
try {
LibraryCacheManager.unregister(jobID);
} catch (IOException e) {
}
}
}
}
}
2 changes: 1 addition & 1 deletion stratosphere-dist/src/main/assemblies/bin.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<id>bin</id>
<formats>
<format>dir</format>
<format>tar.gz</format>
<format>tar.gz</format>
<format>zip</format>
</formats>

Expand Down

0 comments on commit 9c05463

Please sign in to comment.