Skip to content

Commit

Permalink
[FLINK-17050][runtime] Remove methods getVertex() and getResultPartit…
Browse files Browse the repository at this point in the history
…ion()

Remove methods

- SchedulingTopology#getVertex(ExecutionVertexID);
- SchedulingTopology#getResultPartition(IntermediateResultPartitionID);
  • Loading branch information
GJL committed Apr 13, 2020
1 parent b0719f0 commit 3e9418c
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand Down Expand Up @@ -88,13 +87,21 @@ public boolean containsCoLocationConstraints() {
}

@Override
public Optional<DefaultExecutionVertex> getVertex(ExecutionVertexID executionVertexId) {
return Optional.ofNullable(executionVerticesById.get(executionVertexId));
public DefaultExecutionVertex getVertexOrThrow(final ExecutionVertexID executionVertexId) {
final DefaultExecutionVertex executionVertex = executionVerticesById.get(executionVertexId);
if (executionVertex == null) {
throw new IllegalArgumentException("can not find vertex: " + executionVertexId);
}
return executionVertex;
}

@Override
public Optional<DefaultResultPartition> getResultPartition(IntermediateResultPartitionID intermediateResultPartitionId) {
return Optional.ofNullable(resultPartitionsById.get(intermediateResultPartitionId));
public DefaultResultPartition getResultPartitionOrThrow(final IntermediateResultPartitionID intermediateResultPartitionId) {
final DefaultResultPartition resultPartition = resultPartitionsById.get(intermediateResultPartitionId);
if (resultPartition == null) {
throw new IllegalArgumentException("can not find partition: " + intermediateResultPartitionId);
}
return resultPartition;
}

private static List<DefaultResultPartition> generateProducedSchedulingResultPartition(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,41 +21,20 @@
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.topology.Topology;

import java.util.Optional;

/**
* Topology of {@link SchedulingExecutionVertex}.
*/
public interface SchedulingTopology<V extends SchedulingExecutionVertex<V, R>, R extends SchedulingResultPartition<V, R>>
extends Topology<ExecutionVertexID, IntermediateResultPartitionID, V, R> {

/**
* Looks up the {@link SchedulingExecutionVertex} for the given {@link ExecutionVertexID}.
*
* @param executionVertexId identifying the respective scheduling vertex
* @return Optional containing the respective scheduling vertex or none if the vertex does not exist
*/
Optional<V> getVertex(ExecutionVertexID executionVertexId);

/**
* Looks up the {@link SchedulingExecutionVertex} for the given {@link ExecutionVertexID}.
*
* @param executionVertexId identifying the respective scheduling vertex
* @return The respective scheduling vertex
* @throws IllegalArgumentException If the vertex does not exist
*/
default V getVertexOrThrow(ExecutionVertexID executionVertexId) {
return getVertex(executionVertexId).orElseThrow(
() -> new IllegalArgumentException("can not find vertex: " + executionVertexId));
}

/**
* Looks up the {@link SchedulingResultPartition} for the given {@link IntermediateResultPartitionID}.
*
* @param intermediateResultPartitionId identifying the respective scheduling result partition
* @return Optional containing the respective scheduling result partition or none if the partition does not exist
*/
Optional<R> getResultPartition(IntermediateResultPartitionID intermediateResultPartitionId);
V getVertexOrThrow(ExecutionVertexID executionVertexId);

/**
* Looks up the {@link SchedulingResultPartition} for the given {@link IntermediateResultPartitionID}.
Expand All @@ -64,8 +43,5 @@ default V getVertexOrThrow(ExecutionVertexID executionVertexId) {
* @return The respective scheduling result partition
* @throws IllegalArgumentException If the partition does not exist
*/
default R getResultPartitionOrThrow(IntermediateResultPartitionID intermediateResultPartitionId) {
return getResultPartition(intermediateResultPartitionId).orElseThrow(
() -> new IllegalArgumentException("can not find partition: " + intermediateResultPartitionId));
}
R getResultPartitionOrThrow(IntermediateResultPartitionID intermediateResultPartitionId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static org.apache.flink.util.Preconditions.checkState;

Expand Down Expand Up @@ -63,14 +62,21 @@ public void setContainsCoLocationConstraints(final boolean containsCoLocationCon
}

@Override
public Optional<TestingSchedulingExecutionVertex> getVertex(ExecutionVertexID executionVertexId) {
return Optional.ofNullable(schedulingExecutionVertices.get(executionVertexId));
public TestingSchedulingExecutionVertex getVertexOrThrow(final ExecutionVertexID executionVertexId) {
final TestingSchedulingExecutionVertex executionVertex = schedulingExecutionVertices.get(executionVertexId);
if (executionVertex == null) {
throw new IllegalArgumentException("can not find vertex: " + executionVertexId);
}
return executionVertex;
}

@Override
public Optional<TestingSchedulingResultPartition> getResultPartition(
IntermediateResultPartitionID intermediateResultPartitionId) {
return Optional.of(schedulingResultPartitions.get(intermediateResultPartitionId));
public TestingSchedulingResultPartition getResultPartitionOrThrow(final IntermediateResultPartitionID intermediateResultPartitionId) {
final TestingSchedulingResultPartition resultPartition = schedulingResultPartitions.get(intermediateResultPartitionId);
if (resultPartition == null) {
throw new IllegalArgumentException("can not find partition: " + intermediateResultPartitionId);
}
return resultPartition;
}

void addSchedulingExecutionVertex(TestingSchedulingExecutionVertex schedulingExecutionVertex) {
Expand Down

0 comments on commit 3e9418c

Please sign in to comment.