Skip to content

Commit

Permalink
[FLINK-16194][k8s] Introduce decorator for mounting configuration fil…
Browse files Browse the repository at this point in the history
…es such as flink-conf.yaml, log4j.properties, and logback.xml
  • Loading branch information
zhengcanbin authored and tisonkun committed Mar 5, 2020
1 parent 4d8281a commit d29628c
Show file tree
Hide file tree
Showing 3 changed files with 406 additions and 76 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.kubernetes.kubeclient.decorators;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.client.cli.CliFrontend;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.kubeclient.FlinkPod;
import org.apache.flink.kubernetes.kubeclient.parameters.AbstractKubernetesParameters;
import org.apache.flink.kubernetes.utils.Constants;

import org.apache.flink.shaded.guava18.com.google.common.io.Files;

import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.ContainerBuilder;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.KeyToPath;
import io.fabric8.kubernetes.api.model.KeyToPathBuilder;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.Volume;
import io.fabric8.kubernetes.api.model.VolumeBuilder;

import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.flink.configuration.GlobalConfiguration.FLINK_CONF_FILENAME;
import static org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOG4J_NAME;
import static org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOGBACK_NAME;
import static org.apache.flink.kubernetes.utils.Constants.CONFIG_MAP_PREFIX;
import static org.apache.flink.kubernetes.utils.Constants.FLINK_CONF_VOLUME;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* Mounts the log4j.properties, logback.xml, and flink-conf.yaml configuration on the JobManager or TaskManager pod.
*/
public class FlinkConfMountDecorator extends AbstractKubernetesStepDecorator {

private final AbstractKubernetesParameters kubernetesComponentConf;

public FlinkConfMountDecorator(AbstractKubernetesParameters kubernetesComponentConf) {
this.kubernetesComponentConf = checkNotNull(kubernetesComponentConf);
}

@Override
public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
final Pod mountedPod = decoratePod(flinkPod.getPod());

final Container mountedMainContainer = new ContainerBuilder(flinkPod.getMainContainer())
.addNewVolumeMount()
.withName(FLINK_CONF_VOLUME)
.withMountPath(kubernetesComponentConf.getFlinkConfDirInPod())
.endVolumeMount()
.build();

return new FlinkPod.Builder(flinkPod)
.withPod(mountedPod)
.withMainContainer(mountedMainContainer)
.build();
}

private Pod decoratePod(Pod pod) {
final List<KeyToPath> keyToPaths = getLocalLogConfFiles().stream()
.map(file -> new KeyToPathBuilder()
.withKey(file.getName())
.withPath(file.getName())
.build())
.collect(Collectors.toList());
keyToPaths.add(new KeyToPathBuilder()
.withKey(FLINK_CONF_FILENAME)
.withPath(FLINK_CONF_FILENAME)
.build());

final Volume flinkConfVolume = new VolumeBuilder()
.withName(FLINK_CONF_VOLUME)
.withNewConfigMap()
.withName(getFlinkConfConfigMapName(kubernetesComponentConf.getClusterId()))
.withItems(keyToPaths)
.endConfigMap()
.build();

return new PodBuilder(pod)
.editSpec()
.addNewVolumeLike(flinkConfVolume)
.endVolume()
.endSpec()
.build();
}

@Override
public List<HasMetadata> buildAccompanyingKubernetesResources() throws IOException {
final String clusterId = kubernetesComponentConf.getClusterId();

final Map<String, String> data = new HashMap<>();

final List<File> localLogFiles = getLocalLogConfFiles();
for (File file : localLogFiles) {
data.put(file.getName(), Files.toString(file, StandardCharsets.UTF_8));
}

final Map<String, String> propertiesMap = getClusterSidePropertiesMap(kubernetesComponentConf.getFlinkConfiguration());
data.put(FLINK_CONF_FILENAME, getFlinkConfData(propertiesMap));

final ConfigMap flinkConfConfigMap = new ConfigMapBuilder()
.withApiVersion(Constants.API_VERSION)
.withNewMetadata()
.withName(getFlinkConfConfigMapName(clusterId))
.withLabels(kubernetesComponentConf.getCommonLabels())
.endMetadata()
.addToData(data)
.build();

return Collections.singletonList(flinkConfConfigMap);
}

/**
* Get properties map for the cluster-side after removal of some keys.
*/
private Map<String, String> getClusterSidePropertiesMap(Configuration flinkConfig) {
final Map<String, String> propertiesMap = flinkConfig.toMap();

// remove kubernetes.config.file
propertiesMap.remove(KubernetesConfigOptions.KUBE_CONFIG_FILE.key());
return propertiesMap;
}

