Skip to content

Commit

Permalink
[FLINK-16439][k8s] KubernetesResourceManager starts workers with reso…
Browse files Browse the repository at this point in the history
…urces requested by SlotManager.

This means KubernetesResourceManager no longer:
- be aware of the default task executor resources
- assumes all workers are identical

This closes apache#11323.
  • Loading branch information
xintongsong authored and tillrohrmann committed Apr 25, 2020
1 parent 78603b3 commit 98e8b0e
Show file tree
Hide file tree
Showing 4 changed files with 332 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.kubernetes;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
Expand All @@ -31,6 +32,7 @@
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
Expand All @@ -56,7 +58,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;

/**
Expand All @@ -72,8 +73,6 @@ public class KubernetesResourceManager extends ActiveResourceManager<KubernetesW

private final Map<ResourceID, KubernetesWorkerNode> workerNodes = new HashMap<>();

private final double defaultCpus;

/** When ResourceManager failover, the max attempt should recover. */
private long currentMaxAttemptId = 0;

Expand All @@ -84,12 +83,10 @@ public class KubernetesResourceManager extends ActiveResourceManager<KubernetesW

private final FlinkKubeClient kubeClient;

private final ContaineredTaskManagerParameters taskManagerParameters;

private final KubernetesResourceManagerConfiguration configuration;

/** The number of pods requested, but not yet granted. */
private int numPendingPodRequests = 0;
/** Map from pod name to worker resource. */
private final Map<String, WorkerResourceSpec> podWorkerResources;

public KubernetesResourceManager(
RpcService rpcService,
Expand Down Expand Up @@ -119,12 +116,9 @@ public KubernetesResourceManager(
fatalErrorHandler,
resourceManagerMetricGroup);
this.clusterId = configuration.getClusterId();
this.defaultCpus = taskExecutorProcessSpec.getCpuCores().getValue().doubleValue();

this.kubeClient = kubeClient;

this.taskManagerParameters = ContaineredTaskManagerParameters.create(flinkConfig, taskExecutorProcessSpec);
this.configuration = configuration;
this.podWorkerResources = new HashMap<>();
}

