Skip to content

Commit

Permalink
[FLINK-15632][kubernetes] Set JobManagerOptions#ADDRESS to the pod ip…
Browse files Browse the repository at this point in the history
… address when HA mode enabled

For non-HA cluster, JobManagerOptions#ADDRESS has be set to Kubernetes service name on client side. See KubernetesClusterDescriptor#deployClusterInternal. So the TaskManager will use service address to contact with JobManager.
For HA cluster, JobManagerOptions#ADDRESS will be set to the pod ip address. The TaskManager uses Zookeeper or other high-availability service to find the address of JobManager.
  • Loading branch information
wangyang0918 authored and zhuzhurk committed Jan 22, 2020
1 parent 003ceb9 commit be4b713
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.kubernetes.entrypoint;

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.kubernetes.KubernetesClusterDescriptor;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.util.Preconditions;

/**
* This class contains utility methods for the {@link KubernetesSessionClusterEntrypoint}.
*/
class KubernetesEntrypointUtils {

/**
* For non-HA cluster, {@link JobManagerOptions#ADDRESS} has be set to Kubernetes service name on client side. See
* {@link KubernetesClusterDescriptor#deployClusterInternal}. So the TaskManager will use service address to contact
* with JobManager.
* For HA cluster, {@link JobManagerOptions#ADDRESS} will be set to the pod ip address. The TaskManager use Zookeeper
* or other high-availability service to find the address of JobManager.
*
* @return Updated configuration
*/
static Configuration loadConfiguration() {
final String configDir = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
Preconditions.checkNotNull(
configDir,
"Flink configuration directory (%s) in environment should not be null!",
ConfigConstants.ENV_FLINK_CONF_DIR);

final Configuration configuration = GlobalConfiguration.loadConfiguration(configDir);

if (HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) {
final String ipAddress = System.getenv().get(Constants.ENV_FLINK_POD_IP_ADDRESS);
Preconditions.checkState(
ipAddress != null,
"JobManager ip address environment variable %s not set",
Constants.ENV_FLINK_POD_IP_ADDRESS);
configuration.setString(JobManagerOptions.ADDRESS, ipAddress);
configuration.setString(RestOptions.ADDRESS, ipAddress);
}

return configuration;
}

private KubernetesEntrypointUtils() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,14 @@

package org.apache.flink.kubernetes.entrypoint;

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
import org.apache.flink.runtime.entrypoint.SessionClusterEntrypoint;
import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.util.Preconditions;

/**
* Entry point for a Kubernetes session cluster.
Expand All @@ -51,14 +48,8 @@ public static void main(String[] args) {
SignalHandler.register(LOG);
JvmShutdownSafeguard.installAsShutdownHook(LOG);

final String configDir = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
Preconditions.checkNotNull(
configDir,
"Flink configuration directory (%s) in environment should not be null!",
ConfigConstants.ENV_FLINK_CONF_DIR);
final Configuration configuration = GlobalConfiguration.loadConfiguration(configDir);

ClusterEntrypoint entrypoint = new KubernetesSessionClusterEntrypoint(configuration);
final ClusterEntrypoint entrypoint = new KubernetesSessionClusterEntrypoint(
KubernetesEntrypointUtils.loadConfiguration());
ClusterEntrypoint.runClusterEntrypoint(entrypoint);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import io.fabric8.kubernetes.api.model.ContainerBuilder;
import io.fabric8.kubernetes.api.model.ContainerPortBuilder;
import io.fabric8.kubernetes.api.model.EnvVar;
import io.fabric8.kubernetes.api.model.EnvVarBuilder;
import io.fabric8.kubernetes.api.model.EnvVarSourceBuilder;
import io.fabric8.kubernetes.api.model.PodSpec;
import io.fabric8.kubernetes.api.model.PodSpecBuilder;
import io.fabric8.kubernetes.api.model.ResourceRequirements;
Expand All @@ -45,9 +47,13 @@

import java.io.File;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.flink.kubernetes.utils.Constants.API_VERSION;
import static org.apache.flink.kubernetes.utils.Constants.ENV_FLINK_POD_IP_ADDRESS;
import static org.apache.flink.kubernetes.utils.Constants.POD_IP_FIELD_PATH;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;

Expand Down Expand Up @@ -138,13 +144,21 @@ private Container createJobManagerContainer(
new ContainerPortBuilder().withContainerPort(flinkConfig.getInteger(RestOptions.PORT)).build(),
new ContainerPortBuilder().withContainerPort(flinkConfig.getInteger(JobManagerOptions.PORT)).build(),
new ContainerPortBuilder().withContainerPort(blobServerPort).build()))
.withEnv(
BootstrapTools.getEnvironmentVariables(ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX, flinkConfig)
.entrySet()
.stream()
.map(kv -> new EnvVar(kv.getKey(), kv.getValue(), null))
.collect(Collectors.toList()))
.withEnv(buildEnvForContainer(flinkConfig))
.withVolumeMounts(KubernetesUtils.getConfigMapVolumeMount(flinkConfDirInPod, hasLogback, hasLog4j))
.build();
}

private List<EnvVar> buildEnvForContainer(Configuration flinkConfig) {
List<EnvVar> envList =
BootstrapTools.getEnvironmentVariables(ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX, flinkConfig)
.entrySet()
.stream()
.map(kv -> new EnvVar(kv.getKey(), kv.getValue(), null)).collect(Collectors.toList());
envList.add(new EnvVarBuilder()
.withName(ENV_FLINK_POD_IP_ADDRESS)
.withValueFrom(new EnvVarSourceBuilder().withNewFieldRef(API_VERSION, POD_IP_FIELD_PATH).build())
.build());
return envList;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,8 @@ public class Constants {
public static final String ENV_FLINK_CLASSPATH = "FLINK_CLASSPATH";

public static final String ENV_FLINK_POD_NAME = "_FLINK_POD_NAME";

public static final String ENV_FLINK_POD_IP_ADDRESS = "_POD_IP_ADDRESS";

public static final String POD_IP_FIELD_PATH = "status.podIP";
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,19 @@
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;

import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.EnvVar;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.ServiceList;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.client.KubernetesClient;
import org.junit.Test;

import java.util.List;
import java.util.stream.Collectors;

import static org.apache.flink.kubernetes.utils.Constants.ENV_FLINK_POD_IP_ADDRESS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

/**
* Tests for the {@link KubernetesClusterDescriptor}.
Expand Down Expand Up @@ -93,6 +97,25 @@ public void testDeployHighAvailabilitySessionCluster() throws ClusterDeploymentE

final ClusterClient<String> clusterClient = deploySessionCluster();

final KubernetesClient kubeClient = server.getClient();
final Container jmContainer = kubeClient
.apps()
.deployments()
.list()
.getItems()
.get(0)
.getSpec()
.getTemplate()
.getSpec()
.getContainers()
.get(0);
assertTrue(
"Environment " + ENV_FLINK_POD_IP_ADDRESS + " should be set.",
jmContainer.getEnv().stream()
.map(EnvVar::getName)
.collect(Collectors.toList())
.contains(ENV_FLINK_POD_IP_ADDRESS));

clusterClient.close();
}

Expand Down

0 comments on commit be4b713

Please sign in to comment.