From 38e5e8161a9c763cf7df3b642830b5a97371bb00 Mon Sep 17 00:00:00 2001 From: Peter Huang Date: Sun, 24 Mar 2019 23:19:18 -0700 Subject: [PATCH] [FLINK-7244][parquet] Add ParquetTableSource. This closes #8064. --- flink-formats/flink-parquet/pom.xml | 28 +- .../formats/parquet/ParquetInputFormat.java | 16 +- .../parquet/ParquetRowInputFormat.java | 4 +- .../formats/parquet/ParquetTableSource.java | 568 ++++++++++++++++++ .../parquet/utils/ParquetSchemaConverter.java | 2 +- .../parquet/ParquetMapInputFormatTest.java | 2 +- .../parquet/ParquetTableSourceITCase.java | 116 ++++ .../parquet/ParquetTableSourceTest.java | 234 ++++++++ .../utils/ParquetRecordReaderTest.java | 4 +- .../utils/ParquetSchemaConverterTest.java | 39 +- .../flink/formats/parquet/utils/TestUtil.java | 72 ++- .../src/test/resources/avro/nested.avsc | 2 +- 12 files changed, 1041 insertions(+), 46 deletions(-) create mode 100644 flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetTableSource.java create mode 100644 flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTableSourceITCase.java create mode 100644 flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTableSourceTest.java diff --git a/flink-formats/flink-parquet/pom.xml b/flink-formats/flink-parquet/pom.xml index 4a2fd32b42328..b43715d5a8d87 100644 --- a/flink-formats/flink-parquet/pom.xml +++ b/flink-formats/flink-parquet/pom.xml @@ -29,7 +29,7 @@ under the License. .. - flink-parquet + flink-parquet_${scala.binary.version} flink-parquet jar @@ -39,7 +39,6 @@ under the License. - @@ -49,6 +48,31 @@ under the License. provided + + org.apache.flink + flink-table-common + ${project.version} + provided + + + + + + org.apache.flink + flink-table-api-java-bridge_${scala.binary.version} + ${project.version} + provided + true + + + + org.apache.flink + flink-table-planner_${scala.binary.version} + ${project.version} + provided + true + + diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java index e7484cbddee1f..554313ed9b6fc 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java @@ -18,6 +18,7 @@ package org.apache.flink.formats.parquet; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.io.CheckpointableInputFormat; import org.apache.flink.api.common.io.FileInputFormat; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -35,6 +36,7 @@ import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.predicate.FilterPredicate; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.util.HadoopInputFile; import org.apache.parquet.io.InputFile; @@ -85,6 +87,8 @@ public abstract class ParquetInputFormat private String[] fieldNames; + private FilterPredicate filterPredicate; + private transient Counter recordConsumed; private transient MessageType expectedFileSchema; @@ -143,6 +147,10 @@ public void selectFields(String[] fieldNames) { this.fieldTypes = selectFieldTypes; } + public void setFilterPredicate(FilterPredicate filterPredicate) { + this.filterPredicate = filterPredicate; + } + @Override public Tuple2 getCurrentState() { return parquetRecordReader.getCurrentReadPosition(); @@ -164,7 +172,8 @@ public void open(FileInputSplit split) throws IOException { "Escaped the file split [%s] due to mismatch of file schema to expected result schema", split.getPath().toString())); } else { - this.parquetRecordReader = new ParquetRecordReader<>(new RowReadSupport(), readSchema, FilterCompat.NOOP); + this.parquetRecordReader = new ParquetRecordReader<>(new RowReadSupport(), readSchema, + filterPredicate == null ? FilterCompat.NOOP : FilterCompat.get(filterPredicate)); this.parquetRecordReader.initialize(fileReader, configuration); this.parquetRecordReader.setSkipCorruptedRecord(this.skipCorruptedRecord); @@ -203,6 +212,11 @@ protected TypeInformation[] getFieldTypes() { return fieldTypes; } + @VisibleForTesting + protected FilterPredicate getPredicate() { + return this.filterPredicate; + } + @Override public void close() throws IOException { if (parquetRecordReader != null) { diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetRowInputFormat.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetRowInputFormat.java index f010a50cc8ec2..13da9c73ce0f9 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetRowInputFormat.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetRowInputFormat.java @@ -31,16 +31,14 @@ */ public class ParquetRowInputFormat extends ParquetInputFormat implements ResultTypeQueryable { private static final long serialVersionUID = 11L; - private RowTypeInfo returnType; public ParquetRowInputFormat(Path path, MessageType messageType) { super(path, messageType); - this.returnType = new RowTypeInfo(getFieldTypes(), getFieldNames()); } @Override public TypeInformation getProducedType() { - return returnType; + return new RowTypeInfo(getFieldTypes(), getFieldNames()); } @Override diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetTableSource.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetTableSource.java new file mode 100644 index 0000000000000..0b5d168bd2801 --- /dev/null +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetTableSource.java @@ -0,0 +1,568 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.Path; +import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.expressions.And; +import org.apache.flink.table.expressions.Attribute; +import org.apache.flink.table.expressions.BinaryComparison; +import org.apache.flink.table.expressions.BinaryExpression; +import org.apache.flink.table.expressions.EqualTo; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.GreaterThan; +import org.apache.flink.table.expressions.GreaterThanOrEqual; +import org.apache.flink.table.expressions.LessThan; +import org.apache.flink.table.expressions.LessThanOrEqual; +import org.apache.flink.table.expressions.Literal; +import org.apache.flink.table.expressions.Not; +import org.apache.flink.table.expressions.NotEqualTo; +import org.apache.flink.table.expressions.Or; +import org.apache.flink.table.sources.BatchTableSource; +import org.apache.flink.table.sources.FilterableTableSource; +import org.apache.flink.table.sources.ProjectableTableSource; +import org.apache.flink.table.sources.TableSource; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.filter2.predicate.FilterApi; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.filter2.predicate.Operators.BinaryColumn; +import org.apache.parquet.filter2.predicate.Operators.BooleanColumn; +import org.apache.parquet.filter2.predicate.Operators.Column; +import org.apache.parquet.filter2.predicate.Operators.DoubleColumn; +import org.apache.parquet.filter2.predicate.Operators.FloatColumn; +import org.apache.parquet.filter2.predicate.Operators.IntColumn; +import org.apache.parquet.filter2.predicate.Operators.LongColumn; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; + +/** + * A TableSource to read Parquet files. + * + *

