From 1aa9074e8a14765825f56e84caca806d4c17e227 Mon Sep 17 00:00:00 2001 From: Zhu Zhu Date: Tue, 22 Dec 2020 17:42:17 +0800 Subject: [PATCH] [FLINK-20439][runtime] Rename `scheduleOrUpdateConsumers` to `notifyPartitionDataAvailable` to avoid confusion --- .../ResultPartitionDeploymentDescriptor.java | 12 ++++++------ .../flink/runtime/executiongraph/Execution.java | 8 ++++---- .../apache/flink/runtime/jobmaster/JobMaster.java | 4 ++-- .../flink/runtime/jobmaster/JobMasterGateway.java | 4 ++-- .../flink/runtime/scheduler/DefaultScheduler.java | 8 +++++--- .../flink/runtime/scheduler/DeploymentOption.java | 12 ++++++------ .../flink/runtime/scheduler/SchedulerBase.java | 6 +++--- .../flink/runtime/scheduler/SchedulerNG.java | 2 +- .../rpc/RpcResultPartitionConsumableNotifier.java | 7 ++++--- ...bleNotifyingResultPartitionWriterDecorator.java | 4 ++-- .../ResultPartitionDeploymentDescriptorTest.java | 2 +- .../ExecutionGraphPartitionReleaseTest.java | 2 +- .../ExecutionGraphVariousFailuesTest.java | 8 ++++---- .../ExecutionVertexDeploymentTest.java | 2 +- .../io/network/partition/ResultPartitionTest.java | 8 ++++---- .../jobmaster/utils/TestingJobMasterGateway.java | 10 +++++----- .../utils/TestingJobMasterGatewayBuilder.java | 8 ++++---- .../runtime/scheduler/TestingSchedulerNG.java | 2 +- .../taskexecutor/TaskExecutorSubmissionTest.java | 14 +++++++------- 19 files changed, 63 insertions(+), 60 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java index 072019158a3af..2bd12f49e18d9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java @@ -45,19 +45,19 @@ public class ResultPartitionDeploymentDescriptor implements Serializable { private final int maxParallelism; - /** Flag whether the result partition should send scheduleOrUpdateConsumer messages. */ - private final boolean sendScheduleOrUpdateConsumersMessage; + /** Flag whether the result partition should notify master when its data is available. */ + private final boolean notifyPartitionDataAvailable; public ResultPartitionDeploymentDescriptor( PartitionDescriptor partitionDescriptor, ShuffleDescriptor shuffleDescriptor, int maxParallelism, - boolean sendScheduleOrUpdateConsumersMessage) { + boolean notifyPartitionDataAvailable) { this.partitionDescriptor = checkNotNull(partitionDescriptor); this.shuffleDescriptor = checkNotNull(shuffleDescriptor); KeyGroupRangeAssignment.checkParallelismPreconditions(maxParallelism); this.maxParallelism = maxParallelism; - this.sendScheduleOrUpdateConsumersMessage = sendScheduleOrUpdateConsumersMessage; + this.notifyPartitionDataAvailable = notifyPartitionDataAvailable; } public IntermediateDataSetID getResultId() { @@ -88,8 +88,8 @@ public ShuffleDescriptor getShuffleDescriptor() { return shuffleDescriptor; } - public boolean sendScheduleOrUpdateConsumersMessage() { - return sendScheduleOrUpdateConsumersMessage; + public boolean notifyPartitionDataAvailable() { + return notifyPartitionDataAvailable; } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 98d428caf7238..df60708dd4139 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -397,12 +397,12 @@ public CompletableFuture getReleaseFuture() { public CompletableFuture registerProducedPartitions( TaskManagerLocation location, - boolean sendScheduleOrUpdateConsumersMessage) { + boolean notifyPartitionDataAvailable) { assertRunningInJobMasterMainThread(); return FutureUtils.thenApplyAsyncIfNotDone( - registerProducedPartitions(vertex, location, attemptId, sendScheduleOrUpdateConsumersMessage), + registerProducedPartitions(vertex, location, attemptId, notifyPartitionDataAvailable), vertex.getExecutionGraph().getJobMasterMainThreadExecutor(), producedPartitionsCache -> { producedPartitions = producedPartitionsCache; @@ -436,7 +436,7 @@ static CompletableFuture requestPartitionState( } @Override - public CompletableFuture scheduleOrUpdateConsumers( + public CompletableFuture notifyPartitionDataAvailable( final ResultPartitionID partitionID, final Time timeout) { - schedulerNG.scheduleOrUpdateConsumers(partitionID); + schedulerNG.notifyPartitionDataAvailable(partitionID); return CompletableFuture.completedFuture(Acknowledge.get()); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java index 26c337c22e1d7..0798d9775a573 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java @@ -118,9 +118,9 @@ CompletableFuture requestPartitionState( * * @param partitionID The partition which has already produced data * @param timeout before the rpc call fails - * @return Future acknowledge of the schedule or update operation + * @return Future acknowledge of the notification */ - CompletableFuture scheduleOrUpdateConsumers( + CompletableFuture notifyPartitionDataAvailable( final ResultPartitionID partitionID, @RpcTimeout final Time timeout); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java index 2dc99873b3d75..7f4a4bdef1e5b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java @@ -313,7 +313,7 @@ private CompletableFuture cancelExecutionVertex(final ExecutionVertexID execu } @Override - protected void scheduleOrUpdateConsumersInternal(final IntermediateResultPartitionID partitionId) { + protected void notifyPartitionDataAvailableInternal(final IntermediateResultPartitionID partitionId) { schedulingStrategy.onPartitionConsumable(partitionId); } @@ -442,10 +442,12 @@ private BiFunction assignResourceOrHandleError(fin if (throwable == null) { final ExecutionVertex executionVertex = getExecutionVertex(executionVertexId); - final boolean sendScheduleOrUpdateConsumerMessage = deploymentHandle.getDeploymentOption().sendScheduleOrUpdateConsumerMessage(); + final boolean notifyPartitionDataAvailable = deploymentHandle + .getDeploymentOption() + .notifyPartitionDataAvailable(); executionVertex .getCurrentExecutionAttempt() - .registerProducedPartitions(logicalSlot.getTaskManagerLocation(), sendScheduleOrUpdateConsumerMessage); + .registerProducedPartitions(logicalSlot.getTaskManagerLocation(), notifyPartitionDataAvailable); executionVertex.tryAssignResource(logicalSlot); } else { handleTaskDeploymentFailure(executionVertexId, maybeWrapWithNoResourceAvailableException(throwable)); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DeploymentOption.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DeploymentOption.java index 9fb9aced49d3a..40892ad59dc86 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DeploymentOption.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DeploymentOption.java @@ -19,17 +19,17 @@ package org.apache.flink.runtime.scheduler; /** - * Deployment option which indicates whether the task should send scheduleOrUpdateConsumer message to master. + * Deployment option which indicates whether the task should notify master when its data is available. */ public class DeploymentOption { - private final boolean sendScheduleOrUpdateConsumerMessage; + private final boolean notifyPartitionDataAvailable; - public DeploymentOption(boolean sendScheduleOrUpdateConsumerMessage) { - this.sendScheduleOrUpdateConsumerMessage = sendScheduleOrUpdateConsumerMessage; + public DeploymentOption(boolean notifyPartitionDataAvailable) { + this.notifyPartitionDataAvailable = notifyPartitionDataAvailable; } - public boolean sendScheduleOrUpdateConsumerMessage() { - return sendScheduleOrUpdateConsumerMessage; + public boolean notifyPartitionDataAvailable() { + return notifyPartitionDataAvailable; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java index a5738cb86c692..ffcd76353e861 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java @@ -685,15 +685,15 @@ public ExecutionState requestPartitionState( } @Override - public final void scheduleOrUpdateConsumers(final ResultPartitionID partitionId) { + public final void notifyPartitionDataAvailable(final ResultPartitionID partitionId) { mainThreadExecutor.assertRunningInMainThread(); executionGraph.notifyPartitionDataAvailable(partitionId); - scheduleOrUpdateConsumersInternal(partitionId.getPartitionId()); + notifyPartitionDataAvailableInternal(partitionId.getPartitionId()); } - protected void scheduleOrUpdateConsumersInternal(IntermediateResultPartitionID resultPartitionId) { + protected void notifyPartitionDataAvailableInternal(IntermediateResultPartitionID resultPartitionId) { } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java index ff877db763db4..6430522af729b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java @@ -95,7 +95,7 @@ default boolean updateTaskExecutionState(TaskExecutionState taskExecutionState) ExecutionState requestPartitionState(IntermediateDataSetID intermediateResultId, ResultPartitionID resultPartitionId) throws PartitionProducerDisposedException; - void scheduleOrUpdateConsumers(ResultPartitionID partitionID); + void notifyPartitionDataAvailable(ResultPartitionID partitionID); ArchivedExecutionGraph requestJob(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java index 82a6fbccbe863..4be083f2ef580 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java @@ -48,16 +48,17 @@ public RpcResultPartitionConsumableNotifier( this.executor = Preconditions.checkNotNull(executor); this.timeout = Preconditions.checkNotNull(timeout); } + @Override public void notifyPartitionConsumable(JobID jobId, ResultPartitionID partitionId, final TaskActions taskActions) { - CompletableFuture acknowledgeFuture = jobMasterGateway.scheduleOrUpdateConsumers(partitionId, timeout); + CompletableFuture acknowledgeFuture = jobMasterGateway.notifyPartitionDataAvailable(partitionId, timeout); acknowledgeFuture.whenCompleteAsync( (Acknowledge ack, Throwable throwable) -> { if (throwable != null) { - LOG.error("Could not schedule or update consumers at the JobManager.", throwable); + LOG.error("Could not notify partition data available to JobManager.", throwable); - taskActions.failExternally(new RuntimeException("Could not notify JobManager to schedule or update consumers.", throwable)); + taskActions.failExternally(new RuntimeException("Could not notify partition data available to JobManager.", throwable)); } }, executor); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ConsumableNotifyingResultPartitionWriterDecorator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ConsumableNotifyingResultPartitionWriterDecorator.java index 221245515ae01..660e4f6699c38 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ConsumableNotifyingResultPartitionWriterDecorator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ConsumableNotifyingResultPartitionWriterDecorator.java @@ -49,7 +49,7 @@ public class ConsumableNotifyingResultPartitionWriterDecorator { /** * Optionally decorate the ResultPartitionWriter to call * {@link ResultPartitionConsumableNotifier#notifyPartitionConsumable(JobID, ResultPartitionID, TaskActions)} - * on the first record, iff {@link ResultPartitionDeploymentDescriptor#sendScheduleOrUpdateConsumersMessage()} + * on the first record, iff {@link ResultPartitionDeploymentDescriptor#notifyPartitionDataAvailable()} * is true. */ public static ResultPartitionWriter[] decorate( @@ -62,7 +62,7 @@ public static ResultPartitionWriter[] decorate( ResultPartitionWriter[] consumableNotifyingPartitionWriters = new ResultPartitionWriter[partitionWriters.length]; int counter = 0; for (ResultPartitionDeploymentDescriptor desc : descs) { - if (desc.sendScheduleOrUpdateConsumersMessage() && desc.getPartitionType().isPipelined()) { + if (desc.notifyPartitionDataAvailable() && desc.getPartitionType().isPipelined()) { consumableNotifyingPartitionWriters[counter] = new ConsumableNotifyingResultPartitionWriter( taskActions, jobId, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java index 040a055cec6f2..9e4404e63016d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java @@ -121,6 +121,6 @@ private static void verifyResultPartitionDeploymentDescriptorCopy(ResultPartitio assertThat(copy.getPartitionId(), is(partitionId)); assertThat(copy.getPartitionType(), is(partitionType)); assertThat(copy.getNumberOfSubpartitions(), is(numberOfSubpartitions)); - assertThat(copy.sendScheduleOrUpdateConsumersMessage(), is(true)); + assertThat(copy.notifyPartitionDataAvailable(), is(true)); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java index b37af38a86505..8691316ea9132 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java @@ -142,7 +142,7 @@ public void testStrategyNotifiedOfUnFinishedVertices() throws Exception { final Execution operator1Execution = getCurrentExecution(operator1Vertex, executionGraph); // finish o1 and schedule the consumers (o2,o3); this should not result in any release calls since not all operators of the pipelined region are finished for (final IntermediateResultPartitionID partitionId : operator1Execution.getVertex().getProducedPartitions().keySet()) { - scheduler.scheduleOrUpdateConsumers(new ResultPartitionID(partitionId, operator1Execution.getAttemptId())); + scheduler.notifyPartitionDataAvailable(new ResultPartitionID(partitionId, operator1Execution.getAttemptId())); } scheduler.updateTaskExecutionState(new TaskExecutionState(executionGraph.getJobID(), operator1Execution.getAttemptId(), ExecutionState.FINISHED)); assertThat(releasedPartitions, empty()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphVariousFailuesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphVariousFailuesTest.java index 493415ba18d5a..ebecbe1b397ee 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphVariousFailuesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphVariousFailuesTest.java @@ -37,11 +37,11 @@ public class ExecutionGraphVariousFailuesTest extends TestLogger { /** - * Tests that a failing scheduleOrUpdateConsumers call with a non-existing execution attempt + * Tests that a failing notifyPartitionDataAvailable call with a non-existing execution attempt * id, will not fail the execution graph. */ @Test - public void testFailingScheduleOrUpdateConsumers() throws Exception { + public void testFailingNotifyPartitionDataAvailable() throws Exception { final SchedulerBase scheduler = SchedulerTestingUtils.newSchedulerBuilder(new JobGraph()).build(); scheduler.initialize(ComponentMainThreadExecutorServiceAdapter.forMainThread()); scheduler.startScheduling(); @@ -55,11 +55,11 @@ public void testFailingScheduleOrUpdateConsumers() throws Exception { ExecutionAttemptID producerId = new ExecutionAttemptID(); ResultPartitionID resultPartitionId = new ResultPartitionID(intermediateResultPartitionId, producerId); - // The execution attempt id does not exist and thus the scheduleOrUpdateConsumers call + // The execution attempt id does not exist and thus the notifyPartitionDataAvailable call // should fail try { - scheduler.scheduleOrUpdateConsumers(resultPartitionId); + scheduler.notifyPartitionDataAvailable(resultPartitionId); fail("Error expected."); } catch (IllegalStateException e) { // we've expected this exception to occur diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java index 269c935772ac3..44e6cce439ed2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java @@ -294,7 +294,7 @@ public void testTddProducedPartitionsLazyScheduling() throws Exception { assertEquals(1, producedPartitions.size()); ResultPartitionDeploymentDescriptor desc = producedPartitions.iterator().next(); - assertEquals(scheduleMode.allowLazyDeployment(), desc.sendScheduleOrUpdateConsumersMessage()); + assertEquals(scheduleMode.allowLazyDeployment(), desc.notifyPartitionDataAvailable()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java index 6fd82781b694e..6743b2082e575 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java @@ -99,10 +99,10 @@ public void testResultSubpartitionInfo() { } /** - * Tests the schedule or update consumers message sending behaviour depending on the relevant flags. + * Tests notifyPartitionDataAvailable behaviour depending on the relevant flags. */ @Test - public void testSendScheduleOrUpdateConsumersMessage() throws Exception { + public void testNotifyPartitionDataAvailable() throws Exception { FutureConsumerWithException[] notificationCalls = new FutureConsumerWithException[] { writer -> ((ResultPartitionWriter) writer).finish(), writer -> ((ResultPartitionWriter) writer).emitRecord(ByteBuffer.allocate(bufferSize), 0), @@ -111,11 +111,11 @@ public void testSendScheduleOrUpdateConsumersMessage() throws Exception { }; for (FutureConsumerWithException notificationCall: notificationCalls) { - testSendScheduleOrUpdateConsumersMessage(notificationCall); + testNotifyPartitionDataAvailable(notificationCall); } } - private void testSendScheduleOrUpdateConsumersMessage( + private void testNotifyPartitionDataAvailable( FutureConsumerWithException notificationCall) throws Exception { JobID jobId = new JobID(); TaskActions taskActions = new NoOpTaskActions(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java index fc119d6236557..f403425b240e5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java @@ -95,7 +95,7 @@ public class TestingJobMasterGateway implements JobMasterGateway { private final BiFunction> requestPartitionStateFunction; @Nonnull - private final Function> scheduleOrUpdateConsumersFunction; + private final Function> notifyPartitionDataAvailableFunction; @Nonnull private final Function> disconnectTaskManagerFunction; @@ -172,7 +172,7 @@ public TestingJobMasterGateway( @Nonnull Function> updateTaskExecutionStateFunction, @Nonnull BiFunction> requestNextInputSplitFunction, @Nonnull BiFunction> requestPartitionStateFunction, - @Nonnull Function> scheduleOrUpdateConsumersFunction, + @Nonnull Function> notifyPartitionDataAvailableFunction, @Nonnull Function> disconnectTaskManagerFunction, @Nonnull Consumer disconnectResourceManagerConsumer, @Nonnull BiFunction, CompletableFuture>> offerSlotsFunction, @@ -202,7 +202,7 @@ public TestingJobMasterGateway( this.updateTaskExecutionStateFunction = updateTaskExecutionStateFunction; this.requestNextInputSplitFunction = requestNextInputSplitFunction; this.requestPartitionStateFunction = requestPartitionStateFunction; - this.scheduleOrUpdateConsumersFunction = scheduleOrUpdateConsumersFunction; + this.notifyPartitionDataAvailableFunction = notifyPartitionDataAvailableFunction; this.disconnectTaskManagerFunction = disconnectTaskManagerFunction; this.disconnectResourceManagerConsumer = disconnectResourceManagerConsumer; this.offerSlotsFunction = offerSlotsFunction; @@ -249,8 +249,8 @@ public CompletableFuture requestPartitionState(IntermediateDataS } @Override - public CompletableFuture scheduleOrUpdateConsumers(ResultPartitionID partitionID, Time timeout) { - return scheduleOrUpdateConsumersFunction.apply(partitionID); + public CompletableFuture notifyPartitionDataAvailable(ResultPartitionID partitionID, Time timeout) { + return notifyPartitionDataAvailableFunction.apply(partitionID); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java index a55b4ca683ba6..f10e3d7cb0d24 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java @@ -83,7 +83,7 @@ public class TestingJobMasterGatewayBuilder { private Function> updateTaskExecutionStateFunction = ignored -> CompletableFuture.completedFuture(Acknowledge.get()); private BiFunction> requestNextInputSplitFunction = (ignoredA, ignoredB) -> CompletableFuture.completedFuture(new SerializedInputSplit(null)); private BiFunction> requestPartitionStateFunction = (ignoredA, ignoredB) -> CompletableFuture.completedFuture(ExecutionState.RUNNING); - private Function> scheduleOrUpdateConsumersFunction = ignored -> CompletableFuture.completedFuture(Acknowledge.get()); + private Function> notifyPartitionDataAvailableFunction = ignored -> CompletableFuture.completedFuture(Acknowledge.get()); private Function> disconnectTaskManagerFunction = ignored -> CompletableFuture.completedFuture(Acknowledge.get()); private Consumer disconnectResourceManagerConsumer = ignored -> {}; private BiFunction, CompletableFuture>> offerSlotsFunction = (ignoredA, ignoredB) -> CompletableFuture.completedFuture(Collections.emptyList()); @@ -138,8 +138,8 @@ public TestingJobMasterGatewayBuilder setRequestPartitionStateFunction(BiFunctio return this; } - public TestingJobMasterGatewayBuilder setScheduleOrUpdateConsumersFunction(Function> scheduleOrUpdateConsumersFunction) { - this.scheduleOrUpdateConsumersFunction = scheduleOrUpdateConsumersFunction; + public TestingJobMasterGatewayBuilder setNotifyPartitionDataAvailableFunction(Function> notifyPartitionDataAvailableFunction) { + this.notifyPartitionDataAvailableFunction = notifyPartitionDataAvailableFunction; return this; } @@ -266,7 +266,7 @@ public TestingJobMasterGateway build() { updateTaskExecutionStateFunction, requestNextInputSplitFunction, requestPartitionStateFunction, - scheduleOrUpdateConsumersFunction, + notifyPartitionDataAvailableFunction, disconnectTaskManagerFunction, disconnectResourceManagerConsumer, offerSlotsFunction, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java index 2675a846d86e9..676ddb915f5bf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java @@ -131,7 +131,7 @@ public ExecutionState requestPartitionState( } @Override - public void scheduleOrUpdateConsumers(ResultPartitionID partitionID) { + public void notifyPartitionDataAvailable(ResultPartitionID partitionID) { failOperation(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java index 36ebd46d8eacd..2d7629069d572 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java @@ -269,7 +269,7 @@ public void testRunJobWithForwardChannel() throws Exception { TestingJobMasterGateway testingJobMasterGateway = new TestingJobMasterGatewayBuilder() .setFencingTokenSupplier(() -> jobMasterId) - .setScheduleOrUpdateConsumersFunction( + .setNotifyPartitionDataAvailableFunction( resultPartitionID -> CompletableFuture.completedFuture(Acknowledge.get())) .build(); @@ -501,7 +501,7 @@ public void testLocalPartitionNotFound() throws Exception { } /** - * Test that a failing schedule or update consumers call leads to the failing of the respective + * Test that a failing notifyPartitionDataAvailable call leads to the failing of the respective * task. * *

IMPORTANT: We have to make sure that the invokable's cancel method is called, because only @@ -509,14 +509,14 @@ public void testLocalPartitionNotFound() throws Exception { * the invokable to fill one memory segment. The completed memory segment will trigger the * scheduling of the downstream operator since it is in pipeline mode. After we've filled the * memory segment, we'll block the invokable and wait for the task failure due to the failed - * schedule or update consumers call. + * notifyPartitionDataAvailable call. */ @Test(timeout = TEST_TIMEOUT) - public void testFailingScheduleOrUpdateConsumers() throws Exception { + public void testFailingNotifyPartitionDataAvailable() throws Exception { final Configuration configuration = new Configuration(); // set the memory segment to the smallest size possible, because we have to fill one - // memory buffer to trigger the schedule or update consumers message to the downstream + // memory buffer to trigger notifyPartitionDataAvailable to the downstream // operators configuration.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, MemorySize.parse("4096")); @@ -527,13 +527,13 @@ public void testFailingScheduleOrUpdateConsumers() throws Exception { final CompletableFuture taskRunningFuture = new CompletableFuture<>(); - final Exception exception = new Exception("Failed schedule or update consumers"); + final Exception exception = new Exception("Failed notifyPartitionDataAvailable"); final JobMasterId jobMasterId = JobMasterId.generate(); TestingJobMasterGateway testingJobMasterGateway = new TestingJobMasterGatewayBuilder() .setFencingTokenSupplier(() -> jobMasterId) - .setUpdateTaskExecutionStateFunction(resultPartitionID -> FutureUtils.completedExceptionally(exception)) + .setNotifyPartitionDataAvailableFunction(resultPartitionID -> FutureUtils.completedExceptionally(exception)) .build(); try (TaskSubmissionTestEnvironment env =