From 27ec51c56ad23217b162c458d2878fe2af4e963f Mon Sep 17 00:00:00 2001 From: Qingsheng Ren Date: Mon, 23 Aug 2021 17:18:13 +0800 Subject: [PATCH] [hotfix][connector/testing-framework] Let ExternalContext#generateTestData returns List to preserve order --- .../KafkaSingleTopicExternalContext.java | 3 +- .../cases/MultipleTopicTemplateContext.java | 4 +-- .../cases/SingleTopicConsumingContext.java | 4 +-- .../cases/KeySharedSubscriptionContext.java | 3 +- .../cases/SharedSubscriptionContext.java | 3 +- .../test/common/external/ExternalContext.java | 6 ++-- .../testsuites/SourceTestSuiteBase.java | 33 +++++++++---------- .../test/common/utils/TestDataMatchers.java | 31 +++++++++-------- .../common/utils/TestDataMatchersTest.java | 3 +- 9 files changed, 42 insertions(+), 48 deletions(-) diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaSingleTopicExternalContext.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaSingleTopicExternalContext.java index ad5e31d0d6369..d816500512d8b 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaSingleTopicExternalContext.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaSingleTopicExternalContext.java @@ -42,7 +42,6 @@ import org.testcontainers.containers.KafkaContainer; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -155,7 +154,7 @@ public SourceSplitDataWriter createSourceSplitDataWriter() { } @Override - public Collection generateTestData(int splitIndex, long seed) { + public List generateTestData(int splitIndex, long seed) { Random random = new Random(seed); List randomStringRecords = new ArrayList<>(); int recordNum = diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicTemplateContext.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicTemplateContext.java index a0801ec3ba939..0a5670172127a 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicTemplateContext.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicTemplateContext.java @@ -32,8 +32,8 @@ import org.apache.pulsar.client.api.RegexSubscriptionMode; import org.apache.pulsar.client.api.SubscriptionType; -import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic; @@ -92,7 +92,7 @@ public SourceSplitDataWriter createSourceSplitDataWriter() { } @Override - public Collection generateTestData(int splitIndex, long seed) { + public List generateTestData(int splitIndex, long seed) { return generateStringTestData(splitIndex, seed); } diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SingleTopicConsumingContext.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SingleTopicConsumingContext.java index b89511c0b91eb..e649898ec9b1b 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SingleTopicConsumingContext.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SingleTopicConsumingContext.java @@ -29,8 +29,8 @@ import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment; import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter; -import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; @@ -101,7 +101,7 @@ public SourceSplitDataWriter createSourceSplitDataWriter() { } @Override - public Collection generateTestData(int splitIndex, long seed) { + public List generateTestData(int splitIndex, long seed) { return generateStringTestData(splitIndex, seed); } diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java index e442418784471..5d937ba407802 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java @@ -36,7 +36,6 @@ import org.apache.pulsar.common.util.Murmur3_32Hash; import java.util.ArrayList; -import java.util.Collection; import java.util.List; import static java.util.Collections.singletonList; @@ -116,7 +115,7 @@ public SourceSplitDataWriter createSourceSplitDataWriter() { } @Override - public Collection generateTestData(int splitIndex, long seed) { + public List generateTestData(int splitIndex, long seed) { return generateStringTestData(splitIndex, seed); } diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java index f936b6fa6e48b..52e30b3d30349 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java @@ -33,7 +33,6 @@ import org.apache.pulsar.client.api.SubscriptionType; import java.util.ArrayList; -import java.util.Collection; import java.util.List; import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema; @@ -90,7 +89,7 @@ public SourceSplitDataWriter createSourceSplitDataWriter() { } @Override - public Collection generateTestData(int splitIndex, long seed) { + public List generateTestData(int splitIndex, long seed) { return generateStringTestData(splitIndex, seed); } diff --git a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/external/ExternalContext.java b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/external/ExternalContext.java index 5ed06a90fca4b..f67dcacab95c8 100644 --- a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/external/ExternalContext.java +++ b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/external/ExternalContext.java @@ -23,7 +23,7 @@ import org.apache.flink.api.connector.source.Source; import java.io.Serializable; -import java.util.Collection; +import java.util.List; /** * Context of the test interacting with external system. @@ -64,9 +64,9 @@ public interface ExternalContext extends Serializable, AutoCloseable { * * @param splitIndex index of the split. * @param seed Seed for generating random test data set. - * @return Collection of generated test data. + * @return List of generated test data. */ - Collection generateTestData(int splitIndex, long seed); + List generateTestData(int splitIndex, long seed); /** * Factory for {@link ExternalContext}. diff --git a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/testsuites/SourceTestSuiteBase.java b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/testsuites/SourceTestSuiteBase.java index 5aff6e6606d1a..4639a3f67e42b 100644 --- a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/testsuites/SourceTestSuiteBase.java +++ b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/testsuites/SourceTestSuiteBase.java @@ -50,7 +50,6 @@ import java.time.Duration; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.UUID; @@ -102,7 +101,7 @@ public void testSourceSingleSplit(TestEnvironment testEnv, ExternalContext ex throws Exception { // Write test data to external system - final Collection testRecords = generateAndWriteTestData(0, externalContext); + final List testRecords = generateAndWriteTestData(0, externalContext); // Build and execute Flink job StreamExecutionEnvironment execEnv = testEnv.createExecutionEnvironment(); @@ -137,9 +136,9 @@ public void testMultipleSplits(TestEnvironment testEnv, ExternalContext exter throws Exception { final int splitNumber = 4; - final List> testRecordCollections = new ArrayList<>(); + final List> testRecordsLists = new ArrayList<>(); for (int i = 0; i < splitNumber; i++) { - testRecordCollections.add(generateAndWriteTestData(i, externalContext)); + testRecordsLists.add(generateAndWriteTestData(i, externalContext)); } LOG.debug("Build and execute Flink job"); @@ -153,7 +152,7 @@ public void testMultipleSplits(TestEnvironment testEnv, ExternalContext exter .setParallelism(splitNumber) .executeAndCollect("Source Multiple Split Test")) { // Check test result - assertThat(resultIterator, matchesMultipleSplitTestData(testRecordCollections)); + assertThat(resultIterator, matchesMultipleSplitTestData(testRecordsLists)); } } @@ -177,9 +176,9 @@ public void testIdleReader(TestEnvironment testEnv, ExternalContext externalC throws Exception { final int splitNumber = 4; - final List> testRecordCollections = new ArrayList<>(); + final List> testRecordsLists = new ArrayList<>(); for (int i = 0; i < splitNumber; i++) { - testRecordCollections.add(generateAndWriteTestData(i, externalContext)); + testRecordsLists.add(generateAndWriteTestData(i, externalContext)); } try (CloseableIterator resultIterator = @@ -189,8 +188,8 @@ public void testIdleReader(TestEnvironment testEnv, ExternalContext externalC WatermarkStrategy.noWatermarks(), "Tested Source") .setParallelism(splitNumber + 1) - .executeAndCollect("Redundant Parallelism Test")) { - assertThat(resultIterator, matchesMultipleSplitTestData(testRecordCollections)); + .executeAndCollect("Idle Reader Test")) { + assertThat(resultIterator, matchesMultipleSplitTestData(testRecordsLists)); } } @@ -216,7 +215,7 @@ public void testTaskManagerFailure( throws Exception { int splitIndex = 0; - final Collection testRecordsBeforeFailure = + final List testRecordsBeforeFailure = externalContext.generateTestData( splitIndex, ThreadLocalRandom.current().nextLong()); final SourceSplitDataWriter sourceSplitDataWriter = @@ -266,7 +265,7 @@ public void testTaskManagerFailure( Collections.singletonList(JobStatus.RUNNING), Deadline.fromNow(Duration.ofSeconds(30))); - final Collection testRecordsAfterFailure = + final List testRecordsAfterFailure = externalContext.generateTestData( splitIndex, ThreadLocalRandom.current().nextLong()); sourceSplitDataWriter.writeRecords(testRecordsAfterFailure); @@ -290,15 +289,15 @@ public void testTaskManagerFailure( * Generate a set of test records and write it to the given split writer. * * @param externalContext External context - * @return Collection of generated test records + * @return List of generated test records */ - protected Collection generateAndWriteTestData( + protected List generateAndWriteTestData( int splitIndex, ExternalContext externalContext) { - final Collection testRecordCollection = + final List testRecords = externalContext.generateTestData( splitIndex, ThreadLocalRandom.current().nextLong()); - LOG.debug("Writing {} records to external system", testRecordCollection.size()); - externalContext.createSourceSplitDataWriter().writeRecords(testRecordCollection); - return testRecordCollection; + LOG.debug("Writing {} records to external system", testRecords.size()); + externalContext.createSourceSplitDataWriter().writeRecords(testRecords); + return testRecords; } } diff --git a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/utils/TestDataMatchers.java b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/utils/TestDataMatchers.java index 87d4905070d52..19e34789346ae 100644 --- a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/utils/TestDataMatchers.java +++ b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/utils/TestDataMatchers.java @@ -24,7 +24,6 @@ import org.hamcrest.TypeSafeDiagnosingMatcher; import java.util.ArrayList; -import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; @@ -35,15 +34,15 @@ public class TestDataMatchers { // ---------------------------- Matcher Builders ---------------------------------- public static MultipleSplitDataMatcher matchesMultipleSplitTestData( - Collection> testDataCollections) { - return new MultipleSplitDataMatcher<>(testDataCollections); + List> testRecordsLists) { + return new MultipleSplitDataMatcher<>(testRecordsLists); } - public static SplitDataMatcher matchesSplitTestData(Collection testData) { + public static SplitDataMatcher matchesSplitTestData(List testData) { return new SplitDataMatcher<>(testData); } - public static SplitDataMatcher matchesSplitTestData(Collection testData, int limit) { + public static SplitDataMatcher matchesSplitTestData(List testData, int limit) { return new SplitDataMatcher<>(testData, limit); } @@ -57,17 +56,17 @@ public static SplitDataMatcher matchesSplitTestData(Collection testDat public static class SplitDataMatcher extends TypeSafeDiagnosingMatcher> { private static final int UNSET = -1; - private final Collection testData; + private final List testData; private final int limit; private String mismatchDescription = null; - public SplitDataMatcher(Collection testData) { + public SplitDataMatcher(List testData) { this.testData = testData; this.limit = UNSET; } - public SplitDataMatcher(Collection testData, int limit) { + public SplitDataMatcher(List testData, int limit) { if (limit > testData.size()) { throw new IllegalArgumentException( "Limit validation size should be less than number of test records"); @@ -121,12 +120,12 @@ public void describeTo(Description description) { /** * Matcher for validating test data from multiple splits. * - *

