Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-28663][runtime] Allow multiple downstream consumer job vertices sharing the same intermediate dataset at scheduler side #20350

Merged
merged 2 commits into from
Aug 8, 2022

Conversation

wsry
Copy link
Contributor

@wsry wsry commented Jul 25, 2022

What is the purpose of the change

Currently, one intermediate dataset can only be consumed by one downstream consumer vertex. If there are multiple consumer vertices consuming the same output of the same upstream vertex, multiple intermediate datasets will be produced. We can optimize this behavior to produce only one intermediate dataset which can be shared by multiple consumer vertices. As the first step, we should allow multiple downstream consumer job vertices sharing the same intermediate dataset at scheduler side. (Note that this optimization only works for blocking shuffle because pipelined shuffle result partition can not be consumed multiple times)

Brief change log

  • Allow multiple downstream consumer job vertices sharing the same intermediate dataset at scheduler side

Verifying this change

This change added tests.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@wsry wsry requested a review from zhuzhurk July 25, 2022 08:10
@flinkbot
Copy link
Collaborator

flinkbot commented Jul 25, 2022

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

wsry added a commit to wsry/flink that referenced this pull request Jul 27, 2022
…s sharing the same intermediate dataset at scheduler side

This closes apache#20350.
wsry added a commit to wsry/flink that referenced this pull request Jul 27, 2022
Migrated tests include DefaultExecutionGraphConstructionTest, EdgeManagerBuildUtilTest, EdgeManagerTest, ExecutionJobVertexTest, IntermediateResultPartitionTest, RemoveCachedShuffleDescriptorTest, JobTaskVertexTest, JobIntermediateDatasetReuseTest, DefaultExecutionTopologyTest, DefaultExecutionVertexTest, DefaultResultPartitionTest, AdaptiveBatchSchedulerTest and ForwardGroupComputeUtilTest.

This closes apache#20350.
wsry added a commit to wsry/flink that referenced this pull request Jul 27, 2022
…s sharing the same intermediate dataset at scheduler side

This closes apache#20350.
wsry added a commit to wsry/flink that referenced this pull request Jul 27, 2022
Migrated tests include DefaultExecutionGraphConstructionTest, EdgeManagerBuildUtilTest, EdgeManagerTest, ExecutionJobVertexTest, IntermediateResultPartitionTest, RemoveCachedShuffleDescriptorTest, JobTaskVertexTest, JobIntermediateDatasetReuseTest, DefaultExecutionTopologyTest, DefaultExecutionVertexTest, DefaultResultPartitionTest, AdaptiveBatchSchedulerTest and ForwardGroupComputeUtilTest.

This closes apache#20350.
wsry added a commit to wsry/flink that referenced this pull request Jul 28, 2022
…s sharing the same intermediate dataset at scheduler side

This closes apache#20350.
wsry added a commit to wsry/flink that referenced this pull request Jul 28, 2022
Migrated tests include DefaultExecutionGraphConstructionTest, EdgeManagerBuildUtilTest, EdgeManagerTest, ExecutionJobVertexTest, IntermediateResultPartitionTest, RemoveCachedShuffleDescriptorTest, JobTaskVertexTest, JobIntermediateDatasetReuseTest, DefaultExecutionTopologyTest, DefaultExecutionVertexTest, DefaultResultPartitionTest, AdaptiveBatchSchedulerTest and ForwardGroupComputeUtilTest.

This closes apache#20350.
wsry added a commit to wsry/flink that referenced this pull request Jul 29, 2022
…s sharing the same intermediate dataset at scheduler side

This closes apache#20350.
wsry added a commit to wsry/flink that referenced this pull request Jul 29, 2022
Migrated tests include DefaultExecutionGraphConstructionTest, EdgeManagerBuildUtilTest, EdgeManagerTest, ExecutionJobVertexTest, IntermediateResultPartitionTest, RemoveCachedShuffleDescriptorTest, JobTaskVertexTest, JobIntermediateDatasetReuseTest, DefaultExecutionTopologyTest, DefaultExecutionVertexTest, DefaultResultPartitionTest, AdaptiveBatchSchedulerTest and ForwardGroupComputeUtilTest.

