Skip to content

Commit

Permalink
[FLINK-22674][runtime] Provide JobID when applying for shuffle resour…
Browse files Browse the repository at this point in the history
…ces by ShuffleMaster#registerPartitionWithProducer
  • Loading branch information
wsry authored and zhuzhurk committed Aug 5, 2021
1 parent 81e1db3 commit 6bc8399
Show file tree
Hide file tree
Showing 8 changed files with 33 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,8 @@ public CompletableFuture<Void> registerProducedPartitions(
CompletableFuture<? extends ShuffleDescriptor> shuffleDescriptorFuture =
vertex.getExecutionGraphAccessor()
.getShuffleMaster()
.registerPartitionWithProducer(partitionDescriptor, producerDescriptor);
.registerPartitionWithProducer(
vertex.getJobId(), partitionDescriptor, producerDescriptor);

CompletableFuture<ResultPartitionDeploymentDescriptor> partitionRegistration =
shuffleDescriptorFuture.thenApply(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.shuffle;

import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
Expand Down Expand Up @@ -60,7 +61,9 @@ public NettyShuffleMaster(Configuration conf) {

@Override
public CompletableFuture<NettyShuffleDescriptor> registerPartitionWithProducer(
PartitionDescriptor partitionDescriptor, ProducerDescriptor producerDescriptor) {
JobID jobID,
PartitionDescriptor partitionDescriptor,
ProducerDescriptor producerDescriptor) {

ResultPartitionID resultPartitionID =
new ResultPartitionID(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.shuffle;

import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.MemorySize;

import java.util.Collection;
Expand All @@ -39,14 +40,17 @@ public interface ShuffleMaster<T extends ShuffleDescriptor> {
* internally within the shuffle service. The descriptor should provide enough information to
* read from or write data to the partition.
*
* @param jobID job ID of the corresponding job which registered the partition
* @param partitionDescriptor general job graph information about the partition
* @param producerDescriptor general producer information (location, execution id, connection
* info)
* @return future with the partition shuffle descriptor used for producer/consumer deployment
* and their data exchange.
*/
CompletableFuture<T> registerPartitionWithProducer(
PartitionDescriptor partitionDescriptor, ProducerDescriptor producerDescriptor);
JobID jobID,
PartitionDescriptor partitionDescriptor,
ProducerDescriptor producerDescriptor);

/**
* Release any external resources occupied by the given partition.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.deployment;

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
Expand Down Expand Up @@ -54,18 +55,20 @@ public class ShuffleDescriptorTest extends TestLogger {
@Test
public void testMixedLocalRemoteUnknownDeployment() throws Exception {
ResourceID consumerResourceID = ResourceID.generate();
JobID jobID = new JobID();

// Local and remote channel are only allowed for certain execution
// states.
for (ExecutionState state : ExecutionState.values()) {
ResultPartitionID localPartitionId = new ResultPartitionID();
ResultPartitionDeploymentDescriptor localPartition =
createResultPartitionDeploymentDescriptor(localPartitionId, consumerResourceID);
createResultPartitionDeploymentDescriptor(
jobID, localPartitionId, consumerResourceID);

ResultPartitionID remotePartitionId = new ResultPartitionID();
ResultPartitionDeploymentDescriptor remotePartition =
createResultPartitionDeploymentDescriptor(
remotePartitionId, ResourceID.generate());
jobID, remotePartitionId, ResourceID.generate());

ResultPartitionID unknownPartitionId = new ResultPartitionID();

Expand Down Expand Up @@ -196,7 +199,7 @@ private static ShuffleDescriptor getConsumedPartitionShuffleDescriptor(
}

private static ResultPartitionDeploymentDescriptor createResultPartitionDeploymentDescriptor(
ResultPartitionID id, ResourceID location)
JobID jobID, ResultPartitionID id, ResourceID location)
throws ExecutionException, InterruptedException {
ProducerDescriptor producerDescriptor =
new ProducerDescriptor(
Expand All @@ -208,7 +211,8 @@ private static ResultPartitionDeploymentDescriptor createResultPartitionDeployme
PartitionDescriptorBuilder.newBuilder().setPartitionId(id.getPartitionId()).build();
ShuffleDescriptor shuffleDescriptor =
ShuffleTestUtils.DEFAULT_SHUFFLE_MASTER
.registerPartitionWithProducer(partitionDescriptor, producerDescriptor)
.registerPartitionWithProducer(
jobID, partitionDescriptor, producerDescriptor)
.get();
return new ResultPartitionDeploymentDescriptor(
partitionDescriptor, shuffleDescriptor, 1, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,9 @@ private static class TestingShuffleMaster implements ShuffleMaster<ShuffleDescri

@Override
public CompletableFuture<ShuffleDescriptor> registerPartitionWithProducer(
PartitionDescriptor partitionDescriptor, ProducerDescriptor producerDescriptor) {
JobID jobID,
PartitionDescriptor partitionDescriptor,
ProducerDescriptor producerDescriptor) {
return CompletableFuture.completedFuture(
new TestingShuffleDescriptor(
partitionDescriptor.getPartitionId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,9 @@ private static class TestingShuffleMaster implements ShuffleMaster<ShuffleDescri

@Override
public CompletableFuture<ShuffleDescriptor> registerPartitionWithProducer(
PartitionDescriptor partitionDescriptor, ProducerDescriptor producerDescriptor) {
JobID jobID,
PartitionDescriptor partitionDescriptor,
ProducerDescriptor producerDescriptor) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.scheduler;

import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
Expand Down Expand Up @@ -130,7 +131,9 @@ private static JobGraph createJobGraph(final List<SlotSharingGroup> slotSharingG
private static class TestShuffleMaster implements ShuffleMaster<ShuffleDescriptor> {
@Override
public CompletableFuture<ShuffleDescriptor> registerPartitionWithProducer(
PartitionDescriptor partitionDescriptor, ProducerDescriptor producerDescriptor) {
JobID jobID,
PartitionDescriptor partitionDescriptor,
ProducerDescriptor producerDescriptor) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.shuffle;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
Expand Down Expand Up @@ -48,7 +49,9 @@ public class TestingShuffleMaster implements ShuffleMaster<ShuffleDescriptor> {

@Override
public CompletableFuture<ShuffleDescriptor> registerPartitionWithProducer(
PartitionDescriptor partitionDescriptor, ProducerDescriptor producerDescriptor) {
JobID jobID,
PartitionDescriptor partitionDescriptor,
ProducerDescriptor producerDescriptor) {
if (throwExceptionalOnRegistration) {
throw new RuntimeException("Forced partition registration failure");
} else if (autoCompleteRegistration) {
Expand Down

0 comments on commit 6bc8399

Please sign in to comment.