Each collection has a pointer (iterator) pointing to current checking record. When a - * record is received in the stream, it will be compared to all current pointing records in - * collections, and the pointer to the identical record will move forward. + *

Each list has a pointer (iterator) pointing to current checking record. When a record is + * received in the stream, it will be compared to all current pointing records in lists, and the + * pointer to the identical record will move forward. * *

If the stream preserves the correctness and order of records in all splits, all pointers - * should reach the end of the collection finally. + * should reach the end of the list finally. * * @param Type of validating record */ @@ -136,9 +135,9 @@ public static class MultipleSplitDataMatcher extends TypeSafeDiagnosingMatche private String mismatchDescription = null; - public MultipleSplitDataMatcher(Collection> testDataCollections) { - for (Collection testDataCollection : testDataCollections) { - testDataIterators.add(new IteratorWithCurrent<>(testDataCollection.iterator())); + public MultipleSplitDataMatcher(List> testRecordsLists) { + for (List testRecords : testRecordsLists) { + testDataIterators.add(new IteratorWithCurrent<>(testRecords.iterator())); } } @@ -187,7 +186,7 @@ private boolean matchThenNext(T record) { } /** - * Whether all pointers have reached the end of collections. + * Whether all pointers have reached the end of lists. * * @return True if all pointers have reached the end. */ diff --git a/flink-test-utils-parent/flink-connector-testing/src/test/java/org/apache/flink/connectors/test/common/utils/TestDataMatchersTest.java b/flink-test-utils-parent/flink-connector-testing/src/test/java/org/apache/flink/connectors/test/common/utils/TestDataMatchersTest.java index 18331853595b4..4e4d8a190a2b9 100644 --- a/flink-test-utils-parent/flink-connector-testing/src/test/java/org/apache/flink/connectors/test/common/utils/TestDataMatchersTest.java +++ b/flink-test-utils-parent/flink-connector-testing/src/test/java/org/apache/flink/connectors/test/common/utils/TestDataMatchersTest.java @@ -102,8 +102,7 @@ class MultipleSplitDataMatcherTest { private final List splitA = Arrays.asList("alpha", "beta", "gamma"); private final List splitB = Arrays.asList("one", "two", "three"); private final List splitC = Arrays.asList("1", "2", "3"); - private final List> testDataCollection = - Arrays.asList(splitA, splitB, splitC); + private final List> testDataCollection = Arrays.asList(splitA, splitB, splitC); @Test public void testPositiveCase() {