@VisibleForTesting
String getFlinkConfData(Map<String, String> propertiesMap) throws IOException {
try (StringWriter sw = new StringWriter();
PrintWriter out = new PrintWriter(sw)) {
propertiesMap.forEach((k, v) -> {
out.print(k);
out.print(": ");
out.println(v);
});

return sw.toString();
}
}

private List<File> getLocalLogConfFiles() {
final String confDir = CliFrontend.getConfigurationDirectoryFromEnv();
final File logbackFile = new File(confDir, CONFIG_FILE_LOGBACK_NAME);
final File log4jFile = new File(confDir, CONFIG_FILE_LOG4J_NAME);

List<File> localLogConfFiles = new ArrayList<>();
if (logbackFile.exists()) {
localLogConfFiles.add(logbackFile);
}
if (log4jFile.exists()) {
localLogConfFiles.add(log4jFile);
}

return localLogConfFiles;
}

@VisibleForTesting
String getFlinkConfConfigMapName(String clusterId) {
return CONFIG_MAP_PREFIX + clusterId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,9 @@
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
import org.apache.flink.util.FlinkRuntimeException;

import io.fabric8.kubernetes.api.model.ConfigMapVolumeSourceBuilder;
import io.fabric8.kubernetes.api.model.KeyToPath;
import io.fabric8.kubernetes.api.model.Quantity;
import io.fabric8.kubernetes.api.model.ResourceRequirements;
import io.fabric8.kubernetes.api.model.ResourceRequirementsBuilder;
import io.fabric8.kubernetes.api.model.Volume;
import io.fabric8.kubernetes.api.model.VolumeMount;
import io.fabric8.kubernetes.api.model.VolumeMountBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -47,16 +42,9 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.flink.configuration.GlobalConfiguration.FLINK_CONF_FILENAME;
import static org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOG4J_NAME;
import static org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOGBACK_NAME;
import static org.apache.flink.kubernetes.utils.Constants.CONFIG_MAP_PREFIX;
import static org.apache.flink.kubernetes.utils.Constants.FLINK_CONF_VOLUME;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
Expand Down Expand Up @@ -168,70 +156,6 @@ public static String getTaskManagerStartCommand(
);
}

/**
* Get config map volume for job manager and task manager pod.
*
* @param clusterId Cluster id.
* @param hasLogback Uses logback?
* @param hasLog4j Uses log4j?
* @return Config map volume.
*/
public static Volume getConfigMapVolume(String clusterId, boolean hasLogback, boolean hasLog4j) {
final Volume configMapVolume = new Volume();
configMapVolume.setName(FLINK_CONF_VOLUME);

final List<KeyToPath> items = new ArrayList<>();
items.add(new KeyToPath(FLINK_CONF_FILENAME, null, FLINK_CONF_FILENAME));

if (hasLogback) {
items.add(new KeyToPath(CONFIG_FILE_LOGBACK_NAME, null, CONFIG_FILE_LOGBACK_NAME));
}

if (hasLog4j) {
items.add(new KeyToPath(CONFIG_FILE_LOG4J_NAME, null, CONFIG_FILE_LOG4J_NAME));
}

configMapVolume.setConfigMap(new ConfigMapVolumeSourceBuilder()
.withName(CONFIG_MAP_PREFIX + clusterId)
.withItems(items)
.build());
return configMapVolume;
}

/**
* Get config map volume for job manager and task manager pod.
*
* @param flinkConfDirInPod Flink conf directory that will be mounted in the pod.
* @param hasLogback Uses logback?
* @param hasLog4j Uses log4j?
* @return Volume mount list.
*/
public static List<VolumeMount> getConfigMapVolumeMount(String flinkConfDirInPod, boolean hasLogback, boolean hasLog4j) {
final List<VolumeMount> volumeMounts = new ArrayList<>();
volumeMounts.add(new VolumeMountBuilder()
.withName(FLINK_CONF_VOLUME)
.withMountPath(new File(flinkConfDirInPod, FLINK_CONF_FILENAME).getPath())
.withSubPath(FLINK_CONF_FILENAME).build());

if (hasLogback) {
volumeMounts.add(new VolumeMountBuilder()
.withName(FLINK_CONF_VOLUME)
.withMountPath(new File(flinkConfDirInPod, CONFIG_FILE_LOGBACK_NAME).getPath())
.withSubPath(CONFIG_FILE_LOGBACK_NAME)
.build());
}

if (hasLog4j) {
volumeMounts.add(new VolumeMountBuilder()
.withName(FLINK_CONF_VOLUME)
.withMountPath(new File(flinkConfDirInPod, CONFIG_FILE_LOG4J_NAME).getPath())
.withSubPath(CONFIG_FILE_LOG4J_NAME)
.build());
}

return volumeMounts;
}

/**
* Generate name of the internal Service.
*/
Expand Down
Loading

0 comments on commit d29628c

Please sign in to comment.