Skip to content

Commit

Permalink
[FLINK-16602][k8s] Rework the internal & external Service
Browse files Browse the repository at this point in the history
1.The REST service serves REST traffic while the internal service serves internal requests from TMs to JM.
2.The REST service is always created but the internal service is only created in non-high availability setup.

Co-authored-by: felixzheng <[email protected]>

This closes apache#11456 .
  • Loading branch information
zhengcanbin committed Apr 8, 2020
1 parent 86e9ec0 commit 562e771
Show file tree
Hide file tree
Showing 11 changed files with 111 additions and 187 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ private int run(String[] args) throws FlinkException, CliArgsException {
final FlinkKubeClient kubeClient = KubeClientFactory.fromConfiguration(configuration);

// Retrieve or create a session cluster.
if (clusterId != null && kubeClient.getInternalService(clusterId) != null) {
if (clusterId != null && kubeClient.getRestService(clusterId) != null) {
clusterClient = kubernetesClusterDescriptor.retrieve(clusterId).getClusterClient();
} else {
clusterClient = kubernetesClusterDescriptor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,7 @@ public void stopPod(String podName) {
@Nullable
public Endpoint getRestEndpoint(String clusterId) {
int restPort = this.flinkConfig.getInteger(RestOptions.PORT);
final KubernetesConfigOptions.ServiceExposedType serviceExposedType =
flinkConfig.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE);
final KubernetesConfigOptions.ServiceExposedType serviceExposedType = flinkConfig.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE);

// Return the service.namespace directly when use ClusterIP.
if (serviceExposedType == KubernetesConfigOptions.ServiceExposedType.ClusterIP) {
Expand Down Expand Up @@ -193,16 +192,24 @@ public void handleException(Exception e) {
LOG.error("A Kubernetes exception occurred.", e);
}

@Nullable
@Override
public KubernetesService getInternalService(String clusterId) {
return getService(KubernetesUtils.getInternalServiceName(clusterId));
}

@Override
@Nullable
public KubernetesService getRestService(String clusterId) {
return getService(KubernetesUtils.getRestServiceName(clusterId));
final String serviceName = KubernetesUtils.getRestServiceName(clusterId);

final Service service = this.internalClient
.services()
.inNamespace(nameSpace)
.withName(serviceName)
.fromServer()
.get();

if (service == null) {
LOG.debug("Service {} does not exist", serviceName);
return null;
}

return new KubernetesService(service);
}

@Override
Expand Down Expand Up @@ -256,23 +263,6 @@ private void setOwnerReference(Deployment deployment, List<HasMetadata> resource
resource.getMetadata().setOwnerReferences(Collections.singletonList(deploymentOwnerReference)));
}

private KubernetesService getService(String serviceName) {
final Service service = this
.internalClient
.services()
.inNamespace(nameSpace)
.withName(serviceName)
.fromServer()
.get();

if (service == null) {
LOG.debug("Service {} does not exist", serviceName);
return null;
}

return new KubernetesService(service);
}

/**
* To get nodePort of configured ports.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,6 @@ public interface FlinkKubeClient extends AutoCloseable {
*/
void stopAndCleanupCluster(String clusterId);

/**
* Get the kubernetes internal service of the given flink clusterId.
*
* @param clusterId cluster id
* @return Return the internal service of the specified cluster id. Return null if the service does not exist.
*/
@Nullable
KubernetesService getInternalService(String clusterId);

/**
* Get the kubernetes rest service of the given flink clusterId.
*
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,41 +18,52 @@

package org.apache.flink.kubernetes.kubeclient.decorators;

import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.kubernetes.utils.KubernetesUtils;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.ServiceBuilder;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* Creates an external Service to expose the rest port of the Flink JobManager(s).
*/
public class ExternalServiceDecorator extends AbstractServiceDecorator {
public class ExternalServiceDecorator extends AbstractKubernetesStepDecorator {

private final KubernetesJobManagerParameters kubernetesJobManagerParameters;

public ExternalServiceDecorator(KubernetesJobManagerParameters kubernetesJobManagerParameters) {
super(kubernetesJobManagerParameters);
this.kubernetesJobManagerParameters = checkNotNull(kubernetesJobManagerParameters);
}

@Override
public List<HasMetadata> buildAccompanyingKubernetesResources() throws IOException {
if (kubernetesJobManagerParameters.getRestServiceExposedType() == KubernetesConfigOptions.ServiceExposedType.ClusterIP) {
return Collections.emptyList();
}
final String serviceName =
KubernetesUtils.getRestServiceName(kubernetesJobManagerParameters.getClusterId());

return super.buildAccompanyingKubernetesResources();
}
final Service externalService = new ServiceBuilder()
.withApiVersion(Constants.API_VERSION)
.withNewMetadata()
.withName(serviceName)
.withLabels(kubernetesJobManagerParameters.getCommonLabels())
.endMetadata()
.withNewSpec()
.withType(kubernetesJobManagerParameters.getRestServiceExposedType().name())
.withSelector(kubernetesJobManagerParameters.getLabels())
.addNewPort()
.withName(Constants.REST_PORT_NAME)
.withPort(kubernetesJobManagerParameters.getRestPort())
.endPort()
.endSpec()
.build();

@Override
protected KubernetesConfigOptions.ServiceExposedType getServiceType() {
return kubernetesJobManagerParameters.getRestServiceExposedType();
}

@Override
protected String getServiceName() {
return KubernetesUtils.getRestServiceName(kubernetesJobManagerParameters.getClusterId());
return Collections.singletonList(externalService);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,63 +19,67 @@
package org.apache.flink.kubernetes.kubeclient.decorators;

import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.kubernetes.utils.KubernetesUtils;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.ServicePort;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.ServiceBuilder;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* Creates an internal Service which forwards the requests from the TaskManager(s) to the
* active JobManager.
* Note that only the non-HA scenario relies on this Service for internal communication, since
* in the HA mode, the TaskManager(s) directly connects to the JobManager via IP address.
*/
public class InternalServiceDecorator extends AbstractServiceDecorator {
public class InternalServiceDecorator extends AbstractKubernetesStepDecorator {

private final KubernetesJobManagerParameters kubernetesJobManagerParameters;

public InternalServiceDecorator(KubernetesJobManagerParameters kubernetesJobManagerParameters) {
super(kubernetesJobManagerParameters);
this.kubernetesJobManagerParameters = checkNotNull(kubernetesJobManagerParameters);
}

@Override
public List<HasMetadata> buildAccompanyingKubernetesResources() throws IOException {
final String serviceName = getServiceName();
if (!kubernetesJobManagerParameters.isInternalServiceEnabled()) {
return Collections.emptyList();
}

final String serviceName = KubernetesUtils.getInternalServiceName(kubernetesJobManagerParameters.getClusterId());

final Service headlessService = new ServiceBuilder()
.withApiVersion(Constants.API_VERSION)
.withNewMetadata()
.withName(serviceName)
.withLabels(kubernetesJobManagerParameters.getCommonLabels())
.endMetadata()
.withNewSpec()
.withClusterIP(Constants.HEADLESS_SERVICE_CLUSTER_IP)
.withSelector(kubernetesJobManagerParameters.getLabels())
.addNewPort()
.withName(Constants.JOB_MANAGER_RPC_PORT_NAME)
.withPort(kubernetesJobManagerParameters.getRPCPort())
.endPort()
.addNewPort()
.withName(Constants.BLOB_SERVER_PORT_NAME)
.withPort(kubernetesJobManagerParameters.getBlobServerPort())
.endPort()
.endSpec()
.build();

// Set job manager address to namespaced service name
final String namespace = kubernetesJobManagerParameters.getNamespace();
kubernetesJobManagerParameters.getFlinkConfiguration()
.setString(JobManagerOptions.ADDRESS, serviceName + "." + namespace);

return super.buildAccompanyingKubernetesResources();
}

@Override
protected List<ServicePort> getServicePorts() {
final List<ServicePort> servicePorts = super.getServicePorts();

servicePorts.add(getServicePort(
Constants.JOB_MANAGER_RPC_PORT_NAME,
kubernetesJobManagerParameters.getRPCPort()));
servicePorts.add(getServicePort(
Constants.BLOB_SERVER_PORT_NAME,
kubernetesJobManagerParameters.getBlobServerPort()));
kubernetesJobManagerParameters.getFlinkConfiguration().setString(JobManagerOptions.ADDRESS, serviceName + "." + namespace);

return servicePorts;
}

@Override
protected KubernetesConfigOptions.ServiceExposedType getServiceType() {
return KubernetesConfigOptions.ServiceExposedType.ClusterIP;
}

@Override
protected String getServiceName() {
return KubernetesUtils.getInternalServiceName(kubernetesJobManagerParameters.getClusterId());
return Collections.singletonList(headlessService);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptionsInternal;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;

import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -118,4 +119,8 @@ public String getEntrypointClass() {
public KubernetesConfigOptions.ServiceExposedType getRestServiceExposedType() {
return flinkConfig.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE);
}

public boolean isInternalServiceEnabled() {
return !HighAvailabilityMode.isHighAvailabilityModeActivated(flinkConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,6 @@ public class Constants {
public static final String ENV_FLINK_POD_IP_ADDRESS = "_POD_IP_ADDRESS";

public static final String POD_IP_FIELD_PATH = "status.podIP";

public static final String HEADLESS_SERVICE_CLUSTER_IP = "None";
}
Loading

0 comments on commit 562e771

Please sign in to comment.