Skip to content

Commit

Permalink
[FLINK-21745][tests] Sets the parallelism of 1 for testing JobVertex …
Browse files Browse the repository at this point in the history
…instances

This was necessary to make the tests comply with the AdaptiveScheduler
requirements of having the parallelism set for each JobVertex.

This closes apache#15262
  • Loading branch information
XComp authored and rmetzger committed Mar 25, 2021
1 parent da77da3 commit 18a32e8
Show file tree
Hide file tree
Showing 8 changed files with 10 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ private static JobGraph getNoopJobGraph() {
private static JobGraph getCancellableJobGraph() {
JobVertex jobVertex = new JobVertex("jobVertex");
jobVertex.setInvokableClass(MyCancellableInvokable.class);
jobVertex.setParallelism(1);
return JobGraphTestUtils.streamingJobGraph(jobVertex);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public void testJobManagerJMXMetricAccess() throws Exception {
try {
JobVertex sourceJobVertex = new JobVertex("Source");
sourceJobVertex.setInvokableClass(BlockingInvokable.class);
sourceJobVertex.setParallelism(1);

final JobCheckpointingSettings jobCheckpointingSettings =
new JobCheckpointingSettings(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1026,6 +1026,9 @@ public TestingJobManagerRunner createJobManagerRunner(
private Tuple2<JobGraph, BlockingJobVertex> getBlockingJobGraphAndVertex() {
final BlockingJobVertex blockingJobVertex = new BlockingJobVertex("testVertex");
blockingJobVertex.setInvokableClass(NoOpInvokable.class);
// AdaptiveScheduler expects the parallelism to be set for each vertex
blockingJobVertex.setParallelism(1);

return Tuple2.of(
JobGraphBuilder.newStreamingJobGraphBuilder()
.setJobId(jobId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public static JobGraph emptyJobGraph() {
public static JobGraph singleNoOpJobGraph() {
JobVertex jobVertex = new JobVertex("jobVertex");
jobVertex.setInvokableClass(NoOpInvokable.class);
jobVertex.setParallelism(1);

return JobGraphBuilder.newStreamingJobGraphBuilder().addJobVertex(jobVertex).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1783,6 +1783,7 @@ private JobGraph createJobGraphWithCheckpointing(
SavepointRestoreSettings savepointRestoreSettings) {
final JobVertex source = new JobVertex("source");
source.setInvokableClass(NoOpInvokable.class);
source.setParallelism(1);

return TestUtils.createJobGraphFromJobVerticesWithCheckpointing(
savepointRestoreSettings, source);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ private void setUpWithCheckpointInterval(long checkpointInterval) throws Excepti

final JobVertex vertex = new JobVertex("testVertex");
vertex.setInvokableClass(NoOpBlockingInvokable.class);
vertex.setParallelism(1);

final JobCheckpointingSettings jobCheckpointingSettings =
new JobCheckpointingSettings(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public void tearDown() {
public void testJobRetrieval() throws Exception {
final JobVertex imalock = new JobVertex("imalock");
imalock.setInvokableClass(SemaphoreInvokable.class);
imalock.setParallelism(1);

final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(imalock);
final JobID jobId = jobGraph.getJobID();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ private static JobGraph getWorkingJobGraph() {
public void testExceptionInInitializeOnMaster() throws Exception {
final JobVertex failingJobVertex = new FailingJobVertex("Failing job vertex");
failingJobVertex.setInvokableClass(NoOpInvokable.class);
failingJobVertex.setParallelism(1);

final JobGraph failingJobGraph = JobGraphTestUtils.streamingJobGraph(failingJobVertex);
runJobSubmissionTest(
Expand Down

0 comments on commit 18a32e8

Please sign in to comment.