Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-12087][table-runtime-blink] Introduce over window operators to blink batch #8102

Merged
merged 2 commits into from
Apr 10, 2019

Conversation

JingsongLi
Copy link
Contributor

What is the purpose of the change

Introduce NonBufferOverWindowOperator: Some over windows do not need to buffer data, such as rank, rows between unbounded preceding and 0, etc. We introduce NonBufferOverWindowOperator to reduce the overhead of data copy in buffer.

Introduce BufferDataOverWindowOperator and OverWindowFrame: 1. Minimize duplicate computation in various OverWindowFrame implementations. 2. An OverWindowOperator can compute multiple window frames.

Verifying this change

ut

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (JavaDocs)

@flinkbot
Copy link
Collaborator

flinkbot commented Apr 3, 2019

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@JingsongLi
Copy link
Contributor Author

Thanks @KurtYoung for reviewing, I changed method name and add comments.

@JingsongLi
Copy link
Contributor Author

@KurtYoung
I think that even with RowSlidingOverFrame and RangeSlidingOverFrame, still need a interface like compare(BaseRow inputRow, int inputIndex, BaseRow currentRow, int currentIndex) for code reuse, so I prefer to keep BoundComparator and not to introduce RowXXFrame and RangeXXFrame.
I add the default implements RowBoundComparator and RangeBoundComparator to better understand BoundComparator.

@KurtYoung
Copy link
Contributor

Why RowSlidingOverFrame needs a comparator class?

@JingsongLi
Copy link
Contributor Author

Why RowSlidingOverFrame needs a comparator class?

Don't need a comparator class, but need a compare method to reuse code in BaseSlidingOverFrame.

@KurtYoung
Copy link
Contributor

KurtYoung commented Apr 8, 2019

Isn't (currentIndex - precedingCount, currentIndex + followingCount) enough for RowSlidingOverFrame?

@JingsongLi
Copy link
Contributor Author

JingsongLi commented Apr 8, 2019

Consider a new BaseSlidingOverFrame:
It has two abstract method:
1.public abstract boolean isLeftBoundRowInWindow(BaseRow inputRow, int inputIndex, BaseRow currentRow, int currentIndex);
2.public abstract boolean isRightBoundRowInWindow(BaseRow inputRow, int inputIndex, BaseRow currentRow, int currentIndex);
In process, it will invoke isLeftBoundRowInWindow and isRightBoundRowInWindow to determine boundaries.

In RowSlidingOverFrame:

	@Override
	public boolean isLeftBoundRowInWindow(BaseRow inputRow, int inputIndex, BaseRow currentRow,
			int currentIndex) {
		return inputIndex >= currentIndex + leftOffset;
	}

	@Override
	public boolean isRightBoundRowInWindow(BaseRow inputRow, int inputIndex, BaseRow currentRow,
			int currentIndex) {
		return inputIndex <= currentIndex + rightOffset;
	}

In RangeSlidingOverFrame:

	@Override
	public boolean isLeftBoundRowInWindow(BaseRow inputRow, int inputIndex, BaseRow currentRow,
			int currentIndex) {
		return lbound.compare(inputRow, currentRow) >= 0;
	}

	@Override
	public boolean isRightBoundRowInWindow(BaseRow inputRow, int inputIndex, BaseRow currentRow,
			int currentIndex) {
		return rbound.compare(inputRow, currentRow) <= 0;
	}

I mean the isLeftBoundRowInWindow and isRightBoundRowInWindow make thing more complicated. Still has args: (BaseRow inputRow, int inputIndex, BaseRow currentRow, int currentIndex).

@KurtYoung
Copy link
Contributor

I think the problem is the methods you proposed: isLeftBoundRowInWindow and isRightBoundRowInWindow. In RowSlidingOverFrame, there is no need to iterate rows preceding and following, and check whether it belongs to current frame one by one. As I said before, you can immediately determine the left and right boundary with currentIndex - precedingCount, currentIndex + followingCount

@JingsongLi
Copy link
Contributor Author

JingsongLi commented Apr 8, 2019

As I said before, you can immediately determine the left and right boundary with currentIndex - precedingCount, currentIndex + followingCount

I think we don't need this optimization now.
In RowSlidingFrame, we don't need immediately determine boundary. Because it must be head boundary plus 1 and end boundary plus 1, Even if change to immediately determine boundary. The algorithm complexity is the same.
See more detail in SlidingOverFrame.process, next current row computation will reuse previous buffer.

@KurtYoung
Copy link
Contributor

I don't think it's an optimization. I would say it's intuition every reader will have in the first place.

@JingsongLi
Copy link
Contributor Author

I try implement it like "immediately determine the left and right boundary", but even if we immediately determine the boundary, we need take the record one by one for calculation, so the final implementation is basically the same as the current one.
You can give me more details about a better understanding implement.

Copy link
Contributor

@KurtYoung KurtYoung left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm totally confused by your current implementation (some codes + comments), if you are sure it's correct, let's discuss it tomorrow offline.

Copy link
Contributor

@KurtYoung KurtYoung left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@KurtYoung KurtYoung merged commit 91221d6 into apache:master Apr 10, 2019
HuangZhenQiu pushed a commit to HuangZhenQiu/flink that referenced this pull request Apr 22, 2019
sunhaibotb pushed a commit to sunhaibotb/flink that referenced this pull request May 8, 2019
tianchen92 pushed a commit to tianchen92/flink that referenced this pull request May 13, 2019
@JingsongLi JingsongLi deleted the over branch June 11, 2019 01:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants