Skip to content

Commit

Permalink
[FLINK-21128][k8s] Introduce dedicated scripts for native K8s integra…
Browse files Browse the repository at this point in the history
…tion
  • Loading branch information
wangyang0918 authored and zentol committed Mar 4, 2021
1 parent f73b7e7 commit a26388c
Show file tree
Hide file tree
Showing 22 changed files with 428 additions and 954 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,6 @@
<td>String</td>
<td>The kubernetes config file will be used to create the client. The default is located at ~/.kube/config</td>
</tr>
<tr>
<td><h5>kubernetes.container-start-command-template</h5></td>
<td style="word-wrap: break-word;">"%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%"</td>
<td>String</td>
<td>Template for the kubernetes jobmanager and taskmanager container start invocation.</td>
</tr>
<tr>
<td><h5>kubernetes.container.image</h5></td>
<td style="word-wrap: break-word;">The default value depends on the actually running version. In general it looks like "flink:&lt;FLINK_VERSION&gt;-scala_&lt;SCALA_VERSION&gt;"</td>
Expand Down
14 changes: 13 additions & 1 deletion flink-dist/src/main/flink-bin/bin/flink-console.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

# Start a Flink service as a console application. Must be stopped with Ctrl-C
# or with SIGTERM by kill or the controlling process.
USAGE="Usage: flink-console.sh (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob) [args]"
USAGE="Usage: flink-console.sh (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob|kubernetes-session|kubernetes-application|kubernetes-taskmanager) [args]"

SERVICE=$1
ARGS=("${@:2}") # get remaining arguments as array
Expand Down Expand Up @@ -50,6 +50,18 @@ case $SERVICE in
CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint
;;

(kubernetes-session)
CLASS_TO_RUN=org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint
;;

(kubernetes-application)
CLASS_TO_RUN=org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint
;;

(kubernetes-taskmanager)
CLASS_TO_RUN=org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner
;;

(*)
echo "Unknown service '${SERVICE}'. $USAGE."
exit 1
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#!/usr/bin/env bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http:https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

# Start a Flink JobManager for native Kubernetes.
# NOTE: This script is not meant to be started manually. It will be used by native Kubernetes integration.

USAGE="Usage: kubernetes-jobmanager.sh kubernetes-session|kubernetes-application [args]"

ENTRY_POINT_NAME=$1

ARGS=("${@:2}")

bin=`dirname "$0"`
bin=`cd "$bin"; pwd`

. "$bin"/config.sh

# Add JobManager specific JVM options
export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_JM}"
parseJmArgsAndExportLogs "${ARGS[@]}"

if [ ! -z "${DYNAMIC_PARAMETERS}" ]; then
ARGS=(${DYNAMIC_PARAMETERS[@]} "${ARGS[@]}")
fi

exec "${FLINK_BIN_DIR}"/flink-console.sh ${ENTRY_POINT_NAME} "${ARGS[@]}"
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#!/usr/bin/env bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http:https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

# Start a Flink TaskManager for native Kubernetes.
# NOTE: This script is not meant to be started manually. It will be used by native Kubernetes integration.

USAGE="Usage: kubernetes-taskmanager.sh [args]"

ENTRYPOINT=kubernetes-taskmanager

ARGS=("${@:1}")

bin=`dirname "$0"`
bin=`cd "$bin"; pwd`

. "$bin"/config.sh

# if no other JVM options are set, set the GC to G1
if [ -z "${FLINK_ENV_JAVA_OPTS}" ] && [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; then
export JVM_ARGS="$JVM_ARGS -XX:+UseG1GC"
fi

# Add TaskManager specific JVM options
export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}"
parseTmArgsAndExportLogs "${ARGS[@]}"

# Do not need to add ${DYNAMIC_PARAMETERS[@]} to ${ARGS[@]} since it is already done in native Kubernetes integration.

ARGS=("--configDir" "${FLINK_CONF_DIR}" "${ARGS[@]}")

exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${ARGS[@]}"
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,6 @@ public class KubernetesConfigOptions {
.withDescription(
"The namespace that will be used for running the jobmanager and taskmanager pods.");

public static final ConfigOption<String> CONTAINER_START_COMMAND_TEMPLATE =
key("kubernetes.container-start-command-template")
.stringType()
.defaultValue("%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%")
.withDescription(
"Template for the kubernetes jobmanager and taskmanager container start invocation.");

public static final ConfigOption<Map<String, String>> JOB_MANAGER_LABELS =
key("kubernetes.jobmanager.labels")
.mapType()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.kubernetes.kubeclient.decorators;

import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
import org.apache.flink.kubernetes.kubeclient.FlinkPod;
import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.kubernetes.utils.KubernetesUtils;

import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.ContainerBuilder;

import java.util.List;

import static org.apache.flink.util.Preconditions.checkNotNull;

/** Attach the command and args to the main container for running the JobManager. */
public class CmdJobManagerDecorator extends AbstractKubernetesStepDecorator {

private final KubernetesJobManagerParameters kubernetesJobManagerParameters;

public CmdJobManagerDecorator(KubernetesJobManagerParameters kubernetesJobManagerParameters) {
this.kubernetesJobManagerParameters = checkNotNull(kubernetesJobManagerParameters);
}

@Override
public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
final Container mainContainerWithStartCmd =
new ContainerBuilder(flinkPod.getMainContainer())
.withCommand(kubernetesJobManagerParameters.getContainerEntrypoint())
.withArgs(getJobManagerStartCommand())
.build();

return new FlinkPod.Builder(flinkPod).withMainContainer(mainContainerWithStartCmd).build();
}

private List<String> getJobManagerStartCommand() {
final KubernetesDeploymentTarget deploymentTarget =
KubernetesDeploymentTarget.fromConfig(
kubernetesJobManagerParameters.getFlinkConfiguration());
return KubernetesUtils.getStartCommandWithBashWrapper(
Constants.KUBERNETES_JOB_MANAGER_SCRIPT_PATH + " " + deploymentTarget.getName());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.kubernetes.kubeclient.decorators;

import org.apache.flink.kubernetes.kubeclient.FlinkPod;
import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesTaskManagerParameters;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;

import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.ContainerBuilder;

import java.util.List;

import static org.apache.flink.util.Preconditions.checkNotNull;

/** Attach the command and args to the main container for running the TaskManager. */
public class CmdTaskManagerDecorator extends AbstractKubernetesStepDecorator {

private final KubernetesTaskManagerParameters kubernetesTaskManagerParameters;

public CmdTaskManagerDecorator(
KubernetesTaskManagerParameters kubernetesTaskManagerParameters) {
this.kubernetesTaskManagerParameters = checkNotNull(kubernetesTaskManagerParameters);
}

@Override
public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
final Container mainContainerWithStartCmd =
new ContainerBuilder(flinkPod.getMainContainer())
.withCommand(kubernetesTaskManagerParameters.getContainerEntrypoint())
.withArgs(getTaskManagerStartCommand())
.build();

return new FlinkPod.Builder(flinkPod).withMainContainer(mainContainerWithStartCmd).build();
}

private List<String> getTaskManagerStartCommand() {
final String resourceArgs =
TaskExecutorProcessUtils.generateDynamicConfigsStr(
kubernetesTaskManagerParameters
.getContaineredTaskManagerParameters()
.getTaskExecutorProcessSpec());

final String dynamicProperties = kubernetesTaskManagerParameters.getDynamicProperties();

return KubernetesUtils.getStartCommandWithBashWrapper(
Constants.KUBERNETES_TASK_MANAGER_SCRIPT_PATH
+ " "
+ dynamicProperties
+ " "
+ resourceArgs);
}
}

This file was deleted.

Loading

0 comments on commit a26388c

Please sign in to comment.