Skip to content

Commit

Permalink
[FLINK-16438][runtime] Remove unused TaskExecutorProcessSpec from Act…
Browse files Browse the repository at this point in the history
…iveResourceManager.
  • Loading branch information
xintongsong authored and tillrohrmann committed Apr 25, 2020
1 parent f019356 commit 03da1cf
Show file tree
Hide file tree
Showing 9 changed files with 152 additions and 141 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.configuration.KubernetesResourceManagerConfiguration;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.factory.KubernetesTaskManagerFactory;
Expand Down Expand Up @@ -321,11 +320,6 @@ private void removePodAndTryRestartIfRequired(KubernetesPod pod) {
}
}

@Override
protected double getCpuCores(Configuration configuration) {
return TaskExecutorProcessUtils.getCpuCoresWithFallbackConfigOption(configuration, KubernetesConfigOptions.TASK_MANAGER_CPU);
}

private void internalStopPod(String podName) {
final ResourceID resourceId = new ResourceID(podName);
final boolean isPendingWorkerOfCurrentAttempt = isPendingWorkerOfCurrentAttempt(podName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.kubernetes.entrypoint;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.resources.CPUResource;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
Expand All @@ -39,7 +40,8 @@ public WorkerResourceSpec createDefaultWorkerResourceSpec(Configuration configur
return workerResourceSpecFromConfigAndCpu(configuration, getDefaultCpus(configuration));
}

private static CPUResource getDefaultCpus(Configuration configuration) {
@VisibleForTesting
static CPUResource getDefaultCpus(Configuration configuration) {
double fallback = configuration.getDouble(KubernetesConfigOptions.TASK_MANAGER_CPU);
return TaskExecutorProcessUtils.getCpuCoresWithFallback(configuration, fallback);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,45 +305,6 @@ public void testGetWorkerNodesFromPreviousAttempts() throws Exception {
}};
}

@Test
public void testGetCpuCoresCommonOption() throws Exception {
new Context() {{
runTest(() -> {
final Configuration configuration = new Configuration();
configuration.setDouble(TaskManagerOptions.CPU_CORES, 1.0);
configuration.setDouble(KubernetesConfigOptions.TASK_MANAGER_CPU, 2.0);
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3);

assertThat(resourceManager.getCpuCores(configuration), is(1.0));
});
}};
}

@Test
public void testGetCpuCoresKubernetesOption() throws Exception {
new Context() {{
runTest(() -> {
final Configuration configuration = new Configuration();
configuration.setDouble(KubernetesConfigOptions.TASK_MANAGER_CPU, 2.0);
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3);

assertThat(resourceManager.getCpuCores(configuration), is(2.0));
});
}};
}

@Test
public void testGetCpuCoresNumSlots() throws Exception {
new Context() {{
runTest(() -> {
final Configuration configuration = new Configuration();
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3);

assertThat(resourceManager.getCpuCores(configuration), is(3.0));
});
}};
}

@Test
public void testCreateTaskManagerPodFailedAndRetry() throws Exception {
new Context() {{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.kubernetes.entrypoint;

import org.apache.flink.api.common.resources.CPUResource;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.util.TestLogger;

import org.junit.Test;

import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;

/**
* Tests for {@link KubernetesWorkerResourceSpecFactory}.
*/
public class KubernetesWorkerResourceSpecFactoryTest extends TestLogger {

@Test
public void testGetCpuCoresCommonOption() {
final Configuration configuration = new Configuration();
configuration.setDouble(TaskManagerOptions.CPU_CORES, 1.0);
configuration.setDouble(KubernetesConfigOptions.TASK_MANAGER_CPU, 2.0);
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3);

assertThat(KubernetesWorkerResourceSpecFactory.getDefaultCpus(configuration),
is(new CPUResource(1.0)));
}

@Test
public void testGetCpuCoresKubernetesOption() {
final Configuration configuration = new Configuration();
configuration.setDouble(KubernetesConfigOptions.TASK_MANAGER_CPU, 2.0);
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3);

assertThat(KubernetesWorkerResourceSpecFactory.getDefaultCpus(configuration),
is(new CPUResource(2.0)));
}

@Test
public void testGetCpuCoresNumSlots() {
final Configuration configuration = new Configuration();
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3);

assertThat(KubernetesWorkerResourceSpecFactory.getDefaultCpus(configuration),
is(new CPUResource(3.0)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
import org.apache.flink.runtime.concurrent.FutureUtils;
Expand Down Expand Up @@ -53,10 +51,6 @@ public abstract class ActiveResourceManager <WorkerType extends ResourceIDRetrie
/** The process environment variables. */
protected final Map<String, String> env;

protected final TaskExecutorProcessSpec defaultTaskExecutorProcessSpec;

protected final int defaultMemoryMB;

/**
* The updated Flink configuration. The client uploaded configuration may be updated before passed on to
* {@link ResourceManager}. For example, {@link TaskManagerOptions#MANAGED_MEMORY_SIZE}.
Expand Down Expand Up @@ -97,13 +91,6 @@ public ActiveResourceManager(
this.flinkConfig = flinkConfig;
this.env = env;

double defaultCpus = getCpuCores(flinkConfig);
this.defaultTaskExecutorProcessSpec = TaskExecutorProcessUtils
.newProcessSpecBuilder(flinkConfig)
.withCpuCores(defaultCpus)
.build();
this.defaultMemoryMB = defaultTaskExecutorProcessSpec.getTotalProcessMemorySize().getMebiBytes();

// Load the flink config uploaded by flink client
this.flinkClientConfig = loadClientConfiguration();

Expand All @@ -123,8 +110,6 @@ protected CompletableFuture<Void> getStopTerminationFutureOrCompletedExceptional

protected abstract Configuration loadClientConfiguration();

protected abstract double getCpuCores(final Configuration configuration);

protected int getNumPendingWorkers() {
return pendingWorkerCounter.getTotalNum();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
Expand Down Expand Up @@ -658,27 +657,4 @@ private ContainerLaunchContext createTaskExecutorLaunchContext(
.put(ENV_FLINK_NODE_ID, host);
return taskExecutorLaunchContext;
}

@Override
protected double getCpuCores(final Configuration configuration) {
int fallback = configuration.getInteger(YarnConfigOptions.VCORES);
double cpuCoresDouble = TaskExecutorProcessUtils.getCpuCoresWithFallback(configuration, fallback).getValue().doubleValue();
@SuppressWarnings("NumericCastThatLosesPrecision")
long cpuCoresLong = Math.max((long) Math.ceil(cpuCoresDouble), 1L);
//noinspection FloatingPointEquality
if (cpuCoresLong != cpuCoresDouble) {
log.info(
"The amount of cpu cores must be a positive integer on Yarn. Rounding {} up to the closest positive integer {}.",
cpuCoresDouble,
cpuCoresLong);
}
if (cpuCoresLong > Integer.MAX_VALUE) {
throw new IllegalConfigurationException(String.format(
"The amount of cpu cores %d cannot exceed Integer.MAX_VALUE: %d",
cpuCoresLong,
Integer.MAX_VALUE));
}
//noinspection NumericCastThatLosesPrecision
return cpuCoresLong;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.yarn.entrypoint;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.resources.CPUResource;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
Expand Down Expand Up @@ -46,7 +47,8 @@ public WorkerResourceSpec createDefaultWorkerResourceSpec(Configuration configur
return workerResourceSpecFromConfigAndCpu(configuration, getDefaultCpus(configuration));
}

private static CPUResource getDefaultCpus(final Configuration configuration) {
@VisibleForTesting
static CPUResource getDefaultCpus(final Configuration configuration) {
int fallback = configuration.getInteger(YarnConfigOptions.VCORES);
double cpuCoresDouble = TaskExecutorProcessUtils.getCpuCoresWithFallback(configuration, fallback).getValue().doubleValue();
@SuppressWarnings("NumericCastThatLosesPrecision")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
Expand Down Expand Up @@ -61,7 +60,6 @@
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.RunnableWithException;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.entrypoint.YarnWorkerResourceSpecFactory;

import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -554,59 +552,6 @@ public void testOnStartContainerError() throws Exception {
}};
}

@Test
public void testGetCpuCoresCommonOption() throws Exception {
final Configuration configuration = new Configuration();
configuration.setDouble(TaskManagerOptions.CPU_CORES, 1.0);
configuration.setInteger(YarnConfigOptions.VCORES, 2);
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3);

new Context() {{
runTest(() -> assertThat(resourceManager.getCpuCores(configuration), is(1.0)));
}};
}

@Test
public void testGetCpuCoresYarnOption() throws Exception {
final Configuration configuration = new Configuration();
configuration.setInteger(YarnConfigOptions.VCORES, 2);
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3);

new Context() {{
runTest(() -> assertThat(resourceManager.getCpuCores(configuration), is(2.0)));
}};
}

@Test
public void testGetCpuCoresNumSlots() throws Exception {
final Configuration configuration = new Configuration();
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3);

new Context() {{
runTest(() -> assertThat(resourceManager.getCpuCores(configuration), is(3.0)));
}};
}

@Test
public void testGetCpuRoundUp() throws Exception {
final Configuration configuration = new Configuration();
configuration.setDouble(TaskManagerOptions.CPU_CORES, 0.5);

new Context() {{
runTest(() -> assertThat(resourceManager.getCpuCores(configuration), is(1.0)));
}};
}

@Test(expected = IllegalConfigurationException.class)
public void testGetCpuExceedMaxInt() throws Exception {
final Configuration configuration = new Configuration();
configuration.setDouble(TaskManagerOptions.CPU_CORES, Double.MAX_VALUE);

new Context() {{
resourceManager.getCpuCores(configuration);
}};
}

@Test
public void testStartWorkerVariousSpec_SameContainerResource() throws Exception{
final WorkerResourceSpec workerResourceSpec1 = new WorkerResourceSpec.Builder()
Expand Down
Loading

0 comments on commit 03da1cf

Please sign in to comment.