From a26388c4d51cd793f65ee81465d8bb00c3394273 Mon Sep 17 00:00:00 2001 From: wangyang0918 Date: Sat, 27 Feb 2021 20:34:38 +0800 Subject: [PATCH] [FLINK-21128][k8s] Introduce dedicated scripts for native K8s integration --- .../kubernetes_config_configuration.html | 6 - .../src/main/flink-bin/bin/flink-console.sh | 14 +- .../kubernetes-bin/kubernetes-jobmanager.sh | 42 +++ .../kubernetes-bin/kubernetes-taskmanager.sh | 47 +++ .../KubernetesConfigOptions.java | 7 - .../decorators/CmdJobManagerDecorator.java | 61 ++++ .../decorators/CmdTaskManagerDecorator.java | 71 ++++ .../JavaCmdJobManagerDecorator.java | 108 ------ .../JavaCmdTaskManagerDecorator.java | 115 ------- .../factory/KubernetesJobManagerFactory.java | 4 +- .../factory/KubernetesTaskManagerFactory.java | 4 +- .../flink/kubernetes/utils/Constants.java | 6 +- .../kubernetes/utils/KubernetesUtils.java | 89 +---- .../KubernetesClusterDescriptorTest.java | 3 + .../Fabric8FlinkKubeClientTest.java | 3 + .../KubernetesTaskManagerTestBase.java | 2 +- .../CmdJobManagerDecoratorTest.java | 89 +++++ .../CmdTaskManagerDecoratorTest.java | 82 +++++ .../JavaCmdJobManagerDecoratorTest.java | 314 ------------------ .../JavaCmdTaskManagerDecoratorTest.java | 309 ----------------- ...ernetesFactoryWithPodTemplateTestBase.java | 3 + .../KubernetesJobManagerFactoryTest.java | 3 + 22 files changed, 428 insertions(+), 954 deletions(-) create mode 100644 flink-dist/src/main/flink-bin/kubernetes-bin/kubernetes-jobmanager.sh create mode 100755 flink-dist/src/main/flink-bin/kubernetes-bin/kubernetes-taskmanager.sh create mode 100644 flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/CmdJobManagerDecorator.java create mode 100644 flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/CmdTaskManagerDecorator.java delete mode 100644 flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/JavaCmdJobManagerDecorator.java delete mode 100644 flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/JavaCmdTaskManagerDecorator.java create mode 100644 flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/CmdJobManagerDecoratorTest.java create mode 100644 flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/CmdTaskManagerDecoratorTest.java delete mode 100644 flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/JavaCmdJobManagerDecoratorTest.java delete mode 100644 flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/JavaCmdTaskManagerDecoratorTest.java diff --git a/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html index b4e88a2df2c6f..9d639fad1db7c 100644 --- a/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html +++ b/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html @@ -26,12 +26,6 @@ String The kubernetes config file will be used to create the client. The default is located at ~/.kube/config - -
kubernetes.container-start-command-template
- "%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%" - String - Template for the kubernetes jobmanager and taskmanager container start invocation. -
kubernetes.container.image
The default value depends on the actually running version. In general it looks like "flink:<FLINK_VERSION>-scala_<SCALA_VERSION>" diff --git a/flink-dist/src/main/flink-bin/bin/flink-console.sh b/flink-dist/src/main/flink-bin/bin/flink-console.sh index 8b02682b208ad..6ebe2ac73367c 100755 --- a/flink-dist/src/main/flink-bin/bin/flink-console.sh +++ b/flink-dist/src/main/flink-bin/bin/flink-console.sh @@ -19,7 +19,7 @@ # Start a Flink service as a console application. Must be stopped with Ctrl-C # or with SIGTERM by kill or the controlling process. -USAGE="Usage: flink-console.sh (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob) [args]" +USAGE="Usage: flink-console.sh (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob|kubernetes-session|kubernetes-application|kubernetes-taskmanager) [args]" SERVICE=$1 ARGS=("${@:2}") # get remaining arguments as array @@ -50,6 +50,18 @@ case $SERVICE in CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint ;; + (kubernetes-session) + CLASS_TO_RUN=org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint + ;; + + (kubernetes-application) + CLASS_TO_RUN=org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint + ;; + + (kubernetes-taskmanager) + CLASS_TO_RUN=org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner + ;; + (*) echo "Unknown service '${SERVICE}'. $USAGE." exit 1 diff --git a/flink-dist/src/main/flink-bin/kubernetes-bin/kubernetes-jobmanager.sh b/flink-dist/src/main/flink-bin/kubernetes-bin/kubernetes-jobmanager.sh new file mode 100644 index 0000000000000..05131237d560f --- /dev/null +++ b/flink-dist/src/main/flink-bin/kubernetes-bin/kubernetes-jobmanager.sh @@ -0,0 +1,42 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Start a Flink JobManager for native Kubernetes. +# NOTE: This script is not meant to be started manually. It will be used by native Kubernetes integration. + +USAGE="Usage: kubernetes-jobmanager.sh kubernetes-session|kubernetes-application [args]" + +ENTRY_POINT_NAME=$1 + +ARGS=("${@:2}") + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +. "$bin"/config.sh + +# Add JobManager specific JVM options +export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_JM}" +parseJmArgsAndExportLogs "${ARGS[@]}" + +if [ ! -z "${DYNAMIC_PARAMETERS}" ]; then + ARGS=(${DYNAMIC_PARAMETERS[@]} "${ARGS[@]}") +fi + +exec "${FLINK_BIN_DIR}"/flink-console.sh ${ENTRY_POINT_NAME} "${ARGS[@]}" diff --git a/flink-dist/src/main/flink-bin/kubernetes-bin/kubernetes-taskmanager.sh b/flink-dist/src/main/flink-bin/kubernetes-bin/kubernetes-taskmanager.sh new file mode 100755 index 0000000000000..f3418ce64f0e7 --- /dev/null +++ b/flink-dist/src/main/flink-bin/kubernetes-bin/kubernetes-taskmanager.sh @@ -0,0 +1,47 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Start a Flink TaskManager for native Kubernetes. +# NOTE: This script is not meant to be started manually. It will be used by native Kubernetes integration. + +USAGE="Usage: kubernetes-taskmanager.sh [args]" + +ENTRYPOINT=kubernetes-taskmanager + +ARGS=("${@:1}") + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +. "$bin"/config.sh + +# if no other JVM options are set, set the GC to G1 +if [ -z "${FLINK_ENV_JAVA_OPTS}" ] && [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; then + export JVM_ARGS="$JVM_ARGS -XX:+UseG1GC" +fi + +# Add TaskManager specific JVM options +export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}" +parseTmArgsAndExportLogs "${ARGS[@]}" + +# Do not need to add ${DYNAMIC_PARAMETERS[@]} to ${ARGS[@]} since it is already done in native Kubernetes integration. + +ARGS=("--configDir" "${FLINK_CONF_DIR}" "${ARGS[@]}") + +exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${ARGS[@]}" diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java index 0be8388956456..6ed95b54ec9cb 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java @@ -160,13 +160,6 @@ public class KubernetesConfigOptions { .withDescription( "The namespace that will be used for running the jobmanager and taskmanager pods."); - public static final ConfigOption CONTAINER_START_COMMAND_TEMPLATE = - key("kubernetes.container-start-command-template") - .stringType() - .defaultValue("%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%") - .withDescription( - "Template for the kubernetes jobmanager and taskmanager container start invocation."); - public static final ConfigOption> JOB_MANAGER_LABELS = key("kubernetes.jobmanager.labels") .mapType() diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/CmdJobManagerDecorator.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/CmdJobManagerDecorator.java new file mode 100644 index 0000000000000..9abeb224fa794 --- /dev/null +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/CmdJobManagerDecorator.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.kubeclient.decorators; + +import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget; +import org.apache.flink.kubernetes.kubeclient.FlinkPod; +import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.kubernetes.utils.KubernetesUtils; + +import io.fabric8.kubernetes.api.model.Container; +import io.fabric8.kubernetes.api.model.ContainerBuilder; + +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Attach the command and args to the main container for running the JobManager. */ +public class CmdJobManagerDecorator extends AbstractKubernetesStepDecorator { + + private final KubernetesJobManagerParameters kubernetesJobManagerParameters; + + public CmdJobManagerDecorator(KubernetesJobManagerParameters kubernetesJobManagerParameters) { + this.kubernetesJobManagerParameters = checkNotNull(kubernetesJobManagerParameters); + } + + @Override + public FlinkPod decorateFlinkPod(FlinkPod flinkPod) { + final Container mainContainerWithStartCmd = + new ContainerBuilder(flinkPod.getMainContainer()) + .withCommand(kubernetesJobManagerParameters.getContainerEntrypoint()) + .withArgs(getJobManagerStartCommand()) + .build(); + + return new FlinkPod.Builder(flinkPod).withMainContainer(mainContainerWithStartCmd).build(); + } + + private List getJobManagerStartCommand() { + final KubernetesDeploymentTarget deploymentTarget = + KubernetesDeploymentTarget.fromConfig( + kubernetesJobManagerParameters.getFlinkConfiguration()); + return KubernetesUtils.getStartCommandWithBashWrapper( + Constants.KUBERNETES_JOB_MANAGER_SCRIPT_PATH + " " + deploymentTarget.getName()); + } +} diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/CmdTaskManagerDecorator.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/CmdTaskManagerDecorator.java new file mode 100644 index 0000000000000..a293f119a0165 --- /dev/null +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/CmdTaskManagerDecorator.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.kubeclient.decorators; + +import org.apache.flink.kubernetes.kubeclient.FlinkPod; +import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesTaskManagerParameters; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.kubernetes.utils.KubernetesUtils; +import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils; + +import io.fabric8.kubernetes.api.model.Container; +import io.fabric8.kubernetes.api.model.ContainerBuilder; + +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Attach the command and args to the main container for running the TaskManager. */ +public class CmdTaskManagerDecorator extends AbstractKubernetesStepDecorator { + + private final KubernetesTaskManagerParameters kubernetesTaskManagerParameters; + + public CmdTaskManagerDecorator( + KubernetesTaskManagerParameters kubernetesTaskManagerParameters) { + this.kubernetesTaskManagerParameters = checkNotNull(kubernetesTaskManagerParameters); + } + + @Override + public FlinkPod decorateFlinkPod(FlinkPod flinkPod) { + final Container mainContainerWithStartCmd = + new ContainerBuilder(flinkPod.getMainContainer()) + .withCommand(kubernetesTaskManagerParameters.getContainerEntrypoint()) + .withArgs(getTaskManagerStartCommand()) + .build(); + + return new FlinkPod.Builder(flinkPod).withMainContainer(mainContainerWithStartCmd).build(); + } + + private List getTaskManagerStartCommand() { + final String resourceArgs = + TaskExecutorProcessUtils.generateDynamicConfigsStr( + kubernetesTaskManagerParameters + .getContaineredTaskManagerParameters() + .getTaskExecutorProcessSpec()); + + final String dynamicProperties = kubernetesTaskManagerParameters.getDynamicProperties(); + + return KubernetesUtils.getStartCommandWithBashWrapper( + Constants.KUBERNETES_TASK_MANAGER_SCRIPT_PATH + + " " + + dynamicProperties + + " " + + resourceArgs); + } +} diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/JavaCmdJobManagerDecorator.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/JavaCmdJobManagerDecorator.java deleted file mode 100644 index ac71438eec9b8..0000000000000 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/JavaCmdJobManagerDecorator.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.kubernetes.kubeclient.decorators; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.JobManagerOptions; -import org.apache.flink.kubernetes.kubeclient.FlinkPod; -import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters; -import org.apache.flink.kubernetes.utils.KubernetesUtils; -import org.apache.flink.runtime.jobmanager.JobManagerProcessSpec; -import org.apache.flink.runtime.jobmanager.JobManagerProcessUtils; - -import io.fabric8.kubernetes.api.model.Container; -import io.fabric8.kubernetes.api.model.ContainerBuilder; - -import java.util.List; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** Attach the jvm command and args to the main container for running the JobManager code. */ -public class JavaCmdJobManagerDecorator extends AbstractKubernetesStepDecorator { - - private final KubernetesJobManagerParameters kubernetesJobManagerParameters; - - public JavaCmdJobManagerDecorator( - KubernetesJobManagerParameters kubernetesJobManagerParameters) { - this.kubernetesJobManagerParameters = checkNotNull(kubernetesJobManagerParameters); - } - - @Override - public FlinkPod decorateFlinkPod(FlinkPod flinkPod) { - final JobManagerProcessSpec processSpec = - JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap( - kubernetesJobManagerParameters.getFlinkConfiguration(), - JobManagerOptions.TOTAL_PROCESS_MEMORY); - final List startCommand = - getJobManagerStartCommand( - kubernetesJobManagerParameters.getFlinkConfiguration(), - processSpec, - kubernetesJobManagerParameters.getFlinkConfDirInPod(), - kubernetesJobManagerParameters.getFlinkLogDirInPod(), - kubernetesJobManagerParameters.hasLogback(), - kubernetesJobManagerParameters.hasLog4j(), - kubernetesJobManagerParameters.getEntrypointClass()); - - final Container mainContainerWithStartCmd = - new ContainerBuilder(flinkPod.getMainContainer()) - .withCommand(kubernetesJobManagerParameters.getContainerEntrypoint()) - .withArgs(startCommand) - .build(); - - return new FlinkPod.Builder(flinkPod).withMainContainer(mainContainerWithStartCmd).build(); - } - - /** - * Generates the shell command to start a jobmanager for kubernetes. - * - * @param flinkConfig The Flink configuration. - * @param jobManagerProcessSpec JobManager process memory spec. - * @param configDirectory The configuration directory for the flink-conf.yaml - * @param logDirectory The log directory. - * @param hasLogback Uses logback? - * @param hasLog4j Uses log4j? - * @param mainClass The main class to start with. - * @return A String containing the job manager startup command. - */ - private static List getJobManagerStartCommand( - Configuration flinkConfig, - JobManagerProcessSpec jobManagerProcessSpec, - String configDirectory, - String logDirectory, - boolean hasLogback, - boolean hasLog4j, - String mainClass) { - final String jvmMemOpts = - JobManagerProcessUtils.generateJvmParametersStr(jobManagerProcessSpec, flinkConfig); - final String dynamicParameters = - JobManagerProcessUtils.generateDynamicConfigsStr(jobManagerProcessSpec); - final String javaCommand = - KubernetesUtils.getCommonStartCommand( - flinkConfig, - KubernetesUtils.ClusterComponent.JOB_MANAGER, - jvmMemOpts, - configDirectory, - logDirectory, - hasLogback, - hasLog4j, - mainClass, - dynamicParameters); - return KubernetesUtils.getStartCommandWithBashWrapper(javaCommand); - } -} diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/JavaCmdTaskManagerDecorator.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/JavaCmdTaskManagerDecorator.java deleted file mode 100644 index b3edf647d765a..0000000000000 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/JavaCmdTaskManagerDecorator.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.kubernetes.kubeclient.decorators; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.kubernetes.kubeclient.FlinkPod; -import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesTaskManagerParameters; -import org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner; -import org.apache.flink.kubernetes.utils.KubernetesUtils; -import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; -import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec; -import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils; -import org.apache.flink.runtime.entrypoint.parser.CommandLineOptions; -import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils; - -import io.fabric8.kubernetes.api.model.Container; -import io.fabric8.kubernetes.api.model.ContainerBuilder; - -import java.util.List; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** Attach the jvm command and args to the main container for running the TaskManager code. */ -public class JavaCmdTaskManagerDecorator extends AbstractKubernetesStepDecorator { - - private final KubernetesTaskManagerParameters kubernetesTaskManagerParameters; - - public JavaCmdTaskManagerDecorator( - KubernetesTaskManagerParameters kubernetesTaskManagerParameters) { - this.kubernetesTaskManagerParameters = checkNotNull(kubernetesTaskManagerParameters); - } - - @Override - public FlinkPod decorateFlinkPod(FlinkPod flinkPod) { - final Container mainContainerWithStartCmd = - new ContainerBuilder(flinkPod.getMainContainer()) - .withCommand(kubernetesTaskManagerParameters.getContainerEntrypoint()) - .withArgs(getTaskManagerStartCommand()) - .build(); - - return new FlinkPod.Builder(flinkPod).withMainContainer(mainContainerWithStartCmd).build(); - } - - private List getTaskManagerStartCommand() { - final String confDirInPod = kubernetesTaskManagerParameters.getFlinkConfDirInPod(); - - final String logDirInPod = kubernetesTaskManagerParameters.getFlinkLogDirInPod(); - - final String mainClassArgs = - "--" - + CommandLineOptions.CONFIG_DIR_OPTION.getLongOpt() - + " " - + confDirInPod - + " " - + kubernetesTaskManagerParameters.getDynamicProperties(); - - final String javaCommand = - getTaskManagerStartCommand( - kubernetesTaskManagerParameters.getFlinkConfiguration(), - kubernetesTaskManagerParameters.getContaineredTaskManagerParameters(), - confDirInPod, - logDirInPod, - kubernetesTaskManagerParameters.hasLogback(), - kubernetesTaskManagerParameters.hasLog4j(), - KubernetesTaskExecutorRunner.class.getCanonicalName(), - mainClassArgs); - return KubernetesUtils.getStartCommandWithBashWrapper(javaCommand); - } - - private static String getTaskManagerStartCommand( - Configuration flinkConfig, - ContaineredTaskManagerParameters tmParams, - String configDirectory, - String logDirectory, - boolean hasLogback, - boolean hasLog4j, - String mainClass, - String mainArgs) { - final TaskExecutorProcessSpec taskExecutorProcessSpec = - tmParams.getTaskExecutorProcessSpec(); - final String jvmMemOpts = - ProcessMemoryUtils.generateJvmParametersStr(taskExecutorProcessSpec); - String args = TaskExecutorProcessUtils.generateDynamicConfigsStr(taskExecutorProcessSpec); - if (mainArgs != null) { - args += " " + mainArgs; - } - - return KubernetesUtils.getCommonStartCommand( - flinkConfig, - KubernetesUtils.ClusterComponent.TASK_MANAGER, - jvmMemOpts, - configDirectory, - logDirectory, - hasLogback, - hasLog4j, - mainClass, - args); - } -} diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactory.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactory.java index d353d3d3d2cc9..abdab96b7aa29 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactory.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactory.java @@ -20,13 +20,13 @@ import org.apache.flink.kubernetes.kubeclient.FlinkPod; import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerSpecification; +import org.apache.flink.kubernetes.kubeclient.decorators.CmdJobManagerDecorator; import org.apache.flink.kubernetes.kubeclient.decorators.EnvSecretsDecorator; import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator; import org.apache.flink.kubernetes.kubeclient.decorators.FlinkConfMountDecorator; import org.apache.flink.kubernetes.kubeclient.decorators.HadoopConfMountDecorator; import org.apache.flink.kubernetes.kubeclient.decorators.InitJobManagerDecorator; import org.apache.flink.kubernetes.kubeclient.decorators.InternalServiceDecorator; -import org.apache.flink.kubernetes.kubeclient.decorators.JavaCmdJobManagerDecorator; import org.apache.flink.kubernetes.kubeclient.decorators.KerberosMountDecorator; import org.apache.flink.kubernetes.kubeclient.decorators.KubernetesStepDecorator; import org.apache.flink.kubernetes.kubeclient.decorators.MountSecretsDecorator; @@ -67,7 +67,7 @@ public static KubernetesJobManagerSpecification buildKubernetesJobManagerSpecifi new InitJobManagerDecorator(kubernetesJobManagerParameters), new EnvSecretsDecorator(kubernetesJobManagerParameters), new MountSecretsDecorator(kubernetesJobManagerParameters), - new JavaCmdJobManagerDecorator(kubernetesJobManagerParameters), + new CmdJobManagerDecorator(kubernetesJobManagerParameters), new InternalServiceDecorator(kubernetesJobManagerParameters), new ExternalServiceDecorator(kubernetesJobManagerParameters), new HadoopConfMountDecorator(kubernetesJobManagerParameters), diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesTaskManagerFactory.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesTaskManagerFactory.java index 221dd9ce73adf..3a323a831a7fb 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesTaskManagerFactory.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesTaskManagerFactory.java @@ -19,11 +19,11 @@ package org.apache.flink.kubernetes.kubeclient.factory; import org.apache.flink.kubernetes.kubeclient.FlinkPod; +import org.apache.flink.kubernetes.kubeclient.decorators.CmdTaskManagerDecorator; import org.apache.flink.kubernetes.kubeclient.decorators.EnvSecretsDecorator; import org.apache.flink.kubernetes.kubeclient.decorators.FlinkConfMountDecorator; import org.apache.flink.kubernetes.kubeclient.decorators.HadoopConfMountDecorator; import org.apache.flink.kubernetes.kubeclient.decorators.InitTaskManagerDecorator; -import org.apache.flink.kubernetes.kubeclient.decorators.JavaCmdTaskManagerDecorator; import org.apache.flink.kubernetes.kubeclient.decorators.KerberosMountDecorator; import org.apache.flink.kubernetes.kubeclient.decorators.KubernetesStepDecorator; import org.apache.flink.kubernetes.kubeclient.decorators.MountSecretsDecorator; @@ -46,7 +46,7 @@ public static KubernetesPod buildTaskManagerKubernetesPod( new InitTaskManagerDecorator(kubernetesTaskManagerParameters), new EnvSecretsDecorator(kubernetesTaskManagerParameters), new MountSecretsDecorator(kubernetesTaskManagerParameters), - new JavaCmdTaskManagerDecorator(kubernetesTaskManagerParameters), + new CmdTaskManagerDecorator(kubernetesTaskManagerParameters), new HadoopConfMountDecorator(kubernetesTaskManagerParameters), new KerberosMountDecorator(kubernetesTaskManagerParameters), new FlinkConfMountDecorator(kubernetesTaskManagerParameters) diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java index 43674d93e446c..ce55f771fd95e 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java @@ -76,8 +76,6 @@ public class Constants { public static final String RESOURCE_UNIT_MB = "Mi"; - public static final String ENV_FLINK_CLASSPATH = "FLINK_CLASSPATH"; - public static final String ENV_FLINK_POD_IP_ADDRESS = "_POD_IP_ADDRESS"; public static final String POD_IP_FIELD_PATH = "status.podIP"; @@ -105,4 +103,8 @@ public class Constants { public static final String POD_TEMPLATE_DIR_IN_POD = "/opt/flink/pod-template"; public static final String POD_TEMPLATE_CONFIG_MAP_PREFIX = "pod-template-"; public static final String POD_TEMPLATE_VOLUME = "pod-template-volume"; + + // Kubernetes start scripts + public static final String KUBERNETES_JOB_MANAGER_SCRIPT_PATH = "kubernetes-jobmanager.sh"; + public static final String KUBERNETES_TASK_MANAGER_SCRIPT_PATH = "kubernetes-taskmanager.sh"; } diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java index 07056645cacc5..00b4d1acaf678 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java @@ -21,9 +21,7 @@ import org.apache.flink.client.program.PackagedProgramUtils; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.PipelineOptions; -import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.highavailability.KubernetesCheckpointStoreUtil; import org.apache.flink.kubernetes.highavailability.KubernetesJobGraphStoreUtil; import org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore; @@ -34,7 +32,6 @@ import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; import org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore; -import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.DefaultJobGraphStore; @@ -73,8 +70,6 @@ import static org.apache.flink.kubernetes.utils.Constants.CHECKPOINT_ID_KEY_PREFIX; import static org.apache.flink.kubernetes.utils.Constants.COMPLETED_CHECKPOINT_FILE_SUFFIX; -import static org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOG4J_NAME; -import static org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOGBACK_NAME; import static org.apache.flink.kubernetes.utils.Constants.JOB_GRAPH_STORE_KEY_PREFIX; import static org.apache.flink.kubernetes.utils.Constants.LEADER_ADDRESS_KEY; import static org.apache.flink.kubernetes.utils.Constants.LEADER_SESSION_ID_KEY; @@ -345,52 +340,8 @@ public static ResourceRequirements getResourceRequirements( return resourceRequirementsBuilder.build(); } - public static String getCommonStartCommand( - Configuration flinkConfig, - ClusterComponent mode, - String jvmMemOpts, - String configDirectory, - String logDirectory, - boolean hasLogback, - boolean hasLog4j, - String mainClass, - @Nullable String mainArgs) { - final Map startCommandValues = new HashMap<>(); - startCommandValues.put("java", "$JAVA_HOME/bin/java"); - startCommandValues.put("classpath", "-classpath " + "$" + Constants.ENV_FLINK_CLASSPATH); - - startCommandValues.put("jvmmem", jvmMemOpts); - - final String opts; - final String logFileName; - if (mode == ClusterComponent.JOB_MANAGER) { - opts = getJavaOpts(flinkConfig, CoreOptions.FLINK_JM_JVM_OPTIONS); - logFileName = "jobmanager"; - } else { - opts = getJavaOpts(flinkConfig, CoreOptions.FLINK_TM_JVM_OPTIONS); - logFileName = "taskmanager"; - } - startCommandValues.put("jvmopts", opts); - - startCommandValues.put( - "logging", - getLogging( - logDirectory + "/" + logFileName + ".log", - configDirectory, - hasLogback, - hasLog4j)); - - startCommandValues.put("class", mainClass); - - startCommandValues.put("args", mainArgs != null ? mainArgs : ""); - - final String commandTemplate = - flinkConfig.getString(KubernetesConfigOptions.CONTAINER_START_COMMAND_TEMPLATE); - return BootstrapTools.getStartCommand(commandTemplate, startCommandValues); - } - - public static List getStartCommandWithBashWrapper(String javaCommand) { - return Arrays.asList("bash", "-c", javaCommand); + public static List getStartCommandWithBashWrapper(String command) { + return Arrays.asList("bash", "-c", command); } public static List checkJarFileForApplicationMode(Configuration configuration) { @@ -513,42 +464,6 @@ public static String tryToGetPrettyPrintYaml(KubernetesResource kubernetesResour } } - private static String getJavaOpts( - Configuration flinkConfig, ConfigOption configOption) { - String baseJavaOpts = flinkConfig.getString(CoreOptions.FLINK_JVM_OPTIONS); - - if (flinkConfig.getString(configOption).length() > 0) { - return baseJavaOpts + " " + flinkConfig.getString(configOption); - } else { - return baseJavaOpts; - } - } - - private static String getLogging( - String logFile, String confDir, boolean hasLogback, boolean hasLog4j) { - StringBuilder logging = new StringBuilder(); - if (hasLogback || hasLog4j) { - logging.append("-Dlog.file=").append(logFile); - if (hasLogback) { - logging.append(" -Dlogback.configurationFile=file:") - .append(confDir) - .append("/") - .append(CONFIG_FILE_LOGBACK_NAME); - } - if (hasLog4j) { - logging.append(" -Dlog4j.configuration=file:") - .append(confDir) - .append("/") - .append(CONFIG_FILE_LOG4J_NAME) - .append(" -Dlog4j.configurationFile=file:") - .append(confDir) - .append("/") - .append(CONFIG_FILE_LOG4J_NAME); - } - } - return logging.toString(); - } - /** Cluster components. */ public enum ClusterComponent { JOB_MANAGER, diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClusterDescriptorTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClusterDescriptorTest.java index b1ac2f9e50ea7..0c8049dba4e24 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClusterDescriptorTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClusterDescriptorTest.java @@ -78,6 +78,7 @@ protected void onSetup() throws Exception { @Test public void testDeploySessionCluster() throws Exception { + flinkConfig.set(DeploymentOptions.TARGET, KubernetesDeploymentTarget.SESSION.getName()); final ClusterClient clusterClient = deploySessionCluster().getClusterClient(); checkClusterClient(clusterClient); checkUpdatedConfigAndResourceSetting(); @@ -86,6 +87,7 @@ public void testDeploySessionCluster() throws Exception { @Test public void testDeployHighAvailabilitySessionCluster() throws ClusterDeploymentException { + flinkConfig.set(DeploymentOptions.TARGET, KubernetesDeploymentTarget.SESSION.getName()); flinkConfig.setString( HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.toString()); final ClusterClient clusterClient = deploySessionCluster().getClusterClient(); @@ -115,6 +117,7 @@ public void testDeployHighAvailabilitySessionCluster() throws ClusterDeploymentE @Test public void testKillCluster() throws Exception { + flinkConfig.set(DeploymentOptions.TARGET, KubernetesDeploymentTarget.SESSION.getName()); deploySessionCluster(); assertEquals(2, kubeClient.services().list().getItems().size()); diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java index af8acae835fa7..ebfef893c8387 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java @@ -20,6 +20,7 @@ import org.apache.flink.client.deployment.ClusterSpecification; import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.RestOptions; import org.apache.flink.core.testutils.FlinkMatchers; @@ -27,6 +28,7 @@ import org.apache.flink.kubernetes.KubernetesTestUtils; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptionsInternal; +import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget; import org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint; import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator; import org.apache.flink.kubernetes.kubeclient.factory.KubernetesJobManagerFactory; @@ -101,6 +103,7 @@ public class Fabric8FlinkKubeClientTest extends KubernetesClientTestBase { protected void setupFlinkConfig() { super.setupFlinkConfig(); + flinkConfig.set(DeploymentOptions.TARGET, KubernetesDeploymentTarget.SESSION.getName()); flinkConfig.set( KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY, CONTAINER_IMAGE_PULL_POLICY); flinkConfig.set(KubernetesConfigOptionsInternal.ENTRY_POINT_CLASS, ENTRY_POINT_CLASS); diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/KubernetesTaskManagerTestBase.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/KubernetesTaskManagerTestBase.java index 0d8ea25d8bade..eb444d160dda4 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/KubernetesTaskManagerTestBase.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/KubernetesTaskManagerTestBase.java @@ -34,7 +34,7 @@ public class KubernetesTaskManagerTestBase extends KubernetesPodTestBase { protected static final int RPC_PORT = 12345; protected static final String POD_NAME = "taskmanager-pod-1"; - private static final String DYNAMIC_PROPERTIES = ""; + protected static final String DYNAMIC_PROPERTIES = ""; protected static final int TOTAL_PROCESS_MEMORY = 1184; protected static final double TASK_MANAGER_CPU = 2.0; diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/CmdJobManagerDecoratorTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/CmdJobManagerDecoratorTest.java new file mode 100644 index 0000000000000..3c605e45e0248 --- /dev/null +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/CmdJobManagerDecoratorTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.kubeclient.decorators; + +import org.apache.flink.configuration.DeploymentOptions; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget; +import org.apache.flink.kubernetes.kubeclient.FlinkPod; +import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerTestBase; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.kubernetes.utils.KubernetesUtils; + +import org.junit.Test; + +import java.util.List; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertThat; + +/** General tests for the {@link CmdJobManagerDecorator}. */ +public class CmdJobManagerDecoratorTest extends KubernetesJobManagerTestBase { + + private CmdJobManagerDecorator cmdJobManagerDecorator; + + @Override + protected void onSetup() throws Exception { + super.onSetup(); + + this.cmdJobManagerDecorator = new CmdJobManagerDecorator(kubernetesJobManagerParameters); + } + + @Test + public void testContainerIsDecorated() { + flinkConfig.set(DeploymentOptions.TARGET, KubernetesDeploymentTarget.SESSION.getName()); + final FlinkPod resultFlinkPod = cmdJobManagerDecorator.decorateFlinkPod(baseFlinkPod); + assertThat( + resultFlinkPod.getPodWithoutMainContainer(), + is(equalTo(baseFlinkPod.getPodWithoutMainContainer()))); + assertThat( + resultFlinkPod.getMainContainer(), not(equalTo(baseFlinkPod.getMainContainer()))); + } + + @Test + public void testSessionClusterCommandsAndArgs() { + testJobManagerCommandsAndArgs(KubernetesDeploymentTarget.SESSION.getName()); + } + + @Test + public void testApplicationClusterCommandsAndArgs() { + testJobManagerCommandsAndArgs(KubernetesDeploymentTarget.APPLICATION.getName()); + } + + @Test(expected = IllegalArgumentException.class) + public void testUnsupportedDeploymentTargetShouldFail() { + testJobManagerCommandsAndArgs("unsupported-deployment-target"); + } + + private void testJobManagerCommandsAndArgs(String target) { + flinkConfig.set(DeploymentOptions.TARGET, target); + final FlinkPod resultFlinkPod = cmdJobManagerDecorator.decorateFlinkPod(baseFlinkPod); + final String entryCommand = flinkConfig.get(KubernetesConfigOptions.KUBERNETES_ENTRY_PATH); + assertThat( + resultFlinkPod.getMainContainer().getCommand(), containsInAnyOrder(entryCommand)); + List flinkCommands = + KubernetesUtils.getStartCommandWithBashWrapper( + Constants.KUBERNETES_JOB_MANAGER_SCRIPT_PATH + " " + target); + assertThat(resultFlinkPod.getMainContainer().getArgs(), contains(flinkCommands.toArray())); + } +} diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/CmdTaskManagerDecoratorTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/CmdTaskManagerDecoratorTest.java new file mode 100644 index 0000000000000..fbbe17e8faea6 --- /dev/null +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/CmdTaskManagerDecoratorTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.kubeclient.decorators; + +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.kubeclient.FlinkPod; +import org.apache.flink.kubernetes.kubeclient.KubernetesTaskManagerTestBase; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.kubernetes.utils.KubernetesUtils; +import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils; + +import org.junit.Test; + +import java.util.List; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertThat; + +/** General tests for the{@link CmdTaskManagerDecorator}. */ +public class CmdTaskManagerDecoratorTest extends KubernetesTaskManagerTestBase { + + private String mainClassArgs; + + private CmdTaskManagerDecorator cmdTaskManagerDecorator; + + @Override + public void onSetup() throws Exception { + super.onSetup(); + + this.mainClassArgs = + TaskExecutorProcessUtils.generateDynamicConfigsStr(taskExecutorProcessSpec); + + this.cmdTaskManagerDecorator = + new CmdTaskManagerDecorator(this.kubernetesTaskManagerParameters); + } + + @Test + public void testContainerIsDecorated() { + final FlinkPod resultFlinkPod = cmdTaskManagerDecorator.decorateFlinkPod(this.baseFlinkPod); + assertThat( + resultFlinkPod.getPodWithoutMainContainer(), + is(equalTo(baseFlinkPod.getPodWithoutMainContainer()))); + assertThat( + resultFlinkPod.getMainContainer(), not(equalTo(baseFlinkPod.getMainContainer()))); + } + + @Test + public void testTaskManagerStartCommandsAndArgs() { + final FlinkPod resultFlinkPod = cmdTaskManagerDecorator.decorateFlinkPod(baseFlinkPod); + final String entryCommand = flinkConfig.get(KubernetesConfigOptions.KUBERNETES_ENTRY_PATH); + assertThat( + resultFlinkPod.getMainContainer().getCommand(), containsInAnyOrder(entryCommand)); + List flinkCommands = + KubernetesUtils.getStartCommandWithBashWrapper( + Constants.KUBERNETES_TASK_MANAGER_SCRIPT_PATH + + " " + + DYNAMIC_PROPERTIES + + " " + + mainClassArgs); + assertThat(resultFlinkPod.getMainContainer().getArgs(), contains(flinkCommands.toArray())); + } +} diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/JavaCmdJobManagerDecoratorTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/JavaCmdJobManagerDecoratorTest.java deleted file mode 100644 index 9e0971cd83c26..0000000000000 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/JavaCmdJobManagerDecoratorTest.java +++ /dev/null @@ -1,314 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.kubernetes.kubeclient.decorators; - -import org.apache.flink.configuration.CoreOptions; -import org.apache.flink.kubernetes.KubernetesTestUtils; -import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; -import org.apache.flink.kubernetes.configuration.KubernetesConfigOptionsInternal; -import org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint; -import org.apache.flink.kubernetes.kubeclient.FlinkPod; -import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerTestBase; -import org.apache.flink.kubernetes.utils.KubernetesUtils; -import org.apache.flink.runtime.jobmanager.JobManagerProcessSpec; - -import io.fabric8.kubernetes.api.model.Container; -import org.junit.Test; - -import java.io.IOException; -import java.util.Collections; -import java.util.List; - -import static org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOG4J_NAME; -import static org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOGBACK_NAME; -import static org.apache.flink.runtime.jobmanager.JobManagerProcessUtils.createDefaultJobManagerProcessSpec; -import static org.apache.flink.runtime.jobmanager.JobManagerProcessUtils.generateDynamicConfigsStr; -import static org.apache.flink.runtime.jobmanager.JobManagerProcessUtils.generateJvmParametersStr; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; - -/** General tests for the {@link JavaCmdJobManagerDecorator}. */ -public class JavaCmdJobManagerDecoratorTest extends KubernetesJobManagerTestBase { - - private static final String KUBERNETES_ENTRY_PATH = "/opt/bin/start.sh"; - private static final String FLINK_CONF_DIR_IN_POD = "/opt/flink/flink-conf-"; - private static final String FLINK_LOG_DIR_IN_POD = "/opt/flink/flink-log-"; - private static final String ENTRY_POINT_CLASS = - KubernetesSessionClusterEntrypoint.class.getCanonicalName(); - - private static final String java = "$JAVA_HOME/bin/java"; - private static final String classpath = "-classpath $FLINK_CLASSPATH"; - private static final String jvmOpts = "-Djvm"; - - // Logging variables - private static final String logback = - String.format( - "-Dlogback.configurationFile=file:%s/%s", - FLINK_CONF_DIR_IN_POD, CONFIG_FILE_LOGBACK_NAME); - private static final String log4j = - String.format( - "-Dlog4j.configuration=file:%s/%s -Dlog4j.configurationFile=file:%s/%s", - FLINK_CONF_DIR_IN_POD, - CONFIG_FILE_LOG4J_NAME, - FLINK_CONF_DIR_IN_POD, - CONFIG_FILE_LOG4J_NAME); - private static final String jmLogfile = - String.format("-Dlog.file=%s/jobmanager.log", FLINK_LOG_DIR_IN_POD); - - // Memory variables - private static final JobManagerProcessSpec JOB_MANAGER_PROCESS_SPEC = - createDefaultJobManagerProcessSpec(JOB_MANAGER_MEMORY); - - private final String jmJvmMem = generateJvmParametersStr(JOB_MANAGER_PROCESS_SPEC, flinkConfig); - private final String jmDynamicProperties = generateDynamicConfigsStr(JOB_MANAGER_PROCESS_SPEC); - - private JavaCmdJobManagerDecorator javaCmdJobManagerDecorator; - - @Override - protected void setupFlinkConfig() { - super.setupFlinkConfig(); - - flinkConfig.set(KubernetesConfigOptions.FLINK_CONF_DIR, FLINK_CONF_DIR_IN_POD); - flinkConfig.set(KubernetesConfigOptions.FLINK_LOG_DIR, FLINK_LOG_DIR_IN_POD); - flinkConfig.set(KubernetesConfigOptionsInternal.ENTRY_POINT_CLASS, ENTRY_POINT_CLASS); - flinkConfig.set(KubernetesConfigOptions.KUBERNETES_ENTRY_PATH, KUBERNETES_ENTRY_PATH); - } - - @Override - protected void onSetup() throws Exception { - super.onSetup(); - - this.javaCmdJobManagerDecorator = - new JavaCmdJobManagerDecorator(kubernetesJobManagerParameters); - } - - @Test - public void testWhetherContainerOrPodIsReplaced() { - final FlinkPod resultFlinkPod = javaCmdJobManagerDecorator.decorateFlinkPod(baseFlinkPod); - assertEquals( - baseFlinkPod.getPodWithoutMainContainer(), - resultFlinkPod.getPodWithoutMainContainer()); - assertNotEquals(baseFlinkPod.getMainContainer(), resultFlinkPod.getMainContainer()); - } - - @Test - public void testStartCommandWithoutLog4jAndLogback() { - final Container resultMainContainer = - javaCmdJobManagerDecorator.decorateFlinkPod(baseFlinkPod).getMainContainer(); - - assertEquals( - Collections.singletonList(KUBERNETES_ENTRY_PATH), resultMainContainer.getCommand()); - - final String expectedCommand = getJobManagerExpectedCommand("", ""); - final List expectedArgs = - KubernetesUtils.getStartCommandWithBashWrapper(expectedCommand); - - assertEquals(expectedArgs, resultMainContainer.getArgs()); - } - - @Test - public void testStartCommandWithLog4j() throws IOException { - KubernetesTestUtils.createTemporyFile("some data", flinkConfDir, CONFIG_FILE_LOG4J_NAME); - - final Container resultMainContainer = - javaCmdJobManagerDecorator.decorateFlinkPod(baseFlinkPod).getMainContainer(); - - assertEquals( - Collections.singletonList(KUBERNETES_ENTRY_PATH), resultMainContainer.getCommand()); - - final String expectedCommand = getJobManagerExpectedCommand("", log4j); - final List expectedArgs = - KubernetesUtils.getStartCommandWithBashWrapper(expectedCommand); - assertEquals(expectedArgs, resultMainContainer.getArgs()); - } - - @Test - public void testStartCommandWithLogback() throws IOException { - KubernetesTestUtils.createTemporyFile("some data", flinkConfDir, CONFIG_FILE_LOGBACK_NAME); - - final Container resultMainContainer = - javaCmdJobManagerDecorator.decorateFlinkPod(baseFlinkPod).getMainContainer(); - - assertEquals( - Collections.singletonList(KUBERNETES_ENTRY_PATH), resultMainContainer.getCommand()); - - final String expectedCommand = getJobManagerExpectedCommand("", logback); - final List expectedArgs = - KubernetesUtils.getStartCommandWithBashWrapper(expectedCommand); - assertEquals(expectedArgs, resultMainContainer.getArgs()); - } - - @Test - public void testStartCommandWithLog4jAndLogback() throws IOException { - KubernetesTestUtils.createTemporyFile("some data", flinkConfDir, CONFIG_FILE_LOG4J_NAME); - KubernetesTestUtils.createTemporyFile("some data", flinkConfDir, CONFIG_FILE_LOGBACK_NAME); - - final Container resultMainContainer = - javaCmdJobManagerDecorator.decorateFlinkPod(baseFlinkPod).getMainContainer(); - - assertEquals( - Collections.singletonList(KUBERNETES_ENTRY_PATH), resultMainContainer.getCommand()); - - final String expectedCommand = getJobManagerExpectedCommand("", logback + " " + log4j); - final List expectedArgs = - KubernetesUtils.getStartCommandWithBashWrapper(expectedCommand); - assertEquals(expectedArgs, resultMainContainer.getArgs()); - } - - @Test - public void testStartCommandWithLogAndJVMOpts() throws IOException { - KubernetesTestUtils.createTemporyFile("some data", flinkConfDir, CONFIG_FILE_LOG4J_NAME); - KubernetesTestUtils.createTemporyFile("some data", flinkConfDir, CONFIG_FILE_LOGBACK_NAME); - - flinkConfig.set(CoreOptions.FLINK_JVM_OPTIONS, jvmOpts); - final Container resultMainContainer = - javaCmdJobManagerDecorator.decorateFlinkPod(baseFlinkPod).getMainContainer(); - - assertEquals( - Collections.singletonList(KUBERNETES_ENTRY_PATH), resultMainContainer.getCommand()); - - final String expectedCommand = getJobManagerExpectedCommand(jvmOpts, logback + " " + log4j); - final List expectedArgs = - KubernetesUtils.getStartCommandWithBashWrapper(expectedCommand); - assertEquals(expectedArgs, resultMainContainer.getArgs()); - } - - @Test - public void testStartCommandWithLogAndJMOpts() throws IOException { - KubernetesTestUtils.createTemporyFile("some data", flinkConfDir, CONFIG_FILE_LOG4J_NAME); - KubernetesTestUtils.createTemporyFile("some data", flinkConfDir, CONFIG_FILE_LOGBACK_NAME); - - flinkConfig.set(CoreOptions.FLINK_JM_JVM_OPTIONS, jvmOpts); - final Container resultMainContainer = - javaCmdJobManagerDecorator.decorateFlinkPod(baseFlinkPod).getMainContainer(); - - assertEquals( - Collections.singletonList(KUBERNETES_ENTRY_PATH), resultMainContainer.getCommand()); - final String expectedCommand = getJobManagerExpectedCommand(jvmOpts, logback + " " + log4j); - final List expectedArgs = - KubernetesUtils.getStartCommandWithBashWrapper(expectedCommand); - assertEquals(expectedArgs, resultMainContainer.getArgs()); - } - - @Test - public void testContainerStartCommandTemplate1() throws IOException { - KubernetesTestUtils.createTemporyFile("some data", flinkConfDir, CONFIG_FILE_LOG4J_NAME); - KubernetesTestUtils.createTemporyFile("some data", flinkConfDir, CONFIG_FILE_LOGBACK_NAME); - - final String containerStartCommandTemplate = - "%java% 1 %classpath% 2 %jvmmem% %jvmopts% %logging% %class% %args%"; - this.flinkConfig.set( - KubernetesConfigOptions.CONTAINER_START_COMMAND_TEMPLATE, - containerStartCommandTemplate); - - final String jmJvmOpts = "-DjmJvm"; - this.flinkConfig.setString(CoreOptions.FLINK_JVM_OPTIONS, jvmOpts); - this.flinkConfig.setString(CoreOptions.FLINK_JM_JVM_OPTIONS, jmJvmOpts); - - final Container resultMainContainer = - javaCmdJobManagerDecorator.decorateFlinkPod(baseFlinkPod).getMainContainer(); - - assertEquals( - Collections.singletonList(KUBERNETES_ENTRY_PATH), resultMainContainer.getCommand()); - - final String expectedCommand = - java - + " 1 " - + classpath - + " 2 " - + jmJvmMem - + " " - + jvmOpts - + " " - + jmJvmOpts - + " " - + jmLogfile - + " " - + logback - + " " - + log4j - + " " - + ENTRY_POINT_CLASS - + " " - + jmDynamicProperties; - - final List expectedArgs = - KubernetesUtils.getStartCommandWithBashWrapper(expectedCommand); - assertEquals(expectedArgs, resultMainContainer.getArgs()); - } - - @Test - public void testContainerStartCommandTemplate2() throws IOException { - KubernetesTestUtils.createTemporyFile("some data", flinkConfDir, CONFIG_FILE_LOG4J_NAME); - KubernetesTestUtils.createTemporyFile("some data", flinkConfDir, CONFIG_FILE_LOGBACK_NAME); - - final String containerStartCommandTemplate = - "%java% %jvmmem% %logging% %jvmopts% %class% %args%"; - this.flinkConfig.set( - KubernetesConfigOptions.CONTAINER_START_COMMAND_TEMPLATE, - containerStartCommandTemplate); - - final String jmJvmOpts = "-DjmJvm"; - this.flinkConfig.setString(CoreOptions.FLINK_JVM_OPTIONS, jvmOpts); - this.flinkConfig.setString(CoreOptions.FLINK_JM_JVM_OPTIONS, jmJvmOpts); - - final Container resultMainContainer = - javaCmdJobManagerDecorator.decorateFlinkPod(baseFlinkPod).getMainContainer(); - - assertEquals( - Collections.singletonList(KUBERNETES_ENTRY_PATH), resultMainContainer.getCommand()); - - final String expectedCommand = - java - + " " - + jmJvmMem - + " " - + jmLogfile - + " " - + logback - + " " - + log4j - + " " - + jvmOpts - + " " - + jmJvmOpts - + " " - + ENTRY_POINT_CLASS - + " " - + jmDynamicProperties; - - final List expectedArgs = - KubernetesUtils.getStartCommandWithBashWrapper(expectedCommand); - assertEquals(expectedArgs, resultMainContainer.getArgs()); - } - - private String getJobManagerExpectedCommand(String jvmAllOpts, String logging) { - return java - + " " - + classpath - + " " - + jmJvmMem - + (jvmAllOpts.isEmpty() ? "" : " " + jvmAllOpts) - + (logging.isEmpty() ? "" : " " + jmLogfile + " " + logging) - + " " - + ENTRY_POINT_CLASS - + " " - + jmDynamicProperties; - } -} diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/JavaCmdTaskManagerDecoratorTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/JavaCmdTaskManagerDecoratorTest.java deleted file mode 100644 index bb9113786186d..0000000000000 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/JavaCmdTaskManagerDecoratorTest.java +++ /dev/null @@ -1,309 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.kubernetes.kubeclient.decorators; - -import org.apache.flink.configuration.CoreOptions; -import org.apache.flink.kubernetes.KubernetesTestUtils; -import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; -import org.apache.flink.kubernetes.kubeclient.FlinkPod; -import org.apache.flink.kubernetes.kubeclient.KubernetesTaskManagerTestBase; -import org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner; -import org.apache.flink.kubernetes.utils.KubernetesUtils; -import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils; - -import io.fabric8.kubernetes.api.model.Container; -import org.junit.Test; - -import java.io.IOException; -import java.util.Collections; -import java.util.List; - -import static org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOG4J_NAME; -import static org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOGBACK_NAME; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; - -/** General tests for the{@link JavaCmdTaskManagerDecorator}. */ -public class JavaCmdTaskManagerDecoratorTest extends KubernetesTaskManagerTestBase { - - private static final String KUBERNETES_ENTRY_PATH = "/opt/flink/bin/start.sh"; - private static final String FLINK_CONF_DIR_IN_POD = "/opt/flink/flink-conf-"; - private static final String FLINK_LOG_DIR_IN_POD = "/opt/flink/flink-log-"; - - private static final String java = "$JAVA_HOME/bin/java"; - private static final String classpath = "-classpath $FLINK_CLASSPATH"; - private static final String jvmOpts = "-Djvm"; - - private static final String tmJvmMem = - "-Xmx251658235 -Xms251658235 -XX:MaxDirectMemorySize=211392922 -XX:MaxMetaspaceSize=268435456"; - - private static final String mainClass = KubernetesTaskExecutorRunner.class.getCanonicalName(); - private String mainClassArgs; - - // Logging variables - private static final String logback = - String.format( - "-Dlogback.configurationFile=file:%s/%s", - FLINK_CONF_DIR_IN_POD, CONFIG_FILE_LOGBACK_NAME); - private static final String log4j = - String.format( - "-Dlog4j.configuration=file:%s/%s -Dlog4j.configurationFile=file:%s/%s", - FLINK_CONF_DIR_IN_POD, - CONFIG_FILE_LOG4J_NAME, - FLINK_CONF_DIR_IN_POD, - CONFIG_FILE_LOG4J_NAME); - private static final String tmLogfile = - String.format("-Dlog.file=%s/taskmanager.log", FLINK_LOG_DIR_IN_POD); - - private JavaCmdTaskManagerDecorator javaCmdTaskManagerDecorator; - - @Override - protected void setupFlinkConfig() { - super.setupFlinkConfig(); - - flinkConfig.setString(KubernetesConfigOptions.KUBERNETES_ENTRY_PATH, KUBERNETES_ENTRY_PATH); - flinkConfig.set(KubernetesConfigOptions.FLINK_CONF_DIR, FLINK_CONF_DIR_IN_POD); - flinkConfig.set(KubernetesConfigOptions.FLINK_LOG_DIR, FLINK_LOG_DIR_IN_POD); - } - - @Override - public void onSetup() throws Exception { - super.onSetup(); - - this.mainClassArgs = - String.format( - "%s --configDir %s", - TaskExecutorProcessUtils.generateDynamicConfigsStr(taskExecutorProcessSpec), - FLINK_CONF_DIR_IN_POD); - - this.javaCmdTaskManagerDecorator = - new JavaCmdTaskManagerDecorator(this.kubernetesTaskManagerParameters); - } - - @Test - public void testWhetherContainerOrPodIsUpdated() { - final FlinkPod resultFlinkPod = - javaCmdTaskManagerDecorator.decorateFlinkPod(this.baseFlinkPod); - assertEquals( - this.baseFlinkPod.getPodWithoutMainContainer(), - resultFlinkPod.getPodWithoutMainContainer()); - assertNotEquals(this.baseFlinkPod.getMainContainer(), resultFlinkPod.getMainContainer()); - } - - @Test - public void testStartCommandWithoutLog4jAndLogback() { - final Container resultMainContainer = - javaCmdTaskManagerDecorator.decorateFlinkPod(this.baseFlinkPod).getMainContainer(); - assertEquals( - Collections.singletonList(KUBERNETES_ENTRY_PATH), resultMainContainer.getCommand()); - - final String expectedCommand = getTaskManagerExpectedCommand("", ""); - final List expectedArgs = - KubernetesUtils.getStartCommandWithBashWrapper(expectedCommand); - assertEquals(expectedArgs, resultMainContainer.getArgs()); - } - - @Test - public void testStartCommandWithLog4j() throws IOException { - KubernetesTestUtils.createTemporyFile("some data", flinkConfDir, CONFIG_FILE_LOG4J_NAME); - - final Container resultMainContainer = - javaCmdTaskManagerDecorator.decorateFlinkPod(this.baseFlinkPod).getMainContainer(); - - assertEquals( - Collections.singletonList(KUBERNETES_ENTRY_PATH), resultMainContainer.getCommand()); - - final String expectedCommand = getTaskManagerExpectedCommand("", log4j); - final List expectedArgs = - KubernetesUtils.getStartCommandWithBashWrapper(expectedCommand); - assertEquals(expectedArgs, resultMainContainer.getArgs()); - } - - @Test - public void testStartCommandWithLogback() throws IOException { - KubernetesTestUtils.createTemporyFile("some data", flinkConfDir, CONFIG_FILE_LOGBACK_NAME); - - final Container resultMainContainer = - javaCmdTaskManagerDecorator.decorateFlinkPod(this.baseFlinkPod).getMainContainer(); - - assertEquals( - Collections.singletonList(KUBERNETES_ENTRY_PATH), resultMainContainer.getCommand()); - - final String expectedCommand = getTaskManagerExpectedCommand("", logback); - final List expectedArgs = - KubernetesUtils.getStartCommandWithBashWrapper(expectedCommand); - assertEquals(expectedArgs, resultMainContainer.getArgs()); - } - - @Test - public void testStartCommandWithLog4jAndLogback() throws IOException { - KubernetesTestUtils.createTemporyFile("some data", flinkConfDir, CONFIG_FILE_LOGBACK_NAME); - KubernetesTestUtils.createTemporyFile("some data", flinkConfDir, CONFIG_FILE_LOG4J_NAME); - - final Container resultMainContainer = - javaCmdTaskManagerDecorator.decorateFlinkPod(this.baseFlinkPod).getMainContainer(); - assertEquals( - Collections.singletonList(KUBERNETES_ENTRY_PATH), resultMainContainer.getCommand()); - - final String expectedCommand = getTaskManagerExpectedCommand("", logback + " " + log4j); - final List expectedArgs = - KubernetesUtils.getStartCommandWithBashWrapper(expectedCommand); - assertEquals(expectedArgs, resultMainContainer.getArgs()); - } - - @Test - public void testStartCommandWithLogAndJVMOpts() throws IOException { - KubernetesTestUtils.createTemporyFile("some data", flinkConfDir, CONFIG_FILE_LOG4J_NAME); - KubernetesTestUtils.createTemporyFile("some data", flinkConfDir, CONFIG_FILE_LOGBACK_NAME); - - flinkConfig.set(CoreOptions.FLINK_JVM_OPTIONS, jvmOpts); - final Container resultMainContainer = - javaCmdTaskManagerDecorator.decorateFlinkPod(baseFlinkPod).getMainContainer(); - - assertEquals( - Collections.singletonList(KUBERNETES_ENTRY_PATH), resultMainContainer.getCommand()); - - final String expectedCommand = - getTaskManagerExpectedCommand(jvmOpts, logback + " " + log4j); - final List expectedArgs = - KubernetesUtils.getStartCommandWithBashWrapper(expectedCommand); - assertEquals(expectedArgs, resultMainContainer.getArgs()); - } - - @Test - public void testStartCommandWithLogAndJMOpts() throws IOException { - KubernetesTestUtils.createTemporyFile("some data", flinkConfDir, CONFIG_FILE_LOG4J_NAME); - KubernetesTestUtils.createTemporyFile("some data", flinkConfDir, CONFIG_FILE_LOGBACK_NAME); - - flinkConfig.set(CoreOptions.FLINK_TM_JVM_OPTIONS, jvmOpts); - final Container resultMainContainer = - javaCmdTaskManagerDecorator.decorateFlinkPod(baseFlinkPod).getMainContainer(); - - assertEquals( - Collections.singletonList(KUBERNETES_ENTRY_PATH), resultMainContainer.getCommand()); - final String expectedCommand = - getTaskManagerExpectedCommand(jvmOpts, logback + " " + log4j); - final List expectedArgs = - KubernetesUtils.getStartCommandWithBashWrapper(expectedCommand); - assertEquals(expectedArgs, resultMainContainer.getArgs()); - } - - @Test - public void testContainerStartCommandTemplate1() throws IOException { - KubernetesTestUtils.createTemporyFile("some data", flinkConfDir, CONFIG_FILE_LOG4J_NAME); - KubernetesTestUtils.createTemporyFile("some data", flinkConfDir, CONFIG_FILE_LOGBACK_NAME); - - final String containerStartCommandTemplate = - "%java% 1 %classpath% 2 %jvmmem% %jvmopts% %logging% %class% %args%"; - this.flinkConfig.set( - KubernetesConfigOptions.CONTAINER_START_COMMAND_TEMPLATE, - containerStartCommandTemplate); - - final String tmJvmOpts = "-DjmJvm"; - this.flinkConfig.setString(CoreOptions.FLINK_JVM_OPTIONS, jvmOpts); - this.flinkConfig.setString(CoreOptions.FLINK_TM_JVM_OPTIONS, tmJvmOpts); - - final Container resultMainContainer = - javaCmdTaskManagerDecorator.decorateFlinkPod(baseFlinkPod).getMainContainer(); - assertEquals( - Collections.singletonList(KUBERNETES_ENTRY_PATH), resultMainContainer.getCommand()); - - final String expectedCommand = - java - + " 1 " - + classpath - + " 2 " - + tmJvmMem - + " " - + jvmOpts - + " " - + tmJvmOpts - + " " - + tmLogfile - + " " - + logback - + " " - + log4j - + " " - + mainClass - + " " - + mainClassArgs; - final List expectedArgs = - KubernetesUtils.getStartCommandWithBashWrapper(expectedCommand); - assertEquals(expectedArgs, resultMainContainer.getArgs()); - } - - @Test - public void testContainerStartCommandTemplate2() throws IOException { - KubernetesTestUtils.createTemporyFile("some data", flinkConfDir, CONFIG_FILE_LOG4J_NAME); - KubernetesTestUtils.createTemporyFile("some data", flinkConfDir, CONFIG_FILE_LOGBACK_NAME); - - final String containerStartCommandTemplate = - "%java% %jvmmem% %logging% %jvmopts% %class% %args%"; - this.flinkConfig.set( - KubernetesConfigOptions.CONTAINER_START_COMMAND_TEMPLATE, - containerStartCommandTemplate); - - final String tmJvmOpts = "-DjmJvm"; - this.flinkConfig.setString(CoreOptions.FLINK_JVM_OPTIONS, jvmOpts); - this.flinkConfig.setString(CoreOptions.FLINK_TM_JVM_OPTIONS, tmJvmOpts); - - final Container resultMainContainer = - javaCmdTaskManagerDecorator.decorateFlinkPod(baseFlinkPod).getMainContainer(); - - assertEquals( - Collections.singletonList(KUBERNETES_ENTRY_PATH), resultMainContainer.getCommand()); - - final String expectedCommand = - java - + " " - + tmJvmMem - + " " - + tmLogfile - + " " - + logback - + " " - + log4j - + " " - + jvmOpts - + " " - + tmJvmOpts - + " " - + mainClass - + " " - + mainClassArgs; - final List expectedArgs = - KubernetesUtils.getStartCommandWithBashWrapper(expectedCommand); - assertEquals(expectedArgs, resultMainContainer.getArgs()); - } - - private String getTaskManagerExpectedCommand(String jvmAllOpts, String logging) { - return java - + " " - + classpath - + " " - + tmJvmMem - + (jvmAllOpts.isEmpty() ? "" : " " + jvmAllOpts) - + (logging.isEmpty() ? "" : " " + tmLogfile + " " + logging) - + " " - + mainClass - + " " - + mainClassArgs; - } -} diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesFactoryWithPodTemplateTestBase.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesFactoryWithPodTemplateTestBase.java index fffb579afafad..efec92cfa0159 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesFactoryWithPodTemplateTestBase.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesFactoryWithPodTemplateTestBase.java @@ -19,12 +19,14 @@ package org.apache.flink.kubernetes.kubeclient.factory; import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.RestOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.kubernetes.KubernetesPodTemplateTestUtils; import org.apache.flink.kubernetes.KubernetesTestBase; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptionsInternal; +import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget; import org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint; import org.apache.flink.kubernetes.kubeclient.FlinkPod; import org.apache.flink.kubernetes.utils.Constants; @@ -61,6 +63,7 @@ public abstract class KubernetesFactoryWithPodTemplateTestBase extends Kubernete @Override protected void setupFlinkConfig() { super.setupFlinkConfig(); + flinkConfig.set(DeploymentOptions.TARGET, KubernetesDeploymentTarget.SESSION.getName()); flinkConfig.set(KubernetesConfigOptionsInternal.ENTRY_POINT_CLASS, ENTRY_POINT_CLASS); // Set fixed ports diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java index 25869c66e5ad2..6ad6200dd00f5 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java @@ -18,10 +18,12 @@ package org.apache.flink.kubernetes.kubeclient.factory; +import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.kubernetes.KubernetesTestUtils; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptionsInternal; +import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget; import org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint; import org.apache.flink.kubernetes.kubeclient.FlinkPod; import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerSpecification; @@ -95,6 +97,7 @@ public class KubernetesJobManagerFactoryTest extends KubernetesJobManagerTestBas protected void setupFlinkConfig() { super.setupFlinkConfig(); + flinkConfig.set(DeploymentOptions.TARGET, KubernetesDeploymentTarget.SESSION.getName()); flinkConfig.set(KubernetesConfigOptionsInternal.ENTRY_POINT_CLASS, ENTRY_POINT_CLASS); flinkConfig.set(KubernetesConfigOptions.JOB_MANAGER_SERVICE_ACCOUNT, SERVICE_ACCOUNT_NAME); flinkConfig.set(