Skip to content

Commit

Permalink
[FLINK-33853][runtime][JUnit5 Migration] Migrate Junit5 for Declarati…
Browse files Browse the repository at this point in the history
…veSlotPoolBridge test classes of runtime module (apache#23932)
  • Loading branch information
RocMarshal authored Dec 15, 2023
1 parent 9aa7a3c commit 1136ed5
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,9 @@
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.util.TestLoggerExtension;
import org.apache.flink.util.clock.SystemClock;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import javax.annotation.Nonnull;

Expand All @@ -44,11 +42,10 @@

import static org.assertj.core.api.Assertions.assertThat;

@ExtendWith(TestLoggerExtension.class)
public class DeclarativeSlotPoolBridgePreferredAllocationsTest {
class DeclarativeSlotPoolBridgePreferredAllocationsTest {

@Test
public void testDeclarativeSlotPoolTakesPreferredAllocationsIntoAccount() throws Exception {
void testDeclarativeSlotPoolTakesPreferredAllocationsIntoAccount() throws Exception {
final DeclarativeSlotPoolBridge declarativeSlotPoolBridge =
new DeclarativeSlotPoolBridge(
new JobID(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,10 @@
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.CheckedSupplier;

import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.Collection;
import java.util.Collections;
Expand All @@ -43,27 +42,23 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.assertj.core.api.Assertions.assertThat;

/** Tests how the {@link DeclarativeSlotPoolBridge} completes slot requests. */
public class DeclarativeSlotPoolBridgeRequestCompletionTest extends TestLogger {
class DeclarativeSlotPoolBridgeRequestCompletionTest {

private static final Time TIMEOUT = SlotPoolUtils.TIMEOUT;

private TestingResourceManagerGateway resourceManagerGateway;

@Before
public void setUp() throws Exception {
@BeforeEach
void setUp() {
resourceManagerGateway = new TestingResourceManagerGateway();
}

/** Tests that the {@link DeclarativeSlotPoolBridge} completes slots in request order. */
@Test
public void testRequestsAreCompletedInRequestOrder() {
void testRequestsAreCompletedInRequestOrder() {
runSlotRequestCompletionTest(
CheckedSupplier.unchecked(this::createAndSetUpSlotPool), slotPool -> {});
}
Expand All @@ -73,7 +68,7 @@ public void testRequestsAreCompletedInRequestOrder() {
* order.
*/
@Test
public void testStashOrderMaintainsRequestOrder() {
void testStashOrderMaintainsRequestOrder() {
runSlotRequestCompletionTest(
CheckedSupplier.unchecked(this::createAndSetUpSlotPoolWithoutResourceManager),
this::connectToResourceManager);
Expand Down Expand Up @@ -113,15 +108,15 @@ private void runSlotRequestCompletionTest(
new SimpleAckingTaskManagerGateway(),
Collections.singleton(slotOffer));

assertThat(acceptedSlots, containsInAnyOrder(slotOffer));
assertThat(acceptedSlots).contains(slotOffer);

final FlinkException testingReleaseException =
new FlinkException("Testing release exception");

// check that the slot requests get completed in sequential order
for (int i = 0; i < slotRequestIds.size(); i++) {
final CompletableFuture<PhysicalSlot> slotRequestFuture = slotRequests.get(i);
assertThat(slotRequestFuture.getNow(null), is(not(nullValue())));
assertThat(slotRequestFuture.getNow(null)).isNotNull();
slotPool.releaseSlot(slotRequestIds.get(i), testingReleaseException);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,21 @@
package org.apache.flink.runtime.jobmaster.slotpool;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.util.ResourceCounter;
import org.apache.flink.util.TestLogger;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

import java.io.IOException;
import java.time.Duration;
Expand All @@ -44,37 +44,33 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
import static org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridgeTest.createAllocatedSlot;
import static org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridgeTest.createDeclarativeSlotPoolBridge;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.assertj.core.api.Assertions.assertThat;

/** Tests for the {@link DeclarativeSlotPoolBridge}. */
@RunWith(Parameterized.class)
public class DeclarativeSlotPoolBridgeResourceDeclarationTest extends TestLogger {
@ExtendWith(ParameterizedTestExtension.class)
class DeclarativeSlotPoolBridgeResourceDeclarationTest {

private static final JobMasterId jobMasterId = JobMasterId.generate();
private final ComponentMainThreadExecutor mainThreadExecutor =
ComponentMainThreadExecutorServiceAdapter.forMainThread();
private final RequestSlotMatchingStrategy requestSlotMatchingStrategy;

@Parameter private RequestSlotMatchingStrategy requestSlotMatchingStrategy;

private RequirementListener requirementListener;
private DeclarativeSlotPoolBridge declarativeSlotPoolBridge;

@Parameterized.Parameters(name = "RequestSlotMatchingStrategy: {0}")
@Parameters(name = "RequestSlotMatchingStrategy: {0}")
public static Collection<RequestSlotMatchingStrategy> data() throws IOException {
return Arrays.asList(
SimpleRequestSlotMatchingStrategy.INSTANCE,
PreferredAllocationRequestSlotMatchingStrategy.INSTANCE);
}

public DeclarativeSlotPoolBridgeResourceDeclarationTest(
RequestSlotMatchingStrategy requestSlotMatchingStrategy) {
this.requestSlotMatchingStrategy = requestSlotMatchingStrategy;
}

@Before
public void setup() throws Exception {
@BeforeEach
void setup() {
requirementListener = new RequirementListener();

final TestingDeclarativeSlotPoolBuilder slotPoolBuilder =
Expand All @@ -100,27 +96,26 @@ public void setup() throws Exception {
declarativeSlotPoolFactory, requestSlotMatchingStrategy);
}

@After
public void teardown() throws Exception {
@AfterEach
void teardown() {
if (declarativeSlotPoolBridge != null) {
declarativeSlotPoolBridge.close();
}
}

@Test
public void testRequirementsIncreasedOnNewAllocation() throws Exception {
@TestTemplate
void testRequirementsIncreasedOnNewAllocation() throws Exception {
declarativeSlotPoolBridge.start(jobMasterId, "localhost", mainThreadExecutor);

// requesting the allocation of a new slot should increase the requirements
declarativeSlotPoolBridge.requestNewAllocatedSlot(
new SlotRequestId(), ResourceProfile.UNKNOWN, Time.minutes(5));
assertThat(
requirementListener.getRequirements().getResourceCount(ResourceProfile.UNKNOWN),
is(1));
assertThat(requirementListener.getRequirements().getResourceCount(ResourceProfile.UNKNOWN))
.isOne();
}

@Test
public void testRequirementsDecreasedOnAllocationTimeout() throws Exception {
@TestTemplate
void testRequirementsDecreasedOnAllocationTimeout() throws Exception {
final ScheduledExecutorService scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor();
try {
Expand All @@ -141,40 +136,37 @@ public void testRequirementsDecreasedOnAllocationTimeout() throws Exception {
.get();

// waiting for the timeout
assertThat(
allocationFuture,
FlinkMatchers.futureWillCompleteExceptionally(Duration.ofMinutes(1)));

assertThatFuture(allocationFuture).failsWithin(Duration.ofMinutes(1));
// when the allocation fails the requirements should be reduced (it is the users
// responsibility to retry)
CompletableFuture.runAsync(
() ->
assertThat(
requirementListener
.getRequirements()
.getResourceCount(ResourceProfile.UNKNOWN),
is(0)),
requirementListener
.getRequirements()
.getResourceCount(
ResourceProfile.UNKNOWN))
.isZero(),
mainThreadExecutor)
.join();
} finally {
scheduledExecutorService.shutdown();
}
}

@Test
public void testRequirementsUnchangedOnNewSlotsNotification() throws Exception {
@TestTemplate
void testRequirementsUnchangedOnNewSlotsNotification() throws Exception {
declarativeSlotPoolBridge.start(jobMasterId, "localhost", mainThreadExecutor);

// notifications about new slots should not affect requirements
final PhysicalSlot newSlot = createAllocatedSlot(new AllocationID());
declarativeSlotPoolBridge.newSlotsAreAvailable(Collections.singleton(newSlot));
assertThat(
requirementListener.getRequirements().getResourceCount(ResourceProfile.UNKNOWN),
is(0));
assertThat(requirementListener.getRequirements().getResourceCount(ResourceProfile.UNKNOWN))
.isZero();
}

@Test
public void testRequirementsIncreasedOnSlotReservation() throws Exception {
@TestTemplate
void testRequirementsIncreasedOnSlotReservation() throws Exception {
declarativeSlotPoolBridge.start(jobMasterId, "localhost", mainThreadExecutor);

final PhysicalSlot newSlot = createAllocatedSlot(new AllocationID());
Expand All @@ -184,13 +176,12 @@ public void testRequirementsIncreasedOnSlotReservation() throws Exception {
final SlotRequestId slotRequestId = new SlotRequestId();
declarativeSlotPoolBridge.allocateAvailableSlot(
slotRequestId, newSlot.getAllocationId(), ResourceProfile.UNKNOWN);
assertThat(
requirementListener.getRequirements().getResourceCount(ResourceProfile.UNKNOWN),
is(1));
assertThat(requirementListener.getRequirements().getResourceCount(ResourceProfile.UNKNOWN))
.isOne();
}

@Test
public void testRequirementsDecreasedOnSlotFreeing() throws Exception {
@TestTemplate
void testRequirementsDecreasedOnSlotFreeing() throws Exception {
declarativeSlotPoolBridge.start(jobMasterId, "localhost", mainThreadExecutor);

final PhysicalSlot newSlot = createAllocatedSlot(new AllocationID());
Expand All @@ -203,13 +194,12 @@ public void testRequirementsDecreasedOnSlotFreeing() throws Exception {
// releasing (==freeing) a [reserved] slot should decrease the requirements
declarativeSlotPoolBridge.releaseSlot(
slotRequestId, new RuntimeException("Test exception"));
assertThat(
requirementListener.getRequirements().getResourceCount(ResourceProfile.UNKNOWN),
is(0));
assertThat(requirementListener.getRequirements().getResourceCount(ResourceProfile.UNKNOWN))
.isZero();
}

@Test
public void testRequirementsDecreasedOnSlotAllocationFailure() throws Exception {
@TestTemplate
void testRequirementsDecreasedOnSlotAllocationFailure() throws Exception {
declarativeSlotPoolBridge.start(jobMasterId, "localhost", mainThreadExecutor);

final PhysicalSlot newSlot = createAllocatedSlot(new AllocationID());
Expand All @@ -223,9 +213,8 @@ public void testRequirementsDecreasedOnSlotAllocationFailure() throws Exception
newSlot.getTaskManagerLocation().getResourceID(),
newSlot.getAllocationId(),
new RuntimeException("Test exception"));
assertThat(
requirementListener.getRequirements().getResourceCount(ResourceProfile.UNKNOWN),
is(0));
assertThat(requirementListener.getRequirements().getResourceCount(ResourceProfile.UNKNOWN))
.isZero();
}

private static final class RequirementListener {
Expand Down
Loading

0 comments on commit 1136ed5

Please sign in to comment.