Skip to content

Commit

Permalink
[hotfix] Migrate SourceCoordinator tests to JUnit5
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuzhurk committed Jul 22, 2022
1 parent 81379a5 commit b27b1a9
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 142 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,14 @@ Licensed to the Apache Software Foundation (ASF) under one
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.util.function.ThrowingRunnable;

import org.hamcrest.Matchers;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** A util class containing the helper methods for the coordinator tests. */
class CoordinatorTestUtils {
Expand All @@ -58,24 +55,15 @@ static SplitsAssignment<MockSourceSplit> getSplitsAssignment(
/** Check the actual assignment meets the expectation. */
static void verifyAssignment(
List<String> expectedSplitIds, Collection<MockSourceSplit> actualAssignment) {
assertEquals(expectedSplitIds.size(), actualAssignment.size());
assertThat(actualAssignment.size()).isEqualTo(expectedSplitIds.size());
int i = 0;
for (MockSourceSplit split : actualAssignment) {
assertEquals(expectedSplitIds.get(i++), split.splitId());
assertThat(split.splitId()).isEqualTo(expectedSplitIds.get(i++));
}
}

static void verifyException(
ThrowingRunnable<Throwable> runnable, String failureMessage, String errorMessage) {
try {
runnable.run();
fail(failureMessage);
} catch (Throwable t) {
Throwable rootCause = t;
while (rootCause.getCause() != null) {
rootCause = rootCause.getCause();
}
assertThat(rootCause.getMessage(), Matchers.startsWith(errorMessage));
}
assertThatThrownBy(runnable::run, failureMessage).hasStackTraceContaining(errorMessage);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,18 @@
import org.apache.flink.runtime.source.event.ReportedWatermarkEvent;
import org.apache.flink.runtime.source.event.WatermarkAlignmentEvent;

import org.junit.Test;
import org.junit.jupiter.api.Test;

import java.util.List;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.assertj.core.api.Assertions.assertThat;

/** Unit tests for watermark alignment of the {@link SourceCoordinator}. */
@SuppressWarnings("serial")
public class SourceCoordinatorAlignmentTest extends SourceCoordinatorTestBase {
class SourceCoordinatorAlignmentTest extends SourceCoordinatorTestBase {

@Test
public void testWatermarkAlignment() throws Exception {
void testWatermarkAlignment() throws Exception {
try (AutoCloseableRegistry closeableRegistry = new AutoCloseableRegistry()) {
SourceCoordinator<?, ?> sourceCoordinator1 =
getAndStartNewSourceCoordinator(
Expand All @@ -59,7 +58,7 @@ public void testWatermarkAlignment() throws Exception {
}

@Test
public void testWatermarkAlignmentWithIdleness() throws Exception {
void testWatermarkAlignmentWithIdleness() throws Exception {
try (AutoCloseableRegistry closeableRegistry = new AutoCloseableRegistry()) {
SourceCoordinator<?, ?> sourceCoordinator1 =
getAndStartNewSourceCoordinator(
Expand Down Expand Up @@ -88,7 +87,7 @@ public void testWatermarkAlignmentWithIdleness() throws Exception {
}

@Test
public void testWatermarkAlignmentWithTwoGroups() throws Exception {
void testWatermarkAlignmentWithTwoGroups() throws Exception {
try (AutoCloseableRegistry closeableRegistry = new AutoCloseableRegistry()) {
long maxDrift = 1000L;
SourceCoordinator<?, ?> sourceCoordinator1 =
Expand Down Expand Up @@ -116,7 +115,7 @@ public void testWatermarkAlignmentWithTwoGroups() throws Exception {
}
}

protected SourceCoordinator<?, ?> getAndStartNewSourceCoordinator(
private SourceCoordinator<?, ?> getAndStartNewSourceCoordinator(
WatermarkAlignmentParams watermarkAlignmentParams,
AutoCloseableRegistry closeableRegistry)
throws Exception {
Expand All @@ -138,7 +137,8 @@ private void reportWatermarkEvent(

private void assertLatestWatermarkAlignmentEvent(int subtask, long expectedWatermark) {
List<OperatorEvent> events = receivingTasks.getSentEventsForSubtask(subtask);
assertFalse(events.isEmpty());
assertEquals(new WatermarkAlignmentEvent(expectedWatermark), events.get(events.size() - 1));
assertThat(events).isNotEmpty();
assertThat(events.get(events.size() - 1))
.isEqualTo(new WatermarkAlignmentEvent(expectedWatermark));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@
import org.apache.flink.runtime.source.event.AddSplitEvent;
import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;

import org.hamcrest.Matchers;
import org.junit.Test;
import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.Collections;
Expand All @@ -39,55 +38,53 @@
import static org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils.getSplitsAssignment;
import static org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils.verifyAssignment;
import static org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils.verifyException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.assertj.core.api.Assertions.assertThat;

/** Unit test for {@link SourceCoordinatorContext}. */
public class SourceCoordinatorContextTest extends SourceCoordinatorTestBase {
class SourceCoordinatorContextTest extends SourceCoordinatorTestBase {

@Test
public void testRegisterReader() throws Exception {
void testRegisterReader() throws Exception {
sourceReady();
List<ReaderInfo> readerInfo = registerReaders();

assertTrue(context.registeredReaders().containsKey(0));
assertTrue(context.registeredReaders().containsKey(1));
assertEquals(readerInfo.get(0), context.registeredReaders().get(0));
assertEquals(readerInfo.get(1), context.registeredReaders().get(1));
assertThat(context.registeredReaders()).containsKey(0);
assertThat(context.registeredReaders()).containsKey(1);
assertThat(context.registeredReaders().get(0)).isEqualTo(readerInfo.get(0));
assertThat(context.registeredReaders().get(1)).isEqualTo(readerInfo.get(1));

final TestingSplitEnumerator<?> enumerator = getEnumerator();
assertThat(enumerator.getRegisteredReaders(), Matchers.containsInAnyOrder(0, 1, 2));
assertThat(enumerator.getRegisteredReaders()).containsExactlyInAnyOrder(0, 1, 2);
}

@Test
public void testTaskFailureUnregistersReader() throws Exception {
void testTaskFailureUnregistersReader() throws Exception {
sourceReady();
List<ReaderInfo> readerInfo = registerReaders();

sourceCoordinator.subtaskFailed(0, null);
waitForCoordinatorToProcessActions();

assertEquals("Only reader 2 should be registered.", 2, context.registeredReaders().size());
assertNull(context.registeredReaders().get(0));
assertEquals(readerInfo.get(1), context.registeredReaders().get(1));
assertEquals(readerInfo.get(2), context.registeredReaders().get(2));
assertThat(context.registeredReaders())
.as("Only reader 2 should be registered.")
.hasSize(2);
assertThat(context.registeredReaders().get(0)).isNull();
assertThat(context.registeredReaders().get(1)).isEqualTo(readerInfo.get(1));
assertThat(context.registeredReaders().get(2)).isEqualTo(readerInfo.get(2));
}

@Test
public void testUnregisterUnregisteredReader() {
void testUnregisterUnregisteredReader() {
context.unregisterSourceReader(0);
}

@Test
public void testAssignSplitsFromCoordinatorExecutor() throws Exception {
void testAssignSplitsFromCoordinatorExecutor() throws Exception {
testAssignSplits(true);
}

@Test
public void testAssignSplitsFromOtherThread() throws Exception {
void testAssignSplitsFromOtherThread() throws Exception {
testAssignSplits(false);
}

Expand All @@ -112,28 +109,27 @@ private void testAssignSplits(boolean fromCoordinatorExecutor) throws Exception
Arrays.asList("1", "2"),
splitSplitAssignmentTracker.uncheckpointedAssignments().get(1));
// The OperatorCoordinatorContext should have received the event sending call.
assertEquals(
"There should be two events sent to the subtasks.",
2,
receivingTasks.getNumberOfSentEvents());
assertThat(receivingTasks.getNumberOfSentEvents())
.as("There should be two events sent to the subtasks.")
.isEqualTo(2);

// Assert the events to subtask0.
List<OperatorEvent> eventsToSubtask0 = receivingTasks.getSentEventsForSubtask(0);
assertEquals(1, eventsToSubtask0.size());
assertThat(eventsToSubtask0).hasSize(1);
OperatorEvent event = eventsToSubtask0.get(0);
assertTrue(event instanceof AddSplitEvent);
assertThat(event).isInstanceOf(AddSplitEvent.class);
verifyAssignment(
Collections.singletonList("0"),
((AddSplitEvent<MockSourceSplit>) event).splits(new MockSourceSplitSerializer()));
}

@Test
public void testAssignSplitToUnregisteredReaderFromCoordinatorExecutor() throws Exception {
void testAssignSplitToUnregisteredReaderFromCoordinatorExecutor() throws Exception {
testAssignSplitToUnregisterdReader(true);
}

@Test
public void testAssignSplitToUnregisteredReaderFromOtherThread() throws Exception {
void testAssignSplitToUnregisteredReaderFromOtherThread() throws Exception {
testAssignSplitToUnregisterdReader(false);
}

Expand All @@ -157,8 +153,7 @@ private void testAssignSplitToUnregisterdReader(boolean fromCoordinatorExecutor)
}

@Test
public void testExceptionInRunnableFailsTheJob()
throws InterruptedException, ExecutionException {
void testExceptionInRunnableFailsTheJob() throws InterruptedException, ExecutionException {
ManuallyTriggeredScheduledExecutorService manualWorkerExecutor =
new ManuallyTriggeredScheduledExecutorService();
// need the factory to have the exception handler set
Expand All @@ -185,11 +180,11 @@ public void testExceptionInRunnableFailsTheJob()
// blocks until the job is failed: wait that the uncaught exception handler calls
// operatorCoordinatorContext#failJob() which completes the future
operatorCoordinatorContext.getJobFailedFuture().get();
assertTrue(operatorCoordinatorContext.isJobFailed());
assertThat(operatorCoordinatorContext.isJobFailed()).isTrue();
}

@Test
public void testCallableInterruptedDuringShutdownDoNotFailJob() throws InterruptedException {
void testCallableInterruptedDuringShutdownDoNotFailJob() throws InterruptedException {
AtomicReference<Throwable> expectedError = new AtomicReference<>(null);

ManuallyTriggeredScheduledExecutorService manualWorkerExecutor =
Expand Down Expand Up @@ -222,8 +217,8 @@ public void testCallableInterruptedDuringShutdownDoNotFailJob() throws Interrupt
testingContext.close();
manualCoordinatorExecutor.triggerAll();

assertTrue(expectedError.get() instanceof InterruptedException);
assertFalse(operatorCoordinatorContext.isJobFailed());
assertThat(expectedError.get()).isInstanceOf(InterruptedException.class);
assertThat(operatorCoordinatorContext.isJobFailed()).isFalse();
}

// ------------------------
Expand Down
Loading

0 comments on commit b27b1a9

Please sign in to comment.