This closes apache#20350.
wsry added a commit to wsry/flink that referenced this pull request Jul 29, 2022
…s sharing the same intermediate dataset at scheduler side

This closes apache#20350.
wsry added a commit to wsry/flink that referenced this pull request Jul 29, 2022
Migrated tests include DefaultExecutionGraphConstructionTest, EdgeManagerBuildUtilTest, EdgeManagerTest, ExecutionJobVertexTest, IntermediateResultPartitionTest, RemoveCachedShuffleDescriptorTest, JobTaskVertexTest, JobIntermediateDatasetReuseTest, DefaultExecutionTopologyTest, DefaultExecutionVertexTest, DefaultResultPartitionTest, AdaptiveBatchSchedulerTest and ForwardGroupComputeUtilTest.

This closes apache#20350.
Copy link
Contributor

@zhuzhurk zhuzhurk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for opening this PR! @wsry
I have just reviewed the test migration part and here are some comments for it.
Will continue reviewing the core changes commit.

Copy link
Contributor

@zhuzhurk zhuzhurk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work!
Here are a few comments.

wsry added a commit to wsry/flink that referenced this pull request Aug 4, 2022
Migrated tests include DefaultExecutionGraphConstructionTest, EdgeManagerBuildUtilTest, EdgeManagerTest, ExecutionJobVertexTest, IntermediateResultPartitionTest, RemoveCachedShuffleDescriptorTest, JobTaskVertexTest, JobIntermediateDatasetReuseTest, DefaultExecutionTopologyTest, DefaultExecutionVertexTest, DefaultResultPartitionTest, AdaptiveBatchSchedulerTest and ForwardGroupComputeUtilTest.

This closes apache#20350.
wsry added a commit to wsry/flink that referenced this pull request Aug 4, 2022
…s sharing the same intermediate dataset at scheduler side

This closes apache#20350.
wsry added a commit to wsry/flink that referenced this pull request Aug 4, 2022
Migrated tests include DefaultExecutionGraphConstructionTest, EdgeManagerBuildUtilTest, EdgeManagerTest, ExecutionJobVertexTest, IntermediateResultPartitionTest, RemoveCachedShuffleDescriptorTest, JobTaskVertexTest, JobIntermediateDatasetReuseTest, DefaultExecutionTopologyTest, DefaultExecutionVertexTest, DefaultResultPartitionTest, AdaptiveBatchSchedulerTest and ForwardGroupComputeUtilTest.

This closes apache#20350.
wsry added a commit to wsry/flink that referenced this pull request Aug 5, 2022
Migrated tests include DefaultExecutionGraphConstructionTest, EdgeManagerBuildUtilTest, EdgeManagerTest, ExecutionJobVertexTest, IntermediateResultPartitionTest, RemoveCachedShuffleDescriptorTest, JobTaskVertexTest, JobIntermediateDatasetReuseTest, DefaultExecutionTopologyTest, DefaultExecutionVertexTest, DefaultResultPartitionTest, AdaptiveBatchSchedulerTest and ForwardGroupComputeUtilTest.

This closes apache#20350.
wsry added a commit to wsry/flink that referenced this pull request Aug 5, 2022
…s sharing the same intermediate dataset at scheduler side

This closes apache#20350.
@wsry
Copy link
Contributor Author

wsry commented Aug 5, 2022

@zhuzhurk @TanYuxin-tyx Thanks for the review and feedbacks. I have updated the PR.

wsry added a commit to wsry/flink that referenced this pull request Aug 5, 2022
…s sharing the same intermediate dataset at scheduler side

This closes apache#20350.
wsry added a commit to wsry/flink that referenced this pull request Aug 5, 2022
Migrated tests include DefaultExecutionGraphConstructionTest, EdgeManagerBuildUtilTest, EdgeManagerTest, ExecutionJobVertexTest, IntermediateResultPartitionTest, RemoveCachedShuffleDescriptorTest, JobTaskVertexTest, JobIntermediateDatasetReuseTest, DefaultExecutionTopologyTest, DefaultExecutionVertexTest, DefaultResultPartitionTest, AdaptiveBatchSchedulerTest and ForwardGroupComputeUtilTest.

This closes apache#20350.
wsry added a commit to wsry/flink that referenced this pull request Aug 5, 2022
Migrated tests include DefaultExecutionGraphConstructionTest, EdgeManagerBuildUtilTest, EdgeManagerTest, ExecutionJobVertexTest, IntermediateResultPartitionTest, RemoveCachedShuffleDescriptorTest, JobTaskVertexTest, JobIntermediateDatasetReuseTest, DefaultExecutionTopologyTest, DefaultExecutionVertexTest, DefaultResultPartitionTest, AdaptiveBatchSchedulerTest and ForwardGroupComputeUtilTest.

This closes apache#20350.
wsry added a commit to wsry/flink that referenced this pull request Aug 5, 2022
…s sharing the same intermediate dataset at scheduler side

This closes apache#20350.
@wsry
Copy link
Contributor Author

wsry commented Aug 6, 2022

@zhuzhurk I have updated the PR and add two fixup commits. Please take another look.

wsry added a commit to wsry/flink that referenced this pull request Aug 6, 2022
Migrated tests include DefaultExecutionGraphConstructionTest, EdgeManagerBuildUtilTest, EdgeManagerTest, ExecutionJobVertexTest, IntermediateResultPartitionTest, RemoveCachedShuffleDescriptorTest, JobTaskVertexTest, JobIntermediateDatasetReuseTest, DefaultExecutionTopologyTest, DefaultExecutionVertexTest, DefaultResultPartitionTest, AdaptiveBatchSchedulerTest and ForwardGroupComputeUtilTest.

This closes apache#20350.
wsry added a commit to wsry/flink that referenced this pull request Aug 6, 2022
…s sharing the same intermediate dataset at scheduler side

This closes apache#20350.
@wsry
Copy link
Contributor Author

wsry commented Aug 6, 2022

@flinkbot run azure

Copy link
Contributor

@zhuzhurk zhuzhurk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing all the comments! @wsry
The change looks good to me.
Would you squash it? I will merge it after CI gives green.

wsry added a commit to wsry/flink that referenced this pull request Aug 7, 2022
Migrated tests include DefaultExecutionGraphConstructionTest, EdgeManagerBuildUtilTest, EdgeManagerTest, ExecutionJobVertexTest, IntermediateResultPartitionTest, RemoveCachedShuffleDescriptorTest, JobTaskVertexTest, DefaultExecutionTopologyTest, DefaultExecutionVertexTest, DefaultResultPartitionTest, AdaptiveBatchSchedulerTest and ForwardGroupComputeUtilTest.

This closes apache#20350.
wsry added a commit to wsry/flink that referenced this pull request Aug 7, 2022
…s sharing the same intermediate dataset at scheduler side

This closes apache#20350.
@wsry
Copy link
Contributor Author

wsry commented Aug 7, 2022

Thanks for addressing all the comments! @wsry The change looks good to me. Would you squash it? I will merge it after CI gives green.

@zhuzhurk Thanks a lot. Rebased and squashed.

wsry added a commit to wsry/flink that referenced this pull request Aug 8, 2022
Migrated tests include DefaultExecutionGraphConstructionTest, EdgeManagerBuildUtilTest, EdgeManagerTest, ExecutionJobVertexTest, IntermediateResultPartitionTest, RemoveCachedShuffleDescriptorTest, JobTaskVertexTest, DefaultExecutionTopologyTest, DefaultExecutionVertexTest, DefaultResultPartitionTest, AdaptiveBatchSchedulerTest and ForwardGroupComputeUtilTest.

