Skip to content

Commit

Permalink
[FLINK-9427] Fix registration and request slot race condition in Task…
Browse files Browse the repository at this point in the history
…Executor

This commit fixes a race condition between the TaskExecutor and the ResourceManager. Before,
it could happen that the ResourceManager sends requestSlots message before the TaskExecutor
registration was completed. Due to this, the TaskExecutor did not have all information it needed
to accept task submissions.

The problem was that the TaskExecutor sent the SlotReport at registration time. Due to this, t
he SlotManager could already assign these slots to pending slot requests. With this commit, the
registration protocol changes such that the TaskExecutor first registers at the ResourceManager
and only after completing this step, it will announce the available slots to the SlotManager.

This closes apache#6067.
  • Loading branch information
tillrohrmann committed May 24, 2018
1 parent a5966fd commit 47dc699
Show file tree
Hide file tree
Showing 19 changed files with 730 additions and 244 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.registration.RegistrationResponse;
Expand Down Expand Up @@ -665,9 +666,15 @@ public void testWorkerStarted() throws Exception {
final HardwareDescription hardwareDescription = new HardwareDescription(1, 2L, 3L, 4L);
// send registration message
CompletableFuture<RegistrationResponse> successfulFuture =
resourceManager.registerTaskExecutor(task1Executor.address, task1Executor.resourceID, slotReport, dataPort, hardwareDescription, timeout);
resourceManager.registerTaskExecutor(task1Executor.address, task1Executor.resourceID, dataPort, hardwareDescription, timeout);
RegistrationResponse response = successfulFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
assertTrue(response instanceof TaskExecutorRegistrationSuccess);
final TaskExecutorRegistrationSuccess registrationResponse = (TaskExecutorRegistrationSuccess) response;

final CompletableFuture<Acknowledge> initialSlotReportFuture = resourceManager.sendSlotReport(task1Executor.resourceID, registrationResponse.getRegistrationId(), slotReport, timeout);

// check for errors
initialSlotReportFuture.get();

// verify the internal state
assertThat(resourceManager.workersInLaunch, hasEntry(extractResourceID(task1), worker1launched));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1557,7 +1557,8 @@ protected CompletableFuture<RegistrationResponse> invokeRegistration(
@Override
protected void onRegistrationSuccess(final JobMasterRegistrationSuccess success) {
runAsync(() -> {
// filter out replace connections
// filter out outdated connections
//noinspection ObjectEquality
if (this == resourceManagerConnection) {
establishResourceManagerConnection(success);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@
* Classes which want to be notified about the registration result by the {@link RegisteredRpcConnection}
* have to implement this interface.
*/
public interface RegistrationConnectionListener<Success extends RegistrationResponse.Success> {
public interface RegistrationConnectionListener<T extends RegisteredRpcConnection<?, ?, S>, S extends RegistrationResponse.Success> {

/**
* This method is called by the {@link RegisteredRpcConnection} when the registration is success.
*
* @param success The concrete response information for successful registration.
* @param connection The instance which established the connection
*/
void onRegistrationSuccess(Success success);
void onRegistrationSuccess(T connection, S success);

/**
* This method is called by the {@link RegisteredRpcConnection} when the registration fails.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,6 @@ public CompletableFuture<RegistrationResponse> registerJobManager(
public CompletableFuture<RegistrationResponse> registerTaskExecutor(
final String taskExecutorAddress,
final ResourceID taskExecutorResourceId,
final SlotReport slotReport,
final int dataPort,
final HardwareDescription hardwareDescription,
final Time timeout) {
Expand All @@ -354,14 +353,25 @@ public CompletableFuture<RegistrationResponse> registerTaskExecutor(
taskExecutorGateway,
taskExecutorAddress,
taskExecutorResourceId,
slotReport,
dataPort,
hardwareDescription);
}
},
getMainThreadExecutor());
}

@Override
public CompletableFuture<Acknowledge> sendSlotReport(ResourceID taskManagerResourceId, InstanceID taskManagerRegistrationId, SlotReport slotReport, Time timeout) {
final WorkerRegistration<WorkerType> workerTypeWorkerRegistration = taskExecutors.get(taskManagerResourceId);

if (workerTypeWorkerRegistration.getInstanceID().equals(taskManagerRegistrationId)) {
slotManager.registerTaskManager(workerTypeWorkerRegistration, slotReport);
return CompletableFuture.completedFuture(Acknowledge.get());
} else {
return FutureUtils.completedExceptionally(new ResourceManagerException(String.format("Unknown TaskManager registration id %s.", taskManagerRegistrationId)));
}
}

@Override
public void heartbeatFromTaskManager(final ResourceID resourceID, final SlotReport slotReport) {
taskManagerHeartbeatManager.receiveHeartbeat(resourceID, slotReport);
Expand Down Expand Up @@ -669,7 +679,6 @@ public void requestHeartbeat(ResourceID resourceID, Void payload) {
* @param taskExecutorGateway to communicate with the registering TaskExecutor
* @param taskExecutorAddress address of the TaskExecutor
* @param taskExecutorResourceId ResourceID of the TaskExecutor
* @param slotReport initial slot report from the TaskExecutor
* @param dataPort port used for data transfer
* @param hardwareDescription of the registering TaskExecutor
* @return RegistrationResponse
Expand All @@ -678,7 +687,6 @@ private RegistrationResponse registerTaskExecutorInternal(
TaskExecutorGateway taskExecutorGateway,
String taskExecutorAddress,
ResourceID taskExecutorResourceId,
SlotReport slotReport,
int dataPort,
HardwareDescription hardwareDescription) {
WorkerRegistration<WorkerType> oldRegistration = taskExecutors.remove(taskExecutorResourceId);
Expand All @@ -702,8 +710,6 @@ private RegistrationResponse registerTaskExecutorInternal(

taskExecutors.put(taskExecutorResourceId, registration);

slotManager.registerTaskManager(registration, slotReport);

taskManagerHeartbeatManager.monitorTarget(taskExecutorResourceId, new HeartbeatTarget<Void>() {
@Override
public void receiveHeartbeat(ResourceID resourceID, Void payload) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ CompletableFuture<Acknowledge> requestSlot(
*
* @param taskExecutorAddress The address of the TaskExecutor that registers
* @param resourceId The resource ID of the TaskExecutor that registers
* @param slotReport The slot report containing free and allocated task slots
* @param dataPort port used for data communication between TaskExecutors
* @param hardwareDescription of the registering TaskExecutor
* @param timeout The timeout for the response.
Expand All @@ -102,11 +101,24 @@ CompletableFuture<Acknowledge> requestSlot(
CompletableFuture<RegistrationResponse> registerTaskExecutor(
String taskExecutorAddress,
ResourceID resourceId,
SlotReport slotReport,
int dataPort,
HardwareDescription hardwareDescription,
@RpcTimeout Time timeout);

/**
* Sends the given {@link SlotReport} to the ResourceManager.
*
* @param taskManagerRegistrationId id identifying the sending TaskManager
* @param slotReport which is sent to the ResourceManager
* @param timeout for the operation
* @return Future which is completed with {@link Acknowledge} once the slot report has been received.
*/
CompletableFuture<Acknowledge> sendSlotReport(
ResourceID taskManagerResourceId,
InstanceID taskManagerRegistrationId,
SlotReport slotReport,
@RpcTimeout Time timeout);

/**
* Sent by the TaskExecutor to notify the ResourceManager that a slot has become available.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.taskexecutor;

import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;

import javax.annotation.Nonnull;

/**
* Container for the resource manager connection instances used by the
* {@link TaskExecutor}.
*/
class EstablishedResourceManagerConnection {

@Nonnull
private final ResourceManagerGateway resourceManagerGateway;

@Nonnull
private final ResourceID resourceManagerResourceId;

@Nonnull
private final InstanceID taskExecutorRegistrationId;

EstablishedResourceManagerConnection(@Nonnull ResourceManagerGateway resourceManagerGateway, @Nonnull ResourceID resourceManagerResourceId, @Nonnull InstanceID taskExecutorRegistrationId) {
this.resourceManagerGateway = resourceManagerGateway;
this.resourceManagerResourceId = resourceManagerResourceId;
this.taskExecutorRegistrationId = taskExecutorRegistrationId;
}

@Nonnull
public ResourceManagerGateway getResourceManagerGateway() {
return resourceManagerGateway;
}

@Nonnull
public ResourceID getResourceManagerResourceId() {
return resourceManagerResourceId;
}

@Nonnull
public InstanceID getTaskExecutorRegistrationId() {
return taskExecutorRegistrationId;
}
}
Loading

0 comments on commit 47dc699

Please sign in to comment.