Skip to content

Commit

Permalink
[FLINK-23587][k8s] Set the annotations on JobManager deployment when …
Browse files Browse the repository at this point in the history
…using native kubernetes

This closes apache#16697.
  • Loading branch information
KarlManong authored and wangyang0918 committed Aug 5, 2021
1 parent 0b4d17e commit aa21c11
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ private static Deployment createJobManagerDeployment(
.withName(
KubernetesUtils.getDeploymentName(
kubernetesJobManagerParameters.getClusterId()))
.withAnnotations(kubernetesJobManagerParameters.getAnnotations())
.withLabels(kubernetesJobManagerParameters.getLabels())
.withOwnerReferences(
kubernetesJobManagerParameters.getOwnerReference().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ protected void setupFlinkConfig() {
this.flinkConfig.setString(
ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX + k, v));
this.flinkConfig.set(KubernetesConfigOptions.JOB_MANAGER_LABELS, userLabels);
this.flinkConfig.set(KubernetesConfigOptions.JOB_MANAGER_ANNOTATIONS, userAnnotations);
this.flinkConfig.set(KubernetesConfigOptions.JOB_MANAGER_NODE_SELECTOR, nodeSelector);
this.flinkConfig.set(
JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(JOB_MANAGER_MEMORY));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ public class KubernetesPodTestBase extends KubernetesTestBase {
}
};

protected final Map<String, String> userAnnotations =
new HashMap<String, String>() {
{
put("annotation1", "value1");
put("annotation2", "value2");
}
};

protected final Map<String, String> nodeSelector =
new HashMap<String, String>() {
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import static org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOG4J_NAME;
import static org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOGBACK_NAME;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -139,6 +140,8 @@ public void testDeploymentMetadata() throws IOException {
expectedLabels.putAll(userLabels);
assertEquals(expectedLabels, resultDeployment.getMetadata().getLabels());

assertThat(resultDeployment.getMetadata().getAnnotations(), equalTo(userAnnotations));

assertThat(
resultDeployment.getMetadata().getOwnerReferences(),
Matchers.containsInAnyOrder(OWNER_REFERENCES.toArray()));
Expand All @@ -161,6 +164,10 @@ public void testDeploymentSpec() throws IOException {
assertEquals(expectedLabels, resultDeploymentSpec.getTemplate().getMetadata().getLabels());
assertEquals(expectedLabels, resultDeploymentSpec.getSelector().getMatchLabels());

assertThat(
resultDeploymentSpec.getTemplate().getMetadata().getAnnotations(),
equalTo(userAnnotations));

assertNotNull(resultDeploymentSpec.getTemplate().getSpec());
}

Expand Down

0 comments on commit aa21c11

Please sign in to comment.