Skip to content

Commit

Permalink
[hotfix][connector/testing-framework] Let ExternalContext#generateTes…
Browse files Browse the repository at this point in the history
…tData returns List to preserve order
  • Loading branch information
PatrickRen authored and AHeise committed Oct 27, 2021
1 parent aebd310 commit 27ec51c
Show file tree
Hide file tree
Showing 9 changed files with 42 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.testcontainers.containers.KafkaContainer;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -155,7 +154,7 @@ public SourceSplitDataWriter<String> createSourceSplitDataWriter() {
}

@Override
public Collection<String> generateTestData(int splitIndex, long seed) {
public List<String> generateTestData(int splitIndex, long seed) {
Random random = new Random(seed);
List<String> randomStringRecords = new ArrayList<>();
int recordNum =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import org.apache.pulsar.client.api.RegexSubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
Expand Down Expand Up @@ -92,7 +92,7 @@ public SourceSplitDataWriter<String> createSourceSplitDataWriter() {
}

@Override
public Collection<String> generateTestData(int splitIndex, long seed) {
public List<String> generateTestData(int splitIndex, long seed) {
return generateStringTestData(splitIndex, seed);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;

Expand Down Expand Up @@ -101,7 +101,7 @@ public SourceSplitDataWriter<String> createSourceSplitDataWriter() {
}

@Override
public Collection<String> generateTestData(int splitIndex, long seed) {
public List<String> generateTestData(int splitIndex, long seed) {
return generateStringTestData(splitIndex, seed);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.pulsar.common.util.Murmur3_32Hash;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

import static java.util.Collections.singletonList;
Expand Down Expand Up @@ -116,7 +115,7 @@ public SourceSplitDataWriter<String> createSourceSplitDataWriter() {
}

@Override
public Collection<String> generateTestData(int splitIndex, long seed) {
public List<String> generateTestData(int splitIndex, long seed) {
return generateStringTestData(splitIndex, seed);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.pulsar.client.api.SubscriptionType;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema;
Expand Down Expand Up @@ -90,7 +89,7 @@ public SourceSplitDataWriter<String> createSourceSplitDataWriter() {
}

@Override
public Collection<String> generateTestData(int splitIndex, long seed) {
public List<String> generateTestData(int splitIndex, long seed) {
return generateStringTestData(splitIndex, seed);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.flink.api.connector.source.Source;

import java.io.Serializable;
import java.util.Collection;
import java.util.List;

/**
* Context of the test interacting with external system.
Expand Down Expand Up @@ -64,9 +64,9 @@ public interface ExternalContext<T> extends Serializable, AutoCloseable {
*
* @param splitIndex index of the split.
* @param seed Seed for generating random test data set.
* @return Collection of generated test data.
* @return List of generated test data.
*/
Collection<T> generateTestData(int splitIndex, long seed);
List<T> generateTestData(int splitIndex, long seed);

/**
* Factory for {@link ExternalContext}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
Expand Down Expand Up @@ -102,7 +101,7 @@ public void testSourceSingleSplit(TestEnvironment testEnv, ExternalContext<T> ex
throws Exception {

// Write test data to external system
final Collection<T> testRecords = generateAndWriteTestData(0, externalContext);
final List<T> testRecords = generateAndWriteTestData(0, externalContext);

// Build and execute Flink job
StreamExecutionEnvironment execEnv = testEnv.createExecutionEnvironment();
Expand Down Expand Up @@ -137,9 +136,9 @@ public void testMultipleSplits(TestEnvironment testEnv, ExternalContext<T> exter
throws Exception {

final int splitNumber = 4;
final List<Collection<T>> testRecordCollections = new ArrayList<>();
final List<List<T>> testRecordsLists = new ArrayList<>();
for (int i = 0; i < splitNumber; i++) {
testRecordCollections.add(generateAndWriteTestData(i, externalContext));
testRecordsLists.add(generateAndWriteTestData(i, externalContext));
}

LOG.debug("Build and execute Flink job");
Expand All @@ -153,7 +152,7 @@ public void testMultipleSplits(TestEnvironment testEnv, ExternalContext<T> exter
.setParallelism(splitNumber)
.executeAndCollect("Source Multiple Split Test")) {
// Check test result
assertThat(resultIterator, matchesMultipleSplitTestData(testRecordCollections));
assertThat(resultIterator, matchesMultipleSplitTestData(testRecordsLists));
}
}

Expand All @@ -177,9 +176,9 @@ public void testIdleReader(TestEnvironment testEnv, ExternalContext<T> externalC
throws Exception {

final int splitNumber = 4;
final List<Collection<T>> testRecordCollections = new ArrayList<>();
final List<List<T>> testRecordsLists = new ArrayList<>();
for (int i = 0; i < splitNumber; i++) {
testRecordCollections.add(generateAndWriteTestData(i, externalContext));
testRecordsLists.add(generateAndWriteTestData(i, externalContext));
}

try (CloseableIterator<T> resultIterator =
Expand All @@ -189,8 +188,8 @@ public void testIdleReader(TestEnvironment testEnv, ExternalContext<T> externalC
WatermarkStrategy.noWatermarks(),
"Tested Source")
.setParallelism(splitNumber + 1)
.executeAndCollect("Redundant Parallelism Test")) {
assertThat(resultIterator, matchesMultipleSplitTestData(testRecordCollections));
.executeAndCollect("Idle Reader Test")) {
assertThat(resultIterator, matchesMultipleSplitTestData(testRecordsLists));
}
}

Expand All @@ -216,7 +215,7 @@ public void testTaskManagerFailure(
throws Exception {
int splitIndex = 0;

final Collection<T> testRecordsBeforeFailure =
final List<T> testRecordsBeforeFailure =
externalContext.generateTestData(
splitIndex, ThreadLocalRandom.current().nextLong());
final SourceSplitDataWriter<T> sourceSplitDataWriter =
Expand Down Expand Up @@ -266,7 +265,7 @@ public void testTaskManagerFailure(
Collections.singletonList(JobStatus.RUNNING),
Deadline.fromNow(Duration.ofSeconds(30)));

final Collection<T> testRecordsAfterFailure =
final List<T> testRecordsAfterFailure =
externalContext.generateTestData(
splitIndex, ThreadLocalRandom.current().nextLong());
sourceSplitDataWriter.writeRecords(testRecordsAfterFailure);
Expand All @@ -290,15 +289,15 @@ public void testTaskManagerFailure(
* Generate a set of test records and write it to the given split writer.
*
* @param externalContext External context
* @return Collection of generated test records
* @return List of generated test records
*/
protected Collection<T> generateAndWriteTestData(
protected List<T> generateAndWriteTestData(
int splitIndex, ExternalContext<T> externalContext) {
final Collection<T> testRecordCollection =
final List<T> testRecords =
externalContext.generateTestData(
splitIndex, ThreadLocalRandom.current().nextLong());
LOG.debug("Writing {} records to external system", testRecordCollection.size());
externalContext.createSourceSplitDataWriter().writeRecords(testRecordCollection);
return testRecordCollection;
LOG.debug("Writing {} records to external system", testRecords.size());
externalContext.createSourceSplitDataWriter().writeRecords(testRecords);
return testRecords;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.hamcrest.TypeSafeDiagnosingMatcher;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
Expand All @@ -35,15 +34,15 @@ public class TestDataMatchers {

// ---------------------------- Matcher Builders ----------------------------------
public static <T> MultipleSplitDataMatcher<T> matchesMultipleSplitTestData(
Collection<Collection<T>> testDataCollections) {
return new MultipleSplitDataMatcher<>(testDataCollections);
List<List<T>> testRecordsLists) {
return new MultipleSplitDataMatcher<>(testRecordsLists);
}

public static <T> SplitDataMatcher<T> matchesSplitTestData(Collection<T> testData) {
public static <T> SplitDataMatcher<T> matchesSplitTestData(List<T> testData) {
return new SplitDataMatcher<>(testData);
}

public static <T> SplitDataMatcher<T> matchesSplitTestData(Collection<T> testData, int limit) {
public static <T> SplitDataMatcher<T> matchesSplitTestData(List<T> testData, int limit) {
return new SplitDataMatcher<>(testData, limit);
}

Expand All @@ -57,17 +56,17 @@ public static <T> SplitDataMatcher<T> matchesSplitTestData(Collection<T> testDat
public static class SplitDataMatcher<T> extends TypeSafeDiagnosingMatcher<Iterator<T>> {
private static final int UNSET = -1;

private final Collection<T> testData;
private final List<T> testData;
private final int limit;

private String mismatchDescription = null;

public SplitDataMatcher(Collection<T> testData) {
public SplitDataMatcher(List<T> testData) {
this.testData = testData;
this.limit = UNSET;
}

public SplitDataMatcher(Collection<T> testData, int limit) {
public SplitDataMatcher(List<T> testData, int limit) {
if (limit > testData.size()) {
throw new IllegalArgumentException(
"Limit validation size should be less than number of test records");
Expand Down Expand Up @@ -121,12 +120,12 @@ public void describeTo(Description description) {
/**
* Matcher for validating test data from multiple splits.
*
* <p>Each collection has a pointer (iterator) pointing to current checking record. When a
* record is received in the stream, it will be compared to all current pointing records in
* collections, and the pointer to the identical record will move forward.
* <p>Each list has a pointer (iterator) pointing to current checking record. When a record is
* received in the stream, it will be compared to all current pointing records in lists, and the
* pointer to the identical record will move forward.
*
* <p>If the stream preserves the correctness and order of records in all splits, all pointers
* should reach the end of the collection finally.
* should reach the end of the list finally.
*
* @param <T> Type of validating record
*/
Expand All @@ -136,9 +135,9 @@ public static class MultipleSplitDataMatcher<T> extends TypeSafeDiagnosingMatche

private String mismatchDescription = null;

public MultipleSplitDataMatcher(Collection<Collection<T>> testDataCollections) {
for (Collection<T> testDataCollection : testDataCollections) {
testDataIterators.add(new IteratorWithCurrent<>(testDataCollection.iterator()));
public MultipleSplitDataMatcher(List<List<T>> testRecordsLists) {
for (List<T> testRecords : testRecordsLists) {
testDataIterators.add(new IteratorWithCurrent<>(testRecords.iterator()));
}
}

Expand Down Expand Up @@ -187,7 +186,7 @@ private boolean matchThenNext(T record) {
}

/**
* Whether all pointers have reached the end of collections.
* Whether all pointers have reached the end of lists.
*
* @return True if all pointers have reached the end.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,7 @@ class MultipleSplitDataMatcherTest {
private final List<String> splitA = Arrays.asList("alpha", "beta", "gamma");
private final List<String> splitB = Arrays.asList("one", "two", "three");
private final List<String> splitC = Arrays.asList("1", "2", "3");
private final List<Collection<String>> testDataCollection =
Arrays.asList(splitA, splitB, splitC);
private final List<List<String>> testDataCollection = Arrays.asList(splitA, splitB, splitC);

@Test
public void testPositiveCase() {
Expand Down

0 comments on commit 27ec51c

Please sign in to comment.