Skip to content

Commit

Permalink
[FLINK-23971][tests] fix connector testing framework error when compa…
Browse files Browse the repository at this point in the history
…re records in different splits

Add split index parameter to generate test data, make sure T.equals(object) return false when records come from differernt splits.
  • Loading branch information
ruanhang1993 authored and becketqin committed Sep 1, 2021
1 parent d09ef68 commit ad052cc
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,23 +155,23 @@ public SourceSplitDataWriter<String> createSourceSplitDataWriter() {
}

@Override
public Collection<String> generateTestData(long seed) {
public Collection<String> generateTestData(int splitIndex, long seed) {
Random random = new Random(seed);
List<String> randomStringRecords = new ArrayList<>();
int recordNum =
random.nextInt(NUM_RECORDS_UPPER_BOUND - NUM_RECORDS_LOWER_BOUND)
+ NUM_RECORDS_LOWER_BOUND;
for (int i = 0; i < recordNum; i++) {
int stringLength = random.nextInt(50) + 1;
randomStringRecords.add(generateRandomString(stringLength, random));
randomStringRecords.add(generateRandomString(splitIndex, stringLength, random));
}
return randomStringRecords;
}

private String generateRandomString(int length, Random random) {
private String generateRandomString(int splitIndex, int length, Random random) {
String alphaNumericString =
"ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz" + "0123456789";
StringBuilder sb = new StringBuilder();
StringBuilder sb = new StringBuilder().append(splitIndex).append("-");
for (int i = 0; i < length; ++i) {
sb.append(alphaNumericString.charAt(random.nextInt(alphaNumericString.length())));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ protected PulsarTestContext(String displayName, PulsarTestEnvironment environmen

// Helper methods for generating data.

protected List<String> generateStringTestData(long seed) {
protected List<String> generateStringTestData(int splitIndex, long seed) {
Random random = new Random(seed);
int recordNum =
random.nextInt(NUM_RECORDS_UPPER_BOUND - NUM_RECORDS_LOWER_BOUND)
Expand All @@ -52,7 +52,7 @@ protected List<String> generateStringTestData(long seed) {

for (int i = 0; i < recordNum; i++) {
int stringLength = random.nextInt(50) + 1;
records.add(randomAlphanumeric(stringLength));
records.add(splitIndex + "-" + randomAlphanumeric(stringLength));
}

return records;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ public SourceSplitDataWriter<String> createSourceSplitDataWriter() {
}

@Override
public Collection<String> generateTestData(long seed) {
return generateStringTestData(seed);
public Collection<String> generateTestData(int splitIndex, long seed) {
return generateStringTestData(splitIndex, seed);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ public SourceSplitDataWriter<String> createSourceSplitDataWriter() {
}

@Override
public Collection<String> generateTestData(long seed) {
return generateStringTestData(seed);
public Collection<String> generateTestData(int splitIndex, long seed) {
return generateStringTestData(splitIndex, seed);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,14 @@ public interface ExternalContext<T> extends Serializable, AutoCloseable {
/**
* Generate test data.
*
* <p>Make sure that the {@link T#equals(Object)} returns false when the records in different
* splits.
*
* @param splitIndex index of the split.
* @param seed Seed for generating random test data set.
* @return Collection of generated test data.
*/
Collection<T> generateTestData(long seed);
Collection<T> generateTestData(int splitIndex, long seed);

/**
* Factory for {@link ExternalContext}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
import org.apache.flink.util.CloseableIterator;

import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestTemplate;
Expand Down Expand Up @@ -103,7 +102,7 @@ public void testSourceSingleSplit(TestEnvironment testEnv, ExternalContext<T> ex
throws Exception {

// Write test data to external system
final Collection<T> testRecords = generateAndWriteTestData(externalContext);
final Collection<T> testRecords = generateAndWriteTestData(0, externalContext);

// Build and execute Flink job
StreamExecutionEnvironment execEnv = testEnv.createExecutionEnvironment();
Expand Down Expand Up @@ -140,7 +139,7 @@ public void testMultipleSplits(TestEnvironment testEnv, ExternalContext<T> exter
final int splitNumber = 4;
final List<Collection<T>> testRecordCollections = new ArrayList<>();
for (int i = 0; i < splitNumber; i++) {
testRecordCollections.add(generateAndWriteTestData(externalContext));
testRecordCollections.add(generateAndWriteTestData(i, externalContext));
}

LOG.debug("Build and execute Flink job");
Expand Down Expand Up @@ -174,14 +173,13 @@ public void testMultipleSplits(TestEnvironment testEnv, ExternalContext<T> exter
*/
@TestTemplate
@DisplayName("Test source with at least one idle parallelism")
@Disabled
public void testIdleReader(TestEnvironment testEnv, ExternalContext<T> externalContext)
throws Exception {

final int splitNumber = 4;
final List<Collection<T>> testRecordCollections = new ArrayList<>();
for (int i = 0; i < splitNumber; i++) {
testRecordCollections.add(generateAndWriteTestData(externalContext));
testRecordCollections.add(generateAndWriteTestData(i, externalContext));
}

try (CloseableIterator<T> resultIterator =
Expand Down Expand Up @@ -216,9 +214,11 @@ public void testTaskManagerFailure(
ExternalContext<T> externalContext,
ClusterControllable controller)
throws Exception {
int splitIndex = 0;

final Collection<T> testRecordsBeforeFailure =
externalContext.generateTestData(ThreadLocalRandom.current().nextLong());
externalContext.generateTestData(
splitIndex, ThreadLocalRandom.current().nextLong());
final SourceSplitDataWriter<T> sourceSplitDataWriter =
externalContext.createSourceSplitDataWriter();
sourceSplitDataWriter.writeRecords(testRecordsBeforeFailure);
Expand Down Expand Up @@ -267,7 +267,8 @@ public void testTaskManagerFailure(
Deadline.fromNow(Duration.ofSeconds(30)));

final Collection<T> testRecordsAfterFailure =
externalContext.generateTestData(ThreadLocalRandom.current().nextLong());
externalContext.generateTestData(
splitIndex, ThreadLocalRandom.current().nextLong());
sourceSplitDataWriter.writeRecords(testRecordsAfterFailure);

assertThat(
Expand All @@ -291,9 +292,11 @@ public void testTaskManagerFailure(
* @param externalContext External context
* @return Collection of generated test records
*/
protected Collection<T> generateAndWriteTestData(ExternalContext<T> externalContext) {
protected Collection<T> generateAndWriteTestData(
int splitIndex, ExternalContext<T> externalContext) {
final Collection<T> testRecordCollection =
externalContext.generateTestData(ThreadLocalRandom.current().nextLong());
externalContext.generateTestData(
splitIndex, ThreadLocalRandom.current().nextLong());
LOG.debug("Writing {} records to external system", testRecordCollection.size());
externalContext.createSourceSplitDataWriter().writeRecords(testRecordCollection);
return testRecordCollection;
Expand Down

0 comments on commit ad052cc

Please sign in to comment.