-
Notifications
You must be signed in to change notification settings - Fork 13.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-15840][table-planner-blink] ClassCastException is thrown when use tEnv.from for temp/catalog table #10989
Conversation
…use tEnv.from for temp/catalog table under Blink planner
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 65c128c (Sat Feb 01 08:23:51 UTC 2020) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
CI report:
Bot commandsThe @flinkbot bot supports the following commands:
|
I find that Calcite I think in this way, watermark and computed column on Table API can also be supported in the future. |
Hi @wuchong , you can take a look to
And I don't think this is a good way. In 1.11, there is another plan to support computed column in table api, as I said above, it is considered to separate the compute column translation and complete translation in the parser stage. |
Hi @JingsongLi , I'm not sure whether it works to separate the computed column translation to parser stage, because adding a watermark node requires the I didn't mean to create a |
Hi @wuchong , you can take a look to my solution 1, I think that is what are you mean. We don't need I wanted to do that way. But discussed with @danny0405 offline:
|
Updated description. |
Thanks @JingsongLi , according to the updated description. I'm in favor of option#1 which passes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updating @JingsongLi , I left some comments.
Besides, I think we can remove the FlinkPlannerImpl#createSqlExprToRexConverter
method, because it should be get via PlannerContext
now.
...lanner-blink/src/main/java/org/apache/flink/table/planner/calcite/ToRexConverterFactory.java
Outdated
Show resolved
Hide resolved
...anner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.scala
Outdated
Show resolved
Hide resolved
...lanner-blink/src/main/java/org/apache/flink/table/planner/calcite/ToRexConverterFactory.java
Outdated
Show resolved
Hide resolved
...le-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java
Outdated
Show resolved
Hide resolved
...le-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java
Outdated
Show resolved
Hide resolved
...le-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java
Show resolved
Hide resolved
private final FlinkContext context; | ||
private final CalciteSchema rootSchema; | ||
private final List<RelTraitDef> traitDefs; | ||
private FrameworkConfig frameworkConfig; | ||
private RelOptCluster cluster; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can these member fields to be final
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a factory refer these fields before their initialization. According to the semantics of Java, it needs to be non final.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can have a private SqlExprToRexConverter createSqlExprToRexConverter(RelDataType rowType)
method and pass this::createSqlExprToRexConverter
to the FlinkContextImpl
, then these fields can be final
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not favor of it.
Pass this::createSqlExprToRexConverter
to the FlinkContextImpl
means we pass an unfinished instance to it. Actually frameworkConfig
and cluster
is not the real final to FlinkContextImpl
, they will become real instance from null.
So this feel like bypassing the final
rule of Java.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code is a little messy, FrameworkConfig
reference the cluster
, while the cluster
context have a member that needs a FrameworkConfig
and the cluster
itself ...
We may need to re-think if the SqlExprToRexConverter
should belong to the cluster
context, can we just pass in the FrameworkConfig
into the context and constructing the SqlExprToRexConverter
when we really need that (i.e. the CatalogSourceTable
)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @danny0405 , The constructor of FrameworkConfig
need cluster context.
Context need a factory which need FrameworkConfig
. IMO, relationship like this is very common. That's one of the reasons we need the factory model.
I admit we can improve it, but if it's a big refactoring, it's unnecessary, because this PR is just an urgent bug fixing.
Of course, if you have a better and slight improvement plan, you can describe it detailed.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After some research of the code, i found another solution, that is:
- Modify the FlinkRelBuilder to add a new method
scan(Iterable<String> tableNames, ViewExpander viewExpander)
, then in the method, fetch theRelOptTable
first and then do the logic same withTableScanRule
- In the
QueryOperationConverter
, use the newscan
method to translate the table - No other code change is needed
We do not need to pass the SqlToExprConverter
around through the FlinkContext
because the it does not belong there, the computed column/watermark translation should happen as early as possible and not in planner rules.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @wuchong , Made PlannerContext fields final.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After discussing with @danny0405 offline, We can improve this to finish solution#3 in 1.11.
@@ -80,12 +81,17 @@ class BatchCommonSubGraphBasedOptimizer(planner: BatchPlanner) | |||
.getOrElse(FlinkBatchProgram.buildProgram(config.getConfiguration)) | |||
Preconditions.checkNotNull(programs) | |||
|
|||
val context = relNode.getCluster.getPlanner.getContext.unwrap(classOf[FlinkContext]) | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can inline this code with line 92 ~ 94
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not inline because:
I think it is better to get all fields from this context instead of getting from planner.
BatchOptimizeContext
should extends previous context.
Not do it because 1.10.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the great work @JingsongLi !
LGTM and waiting for the travis.
…nv.from for temp/catalog table (apache#10989)
…nv.from for temp/catalog table (#10989)
…nv.from for temp/catalog table (apache#10989)
What is the purpose of the change
TableEnvironment.from/scan(string path)
cannot be used for all temporaryTable and CatalogTable (not DataStreamTable and ConnectorCatalogTable). Of course, it can be bypassed byTableEnvironment.sqlQuery("select * from t")
, butfrom/scan
are very important api of TableEnvironment and pure TableApi can't be used seriously.Brief change log
Bug 1
The problem is that CatalogSourceTable.toRel wants to get the translator of the compute column. At present, CatalogSourceTable.toRe has two places to call:
There are three solutions:
Bug 2
Another bug is CatalogSourceTable need override explainSourceAsString. Otherwise hep rule optimizer can not distinguish CatalogSourceTable and TableSourceTable.
Verifying this change
TableScanTest
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation