Skip to content

Commit

Permalink
[FLINK-20439][runtime] Rename scheduleOrUpdateConsumers to `notifyP…
Browse files Browse the repository at this point in the history
…artitionDataAvailable` to avoid confusion
  • Loading branch information
zhuzhurk committed Dec 23, 2020
1 parent 2c6521a commit 1aa9074
Show file tree
Hide file tree
Showing 19 changed files with 63 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,19 @@ public class ResultPartitionDeploymentDescriptor implements Serializable {

private final int maxParallelism;

/** Flag whether the result partition should send scheduleOrUpdateConsumer messages. */
private final boolean sendScheduleOrUpdateConsumersMessage;
/** Flag whether the result partition should notify master when its data is available. */
private final boolean notifyPartitionDataAvailable;

public ResultPartitionDeploymentDescriptor(
PartitionDescriptor partitionDescriptor,
ShuffleDescriptor shuffleDescriptor,
int maxParallelism,
boolean sendScheduleOrUpdateConsumersMessage) {
boolean notifyPartitionDataAvailable) {
this.partitionDescriptor = checkNotNull(partitionDescriptor);
this.shuffleDescriptor = checkNotNull(shuffleDescriptor);
KeyGroupRangeAssignment.checkParallelismPreconditions(maxParallelism);
this.maxParallelism = maxParallelism;
this.sendScheduleOrUpdateConsumersMessage = sendScheduleOrUpdateConsumersMessage;
this.notifyPartitionDataAvailable = notifyPartitionDataAvailable;
}

public IntermediateDataSetID getResultId() {
Expand Down Expand Up @@ -88,8 +88,8 @@ public ShuffleDescriptor getShuffleDescriptor() {
return shuffleDescriptor;
}

public boolean sendScheduleOrUpdateConsumersMessage() {
return sendScheduleOrUpdateConsumersMessage;
public boolean notifyPartitionDataAvailable() {
return notifyPartitionDataAvailable;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,12 +397,12 @@ public CompletableFuture<?> getReleaseFuture() {

public CompletableFuture<Execution> registerProducedPartitions(
TaskManagerLocation location,
boolean sendScheduleOrUpdateConsumersMessage) {
boolean notifyPartitionDataAvailable) {

assertRunningInJobMasterMainThread();

return FutureUtils.thenApplyAsyncIfNotDone(
registerProducedPartitions(vertex, location, attemptId, sendScheduleOrUpdateConsumersMessage),
registerProducedPartitions(vertex, location, attemptId, notifyPartitionDataAvailable),
vertex.getExecutionGraph().getJobMasterMainThreadExecutor(),
producedPartitionsCache -> {
producedPartitions = producedPartitionsCache;
Expand Down Expand Up @@ -436,7 +436,7 @@ static CompletableFuture<Map<IntermediateResultPartitionID, ResultPartitionDeplo
ExecutionVertex vertex,
TaskManagerLocation location,
ExecutionAttemptID attemptId,
boolean sendScheduleOrUpdateConsumersMessage) {
boolean notifyPartitionDataAvailable) {

ProducerDescriptor producerDescriptor = ProducerDescriptor.create(location, attemptId);

Expand All @@ -460,7 +460,7 @@ static CompletableFuture<Map<IntermediateResultPartitionID, ResultPartitionDeplo
partitionDescriptor,
shuffleDescriptor,
maxParallelism,
sendScheduleOrUpdateConsumersMessage));
notifyPartitionDataAvailable));
partitionRegistrations.add(partitionRegistration);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,11 +453,11 @@ public CompletableFuture<ExecutionState> requestPartitionState(
}

@Override
public CompletableFuture<Acknowledge> scheduleOrUpdateConsumers(
public CompletableFuture<Acknowledge> notifyPartitionDataAvailable(
final ResultPartitionID partitionID,
final Time timeout) {

schedulerNG.scheduleOrUpdateConsumers(partitionID);
schedulerNG.notifyPartitionDataAvailable(partitionID);
return CompletableFuture.completedFuture(Acknowledge.get());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@ CompletableFuture<ExecutionState> requestPartitionState(
*
* @param partitionID The partition which has already produced data
* @param timeout before the rpc call fails
* @return Future acknowledge of the schedule or update operation
* @return Future acknowledge of the notification
*/
CompletableFuture<Acknowledge> scheduleOrUpdateConsumers(
CompletableFuture<Acknowledge> notifyPartitionDataAvailable(
final ResultPartitionID partitionID,
@RpcTimeout final Time timeout);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ private CompletableFuture<?> cancelExecutionVertex(final ExecutionVertexID execu
}

@Override
protected void scheduleOrUpdateConsumersInternal(final IntermediateResultPartitionID partitionId) {
protected void notifyPartitionDataAvailableInternal(final IntermediateResultPartitionID partitionId) {
schedulingStrategy.onPartitionConsumable(partitionId);
}

Expand Down Expand Up @@ -442,10 +442,12 @@ private BiFunction<LogicalSlot, Throwable, Void> assignResourceOrHandleError(fin

if (throwable == null) {
final ExecutionVertex executionVertex = getExecutionVertex(executionVertexId);
final boolean sendScheduleOrUpdateConsumerMessage = deploymentHandle.getDeploymentOption().sendScheduleOrUpdateConsumerMessage();
final boolean notifyPartitionDataAvailable = deploymentHandle
.getDeploymentOption()
.notifyPartitionDataAvailable();
executionVertex
.getCurrentExecutionAttempt()
.registerProducedPartitions(logicalSlot.getTaskManagerLocation(), sendScheduleOrUpdateConsumerMessage);
.registerProducedPartitions(logicalSlot.getTaskManagerLocation(), notifyPartitionDataAvailable);
executionVertex.tryAssignResource(logicalSlot);
} else {
handleTaskDeploymentFailure(executionVertexId, maybeWrapWithNoResourceAvailableException(throwable));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@
package org.apache.flink.runtime.scheduler;

/**
* Deployment option which indicates whether the task should send scheduleOrUpdateConsumer message to master.
* Deployment option which indicates whether the task should notify master when its data is available.
*/
public class DeploymentOption {

private final boolean sendScheduleOrUpdateConsumerMessage;
private final boolean notifyPartitionDataAvailable;

public DeploymentOption(boolean sendScheduleOrUpdateConsumerMessage) {
this.sendScheduleOrUpdateConsumerMessage = sendScheduleOrUpdateConsumerMessage;
public DeploymentOption(boolean notifyPartitionDataAvailable) {
this.notifyPartitionDataAvailable = notifyPartitionDataAvailable;
}

public boolean sendScheduleOrUpdateConsumerMessage() {
return sendScheduleOrUpdateConsumerMessage;
public boolean notifyPartitionDataAvailable() {
return notifyPartitionDataAvailable;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -685,15 +685,15 @@ public ExecutionState requestPartitionState(
}

@Override
public final void scheduleOrUpdateConsumers(final ResultPartitionID partitionId) {
public final void notifyPartitionDataAvailable(final ResultPartitionID partitionId) {
mainThreadExecutor.assertRunningInMainThread();

executionGraph.notifyPartitionDataAvailable(partitionId);

scheduleOrUpdateConsumersInternal(partitionId.getPartitionId());
notifyPartitionDataAvailableInternal(partitionId.getPartitionId());
}

protected void scheduleOrUpdateConsumersInternal(IntermediateResultPartitionID resultPartitionId) {
protected void notifyPartitionDataAvailableInternal(IntermediateResultPartitionID resultPartitionId) {
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ default boolean updateTaskExecutionState(TaskExecutionState taskExecutionState)

ExecutionState requestPartitionState(IntermediateDataSetID intermediateResultId, ResultPartitionID resultPartitionId) throws PartitionProducerDisposedException;

void scheduleOrUpdateConsumers(ResultPartitionID partitionID);
void notifyPartitionDataAvailable(ResultPartitionID partitionID);

ArchivedExecutionGraph requestJob();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,17 @@ public RpcResultPartitionConsumableNotifier(
this.executor = Preconditions.checkNotNull(executor);
this.timeout = Preconditions.checkNotNull(timeout);
}

@Override
public void notifyPartitionConsumable(JobID jobId, ResultPartitionID partitionId, final TaskActions taskActions) {
CompletableFuture<Acknowledge> acknowledgeFuture = jobMasterGateway.scheduleOrUpdateConsumers(partitionId, timeout);
CompletableFuture<Acknowledge> acknowledgeFuture = jobMasterGateway.notifyPartitionDataAvailable(partitionId, timeout);

acknowledgeFuture.whenCompleteAsync(
(Acknowledge ack, Throwable throwable) -> {
if (throwable != null) {
LOG.error("Could not schedule or update consumers at the JobManager.", throwable);
LOG.error("Could not notify partition data available to JobManager.", throwable);

taskActions.failExternally(new RuntimeException("Could not notify JobManager to schedule or update consumers.", throwable));
taskActions.failExternally(new RuntimeException("Could not notify partition data available to JobManager.", throwable));
}
},
executor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class ConsumableNotifyingResultPartitionWriterDecorator {
/**
* Optionally decorate the ResultPartitionWriter to call
* {@link ResultPartitionConsumableNotifier#notifyPartitionConsumable(JobID, ResultPartitionID, TaskActions)}
* on the first record, iff {@link ResultPartitionDeploymentDescriptor#sendScheduleOrUpdateConsumersMessage()}
* on the first record, iff {@link ResultPartitionDeploymentDescriptor#notifyPartitionDataAvailable()}
* is true.
*/
public static ResultPartitionWriter[] decorate(
Expand All @@ -62,7 +62,7 @@ public static ResultPartitionWriter[] decorate(
ResultPartitionWriter[] consumableNotifyingPartitionWriters = new ResultPartitionWriter[partitionWriters.length];
int counter = 0;
for (ResultPartitionDeploymentDescriptor desc : descs) {
if (desc.sendScheduleOrUpdateConsumersMessage() && desc.getPartitionType().isPipelined()) {
if (desc.notifyPartitionDataAvailable() && desc.getPartitionType().isPipelined()) {
consumableNotifyingPartitionWriters[counter] = new ConsumableNotifyingResultPartitionWriter(
taskActions,
jobId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,6 @@ private static void verifyResultPartitionDeploymentDescriptorCopy(ResultPartitio
assertThat(copy.getPartitionId(), is(partitionId));
assertThat(copy.getPartitionType(), is(partitionType));
assertThat(copy.getNumberOfSubpartitions(), is(numberOfSubpartitions));
assertThat(copy.sendScheduleOrUpdateConsumersMessage(), is(true));
assertThat(copy.notifyPartitionDataAvailable(), is(true));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public void testStrategyNotifiedOfUnFinishedVertices() throws Exception {
final Execution operator1Execution = getCurrentExecution(operator1Vertex, executionGraph);
// finish o1 and schedule the consumers (o2,o3); this should not result in any release calls since not all operators of the pipelined region are finished
for (final IntermediateResultPartitionID partitionId : operator1Execution.getVertex().getProducedPartitions().keySet()) {
scheduler.scheduleOrUpdateConsumers(new ResultPartitionID(partitionId, operator1Execution.getAttemptId()));
scheduler.notifyPartitionDataAvailable(new ResultPartitionID(partitionId, operator1Execution.getAttemptId()));
}
scheduler.updateTaskExecutionState(new TaskExecutionState(executionGraph.getJobID(), operator1Execution.getAttemptId(), ExecutionState.FINISHED));
assertThat(releasedPartitions, empty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@
public class ExecutionGraphVariousFailuesTest extends TestLogger {

/**
* Tests that a failing scheduleOrUpdateConsumers call with a non-existing execution attempt
* Tests that a failing notifyPartitionDataAvailable call with a non-existing execution attempt
* id, will not fail the execution graph.
*/
@Test
public void testFailingScheduleOrUpdateConsumers() throws Exception {
public void testFailingNotifyPartitionDataAvailable() throws Exception {
final SchedulerBase scheduler = SchedulerTestingUtils.newSchedulerBuilder(new JobGraph()).build();
scheduler.initialize(ComponentMainThreadExecutorServiceAdapter.forMainThread());
scheduler.startScheduling();
Expand All @@ -55,11 +55,11 @@ public void testFailingScheduleOrUpdateConsumers() throws Exception {
ExecutionAttemptID producerId = new ExecutionAttemptID();
ResultPartitionID resultPartitionId = new ResultPartitionID(intermediateResultPartitionId, producerId);

// The execution attempt id does not exist and thus the scheduleOrUpdateConsumers call
// The execution attempt id does not exist and thus the notifyPartitionDataAvailable call
// should fail

try {
scheduler.scheduleOrUpdateConsumers(resultPartitionId);
scheduler.notifyPartitionDataAvailable(resultPartitionId);
fail("Error expected.");
} catch (IllegalStateException e) {
// we've expected this exception to occur
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ public void testTddProducedPartitionsLazyScheduling() throws Exception {

assertEquals(1, producedPartitions.size());
ResultPartitionDeploymentDescriptor desc = producedPartitions.iterator().next();
assertEquals(scheduleMode.allowLazyDeployment(), desc.sendScheduleOrUpdateConsumersMessage());
assertEquals(scheduleMode.allowLazyDeployment(), desc.notifyPartitionDataAvailable());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,10 @@ public void testResultSubpartitionInfo() {
}

/**
* Tests the schedule or update consumers message sending behaviour depending on the relevant flags.
* Tests notifyPartitionDataAvailable behaviour depending on the relevant flags.
*/
@Test
public void testSendScheduleOrUpdateConsumersMessage() throws Exception {
public void testNotifyPartitionDataAvailable() throws Exception {
FutureConsumerWithException[] notificationCalls = new FutureConsumerWithException[] {
writer -> ((ResultPartitionWriter) writer).finish(),
writer -> ((ResultPartitionWriter) writer).emitRecord(ByteBuffer.allocate(bufferSize), 0),
Expand All @@ -111,11 +111,11 @@ public void testSendScheduleOrUpdateConsumersMessage() throws Exception {
};

for (FutureConsumerWithException notificationCall: notificationCalls) {
testSendScheduleOrUpdateConsumersMessage(notificationCall);
testNotifyPartitionDataAvailable(notificationCall);
}
}

private void testSendScheduleOrUpdateConsumersMessage(
private void testNotifyPartitionDataAvailable(
FutureConsumerWithException<ResultPartitionWriter, Exception> notificationCall) throws Exception {
JobID jobId = new JobID();
TaskActions taskActions = new NoOpTaskActions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public class TestingJobMasterGateway implements JobMasterGateway {
private final BiFunction<IntermediateDataSetID, ResultPartitionID, CompletableFuture<ExecutionState>> requestPartitionStateFunction;

@Nonnull
private final Function<ResultPartitionID, CompletableFuture<Acknowledge>> scheduleOrUpdateConsumersFunction;
private final Function<ResultPartitionID, CompletableFuture<Acknowledge>> notifyPartitionDataAvailableFunction;

@Nonnull
private final Function<ResourceID, CompletableFuture<Acknowledge>> disconnectTaskManagerFunction;
Expand Down Expand Up @@ -172,7 +172,7 @@ public TestingJobMasterGateway(
@Nonnull Function<TaskExecutionState, CompletableFuture<Acknowledge>> updateTaskExecutionStateFunction,
@Nonnull BiFunction<JobVertexID, ExecutionAttemptID, CompletableFuture<SerializedInputSplit>> requestNextInputSplitFunction,
@Nonnull BiFunction<IntermediateDataSetID, ResultPartitionID, CompletableFuture<ExecutionState>> requestPartitionStateFunction,
@Nonnull Function<ResultPartitionID, CompletableFuture<Acknowledge>> scheduleOrUpdateConsumersFunction,
@Nonnull Function<ResultPartitionID, CompletableFuture<Acknowledge>> notifyPartitionDataAvailableFunction,
@Nonnull Function<ResourceID, CompletableFuture<Acknowledge>> disconnectTaskManagerFunction,
@Nonnull Consumer<ResourceManagerId> disconnectResourceManagerConsumer,
@Nonnull BiFunction<ResourceID, Collection<SlotOffer>, CompletableFuture<Collection<SlotOffer>>> offerSlotsFunction,
Expand Down Expand Up @@ -202,7 +202,7 @@ public TestingJobMasterGateway(
this.updateTaskExecutionStateFunction = updateTaskExecutionStateFunction;
this.requestNextInputSplitFunction = requestNextInputSplitFunction;
this.requestPartitionStateFunction = requestPartitionStateFunction;
this.scheduleOrUpdateConsumersFunction = scheduleOrUpdateConsumersFunction;
this.notifyPartitionDataAvailableFunction = notifyPartitionDataAvailableFunction;
this.disconnectTaskManagerFunction = disconnectTaskManagerFunction;
this.disconnectResourceManagerConsumer = disconnectResourceManagerConsumer;
this.offerSlotsFunction = offerSlotsFunction;
Expand Down Expand Up @@ -249,8 +249,8 @@ public CompletableFuture<ExecutionState> requestPartitionState(IntermediateDataS
}

@Override
public CompletableFuture<Acknowledge> scheduleOrUpdateConsumers(ResultPartitionID partitionID, Time timeout) {
return scheduleOrUpdateConsumersFunction.apply(partitionID);
public CompletableFuture<Acknowledge> notifyPartitionDataAvailable(ResultPartitionID partitionID, Time timeout) {
return notifyPartitionDataAvailableFunction.apply(partitionID);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public class TestingJobMasterGatewayBuilder {
private Function<TaskExecutionState, CompletableFuture<Acknowledge>> updateTaskExecutionStateFunction = ignored -> CompletableFuture.completedFuture(Acknowledge.get());
private BiFunction<JobVertexID, ExecutionAttemptID, CompletableFuture<SerializedInputSplit>> requestNextInputSplitFunction = (ignoredA, ignoredB) -> CompletableFuture.completedFuture(new SerializedInputSplit(null));
private BiFunction<IntermediateDataSetID, ResultPartitionID, CompletableFuture<ExecutionState>> requestPartitionStateFunction = (ignoredA, ignoredB) -> CompletableFuture.completedFuture(ExecutionState.RUNNING);
private Function<ResultPartitionID, CompletableFuture<Acknowledge>> scheduleOrUpdateConsumersFunction = ignored -> CompletableFuture.completedFuture(Acknowledge.get());
private Function<ResultPartitionID, CompletableFuture<Acknowledge>> notifyPartitionDataAvailableFunction = ignored -> CompletableFuture.completedFuture(Acknowledge.get());
private Function<ResourceID, CompletableFuture<Acknowledge>> disconnectTaskManagerFunction = ignored -> CompletableFuture.completedFuture(Acknowledge.get());
private Consumer<ResourceManagerId> disconnectResourceManagerConsumer = ignored -> {};
private BiFunction<ResourceID, Collection<SlotOffer>, CompletableFuture<Collection<SlotOffer>>> offerSlotsFunction = (ignoredA, ignoredB) -> CompletableFuture.completedFuture(Collections.emptyList());
Expand Down Expand Up @@ -138,8 +138,8 @@ public TestingJobMasterGatewayBuilder setRequestPartitionStateFunction(BiFunctio
return this;
}

public TestingJobMasterGatewayBuilder setScheduleOrUpdateConsumersFunction(Function<ResultPartitionID, CompletableFuture<Acknowledge>> scheduleOrUpdateConsumersFunction) {
this.scheduleOrUpdateConsumersFunction = scheduleOrUpdateConsumersFunction;
public TestingJobMasterGatewayBuilder setNotifyPartitionDataAvailableFunction(Function<ResultPartitionID, CompletableFuture<Acknowledge>> notifyPartitionDataAvailableFunction) {
this.notifyPartitionDataAvailableFunction = notifyPartitionDataAvailableFunction;
return this;
}

Expand Down Expand Up @@ -266,7 +266,7 @@ public TestingJobMasterGateway build() {
updateTaskExecutionStateFunction,
requestNextInputSplitFunction,
requestPartitionStateFunction,
scheduleOrUpdateConsumersFunction,
notifyPartitionDataAvailableFunction,
disconnectTaskManagerFunction,
disconnectResourceManagerConsumer,
offerSlotsFunction,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public ExecutionState requestPartitionState(
}

@Override
public void scheduleOrUpdateConsumers(ResultPartitionID partitionID) {
public void notifyPartitionDataAvailable(ResultPartitionID partitionID) {
failOperation();
}

Expand Down
Loading

0 comments on commit 1aa9074

Please sign in to comment.