Skip to content

Commit

Permalink
[FLINK-16194][k8s] Introduce Factories that chain the decorators toge…
Browse files Browse the repository at this point in the history
…ther to construct all the client/cluster-side Kubernetes resources
  • Loading branch information
zhengcanbin authored and tisonkun committed Mar 5, 2020
1 parent d29628c commit 10f5329
Show file tree
Hide file tree
Showing 12 changed files with 514 additions and 11 deletions.
8 changes: 4 additions & 4 deletions docs/ops/deployment/native_kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,12 @@ $ echo 'stop' | ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=<ClusterId>

#### Manual Resource Cleanup

Flink uses [Kubernetes ownerReference's](https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/) to cleanup all cluster components.
All the Flink created resources, including `ConfigMap`, `Service`, `Deployment`, `Pod`, have been set the ownerReference to `service/<ClusterId>`.
When the service is deleted, all other resource will be deleted automatically.
Flink uses [Kubernetes OwnerReference's](https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/) to cleanup all cluster components.
All the Flink created resources, including `ConfigMap`, `Service`, `Pod`, have been set the OwnerReference to `deployment/<ClusterId>`.
When the deployment is deleted, all other resources will be deleted automatically.

{% highlight bash %}
$ kubectl delete service/<ClusterID>
$ kubectl delete deployment/<ClusterID>
{% endhighlight %}

## Log Files
Expand Down
8 changes: 4 additions & 4 deletions docs/ops/deployment/native_kubernetes.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,12 @@ $ echo 'stop' | ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=<ClusterId>

#### Manual Resource Cleanup

Flink uses [Kubernetes ownerReference's](https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/) to cleanup all cluster components.
All the Flink created resources, including `ConfigMap`, `Service`, `Deployment`, `Pod`, have been set the ownerReference to `service/<ClusterId>`.
When the service is deleted, all other resource will be deleted automatically.
Flink uses [Kubernetes OwnerReference's](https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/) to cleanup all cluster components.
All the Flink created resources, including `ConfigMap`, `Service`, `Pod`, have been set the OwnerReference to `deployment/<ClusterId>`.
When the deployment is deleted, all other resources will be deleted automatically.

{% highlight bash %}
$ kubectl delete service/<ClusterID>
$ kubectl delete deployment/<ClusterID>
{% endhighlight %}

## Log Files
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ OUTPUT_PATH="/tmp/wc_out"
ARGS="--output ${OUTPUT_PATH}"

function cleanup {
kubectl delete service ${CLUSTER_ID}
kubectl delete deployment ${CLUSTER_ID}
kubectl delete clusterrolebinding ${CLUSTER_ROLE_BINDING}
stop_kubernetes
}
Expand Down
1 change: 1 addition & 0 deletions flink-kubernetes/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ under the License.
<include>io.fabric8:kubernetes-client</include>
<include>io.fabric8:kubernetes-model</include>
<include>io.fabric8:kubernetes-model-common</include>
<include>io.fabric8:zjsonpatch</include>

<!-- Shade all the dependencies of kubernetes client -->
<include>com.fasterxml.jackson.core:jackson-core</include>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.kubernetes.kubeclient;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.apps.Deployment;

import java.util.List;

/**
* Composition of the created Kubernetes components that represents a Flink application.
*/
public class KubernetesJobManagerSpecification {

private Deployment deployment;

private List<HasMetadata> accompanyingResources;

public KubernetesJobManagerSpecification(Deployment deployment, List<HasMetadata> accompanyingResources) {
this.deployment = deployment;
this.accompanyingResources = accompanyingResources;
}

public Deployment getDeployment() {
return deployment;
}

public List<HasMetadata> getAccompanyingResources() {
return accompanyingResources;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.kubernetes.kubeclient.factory;

import org.apache.flink.kubernetes.kubeclient.FlinkPod;
import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerSpecification;
import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
import org.apache.flink.kubernetes.kubeclient.decorators.FlinkConfMountDecorator;
import org.apache.flink.kubernetes.kubeclient.decorators.InitJobManagerDecorator;
import org.apache.flink.kubernetes.kubeclient.decorators.InternalServiceDecorator;
import org.apache.flink.kubernetes.kubeclient.decorators.JavaCmdJobManagerDecorator;
import org.apache.flink.kubernetes.kubeclient.decorators.KubernetesStepDecorator;
import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.kubernetes.utils.KubernetesUtils;

import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
* Utility class for constructing all the Kubernetes components on the client-side. This can
* include the Deployment, the ConfigMap(s), and the Service(s).
*/
public class KubernetesJobManagerFactory {

public static KubernetesJobManagerSpecification createJobManagerComponent(
KubernetesJobManagerParameters kubernetesJobManagerParameters) throws IOException {
FlinkPod flinkPod = new FlinkPod.Builder().build();
List<HasMetadata> accompanyingResources = new ArrayList<>();

final KubernetesStepDecorator[] stepDecorators = new KubernetesStepDecorator[] {
new InitJobManagerDecorator(kubernetesJobManagerParameters),
new JavaCmdJobManagerDecorator(kubernetesJobManagerParameters),
new InternalServiceDecorator(kubernetesJobManagerParameters),
new ExternalServiceDecorator(kubernetesJobManagerParameters),
new FlinkConfMountDecorator(kubernetesJobManagerParameters)};

for (KubernetesStepDecorator stepDecorator: stepDecorators) {
flinkPod = stepDecorator.decorateFlinkPod(flinkPod);
accompanyingResources.addAll(stepDecorator.buildAccompanyingKubernetesResources());
}

final Deployment deployment = createJobManagerDeployment(flinkPod, kubernetesJobManagerParameters);

return new KubernetesJobManagerSpecification(deployment, accompanyingResources);
}

private static Deployment createJobManagerDeployment(
FlinkPod flinkPod,
KubernetesJobManagerParameters kubernetesJobManagerParameters) {
final Container resolvedMainContainer = flinkPod.getMainContainer();

final Pod resolvedPod = new PodBuilder(flinkPod.getPod())
.editOrNewSpec()
.addToContainers(resolvedMainContainer)
.endSpec()
.build();

final Map<String, String> labels = resolvedPod.getMetadata().getLabels();

return new DeploymentBuilder()
.withApiVersion(Constants.APPS_API_VERSION)
.editOrNewMetadata()
.withName(KubernetesUtils.getDeploymentName(kubernetesJobManagerParameters.getClusterId()))
.withLabels(kubernetesJobManagerParameters.getLabels())
.endMetadata()
.editOrNewSpec()
.withReplicas(1)
.editOrNewTemplate()
.editOrNewMetadata()
.withLabels(labels)
.endMetadata()
.withSpec(resolvedPod.getSpec())
.endTemplate()
.editOrNewSelector()
.addToMatchLabels(labels)
.endSelector()
.endSpec()
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.kubernetes.kubeclient.factory;

import org.apache.flink.kubernetes.kubeclient.FlinkPod;
import org.apache.flink.kubernetes.kubeclient.decorators.FlinkConfMountDecorator;
import org.apache.flink.kubernetes.kubeclient.decorators.InitTaskManagerDecorator;
import org.apache.flink.kubernetes.kubeclient.decorators.JavaCmdTaskManagerDecorator;
import org.apache.flink.kubernetes.kubeclient.decorators.KubernetesStepDecorator;
import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesTaskManagerParameters;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;

import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;

/**
* Utility class for constructing the TaskManager Pod on the JobManager.
*/
public class KubernetesTaskManagerFactory {

public static KubernetesPod buildTaskManagerComponent(KubernetesTaskManagerParameters kubernetesTaskManagerParameters) {
FlinkPod flinkPod = new FlinkPod.Builder().build();

final KubernetesStepDecorator[] stepDecorators = new KubernetesStepDecorator[] {
new InitTaskManagerDecorator(kubernetesTaskManagerParameters),
new JavaCmdTaskManagerDecorator(kubernetesTaskManagerParameters),
new FlinkConfMountDecorator(kubernetesTaskManagerParameters)};

for (KubernetesStepDecorator stepDecorator: stepDecorators) {
flinkPod = stepDecorator.decorateFlinkPod(flinkPod);
}

final Pod resolvedPod = new PodBuilder(flinkPod.getPod())
.editOrNewSpec()
.addToContainers(flinkPod.getMainContainer())
.endSpec()
.build();

return new KubernetesPod(resolvedPod);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,13 @@
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
import org.apache.flink.util.FlinkRuntimeException;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.OwnerReference;
import io.fabric8.kubernetes.api.model.OwnerReferenceBuilder;
import io.fabric8.kubernetes.api.model.Quantity;
import io.fabric8.kubernetes.api.model.ResourceRequirements;
import io.fabric8.kubernetes.api.model.ResourceRequirementsBuilder;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -42,7 +46,9 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -170,6 +176,13 @@ public static String getRestServiceName(String clusterId) {
return clusterId + Constants.FLINK_REST_SERVICE_SUFFIX;
}

/**
* Generate name of the Deployment.
*/
public static String getDeploymentName(String clusterId) {
return clusterId;
}

/**
* Get task manager labels for the current Flink cluster. They could be used to watch the pods status.
*
Expand Down
1 change: 1 addition & 0 deletions flink-kubernetes/src/main/resources/META-INF/NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ This project bundles the following dependencies under the Apache Software Licens
- io.fabric8:kubernetes-client:4.5.2
- io.fabric8:kubernetes-model:4.5.2
- io.fabric8:kubernetes-model-common:4.5.2
- io.fabric8:zjsonpatch:0.3.0
- org.yaml:snakeyaml:1.23

This project bundles the following dependencies under the BSD License.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
Expand Down Expand Up @@ -110,8 +111,7 @@ public void setup() throws Exception {

final Deployment mockDeployment = new DeploymentBuilder()
.editOrNewMetadata()
.withName(CLUSTER_ID)
.withUid(CLUSTER_ID)
.withName(KubernetesUtils.getDeploymentName(CLUSTER_ID))
.endMetadata()
.build();
kubeClient.apps().deployments().inNamespace(NAMESPACE).create(mockDeployment);
Expand Down
Loading

0 comments on commit 10f5329

Please sign in to comment.