diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java index 4ed7450b88e7b..397350c0b612f 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java @@ -20,6 +20,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator; import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPodsWatcher; import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService; @@ -62,7 +63,7 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient { private final KubernetesClient internalClient; private final String clusterId; - private final String nameSpace; + private final String namespace; private final ExecutorService kubeClientExecutorService; @@ -73,7 +74,7 @@ public Fabric8FlinkKubeClient( this.internalClient = checkNotNull(client); this.clusterId = checkNotNull(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID)); - this.nameSpace = flinkConfig.getString(KubernetesConfigOptions.NAMESPACE); + this.namespace = flinkConfig.getString(KubernetesConfigOptions.NAMESPACE); this.kubeClientExecutorService = asyncExecutorFactory.get(); } @@ -88,7 +89,7 @@ public void createJobManagerComponent(KubernetesJobManagerSpecification kubernet final Deployment createdDeployment = this.internalClient .apps() .deployments() - .inNamespace(this.nameSpace) + .inNamespace(this.namespace) .create(deployment); // Note that we should use the uid of the created Deployment for the OwnerReference. @@ -96,7 +97,7 @@ public void createJobManagerComponent(KubernetesJobManagerSpecification kubernet this.internalClient .resourceList(accompanyingResources) - .inNamespace(this.nameSpace) + .inNamespace(this.namespace) .createOrReplace(); } @@ -107,13 +108,13 @@ public CompletableFuture createTaskManagerPod(KubernetesPod kubernetesPod) final Deployment masterDeployment = this.internalClient .apps() .deployments() - .inNamespace(this.nameSpace) + .inNamespace(this.namespace) .withName(KubernetesUtils.getDeploymentName(clusterId)) .get(); if (masterDeployment == null) { throw new RuntimeException( - "Failed to find Deployment named " + clusterId + " in namespace " + this.nameSpace); + "Failed to find Deployment named " + clusterId + " in namespace " + this.namespace); } // Note that we should use the uid of the master Deployment for the OwnerReference. @@ -125,7 +126,7 @@ public CompletableFuture createTaskManagerPod(KubernetesPod kubernetesPod) this.internalClient .pods() - .inNamespace(this.nameSpace) + .inNamespace(this.namespace) .create(kubernetesPod.getInternalResource()); }, kubeClientExecutorService); @@ -150,10 +151,10 @@ public Optional getRestEndpoint(String clusterId) { final KubernetesConfigOptions.ServiceExposedType serviceExposedType = KubernetesConfigOptions.ServiceExposedType.valueOf(service.getSpec().getType()); - // Return the service.namespace directly when use ClusterIP. + // Return the external service.namespace directly when using ClusterIP. if (serviceExposedType == KubernetesConfigOptions.ServiceExposedType.ClusterIP) { return Optional.of( - new Endpoint(KubernetesUtils.getInternalServiceName(clusterId) + "." + nameSpace, restPort)); + new Endpoint(ExternalServiceDecorator.getNamespacedExternalServiceName(clusterId, namespace), restPort)); } return getRestEndPointFromService(service, restPort); @@ -178,7 +179,7 @@ public void stopAndCleanupCluster(String clusterId) { this.internalClient .apps() .deployments() - .inNamespace(this.nameSpace) + .inNamespace(this.namespace) .withName(KubernetesUtils.getDeploymentName(clusterId)) .cascading(true) .delete(); @@ -191,11 +192,11 @@ public void handleException(Exception e) { @Override public Optional getRestService(String clusterId) { - final String serviceName = KubernetesUtils.getRestServiceName(clusterId); + final String serviceName = ExternalServiceDecorator.getExternalServiceName(clusterId); final Service service = this.internalClient .services() - .inNamespace(nameSpace) + .inNamespace(namespace) .withName(serviceName) .fromServer() .get(); @@ -246,7 +247,7 @@ private int getRestPortFromExternalService(Service externalService) { if (servicePortCandidates.isEmpty()) { throw new RuntimeException("Failed to find port \"" + Constants.REST_PORT_NAME + "\" in Service \"" + - KubernetesUtils.getRestServiceName(this.clusterId) + "\""); + ExternalServiceDecorator.getExternalServiceName(this.clusterId) + "\""); } final ServicePort externalServicePort = servicePortCandidates.get(0); diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/ExternalServiceDecorator.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/ExternalServiceDecorator.java index 6e02ef6d4f290..943b6cf247cef 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/ExternalServiceDecorator.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/ExternalServiceDecorator.java @@ -20,7 +20,6 @@ import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters; import org.apache.flink.kubernetes.utils.Constants; -import org.apache.flink.kubernetes.utils.KubernetesUtils; import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.Service; @@ -46,7 +45,7 @@ public ExternalServiceDecorator(KubernetesJobManagerParameters kubernetesJobMana @Override public List buildAccompanyingKubernetesResources() throws IOException { final String serviceName = - KubernetesUtils.getRestServiceName(kubernetesJobManagerParameters.getClusterId()); + getExternalServiceName(kubernetesJobManagerParameters.getClusterId()); final Service externalService = new ServiceBuilder() .withApiVersion(Constants.API_VERSION) @@ -68,4 +67,18 @@ public List buildAccompanyingKubernetesResources() throws IOExcepti return Collections.singletonList(externalService); } + + /** + * Generate name of the external Service. + */ + public static String getExternalServiceName(String clusterId) { + return clusterId + Constants.FLINK_REST_SERVICE_SUFFIX; + } + + /** + * Generate namespaced name of the external Service. + */ + public static String getNamespacedExternalServiceName(String clusterId, String namespace) { + return getExternalServiceName(clusterId) + "." + namespace; + } } diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecorator.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecorator.java index 6c9377005f86e..141b1d57364e5 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecorator.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecorator.java @@ -21,7 +21,6 @@ import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters; import org.apache.flink.kubernetes.utils.Constants; -import org.apache.flink.kubernetes.utils.KubernetesUtils; import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.Service; @@ -53,7 +52,7 @@ public List buildAccompanyingKubernetesResources() throws IOExcepti return Collections.emptyList(); } - final String serviceName = KubernetesUtils.getInternalServiceName(kubernetesJobManagerParameters.getClusterId()); + final String serviceName = getInternalServiceName(kubernetesJobManagerParameters.getClusterId()); final Service headlessService = new ServiceBuilder() .withApiVersion(Constants.API_VERSION) @@ -77,10 +76,27 @@ public List buildAccompanyingKubernetesResources() throws IOExcepti // Set job manager address to namespaced service name final String namespace = kubernetesJobManagerParameters.getNamespace(); - kubernetesJobManagerParameters.getFlinkConfiguration().setString(JobManagerOptions.ADDRESS, serviceName + "." + namespace); + kubernetesJobManagerParameters.getFlinkConfiguration().setString( + JobManagerOptions.ADDRESS, + getNamespacedInternalServiceName(serviceName, namespace)); return Collections.singletonList(headlessService); } + + + /** + * Generate name of the internal Service. + */ + public static String getInternalServiceName(String clusterId) { + return clusterId; + } + + /** + * Generate namespaced name of the internal Service. + */ + public static String getNamespacedInternalServiceName(String clusterId, String namespace) { + return getInternalServiceName(clusterId) + "." + namespace; + } } diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java index 3abc0a22ae376..b2d301b0a4445 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java @@ -91,20 +91,6 @@ public static Integer parsePort(Configuration flinkConfig, ConfigOption } } - /** - * Generate name of the internal Service. - */ - public static String getInternalServiceName(String clusterId) { - return clusterId; - } - - /** - * Generate name of the external Service. - */ - public static String getRestServiceName(String clusterId) { - return clusterId + Constants.FLINK_REST_SERVICE_SUFFIX; - } - /** * Generate name of the Deployment. */ diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java index 6459a88133d0d..abd1390748881 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java @@ -19,8 +19,8 @@ package org.apache.flink.kubernetes; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator; import org.apache.flink.kubernetes.utils.Constants; -import org.apache.flink.kubernetes.utils.KubernetesUtils; import io.fabric8.kubernetes.api.model.LoadBalancerIngress; import io.fabric8.kubernetes.api.model.LoadBalancerStatus; @@ -107,7 +107,7 @@ private Service buildExternalService( @Nullable ServiceStatus serviceStatus) { final ServiceBuilder serviceBuilder = new ServiceBuilder() .editOrNewMetadata() - .withName(KubernetesUtils.getRestServiceName(CLUSTER_ID)) + .withName(ExternalServiceDecorator.getExternalServiceName(CLUSTER_ID)) .endMetadata() .editOrNewSpec() .withType(serviceExposedType.name()) diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClusterDescriptorTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClusterDescriptorTest.java index 6fab70194d57e..64f9f62444d50 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClusterDescriptorTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClusterDescriptorTest.java @@ -30,8 +30,8 @@ import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget; +import org.apache.flink.kubernetes.kubeclient.decorators.InternalServiceDecorator; import org.apache.flink.kubernetes.utils.Constants; -import org.apache.flink.kubernetes.utils.KubernetesUtils; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import io.fabric8.kubernetes.api.model.Container; @@ -191,8 +191,9 @@ private void checkUpdatedConfigAndResourceSetting() { // Check updated flink config options assertEquals(String.valueOf(Constants.BLOB_SERVER_PORT), flinkConfig.getString(BlobServerOptions.PORT)); assertEquals(String.valueOf(Constants.TASK_MANAGER_RPC_PORT), flinkConfig.getString(TaskManagerOptions.RPC_PORT)); - assertEquals(KubernetesUtils.getInternalServiceName(CLUSTER_ID) + "." + - NAMESPACE, flinkConfig.getString(JobManagerOptions.ADDRESS)); + assertEquals( + InternalServiceDecorator.getNamespacedInternalServiceName(CLUSTER_ID, NAMESPACE), + flinkConfig.getString(JobManagerOptions.ADDRESS)); final Deployment jmDeployment = kubeClient .apps() diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java index bb7a0b612756c..006ef54bee8b6 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java @@ -27,6 +27,7 @@ import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptionsInternal; import org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint; +import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator; import org.apache.flink.kubernetes.kubeclient.factory.KubernetesJobManagerFactory; import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters; import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; @@ -219,6 +220,9 @@ public void testClusterIPService() { final Optional resultEndpoint = flinkKubeClient.getRestEndpoint(CLUSTER_ID); assertThat(resultEndpoint.isPresent(), is(true)); + assertThat( + resultEndpoint.get().getAddress(), + is(ExternalServiceDecorator.getNamespacedExternalServiceName(CLUSTER_ID, NAMESPACE))); assertThat(resultEndpoint.get().getPort(), is(REST_PORT)); } diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/ExternalServiceDecoratorTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/ExternalServiceDecoratorTest.java index 66ed88e14608d..98ae2fb959f3f 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/ExternalServiceDecoratorTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/ExternalServiceDecoratorTest.java @@ -21,7 +21,6 @@ import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerTestBase; import org.apache.flink.kubernetes.utils.Constants; -import org.apache.flink.kubernetes.utils.KubernetesUtils; import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.Service; @@ -71,7 +70,7 @@ public void testBuildAccompanyingKubernetesResources() throws IOException { assertEquals(Constants.API_VERSION, restService.getApiVersion()); - assertEquals(KubernetesUtils.getRestServiceName(CLUSTER_ID), restService.getMetadata().getName()); + assertEquals(ExternalServiceDecorator.getExternalServiceName(CLUSTER_ID), restService.getMetadata().getName()); final Map expectedLabels = getCommonLabels(); assertEquals(expectedLabels, restService.getMetadata().getLabels()); diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecoratorTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecoratorTest.java index 1bafc133cc9e4..0b39e00f3727b 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecoratorTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecoratorTest.java @@ -22,7 +22,6 @@ import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerTestBase; import org.apache.flink.kubernetes.utils.Constants; -import org.apache.flink.kubernetes.utils.KubernetesUtils; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import io.fabric8.kubernetes.api.model.HasMetadata; @@ -59,14 +58,14 @@ public void testBuildAccompanyingKubernetesResources() throws IOException { assertEquals(1, resources.size()); assertEquals( - KubernetesUtils.getInternalServiceName(CLUSTER_ID) + "." + NAMESPACE, + InternalServiceDecorator.getNamespacedInternalServiceName(CLUSTER_ID, NAMESPACE), this.flinkConfig.getString(JobManagerOptions.ADDRESS)); final Service internalService = (Service) resources.get(0); assertEquals(Constants.API_VERSION, internalService.getApiVersion()); - assertEquals(KubernetesUtils.getInternalServiceName(CLUSTER_ID), internalService.getMetadata().getName()); + assertEquals(InternalServiceDecorator.getInternalServiceName(CLUSTER_ID), internalService.getMetadata().getName()); final Map expectedLabels = getCommonLabels(); assertEquals(expectedLabels, internalService.getMetadata().getLabels()); diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java index 96a3f73c8732a..cf373149fe6b2 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java @@ -24,8 +24,10 @@ import org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint; import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerSpecification; import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerTestBase; +import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator; import org.apache.flink.kubernetes.kubeclient.decorators.FlinkConfMountDecorator; import org.apache.flink.kubernetes.kubeclient.decorators.HadoopConfMountDecorator; +import org.apache.flink.kubernetes.kubeclient.decorators.InternalServiceDecorator; import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters; import org.apache.flink.kubernetes.utils.Constants; import org.apache.flink.kubernetes.utils.KubernetesUtils; @@ -171,13 +173,13 @@ public void testServices() { final List internalServiceCandidates = resultServices .stream() - .filter(x -> x.getMetadata().getName().equals(KubernetesUtils.getInternalServiceName(CLUSTER_ID))) + .filter(x -> x.getMetadata().getName().equals(InternalServiceDecorator.getInternalServiceName(CLUSTER_ID))) .collect(Collectors.toList()); assertEquals(1, internalServiceCandidates.size()); final List restServiceCandidates = resultServices .stream() - .filter(x -> x.getMetadata().getName().equals(KubernetesUtils.getRestServiceName(CLUSTER_ID))) + .filter(x -> x.getMetadata().getName().equals(ExternalServiceDecorator.getExternalServiceName(CLUSTER_ID))) .collect(Collectors.toList()); assertEquals(1, restServiceCandidates.size());