Skip to content

Commit

Permalink
[FLINK-16439][runtime] Introduce PendingWorkerCounter for counting pe…
Browse files Browse the repository at this point in the history
…nding workers per WorkerResourceSpec in ActiveResourceManager.
  • Loading branch information
xintongsong authored and tillrohrmann committed Apr 25, 2020
1 parent 83b7138 commit 78603b3
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.resourcemanager;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
Expand All @@ -35,9 +36,11 @@
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nullable;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

Expand All @@ -63,6 +66,8 @@ public abstract class ActiveResourceManager <WorkerType extends ResourceIDRetrie
/** Flink configuration uploaded by client. */
protected final Configuration flinkClientConfig;

private final PendingWorkerCounter pendingWorkerCounter;

public ActiveResourceManager(
Configuration flinkConfig,
Map<String, String> env,
Expand Down Expand Up @@ -101,6 +106,8 @@ public ActiveResourceManager(

// Load the flink config uploaded by flink client
this.flinkClientConfig = loadClientConfiguration();

pendingWorkerCounter = new PendingWorkerCounter();
}

protected CompletableFuture<Void> getStopTerminationFutureOrCompletedExceptionally(@Nullable Throwable exception) {
Expand All @@ -117,4 +124,76 @@ protected CompletableFuture<Void> getStopTerminationFutureOrCompletedExceptional
protected abstract Configuration loadClientConfiguration();

protected abstract double getCpuCores(final Configuration configuration);

protected int getNumPendingWorkers() {
return pendingWorkerCounter.getTotalNum();
}

protected int getNumPendingWorkersFor(WorkerResourceSpec workerResourceSpec) {
return pendingWorkerCounter.getNum(workerResourceSpec);
}

/**
* Notify that a worker with the given resource spec has been requested.
* @param workerResourceSpec resource spec of the requested worker
* @return updated number of pending workers for the given resource spec
*/
protected int notifyNewWorkerRequested(WorkerResourceSpec workerResourceSpec) {
return pendingWorkerCounter.increaseAndGet(workerResourceSpec);
}

/**
* Notify that a worker with the given resource spec has been allocated.
* @param workerResourceSpec resource spec of the requested worker
* @return updated number of pending workers for the given resource spec
*/
protected int notifyNewWorkerAllocated(WorkerResourceSpec workerResourceSpec) {
return pendingWorkerCounter.decreaseAndGet(workerResourceSpec);
}

/**
* Notify that allocation of a worker with the given resource spec has failed.
* @param workerResourceSpec resource spec of the requested worker
* @return updated number of pending workers for the given resource spec
*/
protected int notifyNewWorkerAllocationFailed(WorkerResourceSpec workerResourceSpec) {
return pendingWorkerCounter.decreaseAndGet(workerResourceSpec);
}

/**
* Utility class for counting pending workers per {@link WorkerResourceSpec}.
*/
@VisibleForTesting
static class PendingWorkerCounter {
private final Map<WorkerResourceSpec, Integer> pendingWorkerNums;

PendingWorkerCounter() {
pendingWorkerNums = new HashMap<>();
}

int getTotalNum() {
return pendingWorkerNums.values().stream().reduce(0, Integer::sum);
}

int getNum(final WorkerResourceSpec workerResourceSpec) {
return pendingWorkerNums.getOrDefault(Preconditions.checkNotNull(workerResourceSpec), 0);
}

int increaseAndGet(final WorkerResourceSpec workerResourceSpec) {
return pendingWorkerNums.compute(
Preconditions.checkNotNull(workerResourceSpec),
(ignored, num) -> num != null ? num + 1 : 1);
}

int decreaseAndGet(final WorkerResourceSpec workerResourceSpec) {
final Integer newValue = pendingWorkerNums.compute(
Preconditions.checkNotNull(workerResourceSpec),
(ignored, num) -> {
Preconditions.checkState(num != null && num > 0,
"Cannot decrease, no pending worker of spec %s.", workerResourceSpec);
return num == 1 ? null : num - 1;
});
return newValue != null ? newValue : 0;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.resourcemanager;

import org.apache.flink.util.TestLogger;

import org.junit.Test;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;

/**
* Tests for {@link ActiveResourceManager}.
*/
public class ActiveResourceManagerTest extends TestLogger {

@Test
public void testPendingWorkerCounterIncreaseAndDecrease() {
final WorkerResourceSpec spec1 = new WorkerResourceSpec.Builder().setCpuCores(1.0).build();
final WorkerResourceSpec spec2 = new WorkerResourceSpec.Builder().setCpuCores(2.0).build();

final ActiveResourceManager.PendingWorkerCounter counter = new ActiveResourceManager.PendingWorkerCounter();
assertThat(counter.getTotalNum(), is(0));
assertThat(counter.getNum(spec1), is(0));
assertThat(counter.getNum(spec2), is(0));

assertThat(counter.increaseAndGet(spec1), is(1));
assertThat(counter.getTotalNum(), is(1));
assertThat(counter.getNum(spec1), is(1));
assertThat(counter.getNum(spec2), is(0));

assertThat(counter.increaseAndGet(spec1), is(2));
assertThat(counter.getTotalNum(), is(2));
assertThat(counter.getNum(spec1), is(2));
assertThat(counter.getNum(spec2), is(0));

assertThat(counter.increaseAndGet(spec2), is(1));
assertThat(counter.getTotalNum(), is(3));
assertThat(counter.getNum(spec1), is(2));
assertThat(counter.getNum(spec2), is(1));

assertThat(counter.decreaseAndGet(spec1), is(1));
assertThat(counter.getTotalNum(), is(2));
assertThat(counter.getNum(spec1), is(1));
assertThat(counter.getNum(spec2), is(1));

assertThat(counter.decreaseAndGet(spec2), is(0));
assertThat(counter.getTotalNum(), is(1));
assertThat(counter.getNum(spec1), is(1));
assertThat(counter.getNum(spec2), is(0));
}

@Test(expected = IllegalStateException.class)
public void testPendingWorkerCounterDecreaseOnZero() {
final WorkerResourceSpec spec = new WorkerResourceSpec.Builder().build();
final ActiveResourceManager.PendingWorkerCounter counter = new ActiveResourceManager.PendingWorkerCounter();
counter.decreaseAndGet(spec);
}
}

0 comments on commit 78603b3

Please sign in to comment.