Skip to content

Commit

Permalink
[FLINK-21195][Connectors/FileSystem] LimitableBulkFormat is invalid w…
Browse files Browse the repository at this point in the history
…hen format is orc

This closes apache#16122
  • Loading branch information
meetjunsu authored Jun 21, 2021
1 parent 046d9b6 commit d8c19ba
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,18 @@ public void before() {
+ "'path' = '%s',"
+ "%s)",
super.resultPath(), String.join(",\n", formatProperties())));
super.tableEnv()
.executeSql(
String.format(
"create table orcLimitTable ("
+ "x string,"
+ "y int,"
+ "a int"
+ ") with ("
+ "'connector' = 'filesystem',"
+ "'path' = '%s',"
+ "%s)",
super.resultPath(), String.join(",\n", formatProperties())));
}

@Test
Expand Down Expand Up @@ -329,4 +341,20 @@ private String initNestedTypesFile(List<RowData> data) throws Exception {
}
return outDir.getAbsolutePath();
}

@Test
public void testLimitableBulkFormat() throws ExecutionException, InterruptedException {
super.tableEnv()
.executeSql(
"insert into orcLimitTable select x, y, " + "1 as a " + "from originalT")
.await();
TableResult tableResult1 =
super.tableEnv().executeSql("SELECT * FROM orcLimitTable limit 5");
List<Row> rows1 = CollectionUtil.iteratorToList(tableResult1.collect());
assertEquals(5, rows1.size());

check(
"select a from orcLimitTable limit 5",
Arrays.asList(Row.of(1), Row.of(1), Row.of(1), Row.of(1), Row.of(1)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@

package org.apache.flink.formats.parquet;

import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.planner.runtime.batch.sql.BatchFileSystemITCaseBase;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

Expand All @@ -34,9 +38,11 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutionException;

import static org.apache.parquet.format.converter.ParquetMetadataConverter.range;
import static org.apache.parquet.hadoop.ParquetFileReader.readFooter;
import static org.junit.Assert.assertEquals;

/** ITCase for {@link ParquetFileFormatFactory}. */
@RunWith(Parameterized.class)
Expand All @@ -53,6 +59,23 @@ public ParquetFileSystemITCase(boolean configure) {
this.configure = configure;
}

@Override
public void before() {
super.before();
super.tableEnv()
.executeSql(
String.format(
"create table parquetLimitTable ("
+ "x string,"
+ "y int,"
+ "a int"
+ ") with ("
+ "'connector' = 'filesystem',"
+ "'path' = '%s',"
+ "%s)",
super.resultPath(), String.join(",\n", formatProperties())));
}

@Override
public String[] formatProperties() {
List<String> ret = new ArrayList<>();
Expand Down Expand Up @@ -91,4 +114,22 @@ public void testNonPartition() {
throw new RuntimeException(e);
}
}

@Test
public void testLimitableBulkFormat() throws ExecutionException, InterruptedException {
super.tableEnv()
.executeSql(
"insert into parquetLimitTable select x, y, "
+ "1 as a "
+ "from originalT")
.await();
TableResult tableResult1 =
super.tableEnv().executeSql("SELECT * FROM parquetLimitTable limit 5");
List<Row> rows1 = CollectionUtil.iteratorToList(tableResult1.collect());
assertEquals(5, rows1.size());

check(
"select a from parquetLimitTable limit 5",
Arrays.asList(Row.of(1), Row.of(1), Row.of(1), Row.of(1), Row.of(1)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,14 @@ private LimitableBulkFormat(BulkFormat<T, SplitT> format, long limit) {

@Override
public Reader<T> createReader(Configuration config, SplitT split) throws IOException {
return wrapReader(format.createReader(config, split));
Reader<T> reader = reachLimit() ? null : format.createReader(config, split);
return wrapReader(reader);
}

@Override
public Reader<T> restoreReader(Configuration config, SplitT split) throws IOException {
return wrapReader(format.restoreReader(config, split));
Reader<T> reader = reachLimit() ? null : format.restoreReader(config, split);
return wrapReader(reader);
}

private synchronized Reader<T> wrapReader(Reader<T> reader) {
Expand All @@ -66,6 +68,10 @@ private synchronized Reader<T> wrapReader(Reader<T> reader) {
return new LimitableReader<>(reader, globalNumberRead, limit);
}

private boolean reachLimit() {
return globalNumberRead != null && globalNumberRead.get() >= limit;
}

@Override
public boolean isSplittable() {
return format.isSplittable();
Expand Down Expand Up @@ -105,7 +111,9 @@ public RecordIterator<T> readBatch() throws IOException {

@Override
public void close() throws IOException {
reader.close();
if (reader != null) {
reader.close();
}
}

private class LimitableIterator implements RecordIterator<T> {
Expand Down

0 comments on commit d8c19ba

Please sign in to comment.