Skip to content

Commit

Permalink
[FLINK-21401] Replace explicit JobGraph generation with JobGraphTestU…
Browse files Browse the repository at this point in the history
…til factories
  • Loading branch information
tillrohrmann committed Mar 5, 2021
1 parent 8509dab commit 88c2d10
Show file tree
Hide file tree
Showing 35 changed files with 290 additions and 323 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
Expand Down Expand Up @@ -277,7 +278,7 @@ public void testCancel() throws Exception {
sender.setParallelism(2);
sender.setInvokableClass(BlockingInvokable.class);

final JobGraph jobGraph = new JobGraph("Stoppable streaming test job", sender);
final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(sender);
final JobID jid = jobGraph.getJobID();

ClusterClient<?> clusterClient = CLUSTER.getClusterClient();
Expand Down Expand Up @@ -341,7 +342,7 @@ public void testJobOverviewHandler() throws Exception {
sender.setParallelism(2);
sender.setInvokableClass(BlockingInvokable.class);

final JobGraph jobGraph = new JobGraph("Stoppable streaming test job", sender);
final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(sender);

ClusterClient<?> clusterClient = CLUSTER.getClusterClient();
clusterClient.submitJob(jobGraph).get();
Expand Down Expand Up @@ -385,7 +386,7 @@ public void testCancelYarn() throws Exception {
sender.setParallelism(2);
sender.setInvokableClass(BlockingInvokable.class);

final JobGraph jobGraph = new JobGraph("Stoppable streaming test job", sender);
final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(sender);
final JobID jid = jobGraph.getJobID();

ClusterClient<?> clusterClient = CLUSTER.getClusterClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
Expand Down Expand Up @@ -659,7 +660,7 @@ private Map<OperatorID, ExecutionJobVertex> buildVertices(
jobEdge.setUpstreamSubtaskStateMapper(upstreamRescaler);
}

JobGraph jobGraph = new JobGraph("Pointwise job", jobVertices);
JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(jobVertices);
ExecutionGraph eg =
TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).build();
return Arrays.stream(jobVertices)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
import org.apache.flink.runtime.jobmaster.JobManagerRunnerResult;
import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner;
Expand All @@ -33,7 +33,6 @@
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.TestLogger;

import org.junit.Test;
Expand All @@ -49,7 +48,6 @@
public class DispatcherJobTest extends TestLogger {

private static final Time TIMEOUT = Time.seconds(10L);
private static final JobID TEST_JOB_ID = new JobID();

@Test
public void testStatusWhenInitializing() throws Exception {
Expand Down Expand Up @@ -277,10 +275,7 @@ public void testUnavailableJobMasterGateway() {
}

private TestContext createTestContext() {
final JobVertex testVertex = new JobVertex("testVertex");
testVertex.setInvokableClass(NoOpInvokable.class);

JobGraph jobGraph = new JobGraph(TEST_JOB_ID, "testJob", testVertex);
JobGraph jobGraph = JobGraphTestUtils.singleNoOpJobGraph();
CompletableFuture<JobManagerRunner> jobManagerRunnerCompletableFuture =
new CompletableFuture<>();
DispatcherJob dispatcherJob =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobmanager.JobGraphWriter;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
Expand All @@ -53,7 +53,6 @@
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.TestingJobGraphStore;
import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
import org.apache.flink.util.ExceptionUtils;
Expand Down Expand Up @@ -141,10 +140,8 @@ public static void setupClass() {

@Before
public void setup() throws Exception {
final JobVertex testVertex = new JobVertex("testVertex");
testVertex.setInvokableClass(NoOpInvokable.class);
jobId = new JobID();
jobGraph = new JobGraph(jobId, "testJob", testVertex);
jobGraph = JobGraphTestUtils.singleNoOpJobGraph();
jobId = jobGraph.getJobID();

configuration = new Configuration();
configuration.setString(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.JobGraphWriter;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
Expand Down Expand Up @@ -130,8 +131,6 @@ public class DispatcherTest extends TestLogger {

private static final Time TIMEOUT = Time.seconds(10L);

private static final JobID TEST_JOB_ID = new JobID();

@Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();

@Rule
Expand All @@ -142,6 +141,8 @@ public class DispatcherTest extends TestLogger {

private JobGraph jobGraph;

private JobID jobId;

private TestingLeaderElectionService jobMasterLeaderElectionService;

private CountDownLatch createdJobManagerRunnerLatch;
Expand Down Expand Up @@ -173,16 +174,15 @@ public static void teardownClass() throws Exception {

@Before
public void setUp() throws Exception {
final JobVertex testVertex = new JobVertex("testVertex");
testVertex.setInvokableClass(NoOpInvokable.class);
jobGraph = new JobGraph(TEST_JOB_ID, "testJob", testVertex);
jobGraph = JobGraphTestUtils.singleNoOpJobGraph();
jobId = jobGraph.getJobID();

heartbeatServices = new HeartbeatServices(1000L, 10000L);

jobMasterLeaderElectionService = new TestingLeaderElectionService();

haServices = new TestingHighAvailabilityServices();
haServices.setJobMasterLeaderElectionService(TEST_JOB_ID, jobMasterLeaderElectionService);
haServices.setJobMasterLeaderElectionService(jobId, jobMasterLeaderElectionService);
haServices.setCheckpointRecoveryFactory(new StandaloneCheckpointRecoveryFactory());
haServices.setResourceManagerLeaderRetriever(new SettableLeaderRetrievalService());

Expand Down Expand Up @@ -315,7 +315,7 @@ public void testJobSubmission() throws Exception {
heartbeatServices,
haServices,
new ExpectedJobIdJobManagerRunnerFactory(
TEST_JOB_ID, createdJobManagerRunnerLatch));
jobId, createdJobManagerRunnerLatch));
DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);

dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
Expand All @@ -332,15 +332,6 @@ public void testJobSubmission() throws Exception {
*/
@Test
public void testJobSubmissionWithPartialResourceConfigured() throws Exception {
dispatcher =
createAndStartDispatcher(
heartbeatServices,
haServices,
new ExpectedJobIdJobManagerRunnerFactory(
TEST_JOB_ID, createdJobManagerRunnerLatch));

DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);

ResourceSpec resourceSpec = ResourceSpec.newBuilder(2.0, 0).build();

final JobVertex firstVertex = new JobVertex("firstVertex");
Expand All @@ -351,7 +342,17 @@ public void testJobSubmissionWithPartialResourceConfigured() throws Exception {
secondVertex.setInvokableClass(NoOpInvokable.class);

JobGraph jobGraphWithTwoVertices =
new JobGraph(TEST_JOB_ID, "twoVerticesJob", firstVertex, secondVertex);
JobGraphTestUtils.streamingJobGraph(firstVertex, secondVertex);
new JobGraph(jobId, "twoVerticesJob", firstVertex, secondVertex);

dispatcher =
createAndStartDispatcher(
heartbeatServices,
haServices,
new ExpectedJobIdJobManagerRunnerFactory(
jobId, createdJobManagerRunnerLatch));

DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);

CompletableFuture<Acknowledge> acknowledgeFuture =
dispatcherGateway.submitJob(jobGraphWithTwoVertices, TIMEOUT);
Expand All @@ -371,7 +372,7 @@ public void testNonBlockingJobSubmission() throws Exception {
heartbeatServices,
haServices,
new ExpectedJobIdJobManagerRunnerFactory(
TEST_JOB_ID, createdJobManagerRunnerLatch));
jobId, createdJobManagerRunnerLatch));
jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);

Expand Down Expand Up @@ -410,7 +411,7 @@ public void testInvalidCallDuringInitialization() throws Exception {
heartbeatServices,
haServices,
new ExpectedJobIdJobManagerRunnerFactory(
TEST_JOB_ID, createdJobManagerRunnerLatch));
jobId, createdJobManagerRunnerLatch));
jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);

Expand Down Expand Up @@ -449,7 +450,7 @@ public void testCancellationDuringInitialization() throws Exception {
heartbeatServices,
haServices,
new ExpectedJobIdJobManagerRunnerFactory(
TEST_JOB_ID, createdJobManagerRunnerLatch));
jobId, createdJobManagerRunnerLatch));
jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);

Expand Down Expand Up @@ -485,7 +486,7 @@ public void testErrorDuringInitialization() throws Exception {
heartbeatServices,
haServices,
new ExpectedJobIdJobManagerRunnerFactory(
TEST_JOB_ID, createdJobManagerRunnerLatch));
jobId, createdJobManagerRunnerLatch));
jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);

