Skip to content

Commit

Permalink
[FLINK-17050][runtime] Rename methods getVertexOrThrow() and getResul…
Browse files Browse the repository at this point in the history
…tPartitionOrThrow()

Rename methods

- SchedulingTopology#getVertexOrThrow(ExecutionVertexID)
- SchedulingTopology#getResultPartitionOrThrow(IntermediateResultPartitionID)

to

- SchedulingTopology#getVertex(ExecutionVertexID)
- SchedulingTopology#getResultPartition(IntermediateResultPartitionID)

respectively.

This closes apache#11684.
  • Loading branch information
GJL committed Apr 13, 2020
1 parent 3e9418c commit 39ebaae
Show file tree
Hide file tree
Showing 9 changed files with 18 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1547,7 +1547,7 @@ private void releasePartitions(final List<IntermediateResultPartitionID> releasa

ResultPartitionID createResultPartitionId(final IntermediateResultPartitionID resultPartitionId) {
final SchedulingResultPartition<?, ?> schedulingResultPartition =
getSchedulingTopology().getResultPartitionOrThrow(resultPartitionId);
getSchedulingTopology().getResultPartition(resultPartitionId);
final SchedulingExecutionVertex<?, ?> producer = schedulingResultPartition.getProducer();
final ExecutionVertexID producerId = producer.getId();
final JobVertexID jobVertexId = producerId.getJobVertexId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ private Set<IntermediateResultPartitionID> findResultPartitionsOutsideOfRegion(f
final Set<SchedulingResultPartition<?, ?>> allConsumedPartitionsInRegion = pipelinedRegion
.getExecutionVertexIds()
.stream()
.map(schedulingTopology::getVertexOrThrow)
.map(schedulingTopology::getVertex)
.flatMap(vertex -> IterableUtils.toStream(vertex.getConsumedResults()))
.collect(Collectors.toSet());

Expand Down Expand Up @@ -157,7 +157,7 @@ private List<IntermediateResultPartitionID> filterReleasablePartitions(final Pip
}

private boolean areConsumerRegionsFinished(final IntermediateResultPartitionID resultPartitionId) {
final SchedulingResultPartition<?, ?> resultPartition = schedulingTopology.getResultPartitionOrThrow(resultPartitionId);
final SchedulingResultPartition<?, ?> resultPartition = schedulingTopology.getResultPartition(resultPartitionId);
return IterableUtils.toStream(resultPartition.getConsumers())
.map(SchedulingExecutionVertex::getId)
.allMatch(this::isRegionOfVertexFinished);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public boolean containsCoLocationConstraints() {
}

@Override
public DefaultExecutionVertex getVertexOrThrow(final ExecutionVertexID executionVertexId) {
public DefaultExecutionVertex getVertex(final ExecutionVertexID executionVertexId) {
final DefaultExecutionVertex executionVertex = executionVerticesById.get(executionVertexId);
if (executionVertex == null) {
throw new IllegalArgumentException("can not find vertex: " + executionVertexId);
Expand All @@ -96,7 +96,7 @@ public DefaultExecutionVertex getVertexOrThrow(final ExecutionVertexID execution
}

@Override
public DefaultResultPartition getResultPartitionOrThrow(final IntermediateResultPartitionID intermediateResultPartitionId) {
public DefaultResultPartition getResultPartition(final IntermediateResultPartitionID intermediateResultPartitionId) {
final DefaultResultPartition resultPartition = resultPartitionsById.get(intermediateResultPartitionId);
if (resultPartition == null) {
throw new IllegalArgumentException("can not find partition: " + intermediateResultPartitionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public void restartTasks(Set<ExecutionVertexID> verticesToRestart) {
// increase counter of the dataset first
verticesToRestart
.stream()
.map(schedulingTopology::getVertexOrThrow)
.map(schedulingTopology::getVertex)
.flatMap(vertex -> IterableUtils.toStream(vertex.getProducedResults()))
.forEach(inputConstraintChecker::resetSchedulingResultPartition);

Expand All @@ -101,7 +101,7 @@ public void onExecutionStateChange(ExecutionVertexID executionVertexId, Executio
}

final Set<SchedulingExecutionVertex<?, ?>> verticesToSchedule = IterableUtils
.toStream(schedulingTopology.getVertexOrThrow(executionVertexId).getProducedResults())
.toStream(schedulingTopology.getVertex(executionVertexId).getProducedResults())
.filter(partition -> partition.getResultType().isBlocking())
.flatMap(partition -> inputConstraintChecker.markSchedulingResultPartitionFinished(partition).stream())
.flatMap(partition -> IterableUtils.toStream(partition.getConsumers()))
Expand All @@ -113,7 +113,7 @@ public void onExecutionStateChange(ExecutionVertexID executionVertexId, Executio
@Override
public void onPartitionConsumable(IntermediateResultPartitionID resultPartitionId) {
final SchedulingResultPartition<?, ?> resultPartition = schedulingTopology
.getResultPartitionOrThrow(resultPartitionId);
.getResultPartition(resultPartitionId);

if (!resultPartition.getResultType().isPipelined()) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ static Set<ExecutionVertexID> getAllVertexIdsFromTopology(final SchedulingTopolo
final Set<ExecutionVertexID> vertexIds) {

return vertexIds.stream()
.map(topology::getVertexOrThrow)
.map(topology::getVertex)
.collect(Collectors.toSet());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public interface SchedulingTopology<V extends SchedulingExecutionVertex<V, R>, R
* @return The respective scheduling vertex
* @throws IllegalArgumentException If the vertex does not exist
*/
V getVertexOrThrow(ExecutionVertexID executionVertexId);
V getVertex(ExecutionVertexID executionVertexId);

/**
* Looks up the {@link SchedulingResultPartition} for the given {@link IntermediateResultPartitionID}.
Expand All @@ -43,5 +43,5 @@ public interface SchedulingTopology<V extends SchedulingExecutionVertex<V, R>, R
* @return The respective scheduling result partition
* @throws IllegalArgumentException If the partition does not exist
*/
R getResultPartitionOrThrow(IntermediateResultPartitionID intermediateResultPartitionId);
R getResultPartition(IntermediateResultPartitionID intermediateResultPartitionId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ public void cancelWhileRestartingShouldWaitForRunningTasks() {

scheduler.updateTaskExecutionState(new TaskExecutionState(jobid, attemptId1, ExecutionState.FAILED, new RuntimeException("expected")));
scheduler.cancel();
final ExecutionState vertex2StateAfterCancel = topology.getVertexOrThrow(executionVertex2).getState();
final ExecutionState vertex2StateAfterCancel = topology.getVertex(executionVertex2).getState();
final JobStatus statusAfterCancelWhileRestarting = scheduler.requestJobStatus();
scheduler.updateTaskExecutionState(new TaskExecutionState(jobid, attemptId2, ExecutionState.CANCELED, new RuntimeException("expected")));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void testGetResultPartition() {
for (ExecutionVertex vertex : executionGraph.getAllExecutionVertices()) {
for (Map.Entry<IntermediateResultPartitionID, IntermediateResultPartition> entry : vertex.getProducedPartitions().entrySet()) {
IntermediateResultPartition partition = entry.getValue();
DefaultResultPartition schedulingResultPartition = adapter.getResultPartitionOrThrow(entry.getKey());
DefaultResultPartition schedulingResultPartition = adapter.getResultPartition(entry.getKey());

assertPartitionEquals(partition, schedulingResultPartition);
}
Expand All @@ -114,7 +114,7 @@ public void testResultPartitionStateSupplier() {
.get();

final DefaultResultPartition schedulingResultPartition = adapter
.getResultPartitionOrThrow(intermediateResultPartition.getPartitionId());
.getResultPartition(intermediateResultPartition.getPartitionId());

assertEquals(ResultPartitionState.CREATED, schedulingResultPartition.getState());

Expand All @@ -125,7 +125,7 @@ public void testResultPartitionStateSupplier() {
@Test
public void testGetVertexOrThrow() {
try {
adapter.getVertexOrThrow(new ExecutionVertexID(new JobVertexID(), 0));
adapter.getVertex(new ExecutionVertexID(new JobVertexID(), 0));
fail("get not exist vertex");
} catch (IllegalArgumentException exception) {
// expected
Expand All @@ -135,7 +135,7 @@ public void testGetVertexOrThrow() {
@Test
public void testResultPartitionOrThrow() {
try {
adapter.getResultPartitionOrThrow(new IntermediateResultPartitionID());
adapter.getResultPartition(new IntermediateResultPartitionID());
fail("get not exist result partition");
} catch (IllegalArgumentException exception) {
// expected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void setContainsCoLocationConstraints(final boolean containsCoLocationCon
}

@Override
public TestingSchedulingExecutionVertex getVertexOrThrow(final ExecutionVertexID executionVertexId) {
public TestingSchedulingExecutionVertex getVertex(final ExecutionVertexID executionVertexId) {
final TestingSchedulingExecutionVertex executionVertex = schedulingExecutionVertices.get(executionVertexId);
if (executionVertex == null) {
throw new IllegalArgumentException("can not find vertex: " + executionVertexId);
Expand All @@ -71,7 +71,7 @@ public TestingSchedulingExecutionVertex getVertexOrThrow(final ExecutionVertexID
}

@Override
public TestingSchedulingResultPartition getResultPartitionOrThrow(final IntermediateResultPartitionID intermediateResultPartitionId) {
public TestingSchedulingResultPartition getResultPartition(final IntermediateResultPartitionID intermediateResultPartitionId) {
final TestingSchedulingResultPartition resultPartition = schedulingResultPartitions.get(intermediateResultPartitionId);
if (resultPartition == null) {
throw new IllegalArgumentException("can not find partition: " + intermediateResultPartitionId);
Expand Down

0 comments on commit 39ebaae

Please sign in to comment.