Skip to content

Commit

Permalink
[hotfix] Fix checkstyle violations in TaskExecutorTest
Browse files Browse the repository at this point in the history
  • Loading branch information
tillrohrmann committed Oct 15, 2018
1 parent 789663e commit 453545b
Showing 1 changed file with 42 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -439,15 +439,15 @@ public void testHeartbeatSlotReporting() throws Exception {
new ClusterInformation("localhost", 1234)));

rmGateway.setRegisterTaskExecutorFunction(stringResourceIDIntegerHardwareDescriptionTuple4 -> {
taskExecutorRegistrationFuture.complete(stringResourceIDIntegerHardwareDescriptionTuple4.f1);
return registrationResponse;
});
taskExecutorRegistrationFuture.complete(stringResourceIDIntegerHardwareDescriptionTuple4.f1);
return registrationResponse;
});

final CompletableFuture<SlotReport> initialSlotReportFuture = new CompletableFuture<>();
rmGateway.setSendSlotReportFunction(resourceIDInstanceIDSlotReportTuple3 -> {
initialSlotReportFuture.complete(resourceIDInstanceIDSlotReportTuple3.f2);
return CompletableFuture.completedFuture(Acknowledge.get());
});
initialSlotReportFuture.complete(resourceIDInstanceIDSlotReportTuple3.f2);
return CompletableFuture.completedFuture(Acknowledge.get());
});

final CompletableFuture<SlotReport> heartbeatSlotReportFuture = new CompletableFuture<>();
rmGateway.setTaskExecutorHeartbeatConsumer((resourceID, slotReport) -> heartbeatSlotReportFuture.complete(slotReport));
Expand Down Expand Up @@ -483,10 +483,10 @@ public HeartbeatManagerImpl<SlotReport, Void> answer(InvocationOnMock invocation
return spy(new HeartbeatManagerImpl<>(
heartbeatTimeout,
taskManagerLocation.getResourceID(),
(HeartbeatListener<SlotReport, Void>)invocation.getArguments()[1],
(Executor)invocation.getArguments()[2],
(ScheduledExecutor)invocation.getArguments()[2],
(Logger)invocation.getArguments()[3]));
(HeartbeatListener<SlotReport, Void>) invocation.getArguments()[1],
(Executor) invocation.getArguments()[2],
(ScheduledExecutor) invocation.getArguments()[2],
(Logger) invocation.getArguments()[3]));
}
}
);
Expand Down Expand Up @@ -670,10 +670,10 @@ public void testTriggerRegistrationOnLeaderChange() throws Exception {
eq(taskManagerAddress), eq(taskManagerLocation.getResourceID()), anyInt(), any(HardwareDescription.class), any(Time.class));
assertNotNull(taskManager.getResourceManagerConnection());

// cancel the leader
// cancel the leader
resourceManagerLeaderRetriever.notifyListener(null, null);

// set a new leader, see that a registration happens
// set a new leader, see that a registration happens
resourceManagerLeaderRetriever.notifyListener(address2, leaderId2);

verify(rmGateway2, Mockito.timeout(timeout.toMilliseconds())).registerTaskExecutor(
Expand Down Expand Up @@ -786,7 +786,7 @@ public void testTaskSubmission() throws Exception {

tmGateway.submitTask(tdd, jobMasterId, timeout);

CompletableFuture<Boolean> completionFuture = TestInvokable.completableFuture;
CompletableFuture<Boolean> completionFuture = TestInvokable.COMPLETABLE_FUTURE;

completionFuture.get();
} finally {
Expand All @@ -800,15 +800,15 @@ public void testTaskSubmission() throws Exception {
*/
public static class TestInvokable extends AbstractInvokable {

static final CompletableFuture<Boolean> completableFuture = new CompletableFuture<>();
static final CompletableFuture<Boolean> COMPLETABLE_FUTURE = new CompletableFuture<>();

public TestInvokable(Environment environment) {
super(environment);
}

@Override
public void invoke() throws Exception {
completableFuture.complete(true);
COMPLETABLE_FUTURE.complete(true);
}
}

Expand All @@ -828,10 +828,9 @@ public void testJobLeaderDetection() throws Exception {
final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
CompletableFuture<Void> initialSlotReportFuture = new CompletableFuture<>();
resourceManagerGateway.setSendSlotReportFunction(resourceIDInstanceIDSlotReportTuple3 -> {
initialSlotReportFuture.complete(null);
return CompletableFuture.completedFuture(Acknowledge.get());

});
initialSlotReportFuture.complete(null);
return CompletableFuture.completedFuture(Acknowledge.get());
});

final String jobManagerAddress = "jm";
final UUID jobManagerLeaderId = UUID.randomUUID();
Expand Down Expand Up @@ -941,10 +940,9 @@ public void testSlotAcceptance() throws Exception {
final CompletableFuture<ResourceID> registrationFuture = new CompletableFuture<>();
resourceManagerGateway.setRegisterTaskExecutorFunction(
stringResourceIDIntegerHardwareDescriptionTuple4 -> {
registrationFuture.complete(stringResourceIDIntegerHardwareDescriptionTuple4.f1);
return CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(registrationId, resourceManagerResourceId, new ClusterInformation("localhost", 1234)));
}
);
registrationFuture.complete(stringResourceIDIntegerHardwareDescriptionTuple4.f1);
return CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(registrationId, resourceManagerResourceId, new ClusterInformation("localhost", 1234)));
});

