Skip to content

Commit

Permalink
[FLINK-32843][JUnit5 Migration] Migrate the jobmaster package of flin…
Browse files Browse the repository at this point in the history
…k-runtime module to JUnit5
  • Loading branch information
RocMarshal committed May 6, 2024
1 parent 80af4d5 commit beb0b16
Show file tree
Hide file tree
Showing 28 changed files with 520 additions and 639 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.rules.ExternalResource;

import javax.annotation.Nonnull;

Expand Down Expand Up @@ -72,41 +71,6 @@ public ComponentMainThreadExecutor getMainThreadExecutor() {
return mainThreadExecutor;
}

/** Test resource for convenience. */
public static class Resource extends ExternalResource {

private long shutdownTimeoutMillis;
private TestingComponentMainThreadExecutor componentMainThreadTestExecutor;
private ScheduledExecutorService innerExecutorService;

public Resource() {
this(500L);
}

public Resource(long shutdownTimeoutMillis) {
this.shutdownTimeoutMillis = shutdownTimeoutMillis;
}

@Override
protected void before() {
this.innerExecutorService = Executors.newSingleThreadScheduledExecutor();
this.componentMainThreadTestExecutor =
new TestingComponentMainThreadExecutor(
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
innerExecutorService));
}

@Override
protected void after() {
ExecutorUtils.gracefulShutdown(
shutdownTimeoutMillis, TimeUnit.MILLISECONDS, innerExecutorService);
}

public TestingComponentMainThreadExecutor getComponentMainThreadTestExecutor() {
return componentMainThreadTestExecutor;
}
}

