Skip to content

Commit

Permalink
[FLINK-16194][k8s] Introduce the internal and the external Service de…
Browse files Browse the repository at this point in the history
…corator
  • Loading branch information
zhengcanbin authored and tisonkun committed Mar 5, 2020
1 parent 7f19c9c commit 4d8281a
Show file tree
Hide file tree
Showing 6 changed files with 427 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.kubernetes.kubeclient.decorators;

import org.apache.flink.configuration.RestOptions;
import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters;
import org.apache.flink.kubernetes.utils.Constants;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.ServiceBuilder;
import io.fabric8.kubernetes.api.model.ServicePort;
import io.fabric8.kubernetes.api.model.ServicePortBuilder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* An abstract class containing some common implementations for the internal/external Services.
*/
public abstract class AbstractServiceDecorator extends AbstractKubernetesStepDecorator {

protected final KubernetesJobManagerParameters kubernetesJobManagerParameters;

public AbstractServiceDecorator(KubernetesJobManagerParameters kubernetesJobManagerParameters) {
this.kubernetesJobManagerParameters = checkNotNull(kubernetesJobManagerParameters);
}

@Override
public List<HasMetadata> buildAccompanyingKubernetesResources() throws IOException {
final Service service = new ServiceBuilder()
.withApiVersion(Constants.API_VERSION)
.withNewMetadata()
.withName(getServiceName())
.withLabels(kubernetesJobManagerParameters.getCommonLabels())
.endMetadata()
.withNewSpec()
.withType(getServiceType())
.withPorts(getServicePorts())
.withSelector(kubernetesJobManagerParameters.getLabels())
.endSpec()
.build();

return Collections.singletonList(service);
}

protected abstract String getServiceType();

protected abstract String getServiceName();

protected List<ServicePort> getServicePorts() {
final List<ServicePort> servicePorts = new ArrayList<>();

servicePorts.add(getServicePort(
getPortName(RestOptions.PORT.key()),
kubernetesJobManagerParameters.getRestPort()));

return servicePorts;
}

protected static ServicePort getServicePort(String name, int port) {
return new ServicePortBuilder()
.withName(name)
.withPort(port)
.build();
}

protected static String getPortName(String portName){
return portName.replace('.', '-');
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.kubernetes.kubeclient.decorators;

import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters;
import org.apache.flink.kubernetes.utils.KubernetesUtils;

import io.fabric8.kubernetes.api.model.HasMetadata;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

/**
* Creates an external Service to expose the rest port of the Flink JobManager(s).
*/
public class ExternalServiceDecorator extends AbstractServiceDecorator {

public ExternalServiceDecorator(KubernetesJobManagerParameters kubernetesJobManagerParameters) {
super(kubernetesJobManagerParameters);
}

@Override
public List<HasMetadata> buildAccompanyingKubernetesResources() throws IOException {
if (kubernetesJobManagerParameters.getRestServiceExposedType().equals(
KubernetesConfigOptions.ServiceExposedType.ClusterIP.name())) {
return Collections.emptyList();
}

return super.buildAccompanyingKubernetesResources();
}

@Override
protected String getServiceType() {
return kubernetesJobManagerParameters.getRestServiceExposedType();
}

@Override
protected String getServiceName() {
return KubernetesUtils.getRestServiceName(kubernetesJobManagerParameters.getClusterId());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.kubernetes.kubeclient.decorators;

import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters;
import org.apache.flink.kubernetes.utils.KubernetesUtils;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.ServicePort;

import java.io.IOException;
import java.util.List;

/**
* Creates an internal Service which forwards the requests from the TaskManager(s) to the
* active JobManager.
* Note that only the non-HA scenario relies on this Service for internal communication, since
* in the HA mode, the TaskManager(s) directly connects to the JobManager via IP address.
*/
public class InternalServiceDecorator extends AbstractServiceDecorator {

public InternalServiceDecorator(KubernetesJobManagerParameters kubernetesJobManagerParameters) {
super(kubernetesJobManagerParameters);
}

@Override
public List<HasMetadata> buildAccompanyingKubernetesResources() throws IOException {
final String serviceName = getServiceName();

// Set job manager address to namespaced service name
final String namespace = kubernetesJobManagerParameters.getNamespace();
kubernetesJobManagerParameters.getFlinkConfiguration()
.setString(JobManagerOptions.ADDRESS, serviceName + "." + namespace);

return super.buildAccompanyingKubernetesResources();
}

@Override
protected List<ServicePort> getServicePorts() {
final List<ServicePort> servicePorts = super.getServicePorts();

servicePorts.add(getServicePort(
getPortName(JobManagerOptions.PORT.key()),
kubernetesJobManagerParameters.getRPCPort()));
servicePorts.add(getServicePort(
getPortName(BlobServerOptions.PORT.key()),
kubernetesJobManagerParameters.getBlobServerPort()));

return servicePorts;
}

@Override
protected String getServiceType() {
return KubernetesConfigOptions.ServiceExposedType.ClusterIP.name();
}

@Override
protected String getServiceName() {
return KubernetesUtils.getInternalServiceName(kubernetesJobManagerParameters.getClusterId());
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,20 @@ public static List<VolumeMount> getConfigMapVolumeMount(String flinkConfDirInPod
return volumeMounts;
}

/**
* Generate name of the internal Service.
*/
public static String getInternalServiceName(String clusterId) {
return clusterId;
}

/**
* Generate name of the external Service.
*/
public static String getRestServiceName(String clusterId) {
return clusterId + Constants.FLINK_REST_SERVICE_SUFFIX;
}

/**
* Get task manager labels for the current Flink cluster. They could be used to watch the pods status.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.kubernetes.kubeclient.decorators;

import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerTestBase;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.kubernetes.utils.KubernetesUtils;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.ServicePort;
import io.fabric8.kubernetes.api.model.ServicePortBuilder;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import static junit.framework.TestCase.assertTrue;
import static org.junit.Assert.assertEquals;

/**
* General tests for the {@link ExternalServiceDecorator}.
*/
public class ExternalServiceDecoratorTest extends KubernetesJobManagerTestBase {

private ExternalServiceDecorator externalServiceDecorator;

@Before
public void setup() throws Exception {
super.setup();
this.externalServiceDecorator = new ExternalServiceDecorator(this.kubernetesJobManagerParameters);
}

@Test
public void testBuildAccompanyingKubernetesResources() throws IOException {
final List<HasMetadata> resources = this.externalServiceDecorator.buildAccompanyingKubernetesResources();
assertEquals(1, resources.size());

final Service restService = (Service) resources.get(0);

assertEquals(Constants.API_VERSION, restService.getApiVersion());

assertEquals(KubernetesUtils.getRestServiceName(CLUSTER_ID), restService.getMetadata().getName());

final Map<String, String> expectedLabels = getCommonLabels();
assertEquals(expectedLabels, restService.getMetadata().getLabels());

assertEquals("LoadBalancer", restService.getSpec().getType());

List<ServicePort> expectedServicePorts = Collections.singletonList(
new ServicePortBuilder()
.withName("rest-port")
.withPort(REST_PORT)
.build());
assertEquals(expectedServicePorts, restService.getSpec().getPorts());

expectedLabels.put(Constants.LABEL_COMPONENT_KEY, Constants.LABEL_COMPONENT_JOB_MANAGER);
assertEquals(expectedLabels, restService.getSpec().getSelector());
}

@Test
public void testSetServiceExposedType() throws IOException {
this.flinkConfig.set(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE, "NodePort");
List<HasMetadata> resources = this.externalServiceDecorator.buildAccompanyingKubernetesResources();
assertEquals("NodePort", ((Service) resources.get(0)).getSpec().getType());

this.flinkConfig.set(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE, "ClusterIP");
assertTrue(this.externalServiceDecorator.buildAccompanyingKubernetesResources().isEmpty());
}
}
Loading

0 comments on commit 4d8281a

Please sign in to comment.