Skip to content

Commit

Permalink
[FLINK-16194][k8s] Introduce the monadic step decorator design and th…
Browse files Browse the repository at this point in the history
…e InitJobManagerDecorator and the TaskManagerDecorator
  • Loading branch information
zhengcanbin authored and tisonkun committed Mar 5, 2020
1 parent 22735e3 commit a50435d
Show file tree
Hide file tree
Showing 11 changed files with 919 additions and 37 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.kubernetes.kubeclient;

import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.ContainerBuilder;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* A collection of variables that composes a JobManager/TaskManager Pod. This can include
* the Pod, the main Container, and the InitContainer, etc.
*/
public class FlinkPod {

private Pod pod;

private Container mainContainer;

public FlinkPod(Pod pod, Container mainContainer) {
this.pod = pod;
this.mainContainer = mainContainer;
}

public Pod getPod() {
return pod;
}

public void setPod(Pod pod) {
this.pod = pod;
}

public Container getMainContainer() {
return mainContainer;
}

public void setMainContainer(Container mainContainer) {
this.mainContainer = mainContainer;
}

/**
* Builder for creating a {@link FlinkPod}.
*/
public static class Builder {

private Pod pod;
private Container mainContainer;

public Builder() {
this.pod = new PodBuilder()
.withNewMetadata()
.endMetadata()
.withNewSpec()
.endSpec()
.build();

this.mainContainer = new ContainerBuilder().build();
}

public Builder(FlinkPod flinkPod) {
checkNotNull(flinkPod);
this.pod = checkNotNull(flinkPod.getPod());
this.mainContainer = checkNotNull(flinkPod.getMainContainer());
}

public Builder withPod(Pod pod) {
this.pod = checkNotNull(pod);
return this;
}

public Builder withMainContainer(Container mainContainer) {
this.mainContainer = checkNotNull(mainContainer);
return this;
}

public FlinkPod build() {
return new FlinkPod(this.pod, this.mainContainer);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.kubernetes.kubeclient.decorators;

import org.apache.flink.kubernetes.kubeclient.FlinkPod;

import io.fabric8.kubernetes.api.model.HasMetadata;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

/**
* An abstract {@link KubernetesStepDecorator} contains common implementations for different plug-in features.
*/
public abstract class AbstractKubernetesStepDecorator implements KubernetesStepDecorator {

/**
* Apply transformations on the given FlinkPod in accordance to this feature.
* Note that we should return a FlinkPod that keeps all of the properties of the passed FlinkPod object.
*
* <p>So this is correct:
*
* <pre>
* {@code
*
* Pod decoratedPod = new PodBuilder(pod) // Keeps the original state
* ...
* .build()
*
* Container decoratedContainer = new ContainerBuilder(container) // Keeps the original state
* ...
* .build()
*
* FlinkPod decoratedFlinkPod = new FlinkPodBuilder(flinkPod) // Keeps the original state
* ...
* .build()
*
* }
* </pre>
*
* <p>And this is the incorrect:
*
* <pre>
* {@code
*
* Pod decoratedPod = new PodBuilder() // Loses the original state
* ...
* .build()
*
* Container decoratedContainer = new ContainerBuilder() // Loses the original state
* ...
* .build()
*
* FlinkPod decoratedFlinkPod = new FlinkPodBuilder() // Loses the original state
* ...
* .build()
*
* }
* </pre>
*/
@Override
public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
return flinkPod;
}

/**
* Note that the method could have a side effect of modifying the Flink Configuration object, such as
* update the JobManager address.
*/
@Override
public List<HasMetadata> buildAccompanyingKubernetesResources() throws IOException {
return Collections.emptyList();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.kubernetes.kubeclient.decorators;

import org.apache.flink.kubernetes.kubeclient.FlinkPod;
import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters;
import org.apache.flink.kubernetes.utils.KubernetesUtils;

import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.ContainerBuilder;
import io.fabric8.kubernetes.api.model.ContainerPort;
import io.fabric8.kubernetes.api.model.ContainerPortBuilder;
import io.fabric8.kubernetes.api.model.EnvVar;
import io.fabric8.kubernetes.api.model.EnvVarBuilder;
import io.fabric8.kubernetes.api.model.EnvVarSourceBuilder;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.ResourceRequirements;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

import static org.apache.flink.kubernetes.utils.Constants.API_VERSION;
import static org.apache.flink.kubernetes.utils.Constants.ENV_FLINK_POD_IP_ADDRESS;
import static org.apache.flink.kubernetes.utils.Constants.POD_IP_FIELD_PATH;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* An initializer for the JobManager {@link org.apache.flink.kubernetes.kubeclient.FlinkPod}.
*/
public class InitJobManagerDecorator extends AbstractKubernetesStepDecorator {

private final KubernetesJobManagerParameters kubernetesJobManagerParameters;

public InitJobManagerDecorator(KubernetesJobManagerParameters kubernetesJobManagerParameters) {
this.kubernetesJobManagerParameters = checkNotNull(kubernetesJobManagerParameters);
}

@Override
public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
final Pod basicPod = new PodBuilder(flinkPod.getPod())
.withApiVersion(API_VERSION)
.editOrNewMetadata()
.withLabels(kubernetesJobManagerParameters.getLabels())
.endMetadata()
.editOrNewSpec()
.withServiceAccountName(kubernetesJobManagerParameters.getServiceAccount())
.withImagePullSecrets(kubernetesJobManagerParameters.getImagePullSecrets())
.endSpec()
.build();

final Container basicMainContainer = decorateMainContainer(flinkPod.getMainContainer());

return new FlinkPod.Builder(flinkPod)
.withPod(basicPod)
.withMainContainer(basicMainContainer)
.build();
}

private Container decorateMainContainer(Container container) {
final ResourceRequirements requirements = KubernetesUtils.getResourceRequirements(
kubernetesJobManagerParameters.getJobManagerMemoryMB(),
kubernetesJobManagerParameters.getJobManagerCPU());

return new ContainerBuilder(container)
.withName(kubernetesJobManagerParameters.getJobManagerMainContainerName())
.withImage(kubernetesJobManagerParameters.getImage())
.withImagePullPolicy(kubernetesJobManagerParameters.getImagePullPolicy())
.withResources(requirements)
.withPorts(getContainerPorts())
.withEnv(getCustomizedEnvs())
.addNewEnv()
.withName(ENV_FLINK_POD_IP_ADDRESS)
.withValueFrom(new EnvVarSourceBuilder()
.withNewFieldRef(API_VERSION, POD_IP_FIELD_PATH)
.build())
.endEnv()
.build();
}

private List<ContainerPort> getContainerPorts() {
return Arrays.asList(
new ContainerPortBuilder()
.withContainerPort(kubernetesJobManagerParameters.getRestPort())
.build(),
new ContainerPortBuilder()
.withContainerPort(kubernetesJobManagerParameters.getRPCPort())
.build(),
new ContainerPortBuilder().
withContainerPort(kubernetesJobManagerParameters.getBlobServerPort())
.build());
}

private List<EnvVar> getCustomizedEnvs() {
return kubernetesJobManagerParameters.getEnvironments()
.entrySet()
.stream()
.map(kv -> new EnvVarBuilder()
.withName(kv.getKey())
.withValue(kv.getValue())
.build())
.collect(Collectors.toList());
}
}
Loading

0 comments on commit a50435d

Please sign in to comment.