-
Notifications
You must be signed in to change notification settings - Fork 13.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-12098] [table-planner-blink] Add support for generating optimized logical plan for simple group aggregate on stream #8110
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
import java.sql.{Date, Time, Timestamp} | ||
|
||
/** The initial accumulator for Max with retraction aggregate function */ | ||
class MaxWithRetractAccumulator[T] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make this Java?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, should implements MinWithRetractAggFunctionTest as java in this PR?
import java.sql.{Date, Time, Timestamp} | ||
|
||
/** The initial accumulator for Min with retraction aggregate function */ | ||
class MinWithRetractAccumulator[T] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make this Java?
groupSize: Int, | ||
needRetraction: Boolean, | ||
aggs: Seq[AggregateCall]): Array[Boolean] = { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
delete blank line
inputRowType: RelDataType, | ||
groupSet: Array[Int], | ||
typeFactory: FlinkTypeFactory): RelDataType = { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
delete blank line
* Derives accumulators names from aggregate | ||
*/ | ||
def inferAggAccumulatorNames(aggInfoList: AggregateInfoList): Array[String] = { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
delete blank line
/** The initial accumulator for Max with retraction aggregate function. */ | ||
public static class MaxWithRetractAccumulator<T> { | ||
public T max; | ||
public Long distinctCount; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just use mapSize
? the name is a little bit confusing with distinct agg
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
@Override | ||
public MaxWithRetractAccumulator<T> createAccumulator() { | ||
MaxWithRetractAccumulator<T> acc = new MaxWithRetractAccumulator<>(); | ||
acc.max = getInitValue(); // max |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just use NULL
to represent init value for all sub-classes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I does not change any logic when porting to java.
for java, using NULL as init value is ok.
hasMax = true; | ||
} | ||
} | ||
if (!hasMax) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why would this happen?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The behavior of deleting expired data in the state backend is uncertain.
so mapSize
data may exist while map
data may have been deleted when both of them are expired.
I have added some comments.
f980267
to
bf9ee8c
Compare
…zed logical plan for simple group aggregate on stream
… java, and move agg functions to org.apache.flink.table.functions.aggfunctions
fix NPE in MaxWithRetractAggFunction and MinWithRetractAggFunction
…ctAggFunction from String to BinaryString
LGTM, +1 to merge |
…ed logical plan for simple group aggregate on stream (apache#8110)
…ed logical plan for simple group aggregate on stream (apache#8110)
…ed logical plan for simple group aggregate on stream (apache#8110)
What is the purpose of the change
Add support for generating optimized logical plan for simple group aggregate on stream
Brief change log
Verifying this change
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: ( no)Documentation