Skip to content

Commit

Permalink
[FLINK-14679][shuffle] Store number of partitions in PartitionDescriptor
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Nov 15, 2019
1 parent 6ae01fb commit cc02399
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ public ResultPartitionType getPartitionType() {
return partitionDescriptor.getPartitionType();
}

public int getTotalNumberOfPartitions() {
return partitionDescriptor.getTotalNumberOfPartitions();
}

public int getNumberOfSubpartitions() {
return partitionDescriptor.getNumberOfSubpartitions();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ public class PartitionDescriptor implements Serializable {
/** The ID of the result this partition belongs to. */
private final IntermediateDataSetID resultId;

/** The total number of partitions for the result. */
private final int totalNumberOfPartitions;

/** The ID of the partition. */
private final IntermediateResultPartitionID partitionId;

Expand All @@ -57,11 +60,14 @@ public class PartitionDescriptor implements Serializable {
@VisibleForTesting
public PartitionDescriptor(
IntermediateDataSetID resultId,
int totalNumberOfPartitions,
IntermediateResultPartitionID partitionId,
ResultPartitionType partitionType,
int numberOfSubpartitions,
int connectionIndex) {
this.resultId = checkNotNull(resultId);
checkArgument(totalNumberOfPartitions >= 1);
this.totalNumberOfPartitions = totalNumberOfPartitions;
this.partitionId = checkNotNull(partitionId);
this.partitionType = checkNotNull(partitionType);
checkArgument(numberOfSubpartitions >= 1);
Expand All @@ -73,6 +79,10 @@ public IntermediateDataSetID getResultId() {
return resultId;
}

public int getTotalNumberOfPartitions() {
return totalNumberOfPartitions;
}

public IntermediateResultPartitionID getPartitionId() {
return partitionId;
}
Expand Down Expand Up @@ -119,6 +129,7 @@ public static PartitionDescriptor from(IntermediateResultPartition partition) {
IntermediateResult result = partition.getIntermediateResult();
return new PartitionDescriptor(
result.getId(),
partition.getIntermediateResult().getNumberOfAssignedPartitions(),
partition.getPartitionId(),
result.getResultType(),
numberOfSubpartitions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
*/
public class ResultPartitionDeploymentDescriptorTest extends TestLogger {
private static final IntermediateDataSetID resultId = new IntermediateDataSetID();
private static final int numberOfPartitions = 5;

private static final IntermediateResultPartitionID partitionId = new IntermediateResultPartitionID();
private static final ExecutionAttemptID producerExecutionId = new ExecutionAttemptID();
Expand All @@ -57,6 +58,7 @@ public class ResultPartitionDeploymentDescriptorTest extends TestLogger {

private static final PartitionDescriptor partitionDescriptor = new PartitionDescriptor(
resultId,
numberOfPartitions,
partitionId,
partitionType,
numberOfSubpartitions,
Expand Down Expand Up @@ -115,6 +117,7 @@ private static ResultPartitionDeploymentDescriptor createCopyAndVerifyResultPart

private static void verifyResultPartitionDeploymentDescriptorCopy(ResultPartitionDeploymentDescriptor copy) {
assertThat(copy.getResultId(), is(resultId));
assertThat(copy.getTotalNumberOfPartitions(), is(numberOfPartitions));
assertThat(copy.getPartitionId(), is(partitionId));
assertThat(copy.getPartitionType(), is(partitionType));
assertThat(copy.getNumberOfSubpartitions(), is(numberOfSubpartitions));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
public class PartitionDescriptorBuilder {
private IntermediateResultPartitionID partitionId;
private ResultPartitionType partitionType;
private int totalNumberOfPartitions = 1;

private PartitionDescriptorBuilder() {
this.partitionId = new IntermediateResultPartitionID();
Expand All @@ -44,8 +45,13 @@ public PartitionDescriptorBuilder setPartitionType(ResultPartitionType partition
return this;
}

public PartitionDescriptorBuilder setTotalNumberOfPartitions(int totalNumberOfPartitions) {
this.totalNumberOfPartitions = totalNumberOfPartitions;
return this;
}

public PartitionDescriptor build() {
return new PartitionDescriptor(new IntermediateDataSetID(), partitionId, partitionType, 1, 0);
return new PartitionDescriptor(new IntermediateDataSetID(), totalNumberOfPartitions, partitionId, partitionType, 1, 0);
}

public static PartitionDescriptorBuilder newBuilder() {
Expand Down

0 comments on commit cc02399

Please sign in to comment.