Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-15840][table-planner-blink] ClassCastException is thrown when use tEnv.from for temp/catalog table #10989

Merged
merged 5 commits into from
Feb 3, 2020

Conversation

JingsongLi
Copy link
Contributor

@JingsongLi JingsongLi commented Feb 1, 2020

What is the purpose of the change

TableEnvironment.from/scan(string path) cannot be used for all temporaryTable and CatalogTable (not DataStreamTable and ConnectorCatalogTable). Of course, it can be bypassed by TableEnvironment.sqlQuery("select * from t"), but from/scan are very important api of TableEnvironment and pure TableApi can't be used seriously.

Brief change log

Bug 1

The problem is that CatalogSourceTable.toRel wants to get the translator of the compute column. At present, CatalogSourceTable.toRe has two places to call:

  1. The parser stage, which passes the correct compute column translator.
  2. In the rule optimization stage, the correct compute column translator is not passed in the TableScanRule, so an error is reported.

There are three solutions:

  1. Don't use ToRelContext to transfer the translators of compute column. Use FlinkContext to transfer so that we can get the correct translators of compute columns at any stage.
  2. In CatalogSourceTable.toRel, when there is a compute column and the compute column translator cannot be obtained, an error is reported, and other cases pass normally. The disadvantage of this is that it is currently unable to support compute columns on TableApi.
  3. @danny0405 suggestion, In the next version, separate the computed column and watermark translation to parser stage. But this need calcite interface modification too. No guarantee.

Bug 2

Another bug is CatalogSourceTable need override explainSourceAsString. Otherwise hep rule optimizer can not distinguish CatalogSourceTable and TableSourceTable.

Verifying this change

TableScanTest

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)

…use tEnv.from for temp/catalog table under Blink planner
@flinkbot
Copy link
Collaborator

flinkbot commented Feb 1, 2020

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 65c128c (Sat Feb 01 08:23:51 UTC 2020)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Feb 1, 2020

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build
  • @flinkbot run azure re-run the last Azure build

@wuchong
Copy link
Member

wuchong commented Feb 1, 2020

I find that Calcite TableScanRule uses ViewExpanders.simpleContext which isn't compliant with CatalogSourceTable. Is that possible to provide a new TableScanRule with the proper FlinkToRelContext? cc @danny0405

I think in this way, watermark and computed column on Table API can also be supported in the future.

@JingsongLi
Copy link
Contributor Author

Hi @wuchong , you can take a look to FlinkPlannerImpl, it needs many things:

config: FrameworkConfig,
catalogReaderSupplier: JFunction[JBoolean, CalciteCatalogReader],
typeFactory: FlinkTypeFactory,
cluster: RelOptCluster

TableScanRule also can not find these arguments.

And I don't think this is a good way. FlinkToRelContext is bad idea to me. Just fix TableScanRule looks like a very temporary fix, and we don't know if there is another ToRelContext.

In 1.11, there is another plan to support computed column in table api, as I said above, it is considered to separate the compute column translation and complete translation in the parser stage.

@wuchong
Copy link
Member

wuchong commented Feb 2, 2020

Hi @JingsongLi , I'm not sure whether it works to separate the computed column translation to parser stage, because adding a watermark node requires the FlinkToRelContext too and it has to happen in CatalogSourceTable#toRel.

I didn't mean to create a FlinkPlannerImpl in the rule, I mean we can pass the FlinkToRelContext to the rule via FlinkContext.

@JingsongLi
Copy link
Contributor Author

Hi @wuchong , you can take a look to my solution 1, I think that is what are you mean. We don't need FlinkToRelContext any more, we can get FlinkPlannerImpl directly from FlinkContext. As I said, I think FlinkToRelContext is a bad way.

I wanted to do that way.

But discussed with @danny0405 offline:

  • Support computed column and watermark(I don't think there are some difference between computed column and watermark) in table api is not the goal of 1.10.
  • "Separate the computed column translation to parser stage" is his plan to do in 1.11.

@JingsongLi
Copy link
Contributor Author

Updated description.

@wuchong
Copy link
Member

wuchong commented Feb 2, 2020

Thanks @JingsongLi , according to the updated description. I'm in favor of option#1 which passes FlinkPlannerImpl via context, because it is a true fixing not a temporary workaround and gives us the ability to extend features in the future. Btw, please try to expose a small interface, rather than a whole FlinkPlannerImpl.

Copy link
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updating @JingsongLi , I left some comments.

Besides, I think we can remove the FlinkPlannerImpl#createSqlExprToRexConverter method, because it should be get via PlannerContext now.

private final FlinkContext context;
private final CalciteSchema rootSchema;
private final List<RelTraitDef> traitDefs;
private FrameworkConfig frameworkConfig;
private RelOptCluster cluster;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can these member fields to be final?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a factory refer these fields before their initialization. According to the semantics of Java, it needs to be non final.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can have a private SqlExprToRexConverter createSqlExprToRexConverter(RelDataType rowType) method and pass this::createSqlExprToRexConverter to the FlinkContextImpl, then these fields can be final.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not favor of it.
Pass this::createSqlExprToRexConverter to the FlinkContextImpl means we pass an unfinished instance to it. Actually frameworkConfig and cluster is not the real final to FlinkContextImpl, they will become real instance from null.
So this feel like bypassing the final rule of Java.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code is a little messy, FrameworkConfig reference the cluster, while the cluster context have a member that needs a FrameworkConfig and the cluster itself ...

We may need to re-think if the SqlExprToRexConverter should belong to the cluster context, can we just pass in the FrameworkConfig into the context and constructing the SqlExprToRexConverter when we really need that (i.e. the CatalogSourceTable)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @danny0405 , The constructor of FrameworkConfig need cluster context.

Context need a factory which need FrameworkConfig. IMO, relationship like this is very common. That's one of the reasons we need the factory model.
I admit we can improve it, but if it's a big refactoring, it's unnecessary, because this PR is just an urgent bug fixing.
Of course, if you have a better and slight improvement plan, you can describe it detailed.

What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After some research of the code, i found another solution, that is:

  • Modify the FlinkRelBuilder to add a new method scan(Iterable<String> tableNames, ViewExpander viewExpander), then in the method, fetch the RelOptTable first and then do the logic same with TableScanRule
  • In the QueryOperationConverter, use the new scan method to translate the table
  • No other code change is needed

We do not need to pass the SqlToExprConverter around through the FlinkContext because the it does not belong there, the computed column/watermark translation should happen as early as possible and not in planner rules.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @wuchong , Made PlannerContext fields final.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After discussing with @danny0405 offline, We can improve this to finish solution#3 in 1.11.

@@ -80,12 +81,17 @@ class BatchCommonSubGraphBasedOptimizer(planner: BatchPlanner)
.getOrElse(FlinkBatchProgram.buildProgram(config.getConfiguration))
Preconditions.checkNotNull(programs)

val context = relNode.getCluster.getPlanner.getContext.unwrap(classOf[FlinkContext])

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can inline this code with line 92 ~ 94

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not inline because:
I think it is better to get all fields from this context instead of getting from planner.
BatchOptimizeContext should extends previous context.
Not do it because 1.10.

Copy link
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the great work @JingsongLi !

LGTM and waiting for the travis.

@wuchong wuchong merged commit 183c8d1 into apache:master Feb 3, 2020
wuchong added a commit to wuchong/flink that referenced this pull request Feb 3, 2020
wuchong added a commit that referenced this pull request Feb 3, 2020
JTaky pushed a commit to JTaky/flink that referenced this pull request Feb 20, 2020
@JingsongLi JingsongLi deleted the toRel branch April 26, 2020 05:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants