Skip to content

Commit

Permalink
[FLINK-7244][parquet] Add ParquetTableSource.
Browse files Browse the repository at this point in the history
This closes apache#8064.
  • Loading branch information
HuangZhenQiu authored and fhueske committed Jul 10, 2019
1 parent f150665 commit 38e5e81
Show file tree
Hide file tree
Showing 12 changed files with 1,041 additions and 46 deletions.
28 changes: 26 additions & 2 deletions flink-formats/flink-parquet/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ under the License.
<relativePath>..</relativePath>
</parent>

<artifactId>flink-parquet</artifactId>
<artifactId>flink-parquet_${scala.binary.version}</artifactId>
<name>flink-parquet</name>

<packaging>jar</packaging>
Expand All @@ -39,7 +39,6 @@ under the License.
</properties>

<dependencies>

<!-- Flink dependencies -->

<dependency>
Expand All @@ -49,6 +48,31 @@ under the License.
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<!-- Table ecosystem -->
<!-- Projects depending on this project won't depend on flink-table-*. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<!-- A planner dependency won't be necessary once FLIP-32 has been completed. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
<optional>true</optional>
</dependency>

<!-- Parquet Dependencies -->

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.formats.parquet;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.io.CheckpointableInputFormat;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
Expand All @@ -35,6 +36,7 @@

import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.filter2.predicate.FilterPredicate;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.InputFile;
Expand Down Expand Up @@ -85,6 +87,8 @@ public abstract class ParquetInputFormat<E>

private String[] fieldNames;

private FilterPredicate filterPredicate;

private transient Counter recordConsumed;

private transient MessageType expectedFileSchema;
Expand Down Expand Up @@ -143,6 +147,10 @@ public void selectFields(String[] fieldNames) {
this.fieldTypes = selectFieldTypes;
}

public void setFilterPredicate(FilterPredicate filterPredicate) {
this.filterPredicate = filterPredicate;
}

@Override
public Tuple2<Long, Long> getCurrentState() {
return parquetRecordReader.getCurrentReadPosition();
Expand All @@ -164,7 +172,8 @@ public void open(FileInputSplit split) throws IOException {
"Escaped the file split [%s] due to mismatch of file schema to expected result schema",
split.getPath().toString()));
} else {
this.parquetRecordReader = new ParquetRecordReader<>(new RowReadSupport(), readSchema, FilterCompat.NOOP);
this.parquetRecordReader = new ParquetRecordReader<>(new RowReadSupport(), readSchema,
filterPredicate == null ? FilterCompat.NOOP : FilterCompat.get(filterPredicate));
this.parquetRecordReader.initialize(fileReader, configuration);
this.parquetRecordReader.setSkipCorruptedRecord(this.skipCorruptedRecord);

Expand Down Expand Up @@ -203,6 +212,11 @@ protected TypeInformation[] getFieldTypes() {
return fieldTypes;
}

@VisibleForTesting
protected FilterPredicate getPredicate() {
return this.filterPredicate;
}

@Override
public void close() throws IOException {
if (parquetRecordReader != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,14 @@
*/
public class ParquetRowInputFormat extends ParquetInputFormat<Row> implements ResultTypeQueryable<Row> {
private static final long serialVersionUID = 11L;
private RowTypeInfo returnType;

public ParquetRowInputFormat(Path path, MessageType messageType) {
super(path, messageType);
this.returnType = new RowTypeInfo(getFieldTypes(), getFieldNames());
}

@Override
public TypeInformation<Row> getProducedType() {
return returnType;
return new RowTypeInfo(getFieldTypes(), getFieldNames());
}

@Override
Expand Down
Loading

0 comments on commit 38e5e81

Please sign in to comment.