This closes apache#20350.
wsry added a commit to wsry/flink that referenced this pull request Aug 8, 2022
…s sharing the same intermediate dataset at scheduler side

This closes apache#20350.
wsry added a commit to wsry/flink that referenced this pull request Aug 8, 2022
Migrated tests include DefaultExecutionGraphConstructionTest, EdgeManagerBuildUtilTest, EdgeManagerTest, ExecutionJobVertexTest, IntermediateResultPartitionTest, RemoveCachedShuffleDescriptorTest, JobTaskVertexTest, DefaultExecutionTopologyTest, DefaultExecutionVertexTest, DefaultResultPartitionTest, AdaptiveBatchSchedulerTest and ForwardGroupComputeUtilTest.

This closes apache#20350.
wsry added a commit to wsry/flink that referenced this pull request Aug 8, 2022
…s sharing the same intermediate dataset at scheduler side

This closes apache#20350.
Migrated tests include DefaultExecutionGraphConstructionTest, EdgeManagerBuildUtilTest, EdgeManagerTest, ExecutionJobVertexTest, IntermediateResultPartitionTest, RemoveCachedShuffleDescriptorTest, JobTaskVertexTest, DefaultExecutionTopologyTest, DefaultExecutionVertexTest, DefaultResultPartitionTest, AdaptiveBatchSchedulerTest and ForwardGroupComputeUtilTest.

This closes apache#20350.
…s sharing the same intermediate dataset at scheduler side

This closes apache#20350.
wsry added a commit to wsry/flink that referenced this pull request Aug 8, 2022
Migrated tests include DefaultExecutionGraphConstructionTest, EdgeManagerBuildUtilTest, EdgeManagerTest, ExecutionJobVertexTest, IntermediateResultPartitionTest, RemoveCachedShuffleDescriptorTest, JobTaskVertexTest, DefaultExecutionTopologyTest, DefaultExecutionVertexTest, DefaultResultPartitionTest, AdaptiveBatchSchedulerTest and ForwardGroupComputeUtilTest.

This closes apache#20350.
wsry added a commit to wsry/flink that referenced this pull request Aug 8, 2022
…s sharing the same intermediate dataset at scheduler side

This closes apache#20350.
@wsry wsry merged commit 7240536 into apache:master Aug 8, 2022
wsry added a commit that referenced this pull request Aug 8, 2022
Migrated tests include DefaultExecutionGraphConstructionTest, EdgeManagerBuildUtilTest, EdgeManagerTest, ExecutionJobVertexTest, IntermediateResultPartitionTest, RemoveCachedShuffleDescriptorTest, JobTaskVertexTest, DefaultExecutionTopologyTest, DefaultExecutionVertexTest, DefaultResultPartitionTest, AdaptiveBatchSchedulerTest and ForwardGroupComputeUtilTest.

This closes #20350.
huangxiaofeng10047 pushed a commit to huangxiaofeng10047/flink that referenced this pull request Nov 3, 2022
Migrated tests include DefaultExecutionGraphConstructionTest, EdgeManagerBuildUtilTest, EdgeManagerTest, ExecutionJobVertexTest, IntermediateResultPartitionTest, RemoveCachedShuffleDescriptorTest, JobTaskVertexTest, DefaultExecutionTopologyTest, DefaultExecutionVertexTest, DefaultResultPartitionTest, AdaptiveBatchSchedulerTest and ForwardGroupComputeUtilTest.

This closes apache#20350.
huangxiaofeng10047 pushed a commit to huangxiaofeng10047/flink that referenced this pull request Nov 3, 2022
…s sharing the same intermediate dataset at scheduler side

This closes apache#20350.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants