Skip to content

Commit

Permalink
[FLINK-15656][k8s] Support to overwrite and merge some K8s fields fro…
Browse files Browse the repository at this point in the history
…m pod template and config options
  • Loading branch information
wangyang0918 authored and tillrohrmann committed Mar 1, 2021
1 parent 4957736 commit 1552131
Show file tree
Hide file tree
Showing 17 changed files with 813 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@
</tr>
<tr>
<td><h5>kubernetes.jobmanager.service-account</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td style="word-wrap: break-word;">"default"</td>
<td>String</td>
<td>Service account that is used by jobmanager within kubernetes cluster. The job manager uses this service account when requesting taskmanager pods from the API server. If not explicitly configured, config option 'kubernetes.service-account' will be used.</td>
</tr>
Expand Down Expand Up @@ -202,7 +202,7 @@
</tr>
<tr>
<td><h5>kubernetes.taskmanager.service-account</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td style="word-wrap: break-word;">"default"</td>
<td>String</td>
<td>Service account that is used by taskmanager within kubernetes cluster. The task manager uses this service account when watching config maps on the API server to retrieve leader address of jobmanager and resourcemanager. If not explicitly configured, config option 'kubernetes.service-account' will be used.</td>
</tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ public class KubernetesConfigOptions {
public static final ConfigOption<String> JOB_MANAGER_SERVICE_ACCOUNT =
key("kubernetes.jobmanager.service-account")
.stringType()
.noDefaultValue()
.defaultValue("default")
.withFallbackKeys(KUBERNETES_SERVICE_ACCOUNT_KEY)
.withDescription(
"Service account that is used by jobmanager within kubernetes cluster. "
+ "The job manager uses this service account when requesting taskmanager pods from the API server. "
Expand All @@ -72,7 +73,8 @@ public class KubernetesConfigOptions {
public static final ConfigOption<String> TASK_MANAGER_SERVICE_ACCOUNT =
key("kubernetes.taskmanager.service-account")
.stringType()
.noDefaultValue()
.defaultValue("default")
.withFallbackKeys(KUBERNETES_SERVICE_ACCOUNT_KEY)
.withDescription(
"Service account that is used by taskmanager within kubernetes cluster. "
+ "The task manager uses this service account when watching config maps on the API server to retrieve "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.flink.kubernetes.kubeclient.FlinkPod;

import io.fabric8.kubernetes.api.model.HasMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -32,6 +34,8 @@
*/
public abstract class AbstractKubernetesStepDecorator implements KubernetesStepDecorator {

protected final Logger logger = LoggerFactory.getLogger(this.getClass());

/**
* Apply transformations on the given FlinkPod in accordance to this feature. Note that we
* should return a FlinkPod that keeps all of the properties of the passed FlinkPod object.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.flink.kubernetes.kubeclient.decorators;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.kubeclient.FlinkPod;
import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesToleration;
Expand All @@ -31,7 +33,6 @@
import io.fabric8.kubernetes.api.model.EnvVar;
import io.fabric8.kubernetes.api.model.EnvVarBuilder;
import io.fabric8.kubernetes.api.model.EnvVarSourceBuilder;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.ResourceRequirements;

Expand All @@ -49,64 +50,105 @@
public class InitJobManagerDecorator extends AbstractKubernetesStepDecorator {

private final KubernetesJobManagerParameters kubernetesJobManagerParameters;
private final Configuration flinkConfig;

public InitJobManagerDecorator(KubernetesJobManagerParameters kubernetesJobManagerParameters) {
this.kubernetesJobManagerParameters = checkNotNull(kubernetesJobManagerParameters);
this.flinkConfig = checkNotNull(kubernetesJobManagerParameters.getFlinkConfiguration());
}

@Override
public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
final Pod basicPod =
new PodBuilder(flinkPod.getPodWithoutMainContainer())
.withApiVersion(API_VERSION)
.editOrNewMetadata()
.withLabels(kubernetesJobManagerParameters.getLabels())
.withAnnotations(kubernetesJobManagerParameters.getAnnotations())
.endMetadata()
.editOrNewSpec()
.withServiceAccountName(kubernetesJobManagerParameters.getServiceAccount())
.withImagePullSecrets(kubernetesJobManagerParameters.getImagePullSecrets())
.withNodeSelector(kubernetesJobManagerParameters.getNodeSelector())
.withTolerations(
kubernetesJobManagerParameters.getTolerations().stream()
.map(
e ->
KubernetesToleration.fromMap(e)
.getInternalResource())
.collect(Collectors.toList()))
.endSpec()
.build();
final PodBuilder basicPodBuilder = new PodBuilder(flinkPod.getPodWithoutMainContainer());

// Overwrite fields
final String serviceAccountName =
KubernetesUtils.resolveUserDefinedValue(
flinkConfig,
KubernetesConfigOptions.JOB_MANAGER_SERVICE_ACCOUNT,
kubernetesJobManagerParameters.getServiceAccount(),
KubernetesUtils.getServiceAccount(flinkPod),
"service account");
if (flinkPod.getPodWithoutMainContainer().getSpec().getRestartPolicy() != null) {
logger.info(
"The restart policy of JobManager pod will be overwritten to 'always' "
+ "since it is controlled by the Kubernetes deployment.");
}
basicPodBuilder
.withApiVersion(API_VERSION)
.editOrNewSpec()
.withServiceAccount(serviceAccountName)
.withServiceAccountName(serviceAccountName)
.endSpec();

// Merge fields
basicPodBuilder
.editOrNewMetadata()
.addToLabels(kubernetesJobManagerParameters.getLabels())
.addToAnnotations(kubernetesJobManagerParameters.getAnnotations())
.endMetadata()
.editOrNewSpec()
.addToImagePullSecrets(kubernetesJobManagerParameters.getImagePullSecrets())
.addToNodeSelector(kubernetesJobManagerParameters.getNodeSelector())
.addAllToTolerations(
kubernetesJobManagerParameters.getTolerations().stream()
.map(e -> KubernetesToleration.fromMap(e).getInternalResource())
.collect(Collectors.toList()))
.endSpec();

final Container basicMainContainer = decorateMainContainer(flinkPod.getMainContainer());

return new FlinkPod.Builder(flinkPod)
.withPod(basicPod)
.withPod(basicPodBuilder.build())
.withMainContainer(basicMainContainer)
.build();
}

private Container decorateMainContainer(Container container) {
final ContainerBuilder mainContainerBuilder = new ContainerBuilder(container);
// Overwrite fields
final String image =
KubernetesUtils.resolveUserDefinedValue(
flinkConfig,
KubernetesConfigOptions.CONTAINER_IMAGE,
kubernetesJobManagerParameters.getImage(),
container.getImage(),
"main container image");
final String imagePullPolicy =
KubernetesUtils.resolveUserDefinedValue(
flinkConfig,
KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY,
kubernetesJobManagerParameters.getImagePullPolicy().name(),
container.getImagePullPolicy(),
"main container image pull policy");
final ResourceRequirements requirementsInPodTemplate =
container.getResources() == null
? new ResourceRequirements()
: container.getResources();
final ResourceRequirements requirements =
KubernetesUtils.getResourceRequirements(
requirementsInPodTemplate,
kubernetesJobManagerParameters.getJobManagerMemoryMB(),
kubernetesJobManagerParameters.getJobManagerCPU(),
Collections.emptyMap());

return new ContainerBuilder(container)
mainContainerBuilder
.withName(Constants.MAIN_CONTAINER_NAME)
.withImage(kubernetesJobManagerParameters.getImage())
.withImagePullPolicy(kubernetesJobManagerParameters.getImagePullPolicy().name())
.withResources(requirements)
.withPorts(getContainerPorts())
.withEnv(getCustomizedEnvs())
.withImage(image)
.withImagePullPolicy(imagePullPolicy)
.withResources(requirements);

// Merge fields
mainContainerBuilder
.addAllToPorts(getContainerPorts())
.addAllToEnv(getCustomizedEnvs())
.addNewEnv()
.withName(ENV_FLINK_POD_IP_ADDRESS)
.withValueFrom(
new EnvVarSourceBuilder()
.withNewFieldRef(API_VERSION, POD_IP_FIELD_PATH)
.build())
.endEnv()
.build();
.endEnv();
return mainContainerBuilder.build();
}

private List<ContainerPort> getContainerPorts() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.flink.kubernetes.kubeclient.decorators;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.kubeclient.FlinkPod;
import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesTaskManagerParameters;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesToleration;
Expand All @@ -28,7 +30,6 @@
import io.fabric8.kubernetes.api.model.ContainerBuilder;
import io.fabric8.kubernetes.api.model.ContainerPortBuilder;
import io.fabric8.kubernetes.api.model.EnvVar;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.ResourceRequirements;

Expand All @@ -41,64 +42,109 @@
public class InitTaskManagerDecorator extends AbstractKubernetesStepDecorator {

private final KubernetesTaskManagerParameters kubernetesTaskManagerParameters;
private final Configuration flinkConfig;

public InitTaskManagerDecorator(
KubernetesTaskManagerParameters kubernetesTaskManagerParameters) {
this.kubernetesTaskManagerParameters = checkNotNull(kubernetesTaskManagerParameters);
this.flinkConfig = checkNotNull(kubernetesTaskManagerParameters.getFlinkConfiguration());
}

@Override
public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
final Pod basicPod =
new PodBuilder(flinkPod.getPodWithoutMainContainer())
.withApiVersion(Constants.API_VERSION)
.editOrNewMetadata()
.withName(kubernetesTaskManagerParameters.getPodName())
.withLabels(kubernetesTaskManagerParameters.getLabels())
.withAnnotations(kubernetesTaskManagerParameters.getAnnotations())
.endMetadata()
.editOrNewSpec()
.withServiceAccountName(kubernetesTaskManagerParameters.getServiceAccount())
.withRestartPolicy(Constants.RESTART_POLICY_OF_NEVER)
.withImagePullSecrets(kubernetesTaskManagerParameters.getImagePullSecrets())
.withNodeSelector(kubernetesTaskManagerParameters.getNodeSelector())
.withTolerations(
kubernetesTaskManagerParameters.getTolerations().stream()
.map(
e ->
KubernetesToleration.fromMap(e)
.getInternalResource())
.collect(Collectors.toList()))
.endSpec()
.build();
final PodBuilder basicPodBuilder = new PodBuilder(flinkPod.getPodWithoutMainContainer());

// Overwrite fields
final String serviceAccountName =
KubernetesUtils.resolveUserDefinedValue(
flinkConfig,
KubernetesConfigOptions.TASK_MANAGER_SERVICE_ACCOUNT,
kubernetesTaskManagerParameters.getServiceAccount(),
KubernetesUtils.getServiceAccount(flinkPod),
"service account");
if (flinkPod.getPodWithoutMainContainer().getSpec().getRestartPolicy() != null) {
logger.info(
"The restart policy of TaskManager pod will be overwritten to 'never' "
+ "since it should not be restarted.");
}
basicPodBuilder
.withApiVersion(Constants.API_VERSION)
.editOrNewMetadata()
.withName(kubernetesTaskManagerParameters.getPodName())
.endMetadata()
.editOrNewSpec()
.withServiceAccount(serviceAccountName)
.withServiceAccountName(serviceAccountName)
.withRestartPolicy(Constants.RESTART_POLICY_OF_NEVER)
.endSpec();

// Merge fields
basicPodBuilder
.editOrNewMetadata()
.addToLabels(kubernetesTaskManagerParameters.getLabels())
.addToAnnotations(kubernetesTaskManagerParameters.getAnnotations())
.endMetadata()
.editOrNewSpec()
.addToImagePullSecrets(kubernetesTaskManagerParameters.getImagePullSecrets())
.addToNodeSelector(kubernetesTaskManagerParameters.getNodeSelector())
.addAllToTolerations(
kubernetesTaskManagerParameters.getTolerations().stream()
.map(e -> KubernetesToleration.fromMap(e).getInternalResource())
.collect(Collectors.toList()))
.endSpec();

final Container basicMainContainer = decorateMainContainer(flinkPod.getMainContainer());

return new FlinkPod.Builder(flinkPod)
.withPod(basicPod)
.withPod(basicPodBuilder.build())
.withMainContainer(basicMainContainer)
.build();
}

private Container decorateMainContainer(Container container) {
final ContainerBuilder mainContainerBuilder = new ContainerBuilder(container);

// Overwrite fields
final ResourceRequirements requirementsInPodTemplate =
container.getResources() == null
? new ResourceRequirements()
: container.getResources();
final ResourceRequirements resourceRequirements =
KubernetesUtils.getResourceRequirements(
requirementsInPodTemplate,
kubernetesTaskManagerParameters.getTaskManagerMemoryMB(),
kubernetesTaskManagerParameters.getTaskManagerCPU(),
kubernetesTaskManagerParameters.getTaskManagerExternalResources());

return new ContainerBuilder(container)
final String image =
KubernetesUtils.resolveUserDefinedValue(
flinkConfig,
KubernetesConfigOptions.CONTAINER_IMAGE,
kubernetesTaskManagerParameters.getImage(),
container.getImage(),
"main container image");
final String imagePullPolicy =
KubernetesUtils.resolveUserDefinedValue(
flinkConfig,
KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY,
kubernetesTaskManagerParameters.getImagePullPolicy().name(),
container.getImagePullPolicy(),
"main container image pull policy");
mainContainerBuilder
.withName(Constants.MAIN_CONTAINER_NAME)
.withImage(kubernetesTaskManagerParameters.getImage())
.withImagePullPolicy(kubernetesTaskManagerParameters.getImagePullPolicy().name())
.withResources(resourceRequirements)
.withPorts(
.withImage(image)
.withImagePullPolicy(imagePullPolicy)
.withResources(resourceRequirements);

// Merge fields
mainContainerBuilder
.addToPorts(
new ContainerPortBuilder()
.withName(Constants.TASK_MANAGER_RPC_PORT_NAME)
.withContainerPort(kubernetesTaskManagerParameters.getRPCPort())
.build())
.withEnv(getCustomizedEnvs())
.build();
.addAllToEnv(getCustomizedEnvs());

return mainContainerBuilder.build();
}

private List<EnvVar> getCustomizedEnvs() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,7 @@ public int getBlobServerPort() {
}

public String getServiceAccount() {
return flinkConfig
.getOptional(KubernetesConfigOptions.JOB_MANAGER_SERVICE_ACCOUNT)
.orElse(flinkConfig.getString(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT));
return flinkConfig.get(KubernetesConfigOptions.JOB_MANAGER_SERVICE_ACCOUNT);
}

public String getEntrypointClass() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,7 @@ public double getTaskManagerCPU() {
}

public String getServiceAccount() {
return flinkConfig
.getOptional(KubernetesConfigOptions.TASK_MANAGER_SERVICE_ACCOUNT)
.orElse(flinkConfig.getString(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT));
return flinkConfig.get(KubernetesConfigOptions.TASK_MANAGER_SERVICE_ACCOUNT);
}

public Map<String, Long> getTaskManagerExternalResources() {
Expand Down
Loading

0 comments on commit 1552131

Please sign in to comment.