Skip to content

Commit

Permalink
[FLINK-25549][state/changelog] Migrate flink-dstl to use JUnit5
Browse files Browse the repository at this point in the history
This closes apache#18978.
  • Loading branch information
RocMarshal authored and gaoyunhaii committed Mar 22, 2022
1 parent 948bfda commit 480956f
Show file tree
Hide file tree
Showing 8 changed files with 243 additions and 229 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.util.function.BiConsumerWithException;

import org.junit.Test;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.time.Duration;
Expand Down Expand Up @@ -56,19 +56,18 @@
import static org.apache.flink.changelog.fs.UnregisteredChangelogStorageMetricGroup.createUnregisteredChangelogStorageMetricGroup;
import static org.apache.flink.util.ExceptionUtils.findThrowable;
import static org.apache.flink.util.ExceptionUtils.rethrow;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.fail;

/** {@link BatchingStateChangeUploadScheduler} test. */
public class BatchingStateChangeUploadSchedulerTest {
class BatchingStateChangeUploadSchedulerTest {

private static final int MAX_BYTES_IN_FLIGHT = 10_000;
private final Random random = new Random();

@Test
public void testNoDelayAndThreshold() throws Exception {
void testNoDelayAndThreshold() throws Exception {
withStore(
0,
0,
Expand All @@ -89,7 +88,7 @@ private void upload(BatchingStateChangeUploadScheduler store, List<StateChangeSe
}

@Test
public void testSizeThreshold() throws Exception {
void testSizeThreshold() throws Exception {
int numChanges = 7;
int changeSize = 11;
int threshold = changeSize * numChanges;
Expand All @@ -108,14 +107,14 @@ public void testSizeThreshold() throws Exception {
if (runningSize >= threshold) {
assertSaved(probe, expected);
} else {
assertTrue(probe.getUploaded().isEmpty());
assertThat(probe.getUploaded()).isEmpty();
}
}
});
}

@Test
public void testDelay() throws Exception {
void testDelay() throws Exception {
int delayMs = 50;
ManuallyTriggeredScheduledExecutorService scheduler =
new ManuallyTriggeredScheduledExecutorService();
Expand All @@ -128,20 +127,17 @@ public void testDelay() throws Exception {
scheduler.triggerAll();
List<StateChangeSet> changeSets = getChanges(4);
upload(store, changeSets);
assertTrue(probe.getUploaded().isEmpty());
assertTrue(
scheduler.getAllNonPeriodicScheduledTask().stream()
.anyMatch(
scheduled ->
scheduled.getDelay(MILLISECONDS) == delayMs));
assertThat(probe.getUploaded()).isEmpty();
assertThat(scheduler.getAllNonPeriodicScheduledTask())
.anyMatch(scheduled -> scheduled.getDelay(MILLISECONDS) == delayMs);
scheduler.triggerAllNonPeriodicTasks();
assertEquals(changeSets, probe.getUploaded());
assertThat(probe.getUploaded()).isEqualTo(changeSets);
});
}

/** Test integration with {@link RetryingExecutor}. */
@Test
public void testRetry() throws Exception {
void testRetry() throws Exception {
final int maxAttempts = 5;

try (BatchingStateChangeUploadScheduler store =
Expand Down Expand Up @@ -184,7 +180,7 @@ public UploadTasksResult upload(Collection<UploadTask> tasks)
}

@Test
public void testUploadTimeout() throws Exception {
void testUploadTimeout() throws Exception {
AtomicReference<List<SequenceNumber>> failed = new AtomicReference<>();
UploadTask upload =
new UploadTask(getChanges(4), unused -> {}, (sqn, error) -> failed.set(sqn));
Expand All @@ -201,12 +197,12 @@ public void testUploadTimeout() throws Exception {
}
}

assertTrue(upload.finished.get());
assertEquals(
upload.changeSets.stream()
.map(StateChangeSet::getSequenceNumber)
.collect(Collectors.toSet()),
new HashSet<>(failed.get()));
assertThat(upload.finished.get()).isTrue();
assertThat(
upload.changeSets.stream()
.map(StateChangeSet::getSequenceNumber)
.collect(Collectors.toSet()))
.isEqualTo(new HashSet<>(failed.get()));
}

@Test
Expand Down Expand Up @@ -236,19 +232,20 @@ public void testRetryOnTimeout() throws Exception {
}
}

assertTrue(upload.finished.get());
assertEquals(
upload.changeSets.stream()
.map(StateChangeSet::getSequenceNumber)
.collect(Collectors.toSet()),
succeeded.get().stream()
.map(UploadResult::getSequenceNumber)
.collect(Collectors.toSet()));
assertTrue(failed.get().isEmpty());
assertThat(upload.finished.get()).isTrue();
assertThat(
upload.changeSets.stream()
.map(StateChangeSet::getSequenceNumber)
.collect(Collectors.toSet()))
.isEqualTo(
succeeded.get().stream()
.map(UploadResult::getSequenceNumber)
.collect(Collectors.toSet()));
assertThat(failed.get()).isEmpty();
}

@Test(expected = RejectedExecutionException.class)
public void testErrorHandling() throws Exception {
@Test
void testErrorHandling() throws Exception {
TestingStateChangeUploader probe = new TestingStateChangeUploader();
DirectScheduledExecutorService scheduler = new DirectScheduledExecutorService();
try (BatchingStateChangeUploadScheduler store =
Expand All @@ -265,12 +262,13 @@ public void testErrorHandling() throws Exception {
.getAttemptsPerUpload()),
createUnregisteredChangelogStorageMetricGroup())) {
scheduler.shutdown();
upload(store, getChanges(4));
assertThatThrownBy(() -> upload(store, getChanges(4)))
.isInstanceOf(RejectedExecutionException.class);
}
}

@Test
public void testClose() throws Exception {
void testClose() throws Exception {
TestingStateChangeUploader probe = new TestingStateChangeUploader();
DirectScheduledExecutorService scheduler = new DirectScheduledExecutorService();
DirectScheduledExecutorService retryScheduler = new DirectScheduledExecutorService();
Expand All @@ -287,13 +285,13 @@ public void testClose() throws Exception {
.getAttemptsPerUpload()),
createUnregisteredChangelogStorageMetricGroup())
.close();
assertTrue(probe.isClosed());
assertTrue(scheduler.isShutdown());
assertTrue(retryScheduler.isShutdown());
assertThat(probe.isClosed()).isTrue();
assertThat(scheduler.isShutdown()).isTrue();
assertThat(retryScheduler.isShutdown()).isTrue();
}

@Test
public void testBackPressure() throws Exception {
void testBackPressure() throws Exception {
int sizeLimit = MAX_BYTES_IN_FLIGHT;
CompletableFuture<TestingStateChangeUploader> thresholdExceededFuture =
new CompletableFuture<>();
Expand All @@ -306,13 +304,14 @@ public void testBackPressure() throws Exception {
getChanges(sizeLimit / 2).stream(),
getChanges(sizeLimit / 2).stream())
.collect(Collectors.toList());
assertTrue(uploader.getAvailabilityProvider().isAvailable());
assertTrue(uploader.getAvailabilityProvider().isApproximatelyAvailable());
assertThat(uploader.getAvailabilityProvider().isAvailable()).isTrue();
assertThat(uploader.getAvailabilityProvider().isApproximatelyAvailable())
.isTrue();
upload(uploader, changes1);
assertSaved(probe, changes1); // sent to upload, not finished yet
thresholdExceededFuture.complete(probe);
List<StateChangeSet> changes2 = getChanges(1);
assertFalse(uploader.getAvailabilityProvider().isAvailable());
assertThat(uploader.getAvailabilityProvider().isAvailable()).isFalse();
upload(uploader, changes2); // should block until capacity released
assertSaved(probe, changes1, changes2);
};
Expand All @@ -322,14 +321,14 @@ public void testBackPressure() throws Exception {
TestingStateChangeUploader probe = thresholdExceededFuture.get();
int uploadedInTheBeginning = probe.getUploaded().size();
Thread.sleep(500); // allow failing, i.e. to proceed with upload
assertEquals(uploadedInTheBeginning, probe.getUploaded().size());
assertThat(probe.getUploaded().size()).isEqualTo(uploadedInTheBeginning);
probe.completeUpload(); // release capacity
uploadFuture.join();
assertTrue(uploadedInTheBeginning < probe.getUploaded().size());
assertThat(probe.getUploaded().size()).isGreaterThan(uploadedInTheBeginning);
}

@Test
public void testInterruptedWhenBackPressured() throws Exception {
void testInterruptedWhenBackPressured() throws Exception {
int limit = MAX_BYTES_IN_FLIGHT;
TestScenario test =
(uploader, probe) -> {
Expand All @@ -342,7 +341,7 @@ public void testInterruptedWhenBackPressured() throws Exception {
fail("upload shouldn't succeed after exceeding the limit");
} catch (IOException e) {
if (findThrowable(e, InterruptedException.class).isPresent()) {
assertTrue(probe.getUploaded().isEmpty());
assertThat(probe.getUploaded()).isEmpty();
} else {
rethrow(e);
}
Expand Down Expand Up @@ -404,9 +403,11 @@ private static void withStore(
@SafeVarargs
private final void assertSaved(
TestingStateChangeUploader probe, List<StateChangeSet>... expected) {
assertEquals(
Arrays.stream(expected).flatMap(Collection::stream).collect(Collectors.toList()),
new ArrayList<>(probe.getUploaded()));
assertThat(new ArrayList<>(probe.getUploaded()))
.isEqualTo(
Arrays.stream(expected)
.flatMap(Collection::stream)
.collect(Collectors.toList()));
}

private interface TestScenario
Expand Down
Loading

0 comments on commit 480956f

Please sign in to comment.