Skip to content

Commit

Permalink
[FLINK-16493][k8s] Use enum type instead of string type for Kubernete…
Browse files Browse the repository at this point in the history
…sConfigOptions.REST_SERVICE_EXPOSED_TYPE

This closes apache#11346 .
  • Loading branch information
zhengcanbin committed Mar 12, 2020
1 parent d723d00 commit 663af45
Show file tree
Hide file tree
Showing 9 changed files with 28 additions and 25 deletions.
6 changes: 3 additions & 3 deletions docs/_includes/generated/kubernetes_config_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,9 @@
</tr>
<tr>
<td><h5>kubernetes.rest-service.exposed.type</h5></td>
<td style="word-wrap: break-word;">"LoadBalancer"</td>
<td>String</td>
<td>It could be ClusterIP/NodePort/LoadBalancer(default). When set to ClusterIP, the rest servicewill not be created.</td>
<td style="word-wrap: break-word;">LoadBalancer</td>
<td><p>Enum</p>Possible values: [ClusterIP, NodePort, LoadBalancer]</td>
<td>The type of the rest service (ClusterIP or NodePort or LoadBalancer). When set to ClusterIP, the rest service will not be created.</td>
</tr>
<tr>
<td><h5>kubernetes.service.create-timeout</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ public class KubernetesConfigOptions {
"for interacting with the cluster. This could be helpful if one has multiple contexts configured and " +
"wants to administrate different Flink clusters on different Kubernetes clusters/contexts.");

public static final ConfigOption<String> REST_SERVICE_EXPOSED_TYPE =
public static final ConfigOption<ServiceExposedType> REST_SERVICE_EXPOSED_TYPE =
key("kubernetes.rest-service.exposed.type")
.stringType()
.defaultValue(ServiceExposedType.LoadBalancer.toString())
.withDescription("It could be ClusterIP/NodePort/LoadBalancer(default). When set to ClusterIP, the rest service" +
"will not be created.");
.enumType(ServiceExposedType.class)
.defaultValue(ServiceExposedType.LoadBalancer)
.withDescription("The type of the rest service (ClusterIP or NodePort or LoadBalancer). " +
"When set to ClusterIP, the rest service will not be created.");

public static final ConfigOption<String> JOB_MANAGER_SERVICE_ACCOUNT =
key("kubernetes.jobmanager.service-account")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,11 @@ public void stopPod(String podName) {
@Nullable
public Endpoint getRestEndpoint(String clusterId) {
int restPort = this.flinkConfig.getInteger(RestOptions.PORT);
String serviceExposedType = flinkConfig.getString(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE);
final KubernetesConfigOptions.ServiceExposedType serviceExposedType =
flinkConfig.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE);

// Return the service.namespace directly when use ClusterIP.
if (serviceExposedType.equals(KubernetesConfigOptions.ServiceExposedType.ClusterIP.toString())) {
if (serviceExposedType == KubernetesConfigOptions.ServiceExposedType.ClusterIP) {
return new Endpoint(KubernetesUtils.getInternalServiceName(clusterId) + "." + nameSpace, restPort);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.kubernetes.kubeclient.decorators;

import org.apache.flink.configuration.RestOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters;
import org.apache.flink.kubernetes.utils.Constants;

Expand Down Expand Up @@ -55,7 +56,7 @@ public List<HasMetadata> buildAccompanyingKubernetesResources() throws IOExcepti
.withLabels(kubernetesJobManagerParameters.getCommonLabels())
.endMetadata()
.withNewSpec()
.withType(getServiceType())
.withType(getServiceType().name())
.withPorts(getServicePorts())
.withSelector(kubernetesJobManagerParameters.getLabels())
.endSpec()
Expand All @@ -64,7 +65,7 @@ public List<HasMetadata> buildAccompanyingKubernetesResources() throws IOExcepti
return Collections.singletonList(service);
}

protected abstract String getServiceType();
protected abstract KubernetesConfigOptions.ServiceExposedType getServiceType();

protected abstract String getServiceName();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,15 @@ public ExternalServiceDecorator(KubernetesJobManagerParameters kubernetesJobMana

@Override
public List<HasMetadata> buildAccompanyingKubernetesResources() throws IOException {
if (kubernetesJobManagerParameters.getRestServiceExposedType().equals(
KubernetesConfigOptions.ServiceExposedType.ClusterIP.name())) {
if (kubernetesJobManagerParameters.getRestServiceExposedType() == KubernetesConfigOptions.ServiceExposedType.ClusterIP) {
return Collections.emptyList();
}

return super.buildAccompanyingKubernetesResources();
}

@Override
protected String getServiceType() {
protected KubernetesConfigOptions.ServiceExposedType getServiceType() {
return kubernetesJobManagerParameters.getRestServiceExposedType();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ protected List<ServicePort> getServicePorts() {
}

@Override
protected String getServiceType() {
return KubernetesConfigOptions.ServiceExposedType.ClusterIP.name();
protected KubernetesConfigOptions.ServiceExposedType getServiceType() {
return KubernetesConfigOptions.ServiceExposedType.ClusterIP;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,7 @@ public String getEntrypointClass() {
return entrypointClass;
}

public String getRestServiceExposedType() {
final String exposedType = flinkConfig.getString(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE);
return KubernetesConfigOptions.ServiceExposedType.valueOf(exposedType).name();
public KubernetesConfigOptions.ServiceExposedType getRestServiceExposedType() {
return flinkConfig.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,14 @@ public void testBuildAccompanyingKubernetesResources() throws IOException {

@Test
public void testSetServiceExposedType() throws IOException {
this.flinkConfig.set(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE, "NodePort");
this.flinkConfig.set(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE,
KubernetesConfigOptions.ServiceExposedType.NodePort);
List<HasMetadata> resources = this.externalServiceDecorator.buildAccompanyingKubernetesResources();
assertEquals("NodePort", ((Service) resources.get(0)).getSpec().getType());
assertEquals(KubernetesConfigOptions.ServiceExposedType.NodePort.name(),
((Service) resources.get(0)).getSpec().getType());

this.flinkConfig.set(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE, "ClusterIP");
this.flinkConfig.set(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE,
KubernetesConfigOptions.ServiceExposedType.ClusterIP);
assertTrue(this.externalServiceDecorator.buildAccompanyingKubernetesResources().isEmpty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ public void testGetEntrypointMainClass() {
@Test
public void testGetRestServiceExposedType() {
flinkConfig.set(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE,
KubernetesConfigOptions.ServiceExposedType.NodePort.name());
assertEquals(KubernetesConfigOptions.ServiceExposedType.NodePort.name(),
KubernetesConfigOptions.ServiceExposedType.NodePort);
assertEquals(KubernetesConfigOptions.ServiceExposedType.NodePort,
kubernetesJobManagerParameters.getRestServiceExposedType());
}
}

0 comments on commit 663af45

Please sign in to comment.