Expand All @@ -494,12 +495,12 @@ public void testErrorDuringInitialization() throws Exception {
new FailingInitializationJobVertex("testVertex");
failingInitializationJobVertex.setInvokableClass(NoOpInvokable.class);
JobGraph blockingJobGraph =
new JobGraph(TEST_JOB_ID, "failingTestJob", failingInitializationJobVertex);
new JobGraph(jobId, "failingTestJob", failingInitializationJobVertex);

dispatcherGateway.submitJob(blockingJobGraph, TIMEOUT).get();

// wait till job has failed
dispatcherGateway.requestJobResult(TEST_JOB_ID, TIMEOUT).get();
dispatcherGateway.requestJobResult(jobId, TIMEOUT).get();

// get failure cause
ArchivedExecutionGraph execGraph =
Expand All @@ -523,7 +524,7 @@ public void testCacheJobExecutionResult() throws Exception {
heartbeatServices,
haServices,
new ExpectedJobIdJobManagerRunnerFactory(
TEST_JOB_ID, createdJobManagerRunnerLatch));
jobId, createdJobManagerRunnerLatch));

final DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
Expand Down Expand Up @@ -557,7 +558,7 @@ public void testThrowExceptionIfJobExecutionResultNotFound() throws Exception {
heartbeatServices,
haServices,
new ExpectedJobIdJobManagerRunnerFactory(
TEST_JOB_ID, createdJobManagerRunnerLatch));
jobId, createdJobManagerRunnerLatch));

final DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
Expand All @@ -580,7 +581,7 @@ public void testSavepointDisposal() throws Exception {
heartbeatServices,
haServices,
new ExpectedJobIdJobManagerRunnerFactory(
TEST_JOB_ID, createdJobManagerRunnerLatch));
jobId, createdJobManagerRunnerLatch));

final DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
Expand Down Expand Up @@ -626,7 +627,7 @@ private URI createTestingSavepoint() throws IOException, URISyntaxException {
@Test
public void testWaitingForJobMasterLeadership() throws Exception {
ExpectedJobIdJobManagerRunnerFactory jobManagerRunnerFactor =
new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch);
new ExpectedJobIdJobManagerRunnerFactory(jobId, createdJobManagerRunnerLatch);
dispatcher =
createAndStartDispatcher(heartbeatServices, haServices, jobManagerRunnerFactor);

Expand Down Expand Up @@ -828,7 +829,7 @@ public void testJobSuspensionWhenDispatcherIsTerminated() throws Exception {
heartbeatServices,
haServices,
new ExpectedJobIdJobManagerRunnerFactory(
TEST_JOB_ID, createdJobManagerRunnerLatch));
jobId, createdJobManagerRunnerLatch));

DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);

Expand Down Expand Up @@ -904,7 +905,7 @@ public void testInitializationTimestampForwardedToExecutionGraph() throws Except
heartbeatServices,
haServices,
new ExpectedJobIdJobManagerRunnerFactory(
TEST_JOB_ID, createdJobManagerRunnerLatch));
jobId, createdJobManagerRunnerLatch));
jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);

Expand Down Expand Up @@ -991,7 +992,7 @@ private Tuple2<JobGraph, BlockingJobVertex> getBlockingJobGraphAndVertex() {
final BlockingJobVertex blockingJobVertex = new BlockingJobVertex("testVertex");
blockingJobVertex.setInvokableClass(NoOpInvokable.class);
return Tuple2.of(
new JobGraph(TEST_JOB_ID, "blockingTestJob", blockingJobVertex), blockingJobVertex);
new JobGraph(jobId, "blockingTestJob", blockingJobVertex), blockingJobVertex);
}

private JobGraph createFailingJobGraph(Exception failureCause) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,14 @@
import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServicesBuilder;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobmanager.JobGraphStore;
import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.TestingRpcServiceResource;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.TestingJobGraphStore;
import org.apache.flink.runtime.util.BlobServerResource;
import org.apache.flink.runtime.util.LeaderConnectionInfo;
Expand Down Expand Up @@ -79,8 +78,6 @@ public class DefaultDispatcherRunnerITCase extends TestLogger {

private static final Time TIMEOUT = Time.seconds(10L);

private static final JobID TEST_JOB_ID = new JobID();

@ClassRule
public static TestingRpcServiceResource rpcServiceResource = new TestingRpcServiceResource();

Expand Down Expand Up @@ -240,9 +237,7 @@ public Dispatcher createDispatcher(
}

private static JobGraph createJobGraph() {
final JobVertex testVertex = new JobVertex("testVertex");
testVertex.setInvokableClass(NoOpInvokable.class);
return new JobGraph(TEST_JOB_ID, "testJob", testVertex);
return JobGraphTestUtils.singleNoOpJobGraph();
}

private DispatcherRunner createDispatcherRunner() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServicesBuilder;
import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperRunningJobsRegistry;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.JobGraphStore;
import org.apache.flink.runtime.jobmanager.JobGraphStoreFactory;
Expand Down Expand Up @@ -268,7 +269,7 @@ private JobGraph createJobGraphWithBlobs() throws IOException {
vertex.setInvokableClass(NoOpInvokable.class);
vertex.setParallelism(1);

final JobGraph jobGraph = new JobGraph("Test job graph", vertex);
final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(vertex);
final PermanentBlobKey permanentBlobKey =
blobServer.putPermanent(jobGraph.getJobID(), new byte[256]);
jobGraph.addUserJarBlobKey(permanentBlobKey);
Expand Down
Loading

0 comments on commit 88c2d10

Please sign in to comment.