@Override
Expand Down Expand Up @@ -164,11 +158,8 @@ protected void internalDeregisterApplication(ApplicationStatus finalStatus, @Nul

@Override
public boolean startNewWorker(WorkerResourceSpec workerResourceSpec) {
Preconditions.checkArgument(Objects.equals(
workerResourceSpec,
WorkerResourceSpec.fromTaskExecutorProcessSpec(taskExecutorProcessSpec)));
LOG.info("Starting new worker with worker resource spec, {}", workerResourceSpec);
requestKubernetesPod();
requestKubernetesPod(workerResourceSpec);
return true;
}

Expand All @@ -179,25 +170,41 @@ protected KubernetesWorkerNode workerStarted(ResourceID resourceID) {

@Override
public boolean stopWorker(final KubernetesWorkerNode worker) {
LOG.info("Stopping Worker {}.", worker.getResourceID());
workerNodes.remove(worker.getResourceID());
internalStopPod(worker.getResourceID().toString());
final ResourceID resourceId = worker.getResourceID();
LOG.info("Stopping Worker {}.", resourceId);
internalStopPod(resourceId.toString());
return true;
}

@Override
public void onAdded(List<KubernetesPod> pods) {
runAsync(() -> {
int duplicatePodNum = 0;
for (KubernetesPod pod : pods) {
if (numPendingPodRequests > 0) {
numPendingPodRequests--;
final KubernetesWorkerNode worker = new KubernetesWorkerNode(new ResourceID(pod.getName()));
workerNodes.putIfAbsent(worker.getResourceID(), worker);
final String podName = pod.getName();
final ResourceID resourceID = new ResourceID(podName);

if (workerNodes.containsKey(resourceID)) {
log.debug("Ignore TaskManager pod that is already added: {}", podName);
++duplicatePodNum;
continue;
}

log.info("Received new TaskManager pod: {} - Remaining pending pod requests: {}",
pod.getName(), numPendingPodRequests);
final WorkerResourceSpec workerResourceSpec = Preconditions.checkNotNull(
podWorkerResources.get(podName),
"Unrecognized pod {}. Pods from previous attempt should have already been added.", podName);

final int pendingNum = getNumPendingWorkersFor(workerResourceSpec);
Preconditions.checkState(pendingNum > 0, "Should not receive more workers than requested.");

notifyNewWorkerAllocated(workerResourceSpec);
final KubernetesWorkerNode worker = new KubernetesWorkerNode(resourceID);
workerNodes.put(resourceID, worker);

log.info("Received new TaskManager pod: {}", podName);
}
log.info("Received {} new TaskManager pods. Remaining pending pod requests: {}",
pods.size() - duplicatePodNum, getNumPendingWorkers());
});
}

Expand Down Expand Up @@ -237,70 +244,80 @@ private void recoverWorkerNodesFromPreviousAttempts() throws ResourceManagerExce
++currentMaxAttemptId);
}

private void requestKubernetesPod() {
numPendingPodRequests++;
private void requestKubernetesPod(WorkerResourceSpec workerResourceSpec) {
final KubernetesTaskManagerParameters parameters =
createKubernetesTaskManagerParameters(workerResourceSpec);

podWorkerResources.put(parameters.getPodName(), workerResourceSpec);
final int pendingWorkerNum = notifyNewWorkerRequested(workerResourceSpec);

log.info("Requesting new TaskManager pod with <{},{}>. Number pending requests {}.",
defaultMemoryMB,
defaultCpus,
numPendingPodRequests);
parameters.getTaskManagerMemoryMB(),
parameters.getTaskManagerCPU(),
pendingWorkerNum);

final KubernetesPod taskManagerPod =
KubernetesTaskManagerFactory.createTaskManagerComponent(parameters);
kubeClient.createTaskManagerPod(taskManagerPod)
.whenCompleteAsync(
(ignore, throwable) -> {
if (throwable != null) {
final Time retryInterval = configuration.getPodCreationRetryInterval();
log.warn("Could not start TaskManager in pod {}, retry in {}. ",
taskManagerPod.getName(), retryInterval, throwable);
podWorkerResources.remove(parameters.getPodName());
notifyNewWorkerAllocationFailed(workerResourceSpec);
scheduleRunAsync(
this::requestKubernetesPodIfRequired,
retryInterval);
} else {
log.info("TaskManager {} will be started with {}.", parameters.getPodName(), workerResourceSpec);
}
},
getMainThreadExecutor());
}

private KubernetesTaskManagerParameters createKubernetesTaskManagerParameters(WorkerResourceSpec workerResourceSpec) {
final TaskExecutorProcessSpec taskExecutorProcessSpec =
TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, workerResourceSpec);

final String podName = String.format(
TASK_MANAGER_POD_FORMAT,
clusterId,
currentMaxAttemptId,
++currentMaxPodId);

final ContaineredTaskManagerParameters taskManagerParameters =
ContaineredTaskManagerParameters.create(flinkConfig, taskExecutorProcessSpec);

final String dynamicProperties =
BootstrapTools.getDynamicPropertiesAsString(flinkClientConfig, flinkConfig);

final KubernetesTaskManagerParameters kubernetesTaskManagerParameters = new KubernetesTaskManagerParameters(
return new KubernetesTaskManagerParameters(
flinkConfig,
podName,
dynamicProperties,
taskManagerParameters);

final KubernetesPod taskManagerPod =
KubernetesTaskManagerFactory.createTaskManagerComponent(kubernetesTaskManagerParameters);

log.info("TaskManager {} will be started with {}.", podName, taskExecutorProcessSpec);
kubeClient.createTaskManagerPod(taskManagerPod)
.whenComplete(
(ignore, throwable) -> {
if (throwable != null) {
log.error("Could not start TaskManager in pod {}.", podName, throwable);
scheduleRunAsync(
this::decreasePendingAndRequestKubernetesPodIfRequired,
configuration.getPodCreationRetryInterval());
}
}
);
}

private void decreasePendingAndRequestKubernetesPodIfRequired() {
validateRunsInMainThread();
numPendingPodRequests--;
requestKubernetesPodIfRequired();
}

/**
* Request new pod if pending pods cannot satisfy pending slot requests.
*/
private void requestKubernetesPodIfRequired() {
final int requiredTaskManagers = getNumberRequiredTaskManagers();
for (Map.Entry<WorkerResourceSpec, Integer> entry : getRequiredResources().entrySet()) {
final WorkerResourceSpec workerResourceSpec = entry.getKey();
final int requiredTaskManagers = entry.getValue();

while (requiredTaskManagers > numPendingPodRequests) {
requestKubernetesPod();
while (requiredTaskManagers > getNumPendingWorkersFor(workerResourceSpec)) {
requestKubernetesPod(workerResourceSpec);
}
}
}

private void removePodIfTerminated(KubernetesPod pod) {
if (pod.isTerminated()) {
internalStopPod(pod.getName());
final KubernetesWorkerNode kubernetesWorkerNode = workerNodes.remove(new ResourceID(pod.getName()));
if (kubernetesWorkerNode != null) {
requestKubernetesPodIfRequired();
}
requestKubernetesPodIfRequired();
}
}

Expand All @@ -310,13 +327,30 @@ protected double getCpuCores(Configuration configuration) {
}

private void internalStopPod(String podName) {
final ResourceID resourceId = new ResourceID(podName);
final boolean isPendingWorkerOfCurrentAttempt = isPendingWorkerOfCurrentAttempt(podName);

kubeClient.stopPod(podName)
.whenComplete(
(ignore, throwable) -> {
if (throwable != null) {
log.error("Could not stop TaskManager in pod {}.", podName, throwable);
log.warn("Could not stop TaskManager in pod {}.", podName, throwable);
}
}
);

final WorkerResourceSpec workerResourceSpec = podWorkerResources.remove(podName);
workerNodes.remove(resourceId);

if (isPendingWorkerOfCurrentAttempt) {
notifyNewWorkerAllocationFailed(
Preconditions.checkNotNull(workerResourceSpec,
"Worker resource spec of current attempt pending worker should be known."));
}
}

private boolean isPendingWorkerOfCurrentAttempt(String podName) {
return podWorkerResources.containsKey(podName) &&
!workerNodes.containsKey(new ResourceID(podName));
}
}
Loading

0 comments on commit 98e8b0e

Please sign in to comment.