Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-11854] [table-planner-blink] Introduce batch physical nodes #7931

Merged
merged 5 commits into from
Mar 11, 2019

Conversation

godfreyhe
Copy link
Contributor

@godfreyhe godfreyhe commented Mar 7, 2019

What is the purpose of the change

Introduce batch physical nodes

Brief change log

  • adds all batch physical nodes, excludes BatchExecCorrelate, BatchExecTemporalTableJoin, BatchExecXXXWindowAggregate

Verifying this change

This change is an initialization work without any test coverage.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (not documented)

@flinkbot
Copy link
Collaborator

flinkbot commented Mar 7, 2019

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

Copy link
Contributor

@KurtYoung KurtYoung left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this, i left some comments

}

public static boolean isMutable(InternalType type) {
return MUTABLE_FIELD_TYPES.contains(type) || type instanceof DecimalType;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we also need to check Decimal's precision just like isFixedLength does?

Copy link
Contributor

@JingsongLi JingsongLi Mar 8, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No matter how precision is, it is mutable.
Precision is compact, Decimal is stored in 8 bytes in fix-length-part.
Precision is not compact, Decimal is stored in 16 bytes in var-length-part.
It can always be update.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

method isFixedLength should be isInFixedLengthPart

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok,

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just add the dicimal type to the MUTABLE_FIELD_TYPES, and give some explainations

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dicimal type can not be added into MUTABLE_FIELD_TYPES. Because its instance is not unique, but it has different instances for different precision or scale value.

*/
object ExpandUtil {

def projectsToString(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we try to move all methods like these to one util class? Various logical or physical operators will share some name formatting utilities. There will be a lot repetition if we organize it by operator.

/**
* Utility methods for RelNode.
*/
object RelNodeUtil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this utility class should focus on operator name formatting, things like compute cost should be operator specific

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, this class name is also too big

@godfreyhe
Copy link
Contributor Author

@KurtYoung @JingsongLi I have updated this pr based on the comments.

@KurtYoung
Copy link
Contributor

Looks like there are some compilation errors

extends Calc(cluster, traitSet, input, calcProgram)
with FlinkRelNode {


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete extra blank line

.itemIf("where", conditionToString(), calcProgram.getCondition != null)
}

protected def conditionToString(): String = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this to RelExplainUtil?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method is only used in this class

}
}

protected def projectionToString(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

.item("distribution", distributionToString())
}

private def distributionToString(): String = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

length
}

def computeSortMemory(mq: RelMetadataQuery, inputOfSort: RelNode): Double = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we should place this method here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can move binaryRowAverageSize and computeSortMemory methods into FlinkRelMdUtil class

}

public static boolean isMutable(InternalType type) {
return MUTABLE_FIELD_TYPES.contains(type) || type instanceof DecimalType;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just add the dicimal type to the MUTABLE_FIELD_TYPES, and give some explainations

Copy link
Contributor Author

@godfreyhe godfreyhe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@KurtYoung I have updated this pr.

@KurtYoung
Copy link
Contributor

LGTM, merging..

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants