Skip to content

Commit

Permalink
[FLINK-18149][k8s] Do not add DeploymentOptionsInternal#CONF_DIR to c…
Browse files Browse the repository at this point in the history
…onfig map

DeploymentOptionsInternal#CONF_DIR is an internal option and stores the client config path. It should not be added to config map and used by JobManager pod. Instead, KubernetesConfigOptions#FLINK_CONF_DIR will be used.

This closes apache#12501.
  • Loading branch information
wangyang0918 authored and tillrohrmann committed Jun 8, 2020
1 parent c27a7a6 commit a4a99ba
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptionsInternal;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.kubeclient.FlinkPod;
import org.apache.flink.kubernetes.kubeclient.parameters.AbstractKubernetesParameters;
Expand Down Expand Up @@ -144,11 +145,11 @@ public List<HasMetadata> buildAccompanyingKubernetesResources() throws IOExcepti
* Get properties map for the cluster-side after removal of some keys.
*/
private Map<String, String> getClusterSidePropertiesMap(Configuration flinkConfig) {
final Map<String, String> propertiesMap = flinkConfig.toMap();

// remove kubernetes.config.file
propertiesMap.remove(KubernetesConfigOptions.KUBE_CONFIG_FILE.key());
return propertiesMap;
final Configuration clusterSideConfig = flinkConfig.clone();
// Remove some configuration options that should not be taken to cluster side.
clusterSideConfig.removeConfig(KubernetesConfigOptions.KUBE_CONFIG_FILE);
clusterSideConfig.removeConfig(DeploymentOptionsInternal.CONF_DIR);
return clusterSideConfig.toMap();
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ public Configuration getFlinkConfiguration() {

@Override
public String getConfigDirectory() {
final String configDir = flinkConfig.get(DeploymentOptionsInternal.CONF_DIR);
final String configDir = flinkConfig.getOptional(DeploymentOptionsInternal.CONF_DIR).orElse(
flinkConfig.getString(KubernetesConfigOptions.FLINK_CONF_DIR));

checkNotNull(configDir);
return configDir;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@

package org.apache.flink.kubernetes;

import org.apache.flink.configuration.Configuration;

import org.apache.flink.shaded.guava18.com.google.common.io.Files;

import org.apache.commons.lang3.StringUtils;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
Expand All @@ -32,4 +36,15 @@ public class KubernetesTestUtils {
public static void createTemporyFile(String data, File directory, String fileName) throws IOException {
Files.write(data, new File(directory, fileName), StandardCharsets.UTF_8);
}

public static Configuration loadConfigurationFromString(String content) {
final Configuration configuration = new Configuration();
for (String line : content.split(System.lineSeparator())) {
final String[] splits = line.split(":");
if (splits.length >= 2) {
configuration.setString(splits[0].trim(), StringUtils.substringAfter(line, ":").trim());
}
}
return configuration;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.flink.kubernetes.kubeclient.decorators;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptionsInternal;
import org.apache.flink.kubernetes.KubernetesTestUtils;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.kubeclient.FlinkPod;
Expand All @@ -42,9 +44,12 @@
import java.util.Map;

import static org.apache.flink.configuration.GlobalConfiguration.FLINK_CONF_FILENAME;
import static org.apache.flink.kubernetes.kubeclient.decorators.FlinkConfMountDecorator.getFlinkConfConfigMapName;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertThat;

/**
* General tests for the {@link FlinkConfMountDecorator}.
Expand Down Expand Up @@ -88,15 +93,20 @@ public void testConfigMap() throws IOException {

assertEquals(Constants.API_VERSION, resultConfigMap.getApiVersion());

assertEquals(flinkConfMountDecorator.getFlinkConfConfigMapName(CLUSTER_ID),
assertEquals(getFlinkConfConfigMapName(CLUSTER_ID),
resultConfigMap.getMetadata().getName());
assertEquals(getCommonLabels(), resultConfigMap.getMetadata().getLabels());

Map<String, String> resultDatas = resultConfigMap.getData();
assertEquals("some data", resultDatas.get("logback.xml"));
assertEquals("some data", resultDatas.get("log4j.properties"));
assertTrue(resultDatas.get(FLINK_CONF_FILENAME).contains(KubernetesConfigOptions.FLINK_CONF_DIR.key() +
": " + FLINK_CONF_DIR_IN_POD));

final Configuration resultFlinkConfig = KubernetesTestUtils.loadConfigurationFromString(
resultDatas.get(FLINK_CONF_FILENAME));
assertThat(resultFlinkConfig.get(KubernetesConfigOptions.FLINK_CONF_DIR), is(FLINK_CONF_DIR_IN_POD));
// The following config options should not be added to config map
assertThat(resultFlinkConfig.get(KubernetesConfigOptions.KUBE_CONFIG_FILE), is(nullValue()));
assertThat(resultFlinkConfig.get(DeploymentOptionsInternal.CONF_DIR), is(nullValue()));
}

@Test
Expand All @@ -112,7 +122,7 @@ public void testDecoratedFlinkPodWithoutLog4jAndLogback() {
new VolumeBuilder()
.withName(Constants.FLINK_CONF_VOLUME)
.withNewConfigMap()
.withName(flinkConfMountDecorator.getFlinkConfConfigMapName(CLUSTER_ID))
.withName(getFlinkConfConfigMapName(CLUSTER_ID))
.withItems(expectedKeyToPaths)
.endConfigMap()
.build());
Expand Down Expand Up @@ -145,7 +155,7 @@ public void testDecoratedFlinkPodWithLog4j() throws IOException {
new VolumeBuilder()
.withName(Constants.FLINK_CONF_VOLUME)
.withNewConfigMap()
.withName(flinkConfMountDecorator.getFlinkConfConfigMapName(CLUSTER_ID))
.withName(getFlinkConfConfigMapName(CLUSTER_ID))
.withItems(expectedKeyToPaths)
.endConfigMap()
.build());
Expand All @@ -171,7 +181,7 @@ public void testDecoratedFlinkPodWithLogback() throws IOException {
new VolumeBuilder()
.withName(Constants.FLINK_CONF_VOLUME)
.withNewConfigMap()
.withName(flinkConfMountDecorator.getFlinkConfConfigMapName(CLUSTER_ID))
.withName(getFlinkConfConfigMapName(CLUSTER_ID))
.withItems(expectedKeyToPaths)
.endConfigMap()
.build());
Expand Down Expand Up @@ -202,7 +212,7 @@ public void testDecoratedFlinkPodWithLog4jAndLogback() throws IOException {
new VolumeBuilder()
.withName(Constants.FLINK_CONF_VOLUME)
.withNewConfigMap()
.withName(flinkConfMountDecorator.getFlinkConfConfigMapName(CLUSTER_ID))
.withName(getFlinkConfConfigMapName(CLUSTER_ID))
.withItems(expectedKeyToPaths)
.endConfigMap()
.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.kubernetes.kubeclient.parameters;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptionsInternal;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.util.StringUtils;
Expand All @@ -31,6 +32,8 @@
import java.util.Random;

import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;

/**
* General tests for the {@link AbstractKubernetesParameters}.
Expand Down Expand Up @@ -62,6 +65,19 @@ public void testClusterIdLengthLimitation() {
);
}

@Test
public void getConfigDirectory() {
final String confDir = "/path/of/flink-conf";
flinkConfig.set(DeploymentOptionsInternal.CONF_DIR, confDir);
assertThat(testingKubernetesParameters.getConfigDirectory(), is(confDir));
}

@Test
public void getConfigDirectoryFallbackToPodConfDir() {
final String confDirInPod = flinkConfig.get(KubernetesConfigOptions.FLINK_CONF_DIR);
assertThat(testingKubernetesParameters.getConfigDirectory(), is(confDirInPod));
}

private class TestingKubernetesParameters extends AbstractKubernetesParameters {

public TestingKubernetesParameters(Configuration flinkConfig) {
Expand Down

0 comments on commit a4a99ba

Please sign in to comment.