/** Test extension for convenience. */
public static class Extension implements BeforeAllCallback, AfterAllCallback {
private final long shutdownTimeoutMillis;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.taskexecutor.ExecutionDeploymentReport;
import org.apache.flink.util.TestLogger;

import org.junit.Test;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -34,15 +33,13 @@

import static org.apache.flink.runtime.clusterframework.types.ResourceID.generate;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.core.IsCollectionContaining.hasItem;
import static org.junit.Assert.assertThat;
import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link DefaultExecutionDeploymentReconciler}. */
public class DefaultExecutionDeploymentReconcilerTest extends TestLogger {
class DefaultExecutionDeploymentReconcilerTest {

@Test
public void testMatchingDeployments() {
void testMatchingDeployments() {
TestingExecutionDeploymentReconciliationHandler handler =
new TestingExecutionDeploymentReconciliationHandler();

Expand All @@ -57,12 +54,12 @@ public void testMatchingDeployments() {
new ExecutionDeploymentReport(Collections.singleton(attemptId)),
Collections.singletonMap(attemptId, ExecutionDeploymentState.DEPLOYED));

assertThat(handler.getMissingExecutions(), empty());
assertThat(handler.getUnknownExecutions(), empty());
assertThat(handler.getMissingExecutions()).isEmpty();
assertThat(handler.getUnknownExecutions()).isEmpty();
}

@Test
public void testMissingDeployments() {
void testMissingDeployments() {
TestingExecutionDeploymentReconciliationHandler handler =
new TestingExecutionDeploymentReconciliationHandler();

Expand All @@ -77,12 +74,12 @@ public void testMissingDeployments() {
new ExecutionDeploymentReport(Collections.emptySet()),
Collections.singletonMap(attemptId, ExecutionDeploymentState.DEPLOYED));

assertThat(handler.getUnknownExecutions(), empty());
assertThat(handler.getMissingExecutions(), hasItem(attemptId));
assertThat(handler.getUnknownExecutions()).isEmpty();
assertThat(handler.getMissingExecutions()).contains(attemptId);
}

@Test
public void testUnknownDeployments() {
void testUnknownDeployments() {
TestingExecutionDeploymentReconciliationHandler handler =
new TestingExecutionDeploymentReconciliationHandler();

Expand All @@ -97,12 +94,12 @@ public void testUnknownDeployments() {
new ExecutionDeploymentReport(Collections.singleton(attemptId)),
Collections.emptyMap());

assertThat(handler.getMissingExecutions(), empty());
assertThat(handler.getUnknownExecutions(), hasItem(attemptId));
assertThat(handler.getMissingExecutions()).isEmpty();
assertThat(handler.getUnknownExecutions()).contains(attemptId);
}

@Test
public void testMissingAndUnknownDeployments() {
void testMissingAndUnknownDeployments() {
TestingExecutionDeploymentReconciliationHandler handler =
new TestingExecutionDeploymentReconciliationHandler();

Expand All @@ -120,12 +117,12 @@ public void testMissingAndUnknownDeployments() {
Stream.of(missingId, matchingId)
.collect(Collectors.toMap(x -> x, x -> ExecutionDeploymentState.DEPLOYED)));

assertThat(handler.getMissingExecutions(), hasItem(missingId));
assertThat(handler.getUnknownExecutions(), hasItem(unknownId));
assertThat(handler.getMissingExecutions()).contains(missingId);
assertThat(handler.getUnknownExecutions()).contains(unknownId);
}

@Test
public void testPendingDeployments() {
void testPendingDeployments() {
TestingExecutionDeploymentReconciliationHandler handler =
new TestingExecutionDeploymentReconciliationHandler();

Expand All @@ -143,8 +140,8 @@ public void testPendingDeployments() {
Stream.of(matchingId, missingId)
.collect(Collectors.toMap(x -> x, x -> ExecutionDeploymentState.PENDING)));

assertThat(handler.getMissingExecutions(), empty());
assertThat(handler.getUnknownExecutions(), hasItem(unknownId));
assertThat(handler.getMissingExecutions()).isEmpty();
assertThat(handler.getUnknownExecutions()).contains(unknownId);
}

private static class TestingExecutionDeploymentReconciliationHandler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,42 +19,34 @@

import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.util.TestLogger;

import org.junit.Test;
import org.junit.jupiter.api.Test;

import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertThat;
import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link DefaultExecutionDeploymentTracker}. */
public class DefaultExecutionDeploymentTrackerTest extends TestLogger {
class DefaultExecutionDeploymentTrackerTest {

@Test
public void testStartTracking() {
void testStartTracking() {
final DefaultExecutionDeploymentTracker tracker = new DefaultExecutionDeploymentTracker();

final ExecutionAttemptID attemptId1 = createExecutionAttemptId();
final ResourceID resourceId1 = ResourceID.generate();
tracker.startTrackingPendingDeploymentOf(attemptId1, resourceId1);

assertThat(
tracker.getExecutionsOn(resourceId1),
hasEntry(attemptId1, ExecutionDeploymentState.PENDING));
assertThat(tracker.getExecutionsOn(resourceId1))
.containsEntry(attemptId1, ExecutionDeploymentState.PENDING);

tracker.completeDeploymentOf(attemptId1);

assertThat(
tracker.getExecutionsOn(resourceId1),
hasEntry(attemptId1, ExecutionDeploymentState.DEPLOYED));
assertThat(tracker.getExecutionsOn(resourceId1))
.containsEntry(attemptId1, ExecutionDeploymentState.DEPLOYED);
}

@Test
public void testStopTrackingCompletedDeployment() {
void testStopTrackingCompletedDeployment() {
final DefaultExecutionDeploymentTracker tracker = new DefaultExecutionDeploymentTracker();

final ExecutionAttemptID attemptId1 = createExecutionAttemptId();
Expand All @@ -65,11 +57,11 @@ public void testStopTrackingCompletedDeployment() {

tracker.stopTrackingDeploymentOf(attemptId1);

assertThat(tracker.getExecutionsOn(resourceId1).entrySet(), empty());
assertThat(tracker.getExecutionsOn(resourceId1).entrySet()).isEmpty();
}

@Test
public void testStopTrackingPendingDeployment() {
void testStopTrackingPendingDeployment() {
final DefaultExecutionDeploymentTracker tracker = new DefaultExecutionDeploymentTracker();

final ExecutionAttemptID attemptId1 = createExecutionAttemptId();
Expand All @@ -78,11 +70,11 @@ public void testStopTrackingPendingDeployment() {

tracker.stopTrackingDeploymentOf(attemptId1);

assertThat(tracker.getExecutionsOn(resourceId1).entrySet(), empty());
assertThat(tracker.getExecutionsOn(resourceId1).entrySet()).isEmpty();
}

@Test
public void testStopTrackingDoesNotAffectOtherIds() {
void testStopTrackingDoesNotAffectOtherIds() {
final DefaultExecutionDeploymentTracker tracker = new DefaultExecutionDeploymentTracker();

final ExecutionAttemptID attemptId1 = createExecutionAttemptId();
Expand All @@ -92,30 +84,28 @@ public void testStopTrackingDoesNotAffectOtherIds() {

tracker.stopTrackingDeploymentOf(createExecutionAttemptId());

assertThat(tracker.getExecutionsOn(resourceId1), hasKey(attemptId1));
assertThat(tracker.getExecutionsOn(resourceId1)).containsKey(attemptId1);
}

@Test
public void testCompleteDeploymentUnknownExecutionDoesNotThrowException() {
void testCompleteDeploymentUnknownExecutionDoesNotThrowException() {
final DefaultExecutionDeploymentTracker tracker = new DefaultExecutionDeploymentTracker();

tracker.completeDeploymentOf(createExecutionAttemptId());
}

@Test
public void testStopTrackingUnknownExecutionDoesNotThrowException() {
void testStopTrackingUnknownExecutionDoesNotThrowException() {
final DefaultExecutionDeploymentTracker tracker = new DefaultExecutionDeploymentTracker();

final ExecutionAttemptID attemptId2 = createExecutionAttemptId();
tracker.stopTrackingDeploymentOf(attemptId2);
}

@Test
public void testGetExecutionsReturnsEmptySetForUnknownHost() {
void testGetExecutionsReturnsEmptySetForUnknownHost() {
final DefaultExecutionDeploymentTracker tracker = new DefaultExecutionDeploymentTracker();

assertThat(
tracker.getExecutionsOn(ResourceID.generate()).entrySet(),
allOf(notNullValue(), empty()));
assertThat(tracker.getExecutionsOn(ResourceID.generate()).entrySet()).isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ void testJobMasterGatewayGetsForwarded() {
}

@Test
void testLeaderAddressGetsForwarded() throws Exception {
void testLeaderAddressGetsForwarded() {
final CompletableFuture<JobMasterService> jobMasterServiceFuture =
new CompletableFuture<>();
DefaultJobMasterServiceProcess serviceProcess = createTestInstance(jobMasterServiceFuture);
Expand Down Expand Up @@ -249,7 +249,7 @@ void testIsNotInitializedAfterClosing() {
}

@Test
void testSuccessOnTerminalState() throws Exception {
void testSuccessOnTerminalState() {
final CompletableFuture<JobMasterService> jobMasterServiceFuture =
new CompletableFuture<>();
DefaultJobMasterServiceProcess serviceProcess = createTestInstance(jobMasterServiceFuture);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@

/** Tests for the {@link DefaultSlotPoolServiceSchedulerFactory}. */
@ExtendWith(TestLoggerExtension.class)
public class DefaultSlotPoolServiceSchedulerFactoryTest {
class DefaultSlotPoolServiceSchedulerFactoryTest {

@Test
public void testFallsBackToDefaultSchedulerIfAdaptiveSchedulerInBatchJob() {
void testFallsBackToDefaultSchedulerIfAdaptiveSchedulerInBatchJob() {
final Configuration configuration = new Configuration();
configuration.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Adaptive);

Expand All @@ -61,7 +61,7 @@ public void testFallsBackToDefaultSchedulerIfAdaptiveSchedulerInBatchJob() {
}

@Test
public void testAdaptiveSchedulerForReactiveMode() {
void testAdaptiveSchedulerForReactiveMode() {
final Configuration configuration = new Configuration();
configuration.set(JobManagerOptions.SCHEDULER_MODE, SchedulerExecutionMode.REACTIVE);

Expand All @@ -76,7 +76,7 @@ public void testAdaptiveSchedulerForReactiveMode() {
}

@Test
public void testFallBackSchedulerWithAdaptiveSchedulerTestProperty() {
void testFallBackSchedulerWithAdaptiveSchedulerTestProperty() {
String propertyValue = saveAdaptiveSchedulerTestPropertyValue();
System.setProperty("flink.tests.enable-adaptive-scheduler", "true");

Expand All @@ -102,7 +102,7 @@ public void testFallBackSchedulerWithAdaptiveSchedulerTestProperty() {
}

@Test
public void testFallBackSchedulerWithoutAdaptiveSchedulerTestProperty() {
void testFallBackSchedulerWithoutAdaptiveSchedulerTestProperty() {
String propertyValue = saveAdaptiveSchedulerTestPropertyValue();
System.clearProperty("flink.tests.enable-adaptive-scheduler");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,23 @@
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.minicluster.TestingMiniCluster;
import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
import org.apache.flink.util.TestLogger;

import org.junit.Test;
import org.junit.jupiter.api.Test;

import java.util.concurrent.CompletableFuture;

import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.assertj.core.api.Assertions.assertThat;

/** Integration tests for job scheduling. */
public class JobExecutionITCase extends TestLogger {
class JobExecutionITCase {

/**
* Tests that tasks with a co-location constraint are scheduled in the same slots. In fact it
* also tests that consumers are scheduled wrt their input location if the co-location
* constraint is deactivated.
*/
@Test
public void testCoLocationConstraintJobExecution() throws Exception {
void testCoLocationConstraintJobExecution() throws Exception {
final int numSlotsPerTaskExecutor = 1;
final int numTaskExecutors = 3;
final int parallelism = numTaskExecutors * numSlotsPerTaskExecutor;
Expand All @@ -66,7 +64,7 @@ public void testCoLocationConstraintJobExecution() throws Exception {
final CompletableFuture<JobResult> jobResultFuture =
miniCluster.requestJobResult(jobGraph.getJobID());

assertThat(jobResultFuture.get().isSuccess(), is(true));
assertThat(jobResultFuture.get().isSuccess()).isTrue();
}
}

Expand Down
Loading

0 comments on commit beb0b16

Please sign in to comment.