Skip to content

Commit

Permalink
[FLINK-15667][k8s] Support to mount custom Hadoop Configurations
Browse files Browse the repository at this point in the history
This closes apache#11415 .
  • Loading branch information
zhengcanbin authored and tisonkun committed Mar 23, 2020
1 parent cdcc25c commit f56a075
Show file tree
Hide file tree
Showing 13 changed files with 507 additions and 7 deletions.
6 changes: 6 additions & 0 deletions docs/_includes/generated/kubernetes_config_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@
<td>String</td>
<td>The directory that logs of jobmanager and taskmanager be saved in the pod.</td>
</tr>
<tr>
<td><h5>kubernetes.hadoop.conf.config-map.name</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Specify the name of an existing ConfigMap that contains custom Hadoop configuration to be mounted on the JobManager(s) and TaskManagers.</td>
</tr>
<tr>
<td><h5>kubernetes.jobmanager.cpu</h5></td>
<td style="word-wrap: break-word;">1.0</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,13 @@ public class KubernetesConfigOptions {
.defaultValue("/opt/flink/log")
.withDescription("The directory that logs of jobmanager and taskmanager be saved in the pod.");

public static final ConfigOption<String> HADOOP_CONF_CONFIG_MAP =
key("kubernetes.hadoop.conf.config-map.name")
.stringType()
.noDefaultValue()
.withDescription("Specify the name of an existing ConfigMap that contains custom Hadoop configuration " +
"to be mounted on the JobManager(s) and TaskManagers.");

/**
* The flink rest service exposed type.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ private List<File> getLocalLogConfFiles() {
}

@VisibleForTesting
String getFlinkConfConfigMapName(String clusterId) {
public static String getFlinkConfConfigMapName(String clusterId) {
return CONFIG_MAP_PREFIX + clusterId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.kubernetes.kubeclient.decorators;

import org.apache.flink.kubernetes.kubeclient.FlinkPod;
import org.apache.flink.kubernetes.kubeclient.parameters.AbstractKubernetesParameters;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.util.FileUtils;

import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.ContainerBuilder;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.KeyToPath;
import io.fabric8.kubernetes.api.model.KeyToPathBuilder;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.Volume;
import io.fabric8.kubernetes.api.model.VolumeBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* Mount the custom Hadoop Configuration to the JobManager(s)/TaskManagers. We provide two options:
* 1. Mount an existing ConfigMap containing custom Hadoop Configuration.
* 2. Create and mount a dedicated ConfigMap containing the custom Hadoop configuration from a local directory
* specified via the HADOOP_CONF_DIR or HADOOP_HOME environment variable.
*/
public class HadoopConfMountDecorator extends AbstractKubernetesStepDecorator {

private static final Logger LOG = LoggerFactory.getLogger(HadoopConfMountDecorator.class);

private final AbstractKubernetesParameters kubernetesParameters;

public HadoopConfMountDecorator(AbstractKubernetesParameters kubernetesParameters) {
this.kubernetesParameters = checkNotNull(kubernetesParameters);
}

@Override
public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
Volume hadoopConfVolume;

final Optional<String> existingConfigMap = kubernetesParameters.getExistingHadoopConfigurationConfigMap();
if (existingConfigMap.isPresent()) {
hadoopConfVolume = new VolumeBuilder()
.withName(Constants.HADOOP_CONF_VOLUME)
.withNewConfigMap()
.withName(existingConfigMap.get())
.endConfigMap()
.build();
} else {
final Optional<String> localHadoopConfigurationDirectory = kubernetesParameters.getLocalHadoopConfigurationDirectory();
if (!localHadoopConfigurationDirectory.isPresent()) {
return flinkPod;
}

final List<File> hadoopConfigurationFileItems = getHadoopConfigurationFileItems(localHadoopConfigurationDirectory.get());
if (hadoopConfigurationFileItems.isEmpty()) {
LOG.warn("Found 0 files in directory {}, skip to mount the Hadoop Configuration ConfigMap.",
localHadoopConfigurationDirectory.get());
return flinkPod;
}

final List<KeyToPath> keyToPaths = hadoopConfigurationFileItems.stream()
.map(file -> new KeyToPathBuilder()
.withKey(file.getName())
.withPath(file.getName())
.build())
.collect(Collectors.toList());

hadoopConfVolume = new VolumeBuilder()
.withName(Constants.HADOOP_CONF_VOLUME)
.withNewConfigMap()
.withName(getHadoopConfConfigMapName(kubernetesParameters.getClusterId()))
.withItems(keyToPaths)
.endConfigMap()
.build();
}

final Pod podWithHadoopConf = new PodBuilder(flinkPod.getPod())
.editOrNewSpec()
.addNewVolumeLike(hadoopConfVolume)
.endVolume()
.endSpec()
.build();

final Container containerWithHadoopConf = new ContainerBuilder(flinkPod.getMainContainer())
.addNewVolumeMount()
.withName(Constants.HADOOP_CONF_VOLUME)
.withMountPath(Constants.HADOOP_CONF_DIR_IN_POD)
.endVolumeMount()
.addNewEnv()
.withName(Constants.ENV_HADOOP_CONF_DIR)
.withValue(Constants.HADOOP_CONF_DIR_IN_POD)
.endEnv()
.build();

return new FlinkPod.Builder(flinkPod)
.withPod(podWithHadoopConf)
.withMainContainer(containerWithHadoopConf)
.build();
}

@Override
public List<HasMetadata> buildAccompanyingKubernetesResources() throws IOException {
if (kubernetesParameters.getExistingHadoopConfigurationConfigMap().isPresent()) {
return Collections.emptyList();
}

final Optional<String> localHadoopConfigurationDirectory = kubernetesParameters.getLocalHadoopConfigurationDirectory();
if (!localHadoopConfigurationDirectory.isPresent()) {
return Collections.emptyList();
}

final List<File> hadoopConfigurationFileItems = getHadoopConfigurationFileItems(localHadoopConfigurationDirectory.get());
if (hadoopConfigurationFileItems.isEmpty()) {
LOG.warn("Found 0 files in directory {}, skip to create the Hadoop Configuration ConfigMap.", localHadoopConfigurationDirectory.get());
return Collections.emptyList();
}

final Map<String, String> data = new HashMap<>();
for (File file: hadoopConfigurationFileItems) {
data.put(file.getName(), FileUtils.readFileUtf8(file));
}

final ConfigMap hadoopConfigMap = new ConfigMapBuilder()
.withApiVersion(Constants.API_VERSION)
.withNewMetadata()
.withName(getHadoopConfConfigMapName(kubernetesParameters.getClusterId()))
.withLabels(kubernetesParameters.getCommonLabels())
.endMetadata()
.addToData(data)
.build();

return Collections.singletonList(hadoopConfigMap);
}

private List<File> getHadoopConfigurationFileItems(String localHadoopConfigurationDirectory) {
final List<String> expectedFileNames = new ArrayList<>();
expectedFileNames.add("core-site.xml");
expectedFileNames.add("hdfs-site.xml");

final File directory = new File(localHadoopConfigurationDirectory);
if (directory.exists() && directory.isDirectory()) {
return Arrays.stream(directory.listFiles())
.filter(file -> file.isFile() && expectedFileNames.stream().anyMatch(name -> file.getName().equals(name)))
.collect(Collectors.toList());
} else {
return Collections.emptyList();
}
}

public static String getHadoopConfConfigMapName(String clusterId) {
return Constants.HADOOP_CONF_CONFIG_MAP_PREFIX + clusterId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerSpecification;
import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
import org.apache.flink.kubernetes.kubeclient.decorators.FlinkConfMountDecorator;
import org.apache.flink.kubernetes.kubeclient.decorators.HadoopConfMountDecorator;
import org.apache.flink.kubernetes.kubeclient.decorators.InitJobManagerDecorator;
import org.apache.flink.kubernetes.kubeclient.decorators.InternalServiceDecorator;
import org.apache.flink.kubernetes.kubeclient.decorators.JavaCmdJobManagerDecorator;
Expand Down Expand Up @@ -58,6 +59,7 @@ public static KubernetesJobManagerSpecification createJobManagerComponent(
new JavaCmdJobManagerDecorator(kubernetesJobManagerParameters),
new InternalServiceDecorator(kubernetesJobManagerParameters),
new ExternalServiceDecorator(kubernetesJobManagerParameters),
new HadoopConfMountDecorator(kubernetesJobManagerParameters),
new FlinkConfMountDecorator(kubernetesJobManagerParameters)};

for (KubernetesStepDecorator stepDecorator: stepDecorators) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.kubernetes.kubeclient.FlinkPod;
import org.apache.flink.kubernetes.kubeclient.decorators.FlinkConfMountDecorator;
import org.apache.flink.kubernetes.kubeclient.decorators.HadoopConfMountDecorator;
import org.apache.flink.kubernetes.kubeclient.decorators.InitTaskManagerDecorator;
import org.apache.flink.kubernetes.kubeclient.decorators.JavaCmdTaskManagerDecorator;
import org.apache.flink.kubernetes.kubeclient.decorators.KubernetesStepDecorator;
Expand All @@ -40,6 +41,7 @@ public static KubernetesPod createTaskManagerComponent(KubernetesTaskManagerPara
final KubernetesStepDecorator[] stepDecorators = new KubernetesStepDecorator[] {
new InitTaskManagerDecorator(kubernetesTaskManagerParameters),
new JavaCmdTaskManagerDecorator(kubernetesTaskManagerParameters),
new HadoopConfMountDecorator(kubernetesTaskManagerParameters),
new FlinkConfMountDecorator(kubernetesTaskManagerParameters)};

for (KubernetesStepDecorator stepDecorator: stepDecorators) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
import org.apache.flink.runtime.clusterframework.BootstrapTools;

import io.fabric8.kubernetes.api.model.LocalObjectReference;
import org.apache.commons.lang3.StringUtils;

import java.io.File;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static org.apache.flink.kubernetes.configuration.KubernetesConfigOptions.CONTAINER_IMAGE_PULL_SECRETS;
import static org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOG4J_NAME;
Expand Down Expand Up @@ -134,6 +136,33 @@ public boolean hasLog4j() {
return log4jFile.exists();
}

@Override
public Optional<String> getExistingHadoopConfigurationConfigMap() {
final String existingHadoopConfigMap = flinkConfig.getString(KubernetesConfigOptions.HADOOP_CONF_CONFIG_MAP);
if (StringUtils.isBlank(existingHadoopConfigMap)) {
return Optional.empty();
} else {
return Optional.of(existingHadoopConfigMap.trim());
}
}

@Override
public Optional<String> getLocalHadoopConfigurationDirectory() {
final String[] possibleHadoopConfPaths = new String[] {
System.getenv(Constants.ENV_HADOOP_CONF_DIR),
System.getenv(Constants.ENV_HADOOP_HOME) + "/etc/hadoop", // hadoop 2.2
System.getenv(Constants.ENV_HADOOP_HOME) + "/conf"
};

for (String hadoopConfPath: possibleHadoopConfPaths) {
if (StringUtils.isNotBlank(hadoopConfPath)) {
return Optional.of(hadoopConfPath);
}
}

return Optional.empty();
}

/**
* Extract container customized environment variable properties with a given name prefix.
* @param envPrefix the given property name prefix
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.fabric8.kubernetes.api.model.LocalObjectReference;

import java.util.Map;
import java.util.Optional;

/**
* A common collection of parameters that is used to construct the JobManager/TaskManager Pods,
Expand Down Expand Up @@ -80,4 +81,14 @@ public interface KubernetesParameters {
* Whether the log4j.properties is located.
*/
boolean hasLog4j();

/**
* The existing ConfigMap containing custom Hadoop configuration.
*/
Optional<String> getExistingHadoopConfigurationConfigMap();

/**
* The local directory to locate the custom Hadoop configuration.
*/
Optional<String> getLocalHadoopConfigurationDirectory();
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,17 @@ public class Constants {
public static final String APPS_API_VERSION = "apps/v1";

public static final String CONFIG_FILE_LOGBACK_NAME = "logback.xml";

public static final String CONFIG_FILE_LOG4J_NAME = "log4j.properties";

public static final String FLINK_CONF_VOLUME = "flink-config-volume";

public static final String CONFIG_MAP_PREFIX = "flink-config-";

public static final String HADOOP_CONF_VOLUME = "hadoop-config-volume";
public static final String HADOOP_CONF_CONFIG_MAP_PREFIX = "hadoop-config-";
public static final String HADOOP_CONF_DIR_IN_POD = "/opt/hadoop/conf";
public static final String ENV_HADOOP_CONF_DIR = "HADOOP_CONF_DIR";
public static final String ENV_HADOOP_HOME = "HADOOP_HOME";

public static final String FLINK_REST_SERVICE_SUFFIX = "-rest";

public static final String NAME_SEPARATOR = "-";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
Expand Down Expand Up @@ -57,6 +58,8 @@ public class KubernetesTestBase extends TestLogger {

protected File flinkConfDir;

protected File hadoopConfDir;

protected final Configuration flinkConfig = new Configuration();

protected KubernetesClient kubeClient;
Expand All @@ -71,6 +74,8 @@ public void setup() throws Exception {
flinkConfig.set(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY, CONTAINER_IMAGE_PULL_POLICY);

flinkConfDir = temporaryFolder.newFolder().getAbsoluteFile();
hadoopConfDir = temporaryFolder.newFolder().getAbsoluteFile();

writeFlinkConfiguration();

Map<String, String> map = new HashMap<>();
Expand All @@ -91,4 +96,15 @@ protected Map<String, String> getCommonLabels() {
labels.put(Constants.LABEL_APP_KEY, CLUSTER_ID);
return labels;
}

protected void setHadoopConfDirEnv() {
Map<String, String> map = new HashMap<>();
map.put(Constants.ENV_HADOOP_CONF_DIR, hadoopConfDir.toString());
CommonTestUtils.setEnv(map, false);
}

protected void generateHadoopConfFileItems() throws IOException {
KubernetesTestUtils.createTemporyFile("some data", hadoopConfDir, "core-site.xml");
KubernetesTestUtils.createTemporyFile("some data", hadoopConfDir, "hdfs-site.xml");
}
}
Loading

0 comments on commit f56a075

Please sign in to comment.