Skip to content

Commit

Permalink
[FLINK-16624][k8s] Support user-specified annotations for the rest Se…
Browse files Browse the repository at this point in the history
…rvice

This closes apache#12105 .
  • Loading branch information
zhengcanbin authored and tisonkun committed May 16, 2020
1 parent bb11993 commit da16f9e
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 1 deletion.
6 changes: 6 additions & 0 deletions docs/_includes/generated/kubernetes_config_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@
<td>String</td>
<td>The namespace that will be used for running the jobmanager and taskmanager pods.</td>
</tr>
<tr>
<td><h5>kubernetes.rest-service.annotations</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Map</td>
<td>The user-specified annotations that are set to the rest Service. The value should be in the form of a1:v1,a2:v2</td>
</tr>
<tr>
<td><h5>kubernetes.rest-service.exposed.type</h5></td>
<td style="word-wrap: break-word;">LoadBalancer</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,13 @@ public class KubernetesConfigOptions {
"in the form of key:key1,operator:Equal,value:value1,effect:NoSchedule;" +
"key:key2,operator:Exists,effect:NoExecute,tolerationSeconds:6000");

public static final ConfigOption<Map<String, String>> REST_SERVICE_ANNOTATIONS =
key("kubernetes.rest-service.annotations")
.mapType()
.noDefaultValue()
.withDescription("The user-specified annotations that are set to the rest Service. The value should be " +
"in the form of a1:v1,a2:v2");

/**
* The flink rest service exposed type.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public List<HasMetadata> buildAccompanyingKubernetesResources() throws IOExcepti
.withNewMetadata()
.withName(serviceName)
.withLabels(kubernetesJobManagerParameters.getCommonLabels())
.withAnnotations(kubernetesJobManagerParameters.getRestServiceAnnotations())
.endMetadata()
.withNewSpec()
.withType(kubernetesJobManagerParameters.getRestServiceExposedType().name())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ public List<Map<String, String>> getTolerations() {
return flinkConfig.getOptional(KubernetesConfigOptions.JOB_MANAGER_TOLERATIONS).orElse(Collections.emptyList());
}

public Map<String, String> getRestServiceAnnotations() {
return flinkConfig.getOptional(KubernetesConfigOptions.REST_SERVICE_ANNOTATIONS).orElse(Collections.emptyMap());
}

public String getJobManagerMainContainerName() {
return JOB_MANAGER_MAIN_CONTAINER_NAME;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,14 @@

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;

/**
* General tests for the {@link ExternalServiceDecorator}.
Expand All @@ -44,9 +48,18 @@ public class ExternalServiceDecoratorTest extends KubernetesJobManagerTestBase {

private ExternalServiceDecorator externalServiceDecorator;

private Map<String, String> customizedAnnotations = new HashMap<String, String>() {
{
put("annotation1", "annotation-value1");
put("annotation2", "annotation-value2");
}
};

@Before
public void setup() throws Exception {
super.setup();

this.flinkConfig.set(KubernetesConfigOptions.REST_SERVICE_ANNOTATIONS, customizedAnnotations);
this.externalServiceDecorator = new ExternalServiceDecorator(this.kubernetesJobManagerParameters);
}

Expand Down Expand Up @@ -77,6 +90,9 @@ public void testBuildAccompanyingKubernetesResources() throws IOException {
expectedLabels.put(Constants.LABEL_COMPONENT_KEY, Constants.LABEL_COMPONENT_JOB_MANAGER);
expectedLabels.putAll(userLabels);
assertEquals(expectedLabels, restService.getSpec().getSelector());

final Map<String, String> resultAnnotations = restService.getMetadata().getAnnotations();
assertThat(resultAnnotations, is(equalTo(customizedAnnotations)));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void testGetEmptyAnnotations() {
}

@Test
public void testGetAnnotations() {
public void testGetJobManagerAnnotations() {
final Map<String, String> expectedAnnotations = new HashMap<>();
expectedAnnotations.put("a1", "v1");
expectedAnnotations.put("a2", "v2");
Expand All @@ -90,6 +90,19 @@ public void testGetAnnotations() {
assertThat(resultAnnotations, is(equalTo(expectedAnnotations)));
}

@Test
public void testGetServiceAnnotations() {
final Map<String, String> expectedAnnotations = new HashMap<>();
expectedAnnotations.put("a1", "v1");
expectedAnnotations.put("a2", "v2");

flinkConfig.set(KubernetesConfigOptions.REST_SERVICE_ANNOTATIONS, expectedAnnotations);

final Map<String, String> resultAnnotations = kubernetesJobManagerParameters.getRestServiceAnnotations();

assertThat(resultAnnotations, is(equalTo(expectedAnnotations)));
}

@Test
public void testGetJobManagerMemoryMB() {
assertEquals(JOB_MANAGER_MEMORY, kubernetesJobManagerParameters.getJobManagerMemoryMB());
Expand Down

0 comments on commit da16f9e

Please sign in to comment.