From 453d8f957dbfea7e5baa69f7098298e4c22a9ea2 Mon Sep 17 00:00:00 2001 From: felixzheng Date: Thu, 21 May 2020 16:31:56 +0800 Subject: [PATCH 1/4] [FLINK-17230][k8s] Fix incorrect returned address of Endpoint for the external Service of ClusterIP type --- .../kubernetes/kubeclient/Fabric8FlinkKubeClient.java | 4 ++-- .../org/apache/flink/kubernetes/utils/KubernetesUtils.java | 7 +++++++ .../kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java | 4 ++++ 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java index 4ed7450b88e7b..95d9f178057c0 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java @@ -150,10 +150,10 @@ public Optional getRestEndpoint(String clusterId) { final KubernetesConfigOptions.ServiceExposedType serviceExposedType = KubernetesConfigOptions.ServiceExposedType.valueOf(service.getSpec().getType()); - // Return the service.namespace directly when use ClusterIP. + // Return the external service.namespace directly when using ClusterIP. if (serviceExposedType == KubernetesConfigOptions.ServiceExposedType.ClusterIP) { return Optional.of( - new Endpoint(KubernetesUtils.getInternalServiceName(clusterId) + "." + nameSpace, restPort)); + new Endpoint(KubernetesUtils.getNamespacedExternalServiceName(clusterId, nameSpace), restPort)); } return getRestEndPointFromService(service, restPort); diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java index 3abc0a22ae376..f5084efd7879e 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java @@ -105,6 +105,13 @@ public static String getRestServiceName(String clusterId) { return clusterId + Constants.FLINK_REST_SERVICE_SUFFIX; } + /** + * Generate namespaced name of the external Service. + */ + public static String getNamespacedExternalServiceName(String clusterId, String namespace) { + return getRestServiceName(clusterId) + "." + namespace; + } + /** * Generate name of the Deployment. */ diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java index bb7a0b612756c..b08cbb4c2d0e3 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java @@ -30,6 +30,7 @@ import org.apache.flink.kubernetes.kubeclient.factory.KubernetesJobManagerFactory; import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters; import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; +import org.apache.flink.kubernetes.utils.KubernetesUtils; import io.fabric8.kubernetes.api.model.ConfigMap; import io.fabric8.kubernetes.api.model.HasMetadata; @@ -219,6 +220,9 @@ public void testClusterIPService() { final Optional resultEndpoint = flinkKubeClient.getRestEndpoint(CLUSTER_ID); assertThat(resultEndpoint.isPresent(), is(true)); + assertThat( + resultEndpoint.get().getAddress(), + is(KubernetesUtils.getNamespacedExternalServiceName(CLUSTER_ID, NAMESPACE))); assertThat(resultEndpoint.get().getPort(), is(REST_PORT)); } From 5243ce453108f0773cf81831e3a6c01c29a37607 Mon Sep 17 00:00:00 2001 From: felixzheng Date: Thu, 21 May 2020 16:33:44 +0800 Subject: [PATCH 2/4] [hotfix][k8s] Fix code style --- .../kubeclient/Fabric8FlinkKubeClient.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java index 95d9f178057c0..c70f6148b6f36 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java @@ -62,7 +62,7 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient { private final KubernetesClient internalClient; private final String clusterId; - private final String nameSpace; + private final String namespace; private final ExecutorService kubeClientExecutorService; @@ -73,7 +73,7 @@ public Fabric8FlinkKubeClient( this.internalClient = checkNotNull(client); this.clusterId = checkNotNull(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID)); - this.nameSpace = flinkConfig.getString(KubernetesConfigOptions.NAMESPACE); + this.namespace = flinkConfig.getString(KubernetesConfigOptions.NAMESPACE); this.kubeClientExecutorService = asyncExecutorFactory.get(); } @@ -88,7 +88,7 @@ public void createJobManagerComponent(KubernetesJobManagerSpecification kubernet final Deployment createdDeployment = this.internalClient .apps() .deployments() - .inNamespace(this.nameSpace) + .inNamespace(this.namespace) .create(deployment); // Note that we should use the uid of the created Deployment for the OwnerReference. @@ -96,7 +96,7 @@ public void createJobManagerComponent(KubernetesJobManagerSpecification kubernet this.internalClient .resourceList(accompanyingResources) - .inNamespace(this.nameSpace) + .inNamespace(this.namespace) .createOrReplace(); } @@ -107,13 +107,13 @@ public CompletableFuture createTaskManagerPod(KubernetesPod kubernetesPod) final Deployment masterDeployment = this.internalClient .apps() .deployments() - .inNamespace(this.nameSpace) + .inNamespace(this.namespace) .withName(KubernetesUtils.getDeploymentName(clusterId)) .get(); if (masterDeployment == null) { throw new RuntimeException( - "Failed to find Deployment named " + clusterId + " in namespace " + this.nameSpace); + "Failed to find Deployment named " + clusterId + " in namespace " + this.namespace); } // Note that we should use the uid of the master Deployment for the OwnerReference. @@ -125,7 +125,7 @@ public CompletableFuture createTaskManagerPod(KubernetesPod kubernetesPod) this.internalClient .pods() - .inNamespace(this.nameSpace) + .inNamespace(this.namespace) .create(kubernetesPod.getInternalResource()); }, kubeClientExecutorService); @@ -153,7 +153,7 @@ public Optional getRestEndpoint(String clusterId) { // Return the external service.namespace directly when using ClusterIP. if (serviceExposedType == KubernetesConfigOptions.ServiceExposedType.ClusterIP) { return Optional.of( - new Endpoint(KubernetesUtils.getNamespacedExternalServiceName(clusterId, nameSpace), restPort)); + new Endpoint(KubernetesUtils.getNamespacedExternalServiceName(clusterId, namespace), restPort)); } return getRestEndPointFromService(service, restPort); @@ -178,7 +178,7 @@ public void stopAndCleanupCluster(String clusterId) { this.internalClient .apps() .deployments() - .inNamespace(this.nameSpace) + .inNamespace(this.namespace) .withName(KubernetesUtils.getDeploymentName(clusterId)) .cascading(true) .delete(); @@ -195,7 +195,7 @@ public Optional getRestService(String clusterId) { final Service service = this.internalClient .services() - .inNamespace(nameSpace) + .inNamespace(namespace) .withName(serviceName) .fromServer() .get(); From 36249fcfccb161948fd90b6846adda0d1e645faa Mon Sep 17 00:00:00 2001 From: felixzheng Date: Thu, 21 May 2020 16:44:21 +0800 Subject: [PATCH 3/4] [hotfix][k8s] Port KubernetesUtils.getInternalServiceName to InternalServiceDecorator --- .../decorators/InternalServiceDecorator.java | 22 ++++++++++++++++--- .../kubernetes/utils/KubernetesUtils.java | 7 ------ .../KubernetesClusterDescriptorTest.java | 7 +++--- .../InternalServiceDecoratorTest.java | 5 ++--- .../KubernetesJobManagerFactoryTest.java | 3 ++- 5 files changed, 27 insertions(+), 17 deletions(-) diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecorator.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecorator.java index 6c9377005f86e..141b1d57364e5 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecorator.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecorator.java @@ -21,7 +21,6 @@ import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters; import org.apache.flink.kubernetes.utils.Constants; -import org.apache.flink.kubernetes.utils.KubernetesUtils; import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.Service; @@ -53,7 +52,7 @@ public List buildAccompanyingKubernetesResources() throws IOExcepti return Collections.emptyList(); } - final String serviceName = KubernetesUtils.getInternalServiceName(kubernetesJobManagerParameters.getClusterId()); + final String serviceName = getInternalServiceName(kubernetesJobManagerParameters.getClusterId()); final Service headlessService = new ServiceBuilder() .withApiVersion(Constants.API_VERSION) @@ -77,10 +76,27 @@ public List buildAccompanyingKubernetesResources() throws IOExcepti // Set job manager address to namespaced service name final String namespace = kubernetesJobManagerParameters.getNamespace(); - kubernetesJobManagerParameters.getFlinkConfiguration().setString(JobManagerOptions.ADDRESS, serviceName + "." + namespace); + kubernetesJobManagerParameters.getFlinkConfiguration().setString( + JobManagerOptions.ADDRESS, + getNamespacedInternalServiceName(serviceName, namespace)); return Collections.singletonList(headlessService); } + + + /** + * Generate name of the internal Service. + */ + public static String getInternalServiceName(String clusterId) { + return clusterId; + } + + /** + * Generate namespaced name of the internal Service. + */ + public static String getNamespacedInternalServiceName(String clusterId, String namespace) { + return getInternalServiceName(clusterId) + "." + namespace; + } } diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java index f5084efd7879e..d0ef437a52f9f 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java @@ -91,13 +91,6 @@ public static Integer parsePort(Configuration flinkConfig, ConfigOption } } - /** - * Generate name of the internal Service. - */ - public static String getInternalServiceName(String clusterId) { - return clusterId; - } - /** * Generate name of the external Service. */ diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClusterDescriptorTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClusterDescriptorTest.java index 6fab70194d57e..64f9f62444d50 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClusterDescriptorTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClusterDescriptorTest.java @@ -30,8 +30,8 @@ import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget; +import org.apache.flink.kubernetes.kubeclient.decorators.InternalServiceDecorator; import org.apache.flink.kubernetes.utils.Constants; -import org.apache.flink.kubernetes.utils.KubernetesUtils; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import io.fabric8.kubernetes.api.model.Container; @@ -191,8 +191,9 @@ private void checkUpdatedConfigAndResourceSetting() { // Check updated flink config options assertEquals(String.valueOf(Constants.BLOB_SERVER_PORT), flinkConfig.getString(BlobServerOptions.PORT)); assertEquals(String.valueOf(Constants.TASK_MANAGER_RPC_PORT), flinkConfig.getString(TaskManagerOptions.RPC_PORT)); - assertEquals(KubernetesUtils.getInternalServiceName(CLUSTER_ID) + "." + - NAMESPACE, flinkConfig.getString(JobManagerOptions.ADDRESS)); + assertEquals( + InternalServiceDecorator.getNamespacedInternalServiceName(CLUSTER_ID, NAMESPACE), + flinkConfig.getString(JobManagerOptions.ADDRESS)); final Deployment jmDeployment = kubeClient .apps() diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecoratorTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecoratorTest.java index 1bafc133cc9e4..0b39e00f3727b 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecoratorTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecoratorTest.java @@ -22,7 +22,6 @@ import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerTestBase; import org.apache.flink.kubernetes.utils.Constants; -import org.apache.flink.kubernetes.utils.KubernetesUtils; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import io.fabric8.kubernetes.api.model.HasMetadata; @@ -59,14 +58,14 @@ public void testBuildAccompanyingKubernetesResources() throws IOException { assertEquals(1, resources.size()); assertEquals( - KubernetesUtils.getInternalServiceName(CLUSTER_ID) + "." + NAMESPACE, + InternalServiceDecorator.getNamespacedInternalServiceName(CLUSTER_ID, NAMESPACE), this.flinkConfig.getString(JobManagerOptions.ADDRESS)); final Service internalService = (Service) resources.get(0); assertEquals(Constants.API_VERSION, internalService.getApiVersion()); - assertEquals(KubernetesUtils.getInternalServiceName(CLUSTER_ID), internalService.getMetadata().getName()); + assertEquals(InternalServiceDecorator.getInternalServiceName(CLUSTER_ID), internalService.getMetadata().getName()); final Map expectedLabels = getCommonLabels(); assertEquals(expectedLabels, internalService.getMetadata().getLabels()); diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java index 96a3f73c8732a..fd9432af1fdfb 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java @@ -26,6 +26,7 @@ import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerTestBase; import org.apache.flink.kubernetes.kubeclient.decorators.FlinkConfMountDecorator; import org.apache.flink.kubernetes.kubeclient.decorators.HadoopConfMountDecorator; +import org.apache.flink.kubernetes.kubeclient.decorators.InternalServiceDecorator; import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters; import org.apache.flink.kubernetes.utils.Constants; import org.apache.flink.kubernetes.utils.KubernetesUtils; @@ -171,7 +172,7 @@ public void testServices() { final List internalServiceCandidates = resultServices .stream() - .filter(x -> x.getMetadata().getName().equals(KubernetesUtils.getInternalServiceName(CLUSTER_ID))) + .filter(x -> x.getMetadata().getName().equals(InternalServiceDecorator.getInternalServiceName(CLUSTER_ID))) .collect(Collectors.toList()); assertEquals(1, internalServiceCandidates.size()); From 689aafe09dd533f85d746915b404d126e051dd6b Mon Sep 17 00:00:00 2001 From: felixzheng Date: Thu, 21 May 2020 16:48:21 +0800 Subject: [PATCH 4/4] [hotfix][k8s] Port KubernetesUtils.getExternalServiceName to ExternalServiceDecorator --- .../kubeclient/Fabric8FlinkKubeClient.java | 7 ++++--- .../decorators/ExternalServiceDecorator.java | 17 +++++++++++++++-- .../flink/kubernetes/utils/KubernetesUtils.java | 14 -------------- .../kubernetes/KubernetesClientTestBase.java | 4 ++-- .../kubeclient/Fabric8FlinkKubeClientTest.java | 4 ++-- .../ExternalServiceDecoratorTest.java | 3 +-- .../KubernetesJobManagerFactoryTest.java | 3 ++- 7 files changed, 26 insertions(+), 26 deletions(-) diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java index c70f6148b6f36..397350c0b612f 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java @@ -20,6 +20,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator; import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPodsWatcher; import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService; @@ -153,7 +154,7 @@ public Optional getRestEndpoint(String clusterId) { // Return the external service.namespace directly when using ClusterIP. if (serviceExposedType == KubernetesConfigOptions.ServiceExposedType.ClusterIP) { return Optional.of( - new Endpoint(KubernetesUtils.getNamespacedExternalServiceName(clusterId, namespace), restPort)); + new Endpoint(ExternalServiceDecorator.getNamespacedExternalServiceName(clusterId, namespace), restPort)); } return getRestEndPointFromService(service, restPort); @@ -191,7 +192,7 @@ public void handleException(Exception e) { @Override public Optional getRestService(String clusterId) { - final String serviceName = KubernetesUtils.getRestServiceName(clusterId); + final String serviceName = ExternalServiceDecorator.getExternalServiceName(clusterId); final Service service = this.internalClient .services() @@ -246,7 +247,7 @@ private int getRestPortFromExternalService(Service externalService) { if (servicePortCandidates.isEmpty()) { throw new RuntimeException("Failed to find port \"" + Constants.REST_PORT_NAME + "\" in Service \"" + - KubernetesUtils.getRestServiceName(this.clusterId) + "\""); + ExternalServiceDecorator.getExternalServiceName(this.clusterId) + "\""); } final ServicePort externalServicePort = servicePortCandidates.get(0); diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/ExternalServiceDecorator.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/ExternalServiceDecorator.java index 6e02ef6d4f290..943b6cf247cef 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/ExternalServiceDecorator.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/ExternalServiceDecorator.java @@ -20,7 +20,6 @@ import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters; import org.apache.flink.kubernetes.utils.Constants; -import org.apache.flink.kubernetes.utils.KubernetesUtils; import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.Service; @@ -46,7 +45,7 @@ public ExternalServiceDecorator(KubernetesJobManagerParameters kubernetesJobMana @Override public List buildAccompanyingKubernetesResources() throws IOException { final String serviceName = - KubernetesUtils.getRestServiceName(kubernetesJobManagerParameters.getClusterId()); + getExternalServiceName(kubernetesJobManagerParameters.getClusterId()); final Service externalService = new ServiceBuilder() .withApiVersion(Constants.API_VERSION) @@ -68,4 +67,18 @@ public List buildAccompanyingKubernetesResources() throws IOExcepti return Collections.singletonList(externalService); } + + /** + * Generate name of the external Service. + */ + public static String getExternalServiceName(String clusterId) { + return clusterId + Constants.FLINK_REST_SERVICE_SUFFIX; + } + + /** + * Generate namespaced name of the external Service. + */ + public static String getNamespacedExternalServiceName(String clusterId, String namespace) { + return getExternalServiceName(clusterId) + "." + namespace; + } } diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java index d0ef437a52f9f..b2d301b0a4445 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java @@ -91,20 +91,6 @@ public static Integer parsePort(Configuration flinkConfig, ConfigOption } } - /** - * Generate name of the external Service. - */ - public static String getRestServiceName(String clusterId) { - return clusterId + Constants.FLINK_REST_SERVICE_SUFFIX; - } - - /** - * Generate namespaced name of the external Service. - */ - public static String getNamespacedExternalServiceName(String clusterId, String namespace) { - return getRestServiceName(clusterId) + "." + namespace; - } - /** * Generate name of the Deployment. */ diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java index 6459a88133d0d..abd1390748881 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java @@ -19,8 +19,8 @@ package org.apache.flink.kubernetes; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator; import org.apache.flink.kubernetes.utils.Constants; -import org.apache.flink.kubernetes.utils.KubernetesUtils; import io.fabric8.kubernetes.api.model.LoadBalancerIngress; import io.fabric8.kubernetes.api.model.LoadBalancerStatus; @@ -107,7 +107,7 @@ private Service buildExternalService( @Nullable ServiceStatus serviceStatus) { final ServiceBuilder serviceBuilder = new ServiceBuilder() .editOrNewMetadata() - .withName(KubernetesUtils.getRestServiceName(CLUSTER_ID)) + .withName(ExternalServiceDecorator.getExternalServiceName(CLUSTER_ID)) .endMetadata() .editOrNewSpec() .withType(serviceExposedType.name()) diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java index b08cbb4c2d0e3..006ef54bee8b6 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java @@ -27,10 +27,10 @@ import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptionsInternal; import org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint; +import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator; import org.apache.flink.kubernetes.kubeclient.factory.KubernetesJobManagerFactory; import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters; import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; -import org.apache.flink.kubernetes.utils.KubernetesUtils; import io.fabric8.kubernetes.api.model.ConfigMap; import io.fabric8.kubernetes.api.model.HasMetadata; @@ -222,7 +222,7 @@ public void testClusterIPService() { assertThat(resultEndpoint.isPresent(), is(true)); assertThat( resultEndpoint.get().getAddress(), - is(KubernetesUtils.getNamespacedExternalServiceName(CLUSTER_ID, NAMESPACE))); + is(ExternalServiceDecorator.getNamespacedExternalServiceName(CLUSTER_ID, NAMESPACE))); assertThat(resultEndpoint.get().getPort(), is(REST_PORT)); } diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/ExternalServiceDecoratorTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/ExternalServiceDecoratorTest.java index 66ed88e14608d..98ae2fb959f3f 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/ExternalServiceDecoratorTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/ExternalServiceDecoratorTest.java @@ -21,7 +21,6 @@ import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerTestBase; import org.apache.flink.kubernetes.utils.Constants; -import org.apache.flink.kubernetes.utils.KubernetesUtils; import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.Service; @@ -71,7 +70,7 @@ public void testBuildAccompanyingKubernetesResources() throws IOException { assertEquals(Constants.API_VERSION, restService.getApiVersion()); - assertEquals(KubernetesUtils.getRestServiceName(CLUSTER_ID), restService.getMetadata().getName()); + assertEquals(ExternalServiceDecorator.getExternalServiceName(CLUSTER_ID), restService.getMetadata().getName()); final Map expectedLabels = getCommonLabels(); assertEquals(expectedLabels, restService.getMetadata().getLabels()); diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java index fd9432af1fdfb..cf373149fe6b2 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java @@ -24,6 +24,7 @@ import org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint; import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerSpecification; import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerTestBase; +import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator; import org.apache.flink.kubernetes.kubeclient.decorators.FlinkConfMountDecorator; import org.apache.flink.kubernetes.kubeclient.decorators.HadoopConfMountDecorator; import org.apache.flink.kubernetes.kubeclient.decorators.InternalServiceDecorator; @@ -178,7 +179,7 @@ public void testServices() { final List restServiceCandidates = resultServices .stream() - .filter(x -> x.getMetadata().getName().equals(KubernetesUtils.getRestServiceName(CLUSTER_ID))) + .filter(x -> x.getMetadata().getName().equals(ExternalServiceDecorator.getExternalServiceName(CLUSTER_ID))) .collect(Collectors.toList()); assertEquals(1, restServiceCandidates.size());