diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index d3aa0e2ee97ae..4ef2bbfcebbeb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -1547,7 +1547,7 @@ private void releasePartitions(final List releasa ResultPartitionID createResultPartitionId(final IntermediateResultPartitionID resultPartitionId) { final SchedulingResultPartition schedulingResultPartition = - getSchedulingTopology().getResultPartitionOrThrow(resultPartitionId); + getSchedulingTopology().getResultPartition(resultPartitionId); final SchedulingExecutionVertex producer = schedulingResultPartition.getProducer(); final ExecutionVertexID producerId = producer.getId(); final JobVertexID jobVertexId = producerId.getJobVertexId(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionReleaseStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionReleaseStrategy.java index a05a24a407bc2..d5529b7cbfcf7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionReleaseStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionReleaseStrategy.java @@ -88,7 +88,7 @@ private Set findResultPartitionsOutsideOfRegion(f final Set> allConsumedPartitionsInRegion = pipelinedRegion .getExecutionVertexIds() .stream() - .map(schedulingTopology::getVertexOrThrow) + .map(schedulingTopology::getVertex) .flatMap(vertex -> IterableUtils.toStream(vertex.getConsumedResults())) .collect(Collectors.toSet()); @@ -157,7 +157,7 @@ private List filterReleasablePartitions(final Pip } private boolean areConsumerRegionsFinished(final IntermediateResultPartitionID resultPartitionId) { - final SchedulingResultPartition resultPartition = schedulingTopology.getResultPartitionOrThrow(resultPartitionId); + final SchedulingResultPartition resultPartition = schedulingTopology.getResultPartition(resultPartitionId); return IterableUtils.toStream(resultPartition.getConsumers()) .map(SchedulingExecutionVertex::getId) .allMatch(this::isRegionOfVertexFinished); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java index 162a888699152..a535fdc796af6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java @@ -87,7 +87,7 @@ public boolean containsCoLocationConstraints() { } @Override - public DefaultExecutionVertex getVertexOrThrow(final ExecutionVertexID executionVertexId) { + public DefaultExecutionVertex getVertex(final ExecutionVertexID executionVertexId) { final DefaultExecutionVertex executionVertex = executionVerticesById.get(executionVertexId); if (executionVertex == null) { throw new IllegalArgumentException("can not find vertex: " + executionVertexId); @@ -96,7 +96,7 @@ public DefaultExecutionVertex getVertexOrThrow(final ExecutionVertexID execution } @Override - public DefaultResultPartition getResultPartitionOrThrow(final IntermediateResultPartitionID intermediateResultPartitionId) { + public DefaultResultPartition getResultPartition(final IntermediateResultPartitionID intermediateResultPartitionId) { final DefaultResultPartition resultPartition = resultPartitionsById.get(intermediateResultPartitionId); if (resultPartition == null) { throw new IllegalArgumentException("can not find partition: " + intermediateResultPartitionId); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java index da4c148c93754..6520100e7ea6c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java @@ -86,7 +86,7 @@ public void restartTasks(Set verticesToRestart) { // increase counter of the dataset first verticesToRestart .stream() - .map(schedulingTopology::getVertexOrThrow) + .map(schedulingTopology::getVertex) .flatMap(vertex -> IterableUtils.toStream(vertex.getProducedResults())) .forEach(inputConstraintChecker::resetSchedulingResultPartition); @@ -101,7 +101,7 @@ public void onExecutionStateChange(ExecutionVertexID executionVertexId, Executio } final Set> verticesToSchedule = IterableUtils - .toStream(schedulingTopology.getVertexOrThrow(executionVertexId).getProducedResults()) + .toStream(schedulingTopology.getVertex(executionVertexId).getProducedResults()) .filter(partition -> partition.getResultType().isBlocking()) .flatMap(partition -> inputConstraintChecker.markSchedulingResultPartitionFinished(partition).stream()) .flatMap(partition -> IterableUtils.toStream(partition.getConsumers())) @@ -113,7 +113,7 @@ public void onExecutionStateChange(ExecutionVertexID executionVertexId, Executio @Override public void onPartitionConsumable(IntermediateResultPartitionID resultPartitionId) { final SchedulingResultPartition resultPartition = schedulingTopology - .getResultPartitionOrThrow(resultPartitionId); + .getResultPartition(resultPartitionId); if (!resultPartition.getResultType().isPipelined()) { return; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategyUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategyUtils.java index 807217e05a6dd..b1e9dd946e98c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategyUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategyUtils.java @@ -43,7 +43,7 @@ static Set getAllVertexIdsFromTopology(final SchedulingTopolo final Set vertexIds) { return vertexIds.stream() - .map(topology::getVertexOrThrow) + .map(topology::getVertex) .collect(Collectors.toSet()); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingTopology.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingTopology.java index fef82cfe4cb8a..ae26be9bd9e35 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingTopology.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingTopology.java @@ -34,7 +34,7 @@ public interface SchedulingTopology, R * @return The respective scheduling vertex * @throws IllegalArgumentException If the vertex does not exist */ - V getVertexOrThrow(ExecutionVertexID executionVertexId); + V getVertex(ExecutionVertexID executionVertexId); /** * Looks up the {@link SchedulingResultPartition} for the given {@link IntermediateResultPartitionID}. @@ -43,5 +43,5 @@ public interface SchedulingTopology, R * @return The respective scheduling result partition * @throws IllegalArgumentException If the partition does not exist */ - R getResultPartitionOrThrow(IntermediateResultPartitionID intermediateResultPartitionId); + R getResultPartition(IntermediateResultPartitionID intermediateResultPartitionId); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java index 2f20f40d43cbd..0b854e4c90c36 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java @@ -654,7 +654,7 @@ public void cancelWhileRestartingShouldWaitForRunningTasks() { scheduler.updateTaskExecutionState(new TaskExecutionState(jobid, attemptId1, ExecutionState.FAILED, new RuntimeException("expected"))); scheduler.cancel(); - final ExecutionState vertex2StateAfterCancel = topology.getVertexOrThrow(executionVertex2).getState(); + final ExecutionState vertex2StateAfterCancel = topology.getVertex(executionVertex2).getState(); final JobStatus statusAfterCancelWhileRestarting = scheduler.requestJobStatus(); scheduler.updateTaskExecutionState(new TaskExecutionState(jobid, attemptId2, ExecutionState.CANCELED, new RuntimeException("expected"))); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java index 8dcdfffde02bd..f0edb10493252 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java @@ -98,7 +98,7 @@ public void testGetResultPartition() { for (ExecutionVertex vertex : executionGraph.getAllExecutionVertices()) { for (Map.Entry entry : vertex.getProducedPartitions().entrySet()) { IntermediateResultPartition partition = entry.getValue(); - DefaultResultPartition schedulingResultPartition = adapter.getResultPartitionOrThrow(entry.getKey()); + DefaultResultPartition schedulingResultPartition = adapter.getResultPartition(entry.getKey()); assertPartitionEquals(partition, schedulingResultPartition); } @@ -114,7 +114,7 @@ public void testResultPartitionStateSupplier() { .get(); final DefaultResultPartition schedulingResultPartition = adapter - .getResultPartitionOrThrow(intermediateResultPartition.getPartitionId()); + .getResultPartition(intermediateResultPartition.getPartitionId()); assertEquals(ResultPartitionState.CREATED, schedulingResultPartition.getState()); @@ -125,7 +125,7 @@ public void testResultPartitionStateSupplier() { @Test public void testGetVertexOrThrow() { try { - adapter.getVertexOrThrow(new ExecutionVertexID(new JobVertexID(), 0)); + adapter.getVertex(new ExecutionVertexID(new JobVertexID(), 0)); fail("get not exist vertex"); } catch (IllegalArgumentException exception) { // expected @@ -135,7 +135,7 @@ public void testGetVertexOrThrow() { @Test public void testResultPartitionOrThrow() { try { - adapter.getResultPartitionOrThrow(new IntermediateResultPartitionID()); + adapter.getResultPartition(new IntermediateResultPartitionID()); fail("get not exist result partition"); } catch (IllegalArgumentException exception) { // expected diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java index dd2fa889b6cb5..5e930b413cb86 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java @@ -62,7 +62,7 @@ public void setContainsCoLocationConstraints(final boolean containsCoLocationCon } @Override - public TestingSchedulingExecutionVertex getVertexOrThrow(final ExecutionVertexID executionVertexId) { + public TestingSchedulingExecutionVertex getVertex(final ExecutionVertexID executionVertexId) { final TestingSchedulingExecutionVertex executionVertex = schedulingExecutionVertices.get(executionVertexId); if (executionVertex == null) { throw new IllegalArgumentException("can not find vertex: " + executionVertexId); @@ -71,7 +71,7 @@ public TestingSchedulingExecutionVertex getVertexOrThrow(final ExecutionVertexID } @Override - public TestingSchedulingResultPartition getResultPartitionOrThrow(final IntermediateResultPartitionID intermediateResultPartitionId) { + public TestingSchedulingResultPartition getResultPartition(final IntermediateResultPartitionID intermediateResultPartitionId) { final TestingSchedulingResultPartition resultPartition = schedulingResultPartitions.get(intermediateResultPartitionId); if (resultPartition == null) { throw new IllegalArgumentException("can not find partition: " + intermediateResultPartitionId);