final CompletableFuture<Tuple3<InstanceID, SlotID, AllocationID>> availableSlotFuture = new CompletableFuture<>();
resourceManagerGateway.setNotifySlotAvailableConsumer(availableSlotFuture::complete);
Expand All @@ -967,7 +965,7 @@ public void testSlotAcceptance() throws Exception {

when(jobMasterGateway.offerSlots(
any(ResourceID.class), any(Collection.class), any(Time.class)))
.thenReturn(CompletableFuture.completedFuture((Collection<SlotOffer>)Collections.singleton(offer1)));
.thenReturn(CompletableFuture.completedFuture((Collection<SlotOffer>) Collections.singleton(offer1)));

rpc.registerGateway(resourceManagerAddress, resourceManagerGateway);
rpc.registerGateway(jobManagerAddress, jobMasterGateway);
Expand Down Expand Up @@ -1063,7 +1061,6 @@ public void testSubmitTaskBeforeAcceptSlot() throws Exception {
when(jobMasterGateway.getHostname()).thenReturn(jobManagerAddress);
when(jobMasterGateway.updateTaskExecutionState(any(TaskExecutionState.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));


rpc.registerGateway(resourceManagerAddress, resourceManagerGateway);
rpc.registerGateway(jobManagerAddress, jobMasterGateway);

Expand Down Expand Up @@ -1194,9 +1191,7 @@ public void testSubmitTaskBeforeAcceptSlot() throws Exception {

/**
* This tests makes sure that duplicate JobMaster gained leadership messages are filtered out
* by the TaskExecutor.
*
* See FLINK-7526
* by the TaskExecutor. See FLINK-7526.
*/
@Test
public void testFilterOutDuplicateJobMasterRegistrations() throws Exception {
Expand Down Expand Up @@ -1366,9 +1361,9 @@ public void testRemoveJobFromJobLeaderService() throws Exception {
final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
final CompletableFuture<Void> initialSlotReport = new CompletableFuture<>();
resourceManagerGateway.setSendSlotReportFunction(resourceIDInstanceIDSlotReportTuple3 -> {
initialSlotReport.complete(null);
return CompletableFuture.completedFuture(Acknowledge.get());
});
initialSlotReport.complete(null);
return CompletableFuture.completedFuture(Acknowledge.get());
});
final ResourceManagerId resourceManagerId = resourceManagerGateway.getFencingToken();

rpc.registerGateway(resourceManagerGateway.getAddress(), resourceManagerGateway);
Expand Down Expand Up @@ -1514,9 +1509,9 @@ public void testIgnoringSlotRequestsIfNotRegistered() throws Exception {
final CompletableFuture<ResourceID> taskExecutorResourceIdFuture = new CompletableFuture<>();

testingResourceManagerGateway.setRegisterTaskExecutorFunction(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5 -> {
taskExecutorResourceIdFuture.complete(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5.f1);
return registrationFuture;
});
taskExecutorResourceIdFuture.complete(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5.f1);
return registrationFuture;
});

rpc.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway);
resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.getFencingToken().toUUID());
Expand Down Expand Up @@ -1619,10 +1614,9 @@ public void testInitialSlotReport() throws Exception {

testingResourceManagerGateway.setSendSlotReportFunction(
resourceIDInstanceIDSlotReportTuple3 -> {
initialSlotReportFuture.complete(resourceIDInstanceIDSlotReportTuple3.f0);
return CompletableFuture.completedFuture(Acknowledge.get());
}
);
initialSlotReportFuture.complete(resourceIDInstanceIDSlotReportTuple3.f0);
return CompletableFuture.completedFuture(Acknowledge.get());
});

rpc.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway);
resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.getFencingToken().toUUID());
Expand Down Expand Up @@ -1666,7 +1660,7 @@ public void testInitialSlotReportFailure() throws Exception {
new TaskExecutorRegistrationSuccess(
new InstanceID(),
testingResourceManagerGateway.getOwnResourceId(),
new ClusterInformation("foobar", 1234) ));
new ClusterInformation("foobar", 1234)));

final CountDownLatch numberRegistrations = new CountDownLatch(2);

Expand Down Expand Up @@ -1694,15 +1688,15 @@ public CompletableFuture<RegistrationResponse> apply(Tuple4<String, ResourceID,
@Nonnull
private TaskExecutor createTaskExecutor(TaskManagerServices taskManagerServices) {
return new TaskExecutor(
rpc,
TaskManagerConfiguration.fromConfiguration(configuration),
haServices,
taskManagerServices,
new HeartbeatServices(1000L, 1000L),
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
null,
dummyBlobCacheService,
testingFatalErrorHandler);
rpc,
TaskManagerConfiguration.fromConfiguration(configuration),
haServices,
taskManagerServices,
new HeartbeatServices(1000L, 1000L),
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
null,
dummyBlobCacheService,
testingFatalErrorHandler);
}

private static final class StartStopNotifyingLeaderRetrievalService implements LeaderRetrievalService {
Expand Down

0 comments on commit 453545b

Please sign in to comment.