Skip to content

Commit

Permalink
[FLINK-17707][k8s] Support configuring replicas of JobManager deploym…
Browse files Browse the repository at this point in the history
…ent when HA enabled

This closes apache#15286
  • Loading branch information
wangyang0918 authored and xintongsong committed Jun 7, 2021
1 parent 163690a commit 5fe3f5b
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,10 @@ For more details see the [official Kubernetes documentation](https://kubernetes.

For high availability on Kubernetes, you can use the [existing high availability services]({{< ref "docs/deployment/ha/overview" >}}).

Configure the value of <a href="{{< ref "docs/deployment/config" >}}#kubernetes-jobmanager-replicas">kubernetes.jobmanager.replicas</a> to greater than 1 to start standby JobManagers.
It will help to achieve faster recovery.
Notice that high availability should be enabled when starting standby JobManagers.

### Manual Resource Cleanup

Flink uses [Kubernetes OwnerReference's](https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/) to clean up all cluster components.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,10 @@ For more details see the [official Kubernetes documentation](https://kubernetes.

For high availability on Kubernetes, you can use the [existing high availability services]({{< ref "docs/deployment/ha/overview" >}}).

Configure the value of <a href="{{< ref "docs/deployment/config" >}}#kubernetes-jobmanager-replicas">kubernetes.jobmanager.replicas</a> to greater than 1 to start standby JobManagers.
It will help to achieve faster recovery.
Notice that high availability should be enabled when starting standby JobManagers.

### Manual Resource Cleanup

Flink uses [Kubernetes OwnerReference's](https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/) to clean up all cluster components.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@
<td>List&lt;Map&gt;</td>
<td>The user-specified <a href="https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/native_kubernetes.html#manual-resource-cleanup">Owner References</a> to be set to the JobManager Deployment. When all the owner resources are deleted, the JobManager Deployment will be deleted automatically, which also deletes all the resources created by this Flink cluster. The value should be formatted as a semicolon-separated list of owner references, where each owner reference is a comma-separated list of `key:value` pairs. E.g., apiVersion:v1,blockOwnerDeletion:true,controller:true,kind:FlinkApplication,name:flink-app-name,uid:flink-app-uid;apiVersion:v1,kind:Deployment,name:deploy-name,uid:deploy-uid</td>
</tr>
<tr>
<td><h5>kubernetes.jobmanager.replicas</h5></td>
<td style="word-wrap: break-word;">1</td>
<td>Integer</td>
<td>Specify how many JobManager pods will be started simultaneously. Configure the value to greater than 1 to start standby JobManagers. It will help to achieve faster recovery. Notice that high availability should be enabled when starting standby JobManagers.</td>
</tr>
<tr>
<td><h5>kubernetes.jobmanager.service-account</h5></td>
<td style="word-wrap: break-word;">"default"</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,16 @@ public class KubernetesConfigOptions {
+ "(e.g. start/stop TaskManager pods, update leader related ConfigMaps, etc.). "
+ "Increasing the pool size allows to run more IO operations concurrently.");

public static final ConfigOption<Integer> KUBERNETES_JOBMANAGER_REPLICAS =
key("kubernetes.jobmanager.replicas")
.intType()
.defaultValue(1)
.withDescription(
"Specify how many JobManager pods will be started simultaneously. "
+ "Configure the value to greater than 1 to start standby JobManagers. "
+ "It will help to achieve faster recovery. "
+ "Notice that high availability should be enabled when starting standby JobManagers.");

private static String getDefaultFlinkImage() {
// The default container image that ties to the exact needed versions of both Flink and
// Scala.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ private static Deployment createJobManagerDeployment(
.collect(Collectors.toList()))
.endMetadata()
.editOrNewSpec()
.withReplicas(1)
.withReplicas(kubernetesJobManagerParameters.getReplicas())
.editOrNewTemplate()
.withMetadata(resolvedPod.getMetadata())
.withSpec(resolvedPod.getSpec())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.configuration.RestOptions;
Expand Down Expand Up @@ -160,4 +161,20 @@ public KubernetesConfigOptions.ServiceExposedType getRestServiceExposedType() {
public boolean isInternalServiceEnabled() {
return !HighAvailabilityMode.isHighAvailabilityModeActivated(flinkConfig);
}

public int getReplicas() {
final int replicas =
flinkConfig.get(KubernetesConfigOptions.KUBERNETES_JOBMANAGER_REPLICAS);
if (replicas < 1) {
throw new IllegalConfigurationException(
String.format(
"'%s' should not be configured less than one.",
KubernetesConfigOptions.KUBERNETES_JOBMANAGER_REPLICAS.key()));
} else if (replicas > 1
&& !HighAvailabilityMode.isHighAvailabilityModeActivated(flinkConfig)) {
throw new IllegalConfigurationException(
"High availability should be enabled when starting standby JobManagers.");
}
return replicas;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
package org.apache.flink.kubernetes.kubeclient.factory;

import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.kubernetes.KubernetesTestUtils;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptionsInternal;
import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
import org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint;
import org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
import org.apache.flink.kubernetes.kubeclient.FlinkPod;
import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerSpecification;
import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerTestBase;
Expand Down Expand Up @@ -60,6 +62,7 @@
import static org.apache.flink.configuration.GlobalConfiguration.FLINK_CONF_FILENAME;
import static org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOG4J_NAME;
import static org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOGBACK_NAME;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
Expand Down Expand Up @@ -89,6 +92,8 @@ public class KubernetesJobManagerFactoryTest extends KubernetesJobManagerTestBas
"testapp",
"e3c9aa3f-cc42-4178-814a-64aa15c82373"));

private static final int JOBMANAGER_REPLICAS = 2;

private final FlinkPod flinkPod = new FlinkPod.Builder().build();

protected KubernetesJobManagerSpecification kubernetesJobManagerSpecification;
Expand Down Expand Up @@ -462,4 +467,19 @@ public void testEmptyHadoopConfDirectory() throws IOException {
.getHadoopConfConfigMapName(
CLUSTER_ID))));
}

