Skip to content

Commit

Permalink
[FLINK-16960][runtime] Rename LogicalPipelinedRegion to DefaultLogica…
Browse files Browse the repository at this point in the history
…lPipelinedRegion
  • Loading branch information
GJL committed Apr 14, 2020
1 parent 3933b04 commit 1f28494
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@
/**
* Set of {@link LogicalVertex} that are connected through pipelined {@link LogicalResult}.
*/
public class LogicalPipelinedRegion {
public class DefaultLogicalPipelinedRegion {

private final Set<JobVertexID> vertexIDs;

public LogicalPipelinedRegion(final Set<? extends LogicalVertex<?, ?>> logicalVertices) {
public DefaultLogicalPipelinedRegion(final Set<? extends LogicalVertex<?, ?>> logicalVertices) {
checkNotNull(logicalVertices);

this.vertexIDs = logicalVertices.stream()
Expand All @@ -47,7 +47,7 @@ public Set<JobVertexID> getVertexIDs() {

@Override
public String toString() {
return "LogicalPipelinedRegion{" +
return "DefaultLogicalPipelinedRegion{" +
"vertexIDs=" + vertexIDs +
'}';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,12 @@ private DefaultLogicalResult getResult(final IntermediateDataSetID resultId) {
.orElseThrow(() -> new IllegalArgumentException("can not find result: " + resultId));
}

public Set<LogicalPipelinedRegion> getLogicalPipelinedRegions() {
public Set<DefaultLogicalPipelinedRegion> getLogicalPipelinedRegions() {
final Set<Set<DefaultLogicalVertex>> regionsRaw = PipelinedRegionComputeUtil.computePipelinedRegions(this);

final Set<LogicalPipelinedRegion> regions = new HashSet<>();
final Set<DefaultLogicalPipelinedRegion> regions = new HashSet<>();
for (Set<DefaultLogicalVertex> regionVertices : regionsRaw) {
regions.add(new LogicalPipelinedRegion(regionVertices));
regions.add(new DefaultLogicalPipelinedRegion(regionVertices));
}
return regions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void testWithoutCoLocationConstraints() {

@Test
public void testGetLogicalPipelinedRegions() {
final Set<LogicalPipelinedRegion> regions = logicalTopology.getLogicalPipelinedRegions();
final Set<DefaultLogicalPipelinedRegion> regions = logicalTopology.getLogicalPipelinedRegions();
assertEquals(2, regions.size());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalPipelinedRegion;
import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalTopology;
import org.apache.flink.runtime.jobgraph.topology.LogicalPipelinedRegion;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.operators.util.TaskConfig;
Expand Down Expand Up @@ -690,8 +690,8 @@ private Map<JobVertexID, SlotSharingGroup> buildVertexRegionSlotSharingGroups()

final boolean allRegionsInSameSlotSharingGroup = streamGraph.isAllVerticesInSameSlotSharingGroupByDefault();

final Set<LogicalPipelinedRegion> regions = new DefaultLogicalTopology(jobGraph).getLogicalPipelinedRegions();
for (LogicalPipelinedRegion region : regions) {
final Set<DefaultLogicalPipelinedRegion> regions = new DefaultLogicalTopology(jobGraph).getLogicalPipelinedRegions();
for (DefaultLogicalPipelinedRegion region : regions) {
final SlotSharingGroup regionSlotSharingGroup;
if (allRegionsInSameSlotSharingGroup) {
regionSlotSharingGroup = defaultSlotSharingGroup;
Expand Down

0 comments on commit 1f28494

Please sign in to comment.