-
Notifications
You must be signed in to change notification settings - Fork 13.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-7244] Add parquet table source #8064
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit b793cb3 (Fri Aug 23 10:13:21 UTC 2019) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @HuangZhenQiu,
Thanks for the PR!
I've added a first round of comments but didn't have a look at the tests yet.
Will do that in the next days.
Thanks, Fabian
...formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetTableSource.java
Show resolved
Hide resolved
// the configuration to read the file | ||
private final Configuration parquetConfig; | ||
|
||
// type information of the data returned by the InputFormat |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indent with tab
...formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetTableSource.java
Show resolved
Hide resolved
...s/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTableSourceITCase.java
Outdated
Show resolved
Hide resolved
private final RowTypeInfo typeInfo; | ||
// list of selected Parquet fields to return | ||
private final int[] selectedFields; | ||
// list of predicates to apply |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change to predicate expressions to apply
since it's not really a list?
...formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetTableSource.java
Outdated
Show resolved
Hide resolved
} | ||
} | ||
} else if (exp instanceof BinaryExpression) { | ||
FilterPredicate c1 = toParquetPredicate(((Or) exp).left()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exp
was only checked to be a BinaryExpression
so it could also be an And
. However, we know that we receive the predicates in CNF, so there should be no And
s. I think we can check for Or
instead of And
before.
} else { | ||
if (exp instanceof Or) { | ||
return FilterApi.or(c1, c2); | ||
} else if (exp instanceof And) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there should be no And
s because we get the predicates in CNF
...formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetTableSource.java
Show resolved
Hide resolved
...formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetTableSource.java
Show resolved
Hide resolved
518ed2a
to
e49b8c7
Compare
@fhueske |
Sorry, I was out for a few days vacation. Will continue with my review soon. |
e72ed37
to
ff12d2d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your patience and the update @HuangZhenQiu!
I left a couple of comments.
Best, Fabian
|
||
@Override | ||
public boolean isFilterPushedDown() { | ||
return predicate != null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can set a flag when applyPredicate()
was called and return it here. This avoids multiple calls of applyPredicate()
if no predicate can be successfully converted.
} else if (exp instanceof BinaryExpression) { | ||
if (exp instanceof And) { | ||
LOG.debug("All of the predicates should be in CNF. Found an AND expression.", exp); | ||
} else if (exp instanceof Or){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add space between ){
if (c1 == null || c2 == null) { | ||
return null; | ||
} else { | ||
if (exp instanceof Or) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this condition can be removed. We already checked that exp instanceof Or
} else { | ||
if (exp instanceof Or) { | ||
return FilterApi.or(c1, c2); | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This else
branch should be moved one condition outside.
|
||
// ensure table schema is the same | ||
assertEquals(parquetTableSource.getTableSchema(), projected.getTableSchema()); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check that the source description is different (!parquetTableSource.explainSource.equals(projected.explainSource())
)
|
||
@Override | ||
public String explainSource() { | ||
return "ParquetFile[path=" + path + ", schema=" + parquetSchema + ", filter=" + predicateString() + "]"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Include the projected fields in the source description. Flink checks whether the project pushdown worked by comparing the descriptions. If both are the same, the rule is not applied.
|
||
private ParquetTableSource createNestedTestParquetTableSource() throws Exception { | ||
Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> nested = TestUtil.getNestedRecordTestData(); | ||
Path path = TestUtil.createTempParquetFile(tempRoot.getRoot(), TestUtil.NESTED_SCHEMA, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to create temp files here.
...ats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTableSourceTest.java
Show resolved
Hide resolved
|
||
// ensure table schema is identical | ||
assertEquals(parquetTableSource.getTableSchema(), projected.getTableSchema()); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check that the pushed down predicates were removed from exps
.
bad6b89
to
e87a4d6
Compare
e87a4d6
to
b793cb3
Compare
@fhueske I resolve all of the comments, except the last one. Not sure why "the pushed down predicates should be removed from exps. |
Hi @HuangZhenQiu, the predicates should be removed to let the optimizer know that these are handled by the table source. If they are not removed, the optimizer will generate code to evaluate the predicates which was already done by the table source. I'll check the PR now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update @HuangZhenQiu!
The PR looks very good.
There are a few spots where we should respect the new code style guide, but I'll fix those and merge the PR.
Cheers, Fabian
// type information of the data returned by the InputFormat | ||
private final RowTypeInfo typeInfo; | ||
// list of selected Parquet fields to return | ||
private final int[] selectedFields; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add @Nullable
annotation as required by the new code style guidelines (apache/flink-web#224)
// list of selected Parquet fields to return | ||
private final int[] selectedFields; | ||
// predicate expression to apply | ||
private final FilterPredicate predicate; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add @Nullable
annotation
} | ||
|
||
private ParquetTableSource(String path, MessageType parquetSchema, Configuration configuration, | ||
boolean recursiveEnumeration, int[] selectedFields, FilterPredicate predicate) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add @Nullable
annotation to selectedFields
and predicate
/** | ||
* Converts flink Expression to parquet FilterPredicate. | ||
*/ | ||
private FilterPredicate toParquetPredicate(Expression exp) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add @Nullable
annotation
); | ||
|
||
// ensure ParquetInputFormat is configured with selected fields | ||
ParquetTableSource spyPTS = spy(projected); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
our new code style guidelines discourage the use of Mockito.
* </pre> | ||
*/ | ||
public class ParquetTableSource | ||
implements BatchTableSource<Row>, FilterableTableSource<Row>, ProjectableTableSource<Row> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new Blink query optimizer and execution is based on the InputFormatTableSource
abstract class. We can simply implement both, BatchTableSource
and InputFormatTableSource
to support both of Flink's SQL engines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, seems that we would need to add a dependency on flink-streaming
for this. I'll keep it as it is for now.
This closes apache#8064.
What is the purpose of the change
Add projectable and filterable ParquetTableSource based on ParquetInputFormat
Brief change log
Verifying this change
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (no)Documentation