@Test
public void testSetJobManagerDeploymentReplicas() throws Exception {
flinkConfig.set(
HighAvailabilityOptions.HA_MODE,
KubernetesHaServicesFactory.class.getCanonicalName());
flinkConfig.set(
KubernetesConfigOptions.KUBERNETES_JOBMANAGER_REPLICAS, JOBMANAGER_REPLICAS);
kubernetesJobManagerSpecification =
KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(
flinkPod, kubernetesJobManagerParameters);
assertThat(
kubernetesJobManagerSpecification.getDeployment().getSpec().getReplicas(),
is(JOBMANAGER_REPLICAS));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@

import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.kubernetes.KubernetesTestBase;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptionsInternal;
import org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.util.FlinkRuntimeException;

Expand Down Expand Up @@ -213,4 +216,25 @@ public void testPrioritizeBuiltInLabels() {
expectedLabels.put(Constants.LABEL_COMPONENT_KEY, Constants.LABEL_COMPONENT_JOB_MANAGER);
assertThat(kubernetesJobManagerParameters.getLabels(), is(equalTo(expectedLabels)));
}

@Test(expected = IllegalConfigurationException.class)
public void testGetReplicasWithTwoShouldFailWhenHAIsNotEnabled() {
flinkConfig.set(KubernetesConfigOptions.KUBERNETES_JOBMANAGER_REPLICAS, 2);
kubernetesJobManagerParameters.getReplicas();
}

@Test(expected = IllegalConfigurationException.class)
public void testGetReplicasWithInvalidValue() {
flinkConfig.set(KubernetesConfigOptions.KUBERNETES_JOBMANAGER_REPLICAS, 0);
kubernetesJobManagerParameters.getReplicas();
}

@Test
public void testGetReplicas() {
flinkConfig.set(
HighAvailabilityOptions.HA_MODE,
KubernetesHaServicesFactory.class.getCanonicalName());
flinkConfig.set(KubernetesConfigOptions.KUBERNETES_JOBMANAGER_REPLICAS, 2);
assertThat(kubernetesJobManagerParameters.getReplicas(), is(2));
}
}

0 comments on commit 5fe3f5b

Please sign in to comment.