From b27b1a9f8d0abfd60a22a17b74881a7455529b98 Mon Sep 17 00:00:00 2001 From: Zhu Zhu Date: Mon, 18 Jul 2022 16:13:04 +0800 Subject: [PATCH] [hotfix] Migrate SourceCoordinator tests to JUnit5 --- .../coordinator/CoordinatorTestUtils.java | 22 +-- .../SourceCoordinatorAlignmentTest.java | 20 +-- .../SourceCoordinatorContextTest.java | 67 +++++----- .../coordinator/SourceCoordinatorTest.java | 126 ++++++++---------- .../SourceCoordinatorTestBase.java | 18 +-- 5 files changed, 111 insertions(+), 142 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/CoordinatorTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/CoordinatorTestUtils.java index 2b7d8dd56dc66..6811ae712a4e4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/CoordinatorTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/CoordinatorTestUtils.java @@ -22,17 +22,14 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.flink.api.connector.source.mocks.MockSourceSplit; import org.apache.flink.util.function.ThrowingRunnable; -import org.hamcrest.Matchers; - import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** A util class containing the helper methods for the coordinator tests. */ class CoordinatorTestUtils { @@ -58,24 +55,15 @@ static SplitsAssignment getSplitsAssignment( /** Check the actual assignment meets the expectation. */ static void verifyAssignment( List expectedSplitIds, Collection actualAssignment) { - assertEquals(expectedSplitIds.size(), actualAssignment.size()); + assertThat(actualAssignment.size()).isEqualTo(expectedSplitIds.size()); int i = 0; for (MockSourceSplit split : actualAssignment) { - assertEquals(expectedSplitIds.get(i++), split.splitId()); + assertThat(split.splitId()).isEqualTo(expectedSplitIds.get(i++)); } } static void verifyException( ThrowingRunnable runnable, String failureMessage, String errorMessage) { - try { - runnable.run(); - fail(failureMessage); - } catch (Throwable t) { - Throwable rootCause = t; - while (rootCause.getCause() != null) { - rootCause = rootCause.getCause(); - } - assertThat(rootCause.getMessage(), Matchers.startsWith(errorMessage)); - } + assertThatThrownBy(runnable::run, failureMessage).hasStackTraceContaining(errorMessage); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java index f3db14136a8ef..86b3b9daca897 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java @@ -24,19 +24,18 @@ import org.apache.flink.runtime.source.event.ReportedWatermarkEvent; import org.apache.flink.runtime.source.event.WatermarkAlignmentEvent; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.util.List; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; +import static org.assertj.core.api.Assertions.assertThat; /** Unit tests for watermark alignment of the {@link SourceCoordinator}. */ @SuppressWarnings("serial") -public class SourceCoordinatorAlignmentTest extends SourceCoordinatorTestBase { +class SourceCoordinatorAlignmentTest extends SourceCoordinatorTestBase { @Test - public void testWatermarkAlignment() throws Exception { + void testWatermarkAlignment() throws Exception { try (AutoCloseableRegistry closeableRegistry = new AutoCloseableRegistry()) { SourceCoordinator sourceCoordinator1 = getAndStartNewSourceCoordinator( @@ -59,7 +58,7 @@ public void testWatermarkAlignment() throws Exception { } @Test - public void testWatermarkAlignmentWithIdleness() throws Exception { + void testWatermarkAlignmentWithIdleness() throws Exception { try (AutoCloseableRegistry closeableRegistry = new AutoCloseableRegistry()) { SourceCoordinator sourceCoordinator1 = getAndStartNewSourceCoordinator( @@ -88,7 +87,7 @@ public void testWatermarkAlignmentWithIdleness() throws Exception { } @Test - public void testWatermarkAlignmentWithTwoGroups() throws Exception { + void testWatermarkAlignmentWithTwoGroups() throws Exception { try (AutoCloseableRegistry closeableRegistry = new AutoCloseableRegistry()) { long maxDrift = 1000L; SourceCoordinator sourceCoordinator1 = @@ -116,7 +115,7 @@ public void testWatermarkAlignmentWithTwoGroups() throws Exception { } } - protected SourceCoordinator getAndStartNewSourceCoordinator( + private SourceCoordinator getAndStartNewSourceCoordinator( WatermarkAlignmentParams watermarkAlignmentParams, AutoCloseableRegistry closeableRegistry) throws Exception { @@ -138,7 +137,8 @@ private void reportWatermarkEvent( private void assertLatestWatermarkAlignmentEvent(int subtask, long expectedWatermark) { List events = receivingTasks.getSentEventsForSubtask(subtask); - assertFalse(events.isEmpty()); - assertEquals(new WatermarkAlignmentEvent(expectedWatermark), events.get(events.size() - 1)); + assertThat(events).isNotEmpty(); + assertThat(events.get(events.size() - 1)) + .isEqualTo(new WatermarkAlignmentEvent(expectedWatermark)); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java index 380e65c1c7e32..2ada5813d7d62 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java @@ -27,8 +27,7 @@ import org.apache.flink.runtime.source.event.AddSplitEvent; import org.apache.flink.runtime.source.event.ReaderRegistrationEvent; -import org.hamcrest.Matchers; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.util.Arrays; import java.util.Collections; @@ -39,55 +38,53 @@ import static org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils.getSplitsAssignment; import static org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils.verifyAssignment; import static org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils.verifyException; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; +import static org.assertj.core.api.Assertions.assertThat; /** Unit test for {@link SourceCoordinatorContext}. */ -public class SourceCoordinatorContextTest extends SourceCoordinatorTestBase { +class SourceCoordinatorContextTest extends SourceCoordinatorTestBase { @Test - public void testRegisterReader() throws Exception { + void testRegisterReader() throws Exception { sourceReady(); List readerInfo = registerReaders(); - assertTrue(context.registeredReaders().containsKey(0)); - assertTrue(context.registeredReaders().containsKey(1)); - assertEquals(readerInfo.get(0), context.registeredReaders().get(0)); - assertEquals(readerInfo.get(1), context.registeredReaders().get(1)); + assertThat(context.registeredReaders()).containsKey(0); + assertThat(context.registeredReaders()).containsKey(1); + assertThat(context.registeredReaders().get(0)).isEqualTo(readerInfo.get(0)); + assertThat(context.registeredReaders().get(1)).isEqualTo(readerInfo.get(1)); final TestingSplitEnumerator enumerator = getEnumerator(); - assertThat(enumerator.getRegisteredReaders(), Matchers.containsInAnyOrder(0, 1, 2)); + assertThat(enumerator.getRegisteredReaders()).containsExactlyInAnyOrder(0, 1, 2); } @Test - public void testTaskFailureUnregistersReader() throws Exception { + void testTaskFailureUnregistersReader() throws Exception { sourceReady(); List readerInfo = registerReaders(); sourceCoordinator.subtaskFailed(0, null); waitForCoordinatorToProcessActions(); - assertEquals("Only reader 2 should be registered.", 2, context.registeredReaders().size()); - assertNull(context.registeredReaders().get(0)); - assertEquals(readerInfo.get(1), context.registeredReaders().get(1)); - assertEquals(readerInfo.get(2), context.registeredReaders().get(2)); + assertThat(context.registeredReaders()) + .as("Only reader 2 should be registered.") + .hasSize(2); + assertThat(context.registeredReaders().get(0)).isNull(); + assertThat(context.registeredReaders().get(1)).isEqualTo(readerInfo.get(1)); + assertThat(context.registeredReaders().get(2)).isEqualTo(readerInfo.get(2)); } @Test - public void testUnregisterUnregisteredReader() { + void testUnregisterUnregisteredReader() { context.unregisterSourceReader(0); } @Test - public void testAssignSplitsFromCoordinatorExecutor() throws Exception { + void testAssignSplitsFromCoordinatorExecutor() throws Exception { testAssignSplits(true); } @Test - public void testAssignSplitsFromOtherThread() throws Exception { + void testAssignSplitsFromOtherThread() throws Exception { testAssignSplits(false); } @@ -112,28 +109,27 @@ private void testAssignSplits(boolean fromCoordinatorExecutor) throws Exception Arrays.asList("1", "2"), splitSplitAssignmentTracker.uncheckpointedAssignments().get(1)); // The OperatorCoordinatorContext should have received the event sending call. - assertEquals( - "There should be two events sent to the subtasks.", - 2, - receivingTasks.getNumberOfSentEvents()); + assertThat(receivingTasks.getNumberOfSentEvents()) + .as("There should be two events sent to the subtasks.") + .isEqualTo(2); // Assert the events to subtask0. List eventsToSubtask0 = receivingTasks.getSentEventsForSubtask(0); - assertEquals(1, eventsToSubtask0.size()); + assertThat(eventsToSubtask0).hasSize(1); OperatorEvent event = eventsToSubtask0.get(0); - assertTrue(event instanceof AddSplitEvent); + assertThat(event).isInstanceOf(AddSplitEvent.class); verifyAssignment( Collections.singletonList("0"), ((AddSplitEvent) event).splits(new MockSourceSplitSerializer())); } @Test - public void testAssignSplitToUnregisteredReaderFromCoordinatorExecutor() throws Exception { + void testAssignSplitToUnregisteredReaderFromCoordinatorExecutor() throws Exception { testAssignSplitToUnregisterdReader(true); } @Test - public void testAssignSplitToUnregisteredReaderFromOtherThread() throws Exception { + void testAssignSplitToUnregisteredReaderFromOtherThread() throws Exception { testAssignSplitToUnregisterdReader(false); } @@ -157,8 +153,7 @@ private void testAssignSplitToUnregisterdReader(boolean fromCoordinatorExecutor) } @Test - public void testExceptionInRunnableFailsTheJob() - throws InterruptedException, ExecutionException { + void testExceptionInRunnableFailsTheJob() throws InterruptedException, ExecutionException { ManuallyTriggeredScheduledExecutorService manualWorkerExecutor = new ManuallyTriggeredScheduledExecutorService(); // need the factory to have the exception handler set @@ -185,11 +180,11 @@ public void testExceptionInRunnableFailsTheJob() // blocks until the job is failed: wait that the uncaught exception handler calls // operatorCoordinatorContext#failJob() which completes the future operatorCoordinatorContext.getJobFailedFuture().get(); - assertTrue(operatorCoordinatorContext.isJobFailed()); + assertThat(operatorCoordinatorContext.isJobFailed()).isTrue(); } @Test - public void testCallableInterruptedDuringShutdownDoNotFailJob() throws InterruptedException { + void testCallableInterruptedDuringShutdownDoNotFailJob() throws InterruptedException { AtomicReference expectedError = new AtomicReference<>(null); ManuallyTriggeredScheduledExecutorService manualWorkerExecutor = @@ -222,8 +217,8 @@ public void testCallableInterruptedDuringShutdownDoNotFailJob() throws Interrupt testingContext.close(); manualCoordinatorExecutor.triggerAll(); - assertTrue(expectedError.get() instanceof InterruptedException); - assertFalse(operatorCoordinatorContext.isJobFailed()); + assertThat(expectedError.get()).isInstanceOf(InterruptedException.class); + assertThat(operatorCoordinatorContext.isJobFailed()).isFalse(); } // ------------------------ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java index 12d29bfa51268..3bb9188bd8ab9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java @@ -41,7 +41,7 @@ import org.apache.flink.runtime.source.event.SourceEventWrapper; import org.apache.flink.util.function.ThrowingRunnable; -import org.junit.Test; +import org.junit.jupiter.api.Test; import javax.annotation.Nullable; @@ -62,18 +62,14 @@ import static org.apache.flink.core.testutils.CommonTestUtils.waitUtil; import static org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils.verifyAssignment; import static org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils.verifyException; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.assertj.core.api.Assertions.assertThat; /** Unit tests for {@link SourceCoordinator}. */ @SuppressWarnings("serial") -public class SourceCoordinatorTest extends SourceCoordinatorTestBase { +class SourceCoordinatorTest extends SourceCoordinatorTestBase { @Test - public void testThrowExceptionWhenNotStarted() { + void testThrowExceptionWhenNotStarted() { // The following methods should only be invoked after the source coordinator has started. String failureMessage = "Call should fail when source coordinator has not started yet."; verifyException( @@ -95,7 +91,7 @@ public void testThrowExceptionWhenNotStarted() { } @Test - public void testRestCheckpointAfterCoordinatorStarted() throws Exception { + void testRestCheckpointAfterCoordinatorStarted() throws Exception { // The following methods should only be invoked after the source coordinator has started. sourceCoordinator.start(); verifyException( @@ -105,34 +101,34 @@ public void testRestCheckpointAfterCoordinatorStarted() throws Exception { } @Test - public void testStart() throws Exception { + void testStart() throws Exception { sourceCoordinator.start(); waitForCoordinatorToProcessActions(); - assertTrue(getEnumerator().isStarted()); + assertThat(getEnumerator().isStarted()).isTrue(); } @Test - public void testClosed() throws Exception { + void testClosed() throws Exception { sourceCoordinator.start(); sourceCoordinator.close(); - assertTrue(getEnumerator().isClosed()); + assertThat(getEnumerator().isClosed()).isTrue(); } @Test - public void testHandleSourceEvent() throws Exception { + void testHandleSourceEvent() throws Exception { sourceReady(); SourceEvent sourceEvent = new SourceEvent() {}; sourceCoordinator.handleEventFromOperator(0, new SourceEventWrapper(sourceEvent)); waitForCoordinatorToProcessActions(); - assertEquals(1, getEnumerator().getHandledSourceEvent().size()); - assertEquals(sourceEvent, getEnumerator().getHandledSourceEvent().get(0)); + assertThat(getEnumerator().getHandledSourceEvent()).hasSize(1); + assertThat(getEnumerator().getHandledSourceEvent().get(0)).isEqualTo(sourceEvent); } @Test - public void testCheckpointCoordinatorAndRestore() throws Exception { + void testCheckpointCoordinatorAndRestore() throws Exception { sourceReady(); addTestingSplitSet(6); @@ -150,19 +146,17 @@ public void testCheckpointCoordinatorAndRestore() throws Exception { TestingSplitEnumerator restoredEnumerator = (TestingSplitEnumerator) restoredCoordinator.getEnumerator(); SourceCoordinatorContext restoredContext = restoredCoordinator.getContext(); - assertEquals( - "2 splits should have been assigned to reader 0", - 4, - restoredEnumerator.getUnassignedSplits().size()); - assertTrue(restoredEnumerator.getContext().registeredReaders().isEmpty()); - assertEquals( - "Registered readers should not be recovered by restoring", - 0, - restoredContext.registeredReaders().size()); + assertThat(restoredEnumerator.getUnassignedSplits()) + .as("2 splits should have been assigned to reader 0") + .hasSize(4); + assertThat(restoredEnumerator.getContext().registeredReaders()).isEmpty(); + assertThat(restoredContext.registeredReaders()) + .as("Registered readers should not be recovered by restoring") + .isEmpty(); } @Test - public void testSubtaskFailedAndRevertUncompletedAssignments() throws Exception { + void testSubtaskFailedAndRevertUncompletedAssignments() throws Exception { sourceReady(); addTestingSplitSet(6); @@ -179,8 +173,8 @@ public void testSubtaskFailedAndRevertUncompletedAssignments() throws Exception // check the state. waitForCoordinatorToProcessActions(); - assertEquals(4, getEnumerator().getUnassignedSplits().size()); - assertTrue(splitSplitAssignmentTracker.uncheckpointedAssignments().isEmpty()); + assertThat(getEnumerator().getUnassignedSplits()).hasSize(4); + assertThat(splitSplitAssignmentTracker.uncheckpointedAssignments()).isEmpty(); verifyAssignment( Arrays.asList("0", "1"), splitSplitAssignmentTracker.assignmentsByCheckpointId().get(100L).get(0)); @@ -193,24 +187,24 @@ public void testSubtaskFailedAndRevertUncompletedAssignments() throws Exception sourceCoordinator.subtaskReset(0, 99L); waitForCoordinatorToProcessActions(); - assertFalse( - "Reader 0 should have been unregistered.", - context.registeredReaders().containsKey(0)); + assertThat(context.registeredReaders()) + .as("Reader 0 should have been unregistered.") + .doesNotContainKey(0); // The tracker should have reverted all the splits assignment to reader 0. for (Map assignment : splitSplitAssignmentTracker.assignmentsByCheckpointId().values()) { - assertFalse( - "Assignment in uncompleted checkpoint should have been reverted.", - assignment.containsKey(0)); + assertThat(assignment) + .as("Assignment in uncompleted checkpoint should have been reverted.") + .doesNotContainKey(0); } - assertFalse(splitSplitAssignmentTracker.uncheckpointedAssignments().containsKey(0)); + assertThat(splitSplitAssignmentTracker.uncheckpointedAssignments()).doesNotContainKey(0); // The split enumerator should now contains the splits used to b // assigned to reader 0. - assertEquals(7, getEnumerator().getUnassignedSplits().size()); + assertThat(getEnumerator().getUnassignedSplits()).hasSize(7); } @Test - public void testFailedSubtaskDoNotRevertCompletedCheckpoint() throws Exception { + void testFailedSubtaskDoNotRevertCompletedCheckpoint() throws Exception { sourceReady(); addTestingSplitSet(6); @@ -226,15 +220,15 @@ public void testFailedSubtaskDoNotRevertCompletedCheckpoint() throws Exception { waitForCoordinatorToProcessActions(); - assertEquals(100L, (long) getEnumerator().getSuccessfulCheckpoints().get(0)); - assertFalse(context.registeredReaders().containsKey(0)); - assertEquals(4, getEnumerator().getUnassignedSplits().size()); - assertFalse(splitSplitAssignmentTracker.uncheckpointedAssignments().containsKey(0)); - assertTrue(splitSplitAssignmentTracker.assignmentsByCheckpointId().isEmpty()); + assertThat(getEnumerator().getSuccessfulCheckpoints().get(0)).isEqualTo(100); + assertThat(context.registeredReaders()).doesNotContainKey(0); + assertThat(getEnumerator().getUnassignedSplits()).hasSize(4); + assertThat(splitSplitAssignmentTracker.uncheckpointedAssignments()).doesNotContainKey(0); + assertThat(splitSplitAssignmentTracker.assignmentsByCheckpointId()).isEmpty(); } @Test - public void testFailJobWhenExceptionThrownFromStart() throws Exception { + void testFailJobWhenExceptionThrownFromStart() throws Exception { final RuntimeException failureReason = new RuntimeException("Artificial Exception"); try (final MockSplitEnumeratorContext enumeratorContext = new MockSplitEnumeratorContext<>(1); @@ -258,12 +252,12 @@ public void start() { () -> operatorCoordinatorContext.isJobFailed(), Duration.ofSeconds(10), "The job should have failed due to the artificial exception."); - assertEquals(failureReason, operatorCoordinatorContext.getJobFailureReason()); + assertThat(operatorCoordinatorContext.getJobFailureReason()).isEqualTo(failureReason); } } @Test - public void testFailJobWhenExceptionThrownFromEnumeratorCreation() throws Exception { + void testFailJobWhenExceptionThrownFromEnumeratorCreation() throws Exception { final RuntimeException failureReason = new RuntimeException("Artificial Exception"); final SourceCoordinator coordinator = @@ -279,12 +273,12 @@ public void testFailJobWhenExceptionThrownFromEnumeratorCreation() throws Except coordinator.start(); - assertTrue(operatorCoordinatorContext.isJobFailed()); - assertEquals(failureReason, operatorCoordinatorContext.getJobFailureReason()); + assertThat(operatorCoordinatorContext.isJobFailed()).isTrue(); + assertThat(operatorCoordinatorContext.getJobFailureReason()).isEqualTo(failureReason); } @Test - public void testErrorThrownFromSplitEnumerator() throws Exception { + void testErrorThrownFromSplitEnumerator() throws Exception { final Error error = new Error("Test Error"); try (final MockSplitEnumeratorContext enumeratorContext = new MockSplitEnumeratorContext<>(1); @@ -310,12 +304,12 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { () -> operatorCoordinatorContext.isJobFailed(), Duration.ofSeconds(10), "The job should have failed due to the artificial exception."); - assertEquals(error, operatorCoordinatorContext.getJobFailureReason()); + assertThat(operatorCoordinatorContext.getJobFailureReason()).isEqualTo(error); } } @Test - public void testBlockOnClose() throws Exception { + void testBlockOnClose() throws Exception { // It is possible that the split enumerator submits some heavy-duty work to the // coordinator executor which blocks the coordinator closure. final CountDownLatch latch = new CountDownLatch(1); @@ -358,7 +352,7 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { future.exceptionally( e -> { - assertTrue(e instanceof TimeoutException); + assertThat(e).isInstanceOf(TimeoutException.class); return null; }) .get(); @@ -371,7 +365,7 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { } @Test - public void testUserClassLoaderWhenCreatingNewEnumerator() throws Exception { + void testUserClassLoaderWhenCreatingNewEnumerator() throws Exception { final ClassLoader testClassLoader = new URLClassLoader(new URL[0]); final OperatorCoordinator.Context context = new MockOperatorCoordinatorContext(new OperatorID(), testClassLoader); @@ -390,15 +384,15 @@ public void testUserClassLoaderWhenCreatingNewEnumerator() throws Exception { coordinator.start(); final ClassLoaderTestEnumerator enumerator = source.createEnumeratorFuture.get(); - assertSame(testClassLoader, enumerator.constructorClassLoader); - assertSame(testClassLoader, enumerator.threadClassLoader.get()); + assertThat(enumerator.constructorClassLoader).isSameAs(testClassLoader); + assertThat(enumerator.threadClassLoader.get()).isSameAs(testClassLoader); // cleanup coordinator.close(); } @Test - public void testUserClassLoaderWhenRestoringEnumerator() throws Exception { + void testUserClassLoaderWhenRestoringEnumerator() throws Exception { final ClassLoader testClassLoader = new URLClassLoader(new URL[0]); final OperatorCoordinator.Context context = new MockOperatorCoordinatorContext(new OperatorID(), testClassLoader); @@ -418,15 +412,15 @@ public void testUserClassLoaderWhenRestoringEnumerator() throws Exception { coordinator.start(); final ClassLoaderTestEnumerator enumerator = source.restoreEnumeratorFuture.get(); - assertSame(testClassLoader, enumerator.constructorClassLoader); - assertSame(testClassLoader, enumerator.threadClassLoader.get()); + assertThat(enumerator.constructorClassLoader).isSameAs(testClassLoader); + assertThat(enumerator.threadClassLoader.get()).isSameAs(testClassLoader); // cleanup coordinator.close(); } @Test - public void testSerdeBackwardCompatibility() throws Exception { + void testSerdeBackwardCompatibility() throws Exception { sourceReady(); addTestingSplitSet(6); @@ -445,9 +439,9 @@ public void testSerdeBackwardCompatibility() throws Exception { SourceCoordinatorContext restoredContext = restoredCoordinator.getContext(); // Check if enumerator is restored correctly - assertEquals(splits, restoredEnumerator.getUnassignedSplits()); - assertTrue(restoredEnumerator.getHandledSourceEvent().isEmpty()); - assertEquals(0, restoredContext.registeredReaders().size()); + assertThat(restoredEnumerator.getUnassignedSplits()).isEqualTo(splits); + assertThat(restoredEnumerator.getHandledSourceEvent()).isEmpty(); + assertThat(restoredContext.registeredReaders()).isEmpty(); } // ------------------------------------------------------------------------ @@ -477,14 +471,6 @@ private byte[] createCheckpointDataWithSerdeV0(Set splits) thro return serializer.getCopyOfBuffer(); } - private void check(Runnable runnable) { - try { - coordinatorExecutor.submit(runnable).get(); - } catch (Exception e) { - fail("Test failed due to " + e); - } - } - private static byte[] createEmptyCheckpoint() throws Exception { return SourceCoordinator.writeCheckpointBytes( Collections.emptySet(), new MockSplitEnumeratorCheckpointSerializer()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java index ce45e9035520c..6fd36c9f3a7aa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java @@ -31,8 +31,8 @@ import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.concurrent.ExecutorThreadFactory; -import org.junit.After; -import org.junit.Before; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import java.util.ArrayList; import java.util.List; @@ -44,10 +44,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import static org.junit.Assert.assertNotNull; +import static org.assertj.core.api.Assertions.assertThat; /** The test base for SourceCoordinator related tests. */ -public abstract class SourceCoordinatorTestBase { +abstract class SourceCoordinatorTestBase { protected static final String OPERATOR_NAME = "TestOperator"; protected static final OperatorID TEST_OPERATOR_ID = new OperatorID(1234L, 5678L); @@ -69,8 +69,8 @@ public abstract class SourceCoordinatorTestBase { // ------------------------------------------------------------------------ - @Before - public void setup() throws Exception { + @BeforeEach + void setup() { receivingTasks = EventReceivingTasks.createForRunningTasks(); operatorCoordinatorContext = new MockOperatorCoordinatorContext(TEST_OPERATOR_ID, NUM_SUBTASKS); @@ -85,8 +85,8 @@ public void setup() throws Exception { context = sourceCoordinator.getContext(); } - @After - public void cleanUp() throws InterruptedException, TimeoutException { + @AfterEach + void cleanUp() throws InterruptedException, TimeoutException { coordinatorExecutor.shutdown(); if (!coordinatorExecutor.awaitTermination(10, TimeUnit.SECONDS)) { throw new TimeoutException("Failed to close the CoordinatorExecutor before timeout."); @@ -99,7 +99,7 @@ protected TestingSplitEnumerator getEnumerator() { if (enumerator == null) { enumerator = (TestingSplitEnumerator) sourceCoordinator.getEnumerator(); - assertNotNull("source was not started", enumerator); + assertThat(enumerator).as("source was not started").isNotNull(); } return enumerator; }