Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-28449][tests][JUnit5 migration] flink-parquet #20230

Merged
merged 2 commits into from
Nov 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;

Expand Down Expand Up @@ -60,7 +59,6 @@ class CompressionFactoryITCase {
private final List<String> testData = Arrays.asList("line1", "line2", "line3");

@Test
@Timeout(20)
void testWriteCompressedFile(@TempDir java.nio.file.Path tmpDir) throws Exception {
final File folder = tmpDir.toFile();
final Path testPath = Path.fromLocalFile(folder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
ImportOptions.ExcludeScalaImportOption.class,
ImportOptions.ExcludeShadedImportOption.class
})
public class TestCodeArchitectureTest {
class TestCodeArchitectureTest {

@ArchTest
public static final ArchTests COMMON_TESTS = ArchTests.in(TestCodeArchitectureTestBase.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,9 @@
import org.apache.flink.util.InstantiationUtil;

import org.apache.hadoop.conf.Configuration;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import java.io.File;
import java.io.IOException;
Expand All @@ -84,8 +82,7 @@
import static org.assertj.core.api.Assertions.assertThat;

/** Test for {@link ParquetColumnarRowInputFormat}. */
@RunWith(Parameterized.class)
public class ParquetColumnarRowInputFormatTest {
class ParquetColumnarRowInputFormatTest {

private static final LocalDateTime BASE_TIME = LocalDateTime.now();
private static final org.apache.flink.configuration.Configuration EMPTY_CONF =
Expand Down Expand Up @@ -129,21 +126,15 @@ public class ParquetColumnarRowInputFormatTest {
new MapType(new IntType(), new BooleanType()),
RowType.of(new VarCharType(VarCharType.MAX_LENGTH), new IntType()));

@ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
@TempDir private File folder;

private final int rowGroupSize;

@Parameterized.Parameters(name = "rowGroupSize-{0}")
public static Collection<Integer> parameters() {
return Arrays.asList(10, 1000);
}

public ParquetColumnarRowInputFormatTest(int rowGroupSize) {
this.rowGroupSize = rowGroupSize;
}

@Test
public void testTypesReadWithSplits() throws IOException {
@ParameterizedTest
@MethodSource("parameters")
void testTypesReadWithSplits(int rowGroupSize) throws IOException {
int number = 10000;
List<Integer> values = new ArrayList<>(number);
Random random = new Random();
Expand All @@ -152,11 +143,12 @@ public void testTypesReadWithSplits() throws IOException {
values.add(v % 10 == 0 ? null : v);
}

innerTestTypes(values);
innerTestTypes(folder, values, rowGroupSize);
}

@Test
public void testDictionary() throws IOException {
@ParameterizedTest
@MethodSource("parameters")
void testDictionary(int rowGroupSize) throws IOException {
int number = 10000;
List<Integer> values = new ArrayList<>(number);
Random random = new Random();
Expand All @@ -170,11 +162,12 @@ public void testDictionary() throws IOException {
values.add(v == 0 ? null : v);
}

innerTestTypes(values);
innerTestTypes(folder, values, rowGroupSize);
}

@Test
public void testPartialDictionary() throws IOException {
@ParameterizedTest
@MethodSource("parameters")
void testPartialDictionary(int rowGroupSize) throws IOException {
// prepare parquet file
int number = 10000;
List<Integer> values = new ArrayList<>(number);
Expand All @@ -189,11 +182,12 @@ public void testPartialDictionary() throws IOException {
values.add(v == 0 ? null : v);
}

innerTestTypes(values);
innerTestTypes(folder, values, rowGroupSize);
}

@Test
public void testContinuousRepetition() throws IOException {
@ParameterizedTest
@MethodSource("parameters")
void testContinuousRepetition(int rowGroupSize) throws IOException {
int number = 10000;
List<Integer> values = new ArrayList<>(number);
Random random = new Random();
Expand All @@ -204,11 +198,12 @@ public void testContinuousRepetition() throws IOException {
}
}

innerTestTypes(values);
innerTestTypes(folder, values, rowGroupSize);
}

@Test
public void testLargeValue() throws IOException {
@ParameterizedTest
@MethodSource("parameters")
void testLargeValue(int rowGroupSize) throws IOException {
int number = 10000;
List<Integer> values = new ArrayList<>(number);
Random random = new Random();
Expand All @@ -217,19 +212,20 @@ public void testLargeValue() throws IOException {
values.add(v % 10 == 0 ? null : v);
}

innerTestTypes(values);
innerTestTypes(folder, values, rowGroupSize);
}

@Test
public void testProjection() throws IOException {
@ParameterizedTest
@MethodSource("parameters")
void testProjection(int rowGroupSize) throws IOException {
int number = 1000;
List<RowData> records = new ArrayList<>(number);
for (int i = 0; i < number; i++) {
Integer v = i;
records.add(newRow(v));
}

Path testPath = createTempParquetFile(TEMPORARY_FOLDER.newFolder(), records, rowGroupSize);
Path testPath = createTempParquetFile(folder, records, rowGroupSize);
// test reader
LogicalType[] fieldTypes =
new LogicalType[] {new DoubleType(), new TinyIntType(), new IntType()};
Expand All @@ -256,16 +252,17 @@ public void testProjection() throws IOException {
});
}

@Test
public void testProjectionReadUnknownField() throws IOException {
@ParameterizedTest
@MethodSource("parameters")
void testProjectionReadUnknownField(int rowGroupSize) throws IOException {
int number = 1000;
List<RowData> records = new ArrayList<>(number);
for (int i = 0; i < number; i++) {
Integer v = i;
records.add(newRow(v));
}

Path testPath = createTempParquetFile(TEMPORARY_FOLDER.newFolder(), records, rowGroupSize);
Path testPath = createTempParquetFile(folder, records, rowGroupSize);

// test reader
LogicalType[] fieldTypes =
Expand Down Expand Up @@ -297,8 +294,9 @@ public void testProjectionReadUnknownField() throws IOException {
});
}

@Test
public void testPartitionValues() throws IOException {
@ParameterizedTest
@MethodSource("parameters")
void testPartitionValues(int rowGroupSize) throws IOException {
// prepare parquet file
int number = 1000;
List<RowData> records = new ArrayList<>(number);
Expand All @@ -307,8 +305,6 @@ public void testPartitionValues() throws IOException {
records.add(newRow(v));
}

File root = TEMPORARY_FOLDER.newFolder();

List<String> partitionKeys =
Arrays.asList(
"f33", "f34", "f35", "f36", "f37", "f38", "f39", "f40", "f41", "f42", "f43",
Expand All @@ -332,7 +328,7 @@ public void testPartitionValues() throws IOException {
partSpec.put("f45", "f45");

String partPath = generatePartitionPath(partSpec);
Path testPath = createTempParquetFile(new File(root, partPath), records, rowGroupSize);
Path testPath = createTempParquetFile(new File(folder, partPath), records, rowGroupSize);

innerTestPartitionValues(testPath, partitionKeys, false);

Expand All @@ -343,14 +339,15 @@ public void testPartitionValues() throws IOException {
}

partPath = generatePartitionPath(partSpec);
testPath = createTempParquetFile(new File(root, partPath), records, rowGroupSize);
testPath = createTempParquetFile(new File(folder, partPath), records, rowGroupSize);

innerTestPartitionValues(testPath, partitionKeys, true);
}

private void innerTestTypes(List<Integer> records) throws IOException {
private void innerTestTypes(File folder, List<Integer> records, int rowGroupSize)
throws IOException {
List<RowData> rows = records.stream().map(this::newRow).collect(Collectors.toList());
Path testPath = createTempParquetFile(TEMPORARY_FOLDER.newFolder(), rows, rowGroupSize);
Path testPath = createTempParquetFile(folder, rows, rowGroupSize);

// test reading and splitting
long fileLen = testPath.getFileSystem().getFileStatus(testPath).getLen();
Expand Down Expand Up @@ -475,7 +472,7 @@ private int testReadingSplit(
assertThat(row.isNullAt(31)).isTrue();
assertThat(row.isNullAt(32)).isTrue();
} else {
assertThat(row.getString(0).toString()).isEqualTo("" + v);
assertThat(row.getString(0)).hasToString("" + v);
assertThat(row.getBoolean(1)).isEqualTo(v % 2 == 0);
assertThat(row.getByte(2)).isEqualTo(v.byteValue());
assertThat(row.getShort(3)).isEqualTo(v.shortValue());
Expand Down Expand Up @@ -516,7 +513,7 @@ private int testReadingSplit(
.isEqualTo(BigDecimal.valueOf(v));
assertThat(row.getDecimal(14, 20, 0).toBigDecimal())
.isEqualTo(BigDecimal.valueOf(v));
assertThat(row.getArray(15).getString(0).toString()).isEqualTo("" + v);
assertThat(row.getArray(15).getString(0)).hasToString("" + v);
assertThat(row.getArray(16).getBoolean(0)).isEqualTo(v % 2 == 0);
assertThat(row.getArray(17).getByte(0)).isEqualTo(v.byteValue());
assertThat(row.getArray(18).getShort(0)).isEqualTo(v.shortValue());
Expand All @@ -539,10 +536,9 @@ private int testReadingSplit(
assertThat(row.getArray(29).getDecimal(0, 20, 0))
.isEqualTo(
DecimalData.fromBigDecimal(BigDecimal.valueOf(v), 20, 0));
assertThat(row.getMap(30).valueArray().getString(0).toString())
.isEqualTo("" + v);
assertThat(row.getMap(30).valueArray().getString(0)).hasToString("" + v);
assertThat(row.getMap(31).valueArray().getBoolean(0)).isEqualTo(v % 2 == 0);
assertThat(row.getRow(32, 2).getString(0).toString()).isEqualTo("" + v);
assertThat(row.getRow(32, 2).getString(0)).hasToString("" + v);
assertThat(row.getRow(32, 2).getInt(1)).isEqualTo(v.intValue());
}
cnt.incrementAndGet();
Expand Down Expand Up @@ -746,7 +742,7 @@ private void innerTestPartitionValues(
.isEqualTo(DecimalData.fromBigDecimal(new BigDecimal(25), 15, 0));
assertThat(row.getDecimal(14, 20, 0))
.isEqualTo(DecimalData.fromBigDecimal(new BigDecimal(26), 20, 0));
assertThat(row.getString(15).toString()).isEqualTo("f45");
assertThat(row.getString(15)).hasToString("f45");
}
cnt.incrementAndGet();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,11 @@ public void testNonPartition() {
ParquetMetadata footer =
readFooter(new Configuration(), path, range(0, Long.MAX_VALUE));
if (configure) {
assertThat(footer.getBlocks().get(0).getColumns().get(0).getCodec().toString())
.isEqualTo("GZIP");
assertThat(footer.getBlocks().get(0).getColumns().get(0).getCodec())
.hasToString("GZIP");
} else {
assertThat(footer.getBlocks().get(0).getColumns().get(0).getCodec().toString())
.isEqualTo("SNAPPY");
assertThat(footer.getBlocks().get(0).getColumns().get(0).getCodec())
.hasToString("SNAPPY");
}
} catch (IOException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void testParquetFormatStatsReportWithSingleFile() throws Exception {
// insert data and get statistics.
DataType dataType = tEnv.from("sourceTable").getResolvedSchema().toPhysicalRowDataType();
tEnv.fromValues(dataType, getData()).executeInsert("sourceTable").await();
assertThat(folder.listFiles()).isNotNull().hasSize(1);
assertThat(folder.listFiles()).hasSize(1);
File[] files = folder.listFiles();
assert files != null;
TableStats tableStats =
Expand All @@ -84,7 +84,7 @@ public void testParquetFormatStatsReportWithMultiFile() throws Exception {
DataType dataType = tEnv.from("sourceTable").getResolvedSchema().toPhysicalRowDataType();
tEnv.fromValues(dataType, getData()).executeInsert("sourceTable").await();
tEnv.fromValues(dataType, getData()).executeInsert("sourceTable").await();
assertThat(folder.listFiles()).isNotNull().hasSize(2);
assertThat(folder.listFiles()).hasSize(2);
File[] files = folder.listFiles();
List<Path> paths = new ArrayList<>();
assert files != null;
Expand Down
Loading