Skip to content

Commit

Permalink
[FLINK-10935][kubernetes] Add config options and cli options for kube…
Browse files Browse the repository at this point in the history
…rnetes.

All the basic and necessary config options has been added. Such as, image name, service account, taskmanager resources, etc.
  • Loading branch information
wangyang0918 authored and tisonkun committed Dec 5, 2019
1 parent 36cb5d4 commit 1274395
Show file tree
Hide file tree
Showing 6 changed files with 409 additions and 1 deletion.
96 changes: 96 additions & 0 deletions docs/_includes/generated/kubernetes_config_configuration.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 20%">Key</th>
<th class="text-left" style="width: 15%">Default</th>
<th class="text-left" style="width: 10%">Type</th>
<th class="text-left" style="width: 55%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><h5>kubernetes.cluster-id</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The cluster id that will be used for flink cluster. If it's not set, the client will generate a random UUID name.</td>
</tr>
<tr>
<td><h5>kubernetes.config.file</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The kubernetes config file will be used to create the client. The default is located at ~/.kube/config</td>
</tr>
<tr>
<td><h5>kubernetes.container-start-command-template</h5></td>
<td style="word-wrap: break-word;">"%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args% %redirects%"</td>
<td>String</td>
<td>Template for the kubernetes jobmanager and taskmanager container start invocation.</td>
</tr>
<tr>
<td><h5>kubernetes.container.image</h5></td>
<td style="word-wrap: break-word;">"flink:latest"</td>
<td>String</td>
<td>Image to use for Flink containers.</td>
</tr>
<tr>
<td><h5>kubernetes.container.image.pull-policy</h5></td>
<td style="word-wrap: break-word;">"IfNotPresent"</td>
<td>String</td>
<td>Kubernetes image pull policy. Valid values are Always, Never, and IfNotPresent. The default policy is IfNotPresent to avoid putting pressure to image repository.</td>
</tr>
<tr>
<td><h5>kubernetes.entry.path</h5></td>
<td style="word-wrap: break-word;">"/opt/flink/bin/kubernetes-entry.sh"</td>
<td>String</td>
<td>The entrypoint script of kubernetes in the image. It will be used as command for jobmanager and taskmanager container.</td>
</tr>
<tr>
<td><h5>kubernetes.flink.conf.dir</h5></td>
<td style="word-wrap: break-word;">"/opt/flink/conf"</td>
<td>String</td>
<td>The flink conf directory that will be mounted in pod. The flink-conf.yaml, log4j.properties, logback.xml in this path will be overwritten from config map.</td>
</tr>
<tr>
<td><h5>kubernetes.flink.log.dir</h5></td>
<td style="word-wrap: break-word;">"/opt/flink/log"</td>
<td>String</td>
<td>The directory that logs of jobmanager and taskmanager be saved in the pod.</td>
</tr>
<tr>
<td><h5>kubernetes.jobmanager.cpu</h5></td>
<td style="word-wrap: break-word;">1.0</td>
<td>Double</td>
<td>The number of cpu used by job manager</td>
</tr>
<tr>
<td><h5>kubernetes.jobmanager.service-account</h5></td>
<td style="word-wrap: break-word;">"default"</td>
<td>String</td>
<td>Service account that is used by jobmanager within kubernetes cluster. The job manager uses this service account when requesting taskmanager pods from the API server.</td>
</tr>
<tr>
<td><h5>kubernetes.namespace</h5></td>
<td style="word-wrap: break-word;">"default"</td>
<td>String</td>
<td>The namespace that will be used for running the jobmanager and taskmanager pods.</td>
</tr>
<tr>
<td><h5>kubernetes.rest-service.exposed.type</h5></td>
<td style="word-wrap: break-word;">"LoadBalancer"</td>
<td>String</td>
<td>It could be ClusterIP/NodePort/LoadBalancer(default). When set to ClusterIP, the rest servicewill not be created.</td>
</tr>
<tr>
<td><h5>kubernetes.service.create-timeout</h5></td>
<td style="word-wrap: break-word;">"1 min"</td>
<td>String</td>
<td>Timeout used for creating the service. The timeout value requires a time-unit specifier (ms/s/min/h/d).</td>
</tr>
<tr>
<td><h5>kubernetes.taskmanager.cpu</h5></td>
<td style="word-wrap: break-word;">-1.0</td>
<td>Double</td>
<td>The number of cpu used by task manager. By default, the cpu is set to the number of slots per TaskManager</td>
</tr>
</tbody>
</table>
5 changes: 5 additions & 0 deletions flink-docs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ under the License.
<artifactId>flink-mesos_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-kubernetes_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ public class ConfigOptionsDocGenerator {
new OptionsClassLocation("flink-metrics/flink-metrics-prometheus", "org.apache.flink.metrics.prometheus"),
new OptionsClassLocation("flink-state-backends/flink-statebackend-rocksdb", "org.apache.flink.contrib.streaming.state"),
new OptionsClassLocation("flink-table/flink-table-api-java", "org.apache.flink.table.api.config"),
new OptionsClassLocation("flink-python", "org.apache.flink.python")
new OptionsClassLocation("flink-python", "org.apache.flink.python"),
new OptionsClassLocation("flink-kubernetes", "org.apache.flink.kubernetes.configuration")
};

static final Set<String> EXCLUSIONS = new HashSet<>(Arrays.asList(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.kubernetes.cli;

import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;

import org.apache.commons.cli.Option;

/**
* Options for kubernetes cli and entrypoint.
*/
public class KubernetesCliOptions {

public static Option getOptionWithPrefix(Option option, String shortPrefix, String longPrefix) {
if (shortPrefix.isEmpty() && longPrefix.isEmpty()) {
return option;
}
return Option.builder(shortPrefix + option.getOpt())
.longOpt(option.getLongOpt() == null ? null : longPrefix + option.getLongOpt())
.required(option.isRequired())
.hasArg(option.hasArg())
.numberOfArgs(option.getArgs())
.argName(option.getArgName())
.desc(option.getDescription())
.valueSeparator()
.build();
}

public static final Option CLUSTER_ID_OPTION = Option.builder("id")
.longOpt("clusterId")
.required(false)
.hasArg(true)
.desc(KubernetesConfigOptions.CLUSTER_ID.description().toString())
.build();

public static final Option IMAGE_OPTION = Option.builder("i")
.longOpt("image")
.required(false)
.hasArg(true)
.argName("image-name")
.desc(KubernetesConfigOptions.CONTAINER_IMAGE.description().toString())
.build();

public static final Option JOB_MANAGER_MEMORY_OPTION = Option.builder("jm")
.longOpt("jobManagerMemory")
.required(false)
.hasArg(true)
.desc("Memory for JobManager Container with optional unit (default: MB)")
.build();

public static final Option TASK_MANAGER_MEMORY_OPTION = Option.builder("tm")
.longOpt("taskManagerMemory")
.required(false)
.hasArg(true)
.desc("Memory per TaskManager Container with optional unit (default: MB)")
.build();

public static final Option TASK_MANAGER_SLOTS_OPTION = Option.builder("s")
.longOpt("slots")
.required(false)
.hasArg(true)
.desc("Number of slots per TaskManager")
.build();

public static final Option DYNAMIC_PROPERTY_OPTION = Option.builder("D")
.argName("property=value")
.numberOfArgs(2)
.valueSeparator()
.desc("use value for given property")
.build();

public static final Option HELP_OPTION = Option.builder("h")
.longOpt("help")
.hasArg(false)
.desc("Help for Kubernetes session CLI.")
.build();

public static final Option JOB_CLASS_NAME_OPTION = Option.builder("jc")
.longOpt("job-classname")
.required(false)
.hasArg(true)
.argName("job class name")
.desc("Class name of the job to run.")
.build();

public static final Option JOB_ID_OPTION = Option.builder("jid")
.longOpt("job-id")
.required(false)
.hasArg(true)
.argName("job id")
.desc("Job ID of the job to run.")
.build();

/** This class is not meant to be instantiated. */
private KubernetesCliOptions() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.kubernetes.configuration;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.ConfigOption;

import static org.apache.flink.configuration.ConfigOptions.key;

/**
* This class holds configuration constants used by Flink's kubernetes runners.
*/
@PublicEvolving
public class KubernetesConfigOptions {

public static final ConfigOption<String> REST_SERVICE_EXPOSED_TYPE =
key("kubernetes.rest-service.exposed.type")
.stringType()
.defaultValue(ServiceExposedType.LoadBalancer.toString())
.withDescription("It could be ClusterIP/NodePort/LoadBalancer(default). When set to ClusterIP, the rest service" +
"will not be created.");

public static final ConfigOption<String> JOB_MANAGER_SERVICE_ACCOUNT =
key("kubernetes.jobmanager.service-account")
.stringType()
.defaultValue("default")
.withDescription("Service account that is used by jobmanager within kubernetes cluster. " +
"The job manager uses this service account when requesting taskmanager pods from the API server.");

public static final ConfigOption<Double> JOB_MANAGER_CPU =
key("kubernetes.jobmanager.cpu")
.doubleType()
.defaultValue(1.0)
.withDescription("The number of cpu used by job manager");

public static final ConfigOption<Double> TASK_MANAGER_CPU =
key("kubernetes.taskmanager.cpu")
.doubleType()
.defaultValue(-1.0)
.withDescription("The number of cpu used by task manager. By default, the cpu is set " +
"to the number of slots per TaskManager");

public static final ConfigOption<String> CONTAINER_IMAGE_PULL_POLICY =
key("kubernetes.container.image.pull-policy")
.stringType()
.defaultValue("IfNotPresent")
.withDescription("Kubernetes image pull policy. Valid values are Always, Never, and IfNotPresent. " +
"The default policy is IfNotPresent to avoid putting pressure to image repository.");

public static final ConfigOption<String> KUBE_CONFIG_FILE =
key("kubernetes.config.file")
.stringType()
.noDefaultValue()
.withDescription("The kubernetes config file will be used to create the client. The default " +
"is located at ~/.kube/config");

public static final ConfigOption<String> NAMESPACE =
key("kubernetes.namespace")
.stringType()
.defaultValue("default")
.withDescription("The namespace that will be used for running the jobmanager and taskmanager pods.");

public static final ConfigOption<String> CONTAINER_START_COMMAND_TEMPLATE =
key("kubernetes.container-start-command-template")
.stringType()
.defaultValue("%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args% %redirects%")
.withDescription("Template for the kubernetes jobmanager and taskmanager container start invocation.");

public static final ConfigOption<String> SERVICE_CREATE_TIMEOUT =
key("kubernetes.service.create-timeout")
.stringType()
.defaultValue("1 min")
.withDescription("Timeout used for creating the service. The timeout value requires a time-unit " +
"specifier (ms/s/min/h/d).");

// ---------------------------------------------------------------------------------
// The following config options could be overridden by KubernetesCliOptions.
// ---------------------------------------------------------------------------------

public static final ConfigOption<String> CLUSTER_ID =
key("kubernetes.cluster-id")
.stringType()
.noDefaultValue()
.withDescription("The cluster id that will be used for flink cluster. If it's not set, " +
"the client will generate a random UUID name.");

public static final ConfigOption<String> CONTAINER_IMAGE =
key("kubernetes.container.image")
.stringType()
.defaultValue("flink:latest")
.withDescription("Image to use for Flink containers.");

/**
* The following config options need to be set according to the image.
*/
public static final ConfigOption<String> KUBERNETES_ENTRY_PATH =
key("kubernetes.entry.path")
.stringType()
.defaultValue("/opt/flink/bin/kubernetes-entry.sh")
.withDescription("The entrypoint script of kubernetes in the image. It will be used as command for jobmanager " +
"and taskmanager container.");

public static final ConfigOption<String> FLINK_CONF_DIR =
key("kubernetes.flink.conf.dir")
.stringType()
.defaultValue("/opt/flink/conf")
.withDescription("The flink conf directory that will be mounted in pod. The flink-conf.yaml, log4j.properties, " +
"logback.xml in this path will be overwritten from config map.");

public static final ConfigOption<String> FLINK_LOG_DIR =
key("kubernetes.flink.log.dir")
.stringType()
.defaultValue("/opt/flink/log")
.withDescription("The directory that logs of jobmanager and taskmanager be saved in the pod.");

/**
* The flink rest service exposed type.
*/
public enum ServiceExposedType {
ClusterIP,
NodePort,
LoadBalancer
}

/** This class is not meant to be instantiated. */
private KubernetesConfigOptions() {}
}
Loading

0 comments on commit 1274395

Please sign in to comment.