The {@link ParquetTableSource} supports projection and filter push-down.

+ * + *

An {@link ParquetTableSource} is used as shown in the example below. + * + *

+ * {@code
+ * ParquetTableSource orcSrc = ParquetTableSource.builder()
+ *   .path("file:///my/data/file.parquet")
+ *   .schema(messageType)
+ *   .build();
+ *
+ * tEnv.registerTableSource("parquetTable", orcSrc);
+ * Table res = tableEnv.sqlQuery("SELECT * FROM parquetTable");
+ * }
+ * 
+ */ +public class ParquetTableSource + implements BatchTableSource, FilterableTableSource, ProjectableTableSource { + + private static final Logger LOG = LoggerFactory.getLogger(ParquetTableSource.class); + + // path to read Parquet files from + private final String path; + // schema of the Parquet file + private final MessageType parquetSchema; + // the schema of table + private final TableSchema tableSchema; + // the configuration to read the file + private final Configuration parquetConfig; + // type information of the data returned by the InputFormat + private final RowTypeInfo typeInfo; + // list of selected Parquet fields to return + @Nullable + private final int[] selectedFields; + // predicate expression to apply + @Nullable + private final FilterPredicate predicate; + // flag whether a path is recursively enumerated + private final boolean recursiveEnumeration; + + private boolean isFilterPushedDown; + + private ParquetTableSource(String path, MessageType parquetSchema, Configuration configuration, + boolean recursiveEnumeration) { + this(path, parquetSchema, configuration, recursiveEnumeration, null, null); + } + + private ParquetTableSource(String path, MessageType parquetSchema, Configuration configuration, + boolean recursiveEnumeration, @Nullable int[] selectedFields, @Nullable FilterPredicate predicate) { + Preconditions.checkNotNull(path, "Path must not be null."); + Preconditions.checkNotNull(parquetSchema, "ParquetSchema must not be null."); + Preconditions.checkNotNull(configuration, "Configuration must not be null"); + this.path = path; + this.parquetSchema = parquetSchema; + this.parquetConfig = configuration; + this.selectedFields = selectedFields; + this.predicate = predicate; + this.recursiveEnumeration = recursiveEnumeration; + + if (predicate != null) { + this.isFilterPushedDown = true; + } + // determine the type information from the Parquet schema + RowTypeInfo typeInfoFromSchema = (RowTypeInfo) ParquetSchemaConverter.fromParquetType(parquetSchema); + + // set return type info + if (selectedFields == null) { + this.typeInfo = typeInfoFromSchema; + } else { + this.typeInfo = RowTypeInfo.projectFields(typeInfoFromSchema, selectedFields); + } + + // create a TableSchema that corresponds to the Parquet schema + this.tableSchema = new TableSchema( + typeInfoFromSchema.getFieldNames(), + typeInfoFromSchema.getFieldTypes() + ); + } + + @Override + public TableSource projectFields(int[] fields) { + return new ParquetTableSource(path, parquetSchema, parquetConfig, recursiveEnumeration, fields, null); + } + + @Override + public DataSet getDataSet(ExecutionEnvironment executionEnvironment) { + ParquetRowInputFormat parquetRowInputFormat = new ParquetRowInputFormat(new Path(path), parquetSchema); + parquetRowInputFormat.setNestedFileEnumeration(recursiveEnumeration); + if (selectedFields != null) { + parquetRowInputFormat.selectFields(typeInfo.getFieldNames()); + } + + if (predicate != null) { + parquetRowInputFormat.setFilterPredicate(predicate); + } + + return executionEnvironment.createInput(parquetRowInputFormat).name(explainSource()); + } + + @Override + public TableSource applyPredicate(List predicates) { + + // try to convert Flink filter expressions to Parquet FilterPredicates + List convertedPredicates = new ArrayList<>(predicates.size()); + List unsupportedExpressions = new ArrayList<>(predicates.size()); + + for (Expression toConvert : predicates) { + FilterPredicate convertedPredicate = toParquetPredicate(toConvert); + if (convertedPredicate != null) { + convertedPredicates.add(convertedPredicate); + } else { + unsupportedExpressions.add(toConvert); + } + } + + // update list of Flink expressions to unsupported expressions + predicates.clear(); + predicates.addAll(unsupportedExpressions); + + // construct single Parquet FilterPredicate + FilterPredicate parquetPredicate = null; + if (!convertedPredicates.isEmpty()) { + // concat converted predicates with AND + parquetPredicate = convertedPredicates.get(0); + + for (FilterPredicate converted : convertedPredicates.subList(1, convertedPredicates.size())) { + parquetPredicate = FilterApi.and(parquetPredicate, converted); + } + } + + // create and return a new ParquetTableSource with Parquet FilterPredicate + return new ParquetTableSource(path, parquetSchema, this.parquetConfig, recursiveEnumeration, selectedFields, parquetPredicate); + } + + @Override + public boolean isFilterPushedDown() { + return isFilterPushedDown; + } + + @Override + public TypeInformation getReturnType() { + return typeInfo; + } + + @Override + public TableSchema getTableSchema() { + return tableSchema; + } + + @Override + public String explainSource() { + return "ParquetFile[path=" + path + ", schema=" + parquetSchema + ", filter=" + predicateString() + + ", typeInfo=" + typeInfo + "]"; + } + + private String predicateString() { + if (predicate != null) { + return predicate.toString(); + } else { + return "TRUE"; + } + } + + /** + * Converts Flink Expression to Parquet FilterPredicate. + */ + @Nullable + private FilterPredicate toParquetPredicate(Expression exp) { + if (exp instanceof Not) { + FilterPredicate c = toParquetPredicate(((Not) exp).child()); + if (c == null) { + return null; + } else { + return FilterApi.not(c); + } + } else if (exp instanceof BinaryComparison) { + BinaryComparison binComp = (BinaryComparison) exp; + + if (!isValid(binComp)) { + // unsupported literal Type + LOG.debug("Unsupported predict [{}] cannot be pushed to ParquetTableSource.", exp); + return null; + } + + boolean onRight = literalOnRight(binComp); + Tuple2 columnPair = extractColumnAndLiteral(binComp); + + if (columnPair != null) { + if (exp instanceof EqualTo) { + if (columnPair.f0 instanceof IntColumn) { + return FilterApi.eq((IntColumn) columnPair.f0, (Integer) columnPair.f1); + } else if (columnPair.f0 instanceof LongColumn) { + return FilterApi.eq((LongColumn) columnPair.f0, (Long) columnPair.f1); + } else if (columnPair.f0 instanceof DoubleColumn) { + return FilterApi.eq((DoubleColumn) columnPair.f0, (Double) columnPair.f1); + } else if (columnPair.f0 instanceof FloatColumn) { + return FilterApi.eq((FloatColumn) columnPair.f0, (Float) columnPair.f1); + } else if (columnPair.f0 instanceof BooleanColumn) { + return FilterApi.eq((BooleanColumn) columnPair.f0, (Boolean) columnPair.f1); + } else if (columnPair.f0 instanceof BinaryColumn) { + return FilterApi.eq((BinaryColumn) columnPair.f0, (Binary) columnPair.f1); + } + } else if (exp instanceof NotEqualTo) { + if (columnPair.f0 instanceof IntColumn) { + return FilterApi.notEq((IntColumn) columnPair.f0, (Integer) columnPair.f1); + } else if (columnPair.f0 instanceof LongColumn) { + return FilterApi.notEq((LongColumn) columnPair.f0, (Long) columnPair.f1); + } else if (columnPair.f0 instanceof DoubleColumn) { + return FilterApi.notEq((DoubleColumn) columnPair.f0, (Double) columnPair.f1); + } else if (columnPair.f0 instanceof FloatColumn) { + return FilterApi.notEq((FloatColumn) columnPair.f0, (Float) columnPair.f1); + } else if (columnPair.f0 instanceof BooleanColumn) { + return FilterApi.notEq((BooleanColumn) columnPair.f0, (Boolean) columnPair.f1); + } else if (columnPair.f0 instanceof BinaryColumn) { + return FilterApi.notEq((BinaryColumn) columnPair.f0, (Binary) columnPair.f1); + } + } else if (exp instanceof GreaterThan) { + if (onRight) { + return greaterThan(exp, columnPair); + } else { + lessThan(exp, columnPair); + } + } else if (exp instanceof GreaterThanOrEqual) { + if (onRight) { + return greaterThanOrEqual(exp, columnPair); + } else { + return lessThanOrEqual(exp, columnPair); + } + } else if (exp instanceof LessThan) { + if (onRight) { + return lessThan(exp, columnPair); + } else { + return greaterThan(exp, columnPair); + } + } else if (exp instanceof LessThanOrEqual) { + if (onRight) { + return lessThanOrEqual(exp, columnPair); + } else { + return greaterThanOrEqual(exp, columnPair); + } + } else { + // Unsupported Predicate + LOG.debug("Unsupported predicate [{}] cannot be pushed into ParquetTableSource.", exp); + return null; + } + } + } else if (exp instanceof BinaryExpression) { + if (exp instanceof And) { + LOG.debug("All of the predicates should be in CNF. Found an AND expression.", exp); + } else if (exp instanceof Or) { + FilterPredicate c1 = toParquetPredicate(((Or) exp).left()); + FilterPredicate c2 = toParquetPredicate(((Or) exp).right()); + + if (c1 == null || c2 == null) { + return null; + } else { + return FilterApi.or(c1, c2); + } + } else { + // Unsupported Predicate + LOG.debug("Unsupported predicate [{}] cannot be pushed into ParquetTableSource.", exp); + return null; + } + } + + return null; + } + + @Nullable + private FilterPredicate greaterThan(Expression exp, Tuple2 columnPair) { + Preconditions.checkArgument(exp instanceof GreaterThan, "exp has to be GreaterThan"); + if (columnPair.f0 instanceof IntColumn) { + return FilterApi.gt((IntColumn) columnPair.f0, (Integer) columnPair.f1); + } else if (columnPair.f0 instanceof LongColumn) { + return FilterApi.gt((LongColumn) columnPair.f0, (Long) columnPair.f1); + } else if (columnPair.f0 instanceof DoubleColumn) { + return FilterApi.gt((DoubleColumn) columnPair.f0, (Double) columnPair.f1); + } else if (columnPair.f0 instanceof FloatColumn) { + return FilterApi.gt((FloatColumn) columnPair.f0, (Float) columnPair.f1); + } + + return null; + } + + @Nullable + private FilterPredicate lessThan(Expression exp, Tuple2 columnPair) { + Preconditions.checkArgument(exp instanceof LessThan, "exp has to be LessThan"); + + if (columnPair.f0 instanceof IntColumn) { + return FilterApi.lt((IntColumn) columnPair.f0, (Integer) columnPair.f1); + } else if (columnPair.f0 instanceof LongColumn) { + return FilterApi.lt((LongColumn) columnPair.f0, (Long) columnPair.f1); + } else if (columnPair.f0 instanceof DoubleColumn) { + return FilterApi.lt((DoubleColumn) columnPair.f0, (Double) columnPair.f1); + } else if (columnPair.f0 instanceof FloatColumn) { + return FilterApi.lt((FloatColumn) columnPair.f0, (Float) columnPair.f1); + } + + return null; + } + + @Nullable + private FilterPredicate greaterThanOrEqual(Expression exp, Tuple2 columnPair) { + Preconditions.checkArgument(exp instanceof GreaterThanOrEqual, "exp has to be GreaterThanOrEqual"); + if (columnPair.f0 instanceof IntColumn) { + return FilterApi.gtEq((IntColumn) columnPair.f0, (Integer) columnPair.f1); + } else if (columnPair.f0 instanceof LongColumn) { + return FilterApi.gtEq((LongColumn) columnPair.f0, (Long) columnPair.f1); + } else if (columnPair.f0 instanceof DoubleColumn) { + return FilterApi.gtEq((DoubleColumn) columnPair.f0, (Double) columnPair.f1); + } else if (columnPair.f0 instanceof FloatColumn) { + return FilterApi.gtEq((FloatColumn) columnPair.f0, (Float) columnPair.f1); + } + + return null; + } + + @Nullable + private FilterPredicate lessThanOrEqual(Expression exp, Tuple2 columnPair) { + Preconditions.checkArgument(exp instanceof LessThanOrEqual, "exp has to be LessThanOrEqual"); + if (columnPair.f0 instanceof IntColumn) { + return FilterApi.ltEq((IntColumn) columnPair.f0, (Integer) columnPair.f1); + } else if (columnPair.f0 instanceof LongColumn) { + return FilterApi.ltEq((LongColumn) columnPair.f0, (Long) columnPair.f1); + } else if (columnPair.f0 instanceof DoubleColumn) { + return FilterApi.ltEq((DoubleColumn) columnPair.f0, (Double) columnPair.f1); + } else if (columnPair.f0 instanceof FloatColumn) { + return FilterApi.ltEq((FloatColumn) columnPair.f0, (Float) columnPair.f1); + } + + return null; + } + + private boolean isValid(BinaryComparison comp) { + return (comp.left() instanceof Literal && comp.right() instanceof Attribute) || + (comp.left() instanceof Attribute && comp.right() instanceof Literal); + } + + private boolean literalOnRight(BinaryComparison comp) { + if (comp.left() instanceof Literal && comp.right() instanceof Attribute) { + return false; + } else if (comp.left() instanceof Attribute && comp.right() instanceof Literal) { + return true; + } else { + throw new RuntimeException("Invalid binary comparison."); + } + } + + private TypeInformation getLiteralType(BinaryComparison comp) { + if (literalOnRight(comp)) { + return ((Literal) comp.right()).resultType(); + } else { + return ((Literal) comp.left()).resultType(); + } + } + + private Object getLiteral(BinaryComparison comp) { + if (literalOnRight(comp)) { + return ((Literal) comp.right()).value(); + } else { + return ((Literal) comp.left()).value(); + } + } + + private String getColumnName(BinaryComparison comp) { + if (literalOnRight(comp)) { + return ((Attribute) comp.left()).name(); + } else { + return ((Attribute) comp.right()).name(); + } + } + + @Nullable + private Tuple2 extractColumnAndLiteral(BinaryComparison comp) { + TypeInformation typeInfo = getLiteralType(comp); + String columnName = getColumnName(comp); + + // fetch literal and ensure it is comparable + Object value = getLiteral(comp); + // validate that literal is comparable + if (!(value instanceof Comparable)) { + LOG.warn("Encountered a non-comparable literal of type {}." + + "Cannot push predicate [{}] into ParquetTablesource." + + "This is a bug and should be reported.", value.getClass().getCanonicalName(), comp); + return null; + } + + if (typeInfo == BasicTypeInfo.BYTE_TYPE_INFO || + typeInfo == BasicTypeInfo.SHORT_TYPE_INFO || + typeInfo == BasicTypeInfo.INT_TYPE_INFO) { + return new Tuple2<>(FilterApi.intColumn(columnName), (Integer) value); + } else if (typeInfo == BasicTypeInfo.LONG_TYPE_INFO) { + return new Tuple2<>(FilterApi.longColumn(columnName), (Long) value); + } else if (typeInfo == BasicTypeInfo.FLOAT_TYPE_INFO) { + return new Tuple2<>(FilterApi.floatColumn(columnName), (Float) value); + } else if (typeInfo == BasicTypeInfo.BOOLEAN_TYPE_INFO) { + return new Tuple2<>(FilterApi.booleanColumn(columnName), (Boolean) value); + } else if (typeInfo == BasicTypeInfo.DOUBLE_TYPE_INFO) { + return new Tuple2<>(FilterApi.doubleColumn(columnName), (Double) value); + } else if (typeInfo == BasicTypeInfo.STRING_TYPE_INFO) { + return new Tuple2<>(FilterApi.binaryColumn(columnName), Binary.fromString((String) value)); + } else { + // unsupported type + return null; + } + } + + // Builder + public static Builder builder() { + return new Builder(); + } + + /** + * Constructs an {@link ParquetTableSource}. + */ + public static class Builder { + + private String path; + + private MessageType schema; + + private Configuration config; + + private boolean recursive = true; + + /** + * Sets the path of Parquet files. + * If the path is specifies a directory, it will be recursively enumerated. + * + * @param path the path of the Parquet files. + * @return The Builder + */ + public Builder path(String path) { + Preconditions.checkNotNull(path, "Path must not be null"); + Preconditions.checkArgument(!path.isEmpty(), "Path must not be empty"); + this.path = path; + return this; + } + + /** + * Sets the path of the Parquet files. + * + * @param path The path of the Parquet files + * @param recursive Flag whether to enumerate + * @return The Builder + */ + public Builder path(String path, boolean recursive) { + Preconditions.checkNotNull(path, "Path must not be null"); + Preconditions.checkArgument(!path.isEmpty(), "Path must not be empty"); + this.path = path; + this.recursive = recursive; + return this; + } + + /** + * Sets the Parquet schema of the files to read as a String. + * + * @param parquetSchema The parquet schema of the files to read as a String. + * @return The Builder + */ + public Builder forParquetSchema(MessageType parquetSchema) { + Preconditions.checkNotNull(parquetSchema, "Parquet schema must not be null"); + this.schema = parquetSchema; + return this; + } + + /** + * Sets a Hadoop {@link Configuration} for the Parquet Reader. If no configuration is configured, + * an empty configuration is used. + * + * @param config The Hadoop Configuration for the Parquet reader. + * @return The Builder + */ + public Builder withConfiguration(Configuration config) { + Preconditions.checkNotNull(config, "Configuration must not be null."); + this.config = config; + return this; + } + + /** + * Builds the ParquetTableSource for this builder. + * + * @return The ParquetTableSource for this builder. + */ + public ParquetTableSource build() { + Preconditions.checkNotNull(path, "Path must not be null"); + Preconditions.checkNotNull(schema, "Parquet schema must not be null"); + if (config == null) { + this.config = new Configuration(); + } + + return new ParquetTableSource(this.path, this.schema, this.config, this.recursive); + } + } +} diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java index 35e197761423c..084c0604d8ab8 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java @@ -43,7 +43,7 @@ * Schema converter converts Parquet schema to and from Flink internal types. */ public class ParquetSchemaConverter { - private static final Logger LOGGER = LoggerFactory.getLogger(RowConverter.class); + private static final Logger LOGGER = LoggerFactory.getLogger(ParquetSchemaConverter.class); public static final String MAP_VALUE = "value"; public static final String LIST_ARRAY_TYPE = "array"; public static final String LIST_ELEMENT = "element"; diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetMapInputFormatTest.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetMapInputFormatTest.java index 29111661bcfbf..f36b12c5f3e74 100644 --- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetMapInputFormatTest.java +++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetMapInputFormatTest.java @@ -77,7 +77,7 @@ public void testReadMapFromNestedRecord() throws IOException { List> nestedArray = (List>) map.get("nestedArray"); assertEquals(1, nestedArray.size()); assertEquals("color", nestedArray.get(0).get("type")); - assertEquals("yellow", nestedArray.get(0).get("value")); + assertEquals(1L, nestedArray.get(0).get("value")); } @Test diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTableSourceITCase.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTableSourceITCase.java new file mode 100644 index 0000000000000..d8eba5e6e0e2d --- /dev/null +++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTableSourceITCase.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.core.fs.Path; +import org.apache.flink.formats.parquet.utils.TestUtil; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.java.BatchTableEnvironment; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.types.Row; + +import org.apache.avro.generic.IndexedRecord; +import org.apache.parquet.avro.AvroSchemaConverter; +import org.apache.parquet.schema.MessageType; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Integration tests for {@link ParquetTableSource}. + */ +public class ParquetTableSourceITCase extends MultipleProgramsTestBase { + private static final AvroSchemaConverter SCHEMA_CONVERTER = new AvroSchemaConverter(); + private static Path testPath; + + @ClassRule + public static TemporaryFolder tempRoot = new TemporaryFolder(); + + public ParquetTableSourceITCase() { + super(TestExecutionMode.COLLECTION); + } + + @BeforeClass + public static void setup() throws Exception { + testPath = createTestParquetFile(1000); + } + + @Test + public void testFullScan() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + BatchTableEnvironment batchTableEnvironment = BatchTableEnvironment.create(env); + ParquetTableSource tableSource = createParquetTableSource(testPath); + batchTableEnvironment.registerTableSource("ParquetTable", tableSource); + String query = + "SELECT foo " + + "FROM ParquetTable"; + + Table table = batchTableEnvironment.sqlQuery(query); + DataSet dataSet = batchTableEnvironment.toDataSet(table, Row.class); + List result = dataSet.collect(); + + assertEquals(1000, result.size()); + } + + @Test + public void testScanWithProjectionAndFilter() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + BatchTableEnvironment batchTableEnvironment = BatchTableEnvironment.create(env); + ParquetTableSource tableSource = createParquetTableSource(testPath); + batchTableEnvironment.registerTableSource("ParquetTable", tableSource); + String query = + "SELECT foo " + + "FROM ParquetTable WHERE bar.spam >= 30 AND CARDINALITY(arr) >= 1 AND arr[1] <= 50"; + + Table table = batchTableEnvironment.sqlQuery(query); + DataSet dataSet = batchTableEnvironment.toDataSet(table, Row.class); + List result = dataSet.collect(); + + assertEquals(21, result.size()); + } + + /** + * Create test Parquet table source that reads a test file created by {@link #createTestParquetFile(int)}. + */ + private ParquetTableSource createParquetTableSource(Path path) throws IOException { + MessageType nestedSchema = SCHEMA_CONVERTER.convert(TestUtil.NESTED_SCHEMA); + ParquetTableSource parquetTableSource = ParquetTableSource.builder() + .path(path.getPath()) + .forParquetSchema(nestedSchema) + .build(); + return parquetTableSource; + } + + /** + * Create a test Parquet file with a given number of rows. + */ + private static Path createTestParquetFile(int numberOfRows) throws Exception { + List records = TestUtil.createRecordList(numberOfRows); + Path path = TestUtil.createTempParquetFile(tempRoot.getRoot(), TestUtil.NESTED_SCHEMA, records); + return path; + } +} diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTableSourceTest.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTableSourceTest.java new file mode 100644 index 0000000000000..1e6ebf8fdda86 --- /dev/null +++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTableSourceTest.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet; + +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.operators.DataSource; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.Path; +import org.apache.flink.formats.parquet.utils.TestUtil; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.expressions.EqualTo; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.GetCompositeField; +import org.apache.flink.table.expressions.GreaterThan; +import org.apache.flink.table.expressions.ItemAt; +import org.apache.flink.table.expressions.Literal; +import org.apache.flink.table.expressions.PlannerResolvedFieldReference; +import org.apache.flink.types.Row; + +import org.apache.avro.specific.SpecificRecord; +import org.apache.parquet.avro.AvroSchemaConverter; +import org.apache.parquet.filter2.predicate.FilterApi; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.schema.MessageType; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertTrue; + +/** + * Test cases for {@link ParquetTableSource}. + */ +public class ParquetTableSourceTest extends TestUtil { + private static final AvroSchemaConverter SCHEMA_CONVERTER = new AvroSchemaConverter(); + private static Path testPath; + + @ClassRule + public static TemporaryFolder tempRoot = new TemporaryFolder(); + + @BeforeClass + public static void setup() throws Exception { + testPath = createTestParquetFile(); + } + + @Test + public void testGetReturnType() { + MessageType nestedSchema = SCHEMA_CONVERTER.convert(TestUtil.NESTED_SCHEMA); + ParquetTableSource parquetTableSource = ParquetTableSource.builder() + .path("dummy-path") + .forParquetSchema(nestedSchema) + .build(); + + TypeInformation returnType = parquetTableSource.getReturnType(); + assertNotNull(returnType); + assertTrue(returnType instanceof RowTypeInfo); + RowTypeInfo rowType = (RowTypeInfo) returnType; + assertEquals(NESTED_ROW_TYPE, rowType); + } + + @Test + public void testGetTableSchema() { + MessageType nestedSchema = SCHEMA_CONVERTER.convert(TestUtil.NESTED_SCHEMA); + ParquetTableSource parquetTableSource = ParquetTableSource.builder() + .path("dummy-path") + .forParquetSchema(nestedSchema) + .build(); + + TableSchema schema = parquetTableSource.getTableSchema(); + assertNotNull(schema); + + RowTypeInfo expectedSchema = (RowTypeInfo) NESTED_ROW_TYPE; + assertArrayEquals(expectedSchema.getFieldNames(), schema.getFieldNames()); + assertArrayEquals(expectedSchema.getFieldTypes(), schema.getFieldTypes()); + } + + @Test + public void testFieldsProjection() throws Exception { + ParquetTableSource parquetTableSource = createNestedTestParquetTableSource(testPath); + ParquetTableSource projected = (ParquetTableSource) parquetTableSource.projectFields(new int[] {2, 4, 6}); + + // ensure a new reference is returned + assertNotSame(projected, parquetTableSource); + + // ensure table schema is the same + assertEquals(parquetTableSource.getTableSchema(), projected.getTableSchema()); + + // ensure that table source description differs + assertNotEquals(parquetTableSource.explainSource(), projected.explainSource()); + + String[] fieldNames = ((RowTypeInfo) NESTED_ROW_TYPE).getFieldNames(); + TypeInformation[] fieldTypes = ((RowTypeInfo) NESTED_ROW_TYPE).getFieldTypes(); + assertEquals( + Types.ROW_NAMED( + new String[] {fieldNames[2], fieldNames[4], fieldNames[6]}, + fieldTypes[2], fieldTypes[4], fieldTypes[6] + ), + projected.getReturnType() + ); + + // ensure ParquetInputFormat is configured with selected fields + DataSet data = projected.getDataSet(ExecutionEnvironment.createLocalEnvironment()); + InputFormat inputFormat = ((DataSource) data).getInputFormat(); + assertTrue(inputFormat instanceof ParquetRowInputFormat); + ParquetRowInputFormat parquetIF = (ParquetRowInputFormat) inputFormat; + assertArrayEquals(new String[]{fieldNames[2], fieldNames[4], fieldNames[6]}, parquetIF.getFieldNames()); + assertArrayEquals(new TypeInformation[]{fieldTypes[2], fieldTypes[4], fieldTypes[6]}, parquetIF.getFieldTypes()); + } + + @Test + public void testFieldsFilter() throws Exception { + ParquetTableSource parquetTableSource = createNestedTestParquetTableSource(testPath); + + // expressions for supported predicates + Expression exp1 = new GreaterThan( + new PlannerResolvedFieldReference("foo", Types.LONG), + new Literal(100L, Types.LONG)); + Expression exp2 = new EqualTo( + new Literal(100L, Types.LONG), + new PlannerResolvedFieldReference("bar.spam", Types.LONG)); + + // unsupported predicate + Expression unsupported = new EqualTo( + new GetCompositeField( + new ItemAt( + new PlannerResolvedFieldReference( + "nestedArray", + ObjectArrayTypeInfo.getInfoFor( + Types.ROW_NAMED(new String[] {"type", "name"}, Types.STRING, Types.STRING))), + new Literal(1, Types.INT)), + "type"), + new Literal("test", Types.STRING)); + // invalid predicate + Expression invalidPred = new EqualTo( + new PlannerResolvedFieldReference("nonField", Types.LONG), + // some invalid, non-serializable, literal (here an object of this test class) + new Literal(new ParquetTableSourceTest(), Types.LONG) + ); + + List exps = new ArrayList<>(); + exps.add(exp1); + exps.add(exp2); + exps.add(unsupported); + exps.add(invalidPred); + + // apply predict on TableSource + ParquetTableSource filtered = (ParquetTableSource) parquetTableSource.applyPredicate(exps); + + // ensure copy is returned + assertNotSame(parquetTableSource, filtered); + + // ensure table schema is identical + assertEquals(parquetTableSource.getTableSchema(), filtered.getTableSchema()); + + // ensure return type is identical + assertEquals(NESTED_ROW_TYPE, filtered.getReturnType()); + + // ensure source description is not the same + assertNotEquals(parquetTableSource.explainSource(), filtered.explainSource()); + + // check that pushdown was recorded + assertTrue(filtered.isFilterPushedDown()); + assertFalse(parquetTableSource.isFilterPushedDown()); + + // ensure that supported predicates were removed from list of offered expressions + assertEquals(2, exps.size()); + assertTrue(exps.contains(unsupported)); + assertTrue(exps.contains(invalidPred)); + + // ensure ParquetInputFormat is correctly configured with filter + DataSet data = filtered.getDataSet(ExecutionEnvironment.createLocalEnvironment()); + InputFormat inputFormat = ((DataSource) data).getInputFormat(); + assertTrue(inputFormat instanceof ParquetRowInputFormat); + ParquetRowInputFormat parquetIF = (ParquetRowInputFormat) inputFormat; + + // expected predicate + FilterPredicate a = FilterApi.gt(FilterApi.longColumn("foo"), 100L); + FilterPredicate b = FilterApi.eq(FilterApi.longColumn("bar.spam"), 100L); + FilterPredicate expected = FilterApi.and(a, b); + // actual predicate + FilterPredicate predicate = parquetIF.getPredicate(); + // check predicate + assertEquals(expected, predicate); + } + + private static Path createTestParquetFile() throws Exception { + Tuple3, SpecificRecord, Row> nested = getNestedRecordTestData(); + Path path = createTempParquetFile(tempRoot.getRoot(), NESTED_SCHEMA, + Collections.singletonList(nested.f1)); + return path; + } + + private ParquetTableSource createNestedTestParquetTableSource(Path path) throws Exception { + MessageType nestedSchema = SCHEMA_CONVERTER.convert(NESTED_SCHEMA); + ParquetTableSource parquetTableSource = ParquetTableSource.builder() + .path(path.getPath()) + .forParquetSchema(nestedSchema) + .build(); + return parquetTableSource; + } +} diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetRecordReaderTest.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetRecordReaderTest.java index d021fc3f43fdb..6c7960568d510 100644 --- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetRecordReaderTest.java +++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetRecordReaderTest.java @@ -279,7 +279,7 @@ public void testNestedArrayGroup() throws IOException { Schema arrayItemSchema = nestedArraySchema.getElementType(); GenericRecord item = new GenericRecordBuilder(arrayItemSchema) .set("type", "nested") - .set("value", "nested_value").build(); + .set("value", 1L).build(); ImmutableList.Builder list = ImmutableList.builder(); list.add(item); @@ -310,7 +310,7 @@ public void testNestedArrayGroup() throws IOException { Row nestedRow = (Row) result[0]; assertEquals("nested", nestedRow.getField(0)); - assertEquals("nested_value", nestedRow.getField(1)); + assertEquals(1L, nestedRow.getField(1)); } private Schema unWrapSchema(Schema o) { diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverterTest.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverterTest.java index ce13c8d891e3a..10db6d2fa3315 100644 --- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverterTest.java +++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverterTest.java @@ -18,12 +18,7 @@ package org.apache.flink.formats.parquet.utils; -import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.typeutils.RowTypeInfo; -import org.apache.flink.types.Row; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.OriginalType; @@ -32,7 +27,6 @@ import org.junit.Test; import java.util.Arrays; -import java.util.Map; import static org.junit.Assert.assertEquals; @@ -40,27 +34,6 @@ * Simple test case for conversion between Parquet schema and Flink date types. */ public class ParquetSchemaConverterTest extends TestUtil { - private final TypeInformation simplyRowType = Types.ROW_NAMED(new String[] {"foo", "bar", "arr"}, - BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO); - - private final TypeInformation nestedArray = Types.OBJECT_ARRAY(Types.ROW_NAMED(new String[] {"type", "value"}, - BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)); - - @SuppressWarnings("unchecked") - private final TypeInformation> nestedMap = Types.MAP(BasicTypeInfo.STRING_TYPE_INFO, - Types.ROW_NAMED(new String[] {"type", "value"}, - BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)); - - @SuppressWarnings("unchecked") - private final TypeInformation nestedRowType = Types.ROW_NAMED( - new String[] {"foo", "spamMap", "bar", "arr", "strArray", "nestedMap", "nestedArray"}, - BasicTypeInfo.LONG_TYPE_INFO, - Types.MAP(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), - Types.ROW_NAMED(new String[] {"spam"}, BasicTypeInfo.LONG_TYPE_INFO), - BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO, - BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO, - nestedMap, - nestedArray); private final Type[] simpleStandardTypes = { org.apache.parquet.schema.Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, Type.Repetition.OPTIONAL) @@ -102,8 +75,8 @@ public class ParquetSchemaConverterTest extends TestUtil { org.apache.parquet.schema.Types.optionalGroup().addField(org.apache.parquet.schema.Types.repeatedGroup() .addField(org.apache.parquet.schema.Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, Type.Repetition.REQUIRED) .as(OriginalType.UTF8).named("type")) - .addField(org.apache.parquet.schema.Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, Type.Repetition.REQUIRED) - .as(OriginalType.UTF8).named("value")) + .addField(org.apache.parquet.schema.Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, Type.Repetition.REQUIRED) + .as(OriginalType.INT_64).named("value")) .named("element")).as(OriginalType.LIST) .named("nestedArray") }; @@ -112,25 +85,25 @@ public class ParquetSchemaConverterTest extends TestUtil { public void testSimpleSchemaConversion() { MessageType simpleType = new MessageType("simple", simpleStandardTypes); RowTypeInfo rowTypeInfo = (RowTypeInfo) ParquetSchemaConverter.fromParquetType(simpleType); - assertEquals(simplyRowType, rowTypeInfo); + assertEquals(SIMPLE_ROW_TYPE, rowTypeInfo); } @Test public void testNestedSchemaConversion() { MessageType nestedTypes = new MessageType("nested", this.nestedTypes); RowTypeInfo rowTypeInfo = (RowTypeInfo) ParquetSchemaConverter.fromParquetType(nestedTypes); - assertEquals(nestedRowType, rowTypeInfo); + assertEquals(NESTED_ROW_TYPE, rowTypeInfo); } @Test public void testSimpleRowTypeConversion() { - MessageType simpleSchema = ParquetSchemaConverter.toParquetType(simplyRowType, true); + MessageType simpleSchema = ParquetSchemaConverter.toParquetType(SIMPLE_ROW_TYPE, true); assertEquals(Arrays.asList(simpleStandardTypes), simpleSchema.getFields()); } @Test public void testNestedRowTypeConversion() { - MessageType nestedSchema = ParquetSchemaConverter.toParquetType(nestedRowType, true); + MessageType nestedSchema = ParquetSchemaConverter.toParquetType(NESTED_ROW_TYPE, true); assertEquals(Arrays.asList(nestedTypes), nestedSchema.getFields()); } } diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/TestUtil.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/TestUtil.java index ed640419c1616..6b5cf2a56359c 100644 --- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/TestUtil.java +++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/TestUtil.java @@ -19,6 +19,10 @@ package org.apache.flink.formats.parquet.utils; import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.core.fs.Path; import org.apache.flink.formats.parquet.generated.ArrayItem; @@ -52,11 +56,33 @@ * Utilities for testing schema conversion and test parquet file creation. */ public class TestUtil { + private static final TypeInformation nestedArray = Types.OBJECT_ARRAY(Types.ROW_NAMED( + new String[] {"type", "value"}, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO)); + + @SuppressWarnings("unchecked") + private static final TypeInformation> nestedMap = Types.MAP(BasicTypeInfo.STRING_TYPE_INFO, + Types.ROW_NAMED(new String[] {"type", "value"}, + BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)); + @ClassRule public static TemporaryFolder tempRoot = new TemporaryFolder(); public static final Schema NESTED_SCHEMA = getTestSchema("nested.avsc"); public static final Schema SIMPLE_SCHEMA = getTestSchema("simple.avsc"); + public static final TypeInformation SIMPLE_ROW_TYPE = Types.ROW_NAMED(new String[] {"foo", "bar", "arr"}, + BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO); + + @SuppressWarnings("unchecked") + public static final TypeInformation NESTED_ROW_TYPE = Types.ROW_NAMED( + new String[] {"foo", "spamMap", "bar", "arr", "strArray", "nestedMap", "nestedArray"}, + BasicTypeInfo.LONG_TYPE_INFO, + Types.MAP(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), + Types.ROW_NAMED(new String[] {"spam"}, BasicTypeInfo.LONG_TYPE_INFO), + BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO, + BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO, + nestedMap, + nestedArray); + public static Path createTempParquetFile(File folder, Schema schema, List records) throws IOException { Path path = new Path(folder.getPath(), UUID.randomUUID().toString()); ParquetWriter writer = AvroParquetWriter.builder( @@ -96,7 +122,7 @@ public static Tuple3, SpecificRecord, Row> getNe final ArrayItem arrayItem = ArrayItem.newBuilder() .setType("color") - .setValue("yellow").build(); + .setValue(1L).build(); final MapItem mapItem = MapItem.newBuilder() .setType("map") @@ -129,7 +155,7 @@ public static Tuple3, SpecificRecord, Row> getNe final Row arrayItemRow = new Row(2); arrayItemRow.setField(0, "color"); - arrayItemRow.setField(1, "yellow"); + arrayItemRow.setField(1, 1L); final Row mapItemRow = new Row(2); mapItemRow.setField(0, "map"); @@ -154,6 +180,48 @@ public static Tuple3, SpecificRecord, Row> getNe return t; } + /** + * Create a list of NestedRecord with the NESTED_SCHEMA. + */ + public static List createRecordList(long numberOfRows) { + List records = new ArrayList<>(0); + for (long i = 0; i < numberOfRows; i++) { + final Bar bar = Bar.newBuilder() + .setSpam(i).build(); + + final ArrayItem arrayItem = ArrayItem.newBuilder() + .setType("color") + .setValue(i).build(); + + final MapItem mapItem = MapItem.newBuilder() + .setType("map") + .setValue("hashMap").build(); + + List nestedArray = new ArrayList<>(); + nestedArray.add(arrayItem); + + Map nestedMap = new HashMap<>(); + nestedMap.put("mapItem", mapItem); + + List longArray = new ArrayList<>(); + longArray.add(i); + + List stringArray = new ArrayList<>(); + stringArray.add("String"); + + final NestedRecord nestedRecord = NestedRecord.newBuilder() + .setBar(bar) + .setNestedArray(nestedArray) + .setStrArray(stringArray) + .setNestedMap(nestedMap) + .setArr(longArray).build(); + + records.add(nestedRecord); + } + + return records; + } + public static RuntimeContext getMockRuntimeContext() { RuntimeContext mockContext = Mockito.mock(RuntimeContext.class); Mockito.doReturn(UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup()) diff --git a/flink-formats/flink-parquet/src/test/resources/avro/nested.avsc b/flink-formats/flink-parquet/src/test/resources/avro/nested.avsc index 2517c6194334b..eb60752d8c150 100644 --- a/flink-formats/flink-parquet/src/test/resources/avro/nested.avsc +++ b/flink-formats/flink-parquet/src/test/resources/avro/nested.avsc @@ -26,7 +26,7 @@ "name": "ArrayItem", "fields": [ {"name": "type", "type": "string"}, - {"name": "value", "type": "string"}]} + {"name": "value", "type": "long"}]} }] } ],