Skip to content

Commit

Permalink
[FLINK-22686][hotfix] Rename InflightDataGateOrPartitionRescalingDesc…
Browse files Browse the repository at this point in the history
…riptor.Rescaling to MappingType
  • Loading branch information
dawidwys committed Jun 1, 2021
1 parent 4011bde commit abd321c
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,9 @@ public static class InflightDataGateOrPartitionRescalingDescriptor implements Se
/** All channels where upstream duplicates data (only valid for downstream mappings). */
private final Set<Integer> ambiguousSubtaskIndexes;

private final Rescaling rescaling;
private final MappingType mappingType;

enum Rescaling {
enum MappingType {
IDENTITY,
RESCALING
}
Expand All @@ -137,15 +137,15 @@ public InflightDataGateOrPartitionRescalingDescriptor(
int[] oldSubtaskIndexes,
RescaleMappings rescaledChannelsMappings,
Set<Integer> ambiguousSubtaskIndexes,
Rescaling rescaling) {
MappingType mappingType) {
this.oldSubtaskIndexes = oldSubtaskIndexes;
this.rescaledChannelsMappings = rescaledChannelsMappings;
this.ambiguousSubtaskIndexes = ambiguousSubtaskIndexes;
this.rescaling = rescaling;
this.mappingType = mappingType;
}

public boolean isIdentity() {
return rescaling == Rescaling.IDENTITY;
return mappingType == MappingType.IDENTITY;
}

@Override
Expand All @@ -161,12 +161,13 @@ public boolean equals(Object o) {
return Arrays.equals(oldSubtaskIndexes, that.oldSubtaskIndexes)
&& Objects.equals(rescaledChannelsMappings, that.rescaledChannelsMappings)
&& Objects.equals(ambiguousSubtaskIndexes, that.ambiguousSubtaskIndexes)
&& rescaling == that.rescaling;
&& mappingType == that.mappingType;
}

@Override
public int hashCode() {
int result = Objects.hash(rescaledChannelsMappings, ambiguousSubtaskIndexes, rescaling);
int result =
Objects.hash(rescaledChannelsMappings, ambiguousSubtaskIndexes, mappingType);
result = 31 * result + Arrays.hashCode(oldSubtaskIndexes);
return result;
}
Expand All @@ -180,8 +181,8 @@ public String toString() {
+ rescaledChannelsMappings
+ ", ambiguousSubtaskIndexes="
+ ambiguousSubtaskIndexes
+ ", rescaling="
+ rescaling
+ ", mappingType="
+ mappingType
+ '}';
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor.InflightDataGateOrPartitionRescalingDescriptor;
import org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor.InflightDataGateOrPartitionRescalingDescriptor.Rescaling;
import org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor.InflightDataGateOrPartitionRescalingDescriptor.MappingType;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper;
Expand Down Expand Up @@ -314,7 +314,7 @@ private InflightDataRescalingDescriptor createRescalingDescriptor(
oldSubtaskInstances,
rescaleMapping.getRescaleMappings(),
ambiguousSubtasks,
isIdentity ? Rescaling.IDENTITY : Rescaling.RESCALING),
isIdentity ? MappingType.IDENTITY : MappingType.RESCALING),
instanceID.getSubtaskId(),
partition);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.flink.runtime.checkpoint;

import org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor.InflightDataGateOrPartitionRescalingDescriptor;
import org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor.InflightDataGateOrPartitionRescalingDescriptor.Rescaling;
import org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor.InflightDataGateOrPartitionRescalingDescriptor.MappingType;

import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;

Expand Down Expand Up @@ -59,7 +59,7 @@ public static InflightDataRescalingDescriptor rescalingDescriptor(
oldIndices,
mapping,
ambiguousSubtasks,
Rescaling.RESCALING))
MappingType.RESCALING))
.toArray(InflightDataGateOrPartitionRescalingDescriptor[]::new));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
import static java.util.Arrays.asList;
import static java.util.Collections.emptySet;
import static java.util.Collections.singletonList;
import static org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor.InflightDataGateOrPartitionRescalingDescriptor.Rescaling.RESCALING;
import static org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor.InflightDataGateOrPartitionRescalingDescriptor.MappingType.RESCALING;
import static org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptorUtil.array;
import static org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptorUtil.mappings;
import static org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptorUtil.rescalingDescriptor;
Expand Down Expand Up @@ -482,9 +482,9 @@ private InflightDataGateOrPartitionRescalingDescriptor gate(
int[] oldIndices,
RescaleMappings rescaleMapping,
Set<Integer> ambiguousSubtaskIndexes,
InflightDataGateOrPartitionRescalingDescriptor.Rescaling rescalingMode) {
InflightDataGateOrPartitionRescalingDescriptor.MappingType mappingType) {
return new InflightDataGateOrPartitionRescalingDescriptor(
oldIndices, rescaleMapping, ambiguousSubtaskIndexes, rescalingMode);
oldIndices, rescaleMapping, ambiguousSubtaskIndexes, mappingType);
}

@Test
Expand Down

0 comments on commit abd321c

Please sign in to comment.