Skip to content

Commit

Permalink
[FLINK-22054][k8s] Using a shared watcher for ConfigMap watching
Browse files Browse the repository at this point in the history
Initially, we started a separate connection for each ConfigMap watching.
This change introduces KubernetesConfigMapSharedWatcher for all
ConfigMap watching.

Internally, this change provides a implementaion
KubernetesConfigMapSharedInformer based on SharedIndexInformer for the
shared watcher interface.
  • Loading branch information
yittg authored and wangyang0918 committed Jul 23, 2021
1 parent b19a515 commit 9c7e300
Show file tree
Hide file tree
Showing 22 changed files with 974 additions and 303 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,7 @@ The Kubernetes-specific configuration options are listed on the [configuration p
Flink uses [Fabric8 Kubernetes client](https://github.com/fabric8io/kubernetes-client) to communicate with Kubernetes APIServer to create/delete Kubernetes resources(e.g. Deployment, Pod, ConfigMap, Service, etc.), as well as watch the Pods and ConfigMaps.
Except for the above Flink config options, some [expert options](https://github.com/fabric8io/kubernetes-client#configuring-the-client) of Fabric8 Kubernetes client could be configured via system properties or environment variables.

For example, users could use the following Flink config options to set the concurrent max requests, which allows running more jobs in a session cluster when [Kubernetes HA Services]({{< ref "docs/deployment/ha/kubernetes_ha" >}}) are used.
Please note that, each Flink job will consume `3` concurrent requests.
For example, users could use the following Flink config options to set the concurrent max requests.

```yaml
containerized.master.env.KUBERNETES_MAX_CONCURRENT_REQUESTS: 200
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,7 @@ The Kubernetes-specific configuration options are listed on the [configuration p
Flink uses [Fabric8 Kubernetes client](https://github.com/fabric8io/kubernetes-client) to communicate with Kubernetes APIServer to create/delete Kubernetes resources(e.g. Deployment, Pod, ConfigMap, Service, etc.), as well as watch the Pods and ConfigMaps.
Except for the above Flink config options, some [expert options](https://github.com/fabric8io/kubernetes-client#configuring-the-client) of Fabric8 Kubernetes client could be configured via system properties or environment variables.

For example, users could use the following Flink config options to set the concurrent max requests, which allows running more jobs in a session cluster when [Kubernetes HA Services]({{< ref "docs/deployment/ha/kubernetes_ha" >}}) are used.
Please note that, each Flink job will consume `3` concurrent requests.
For example, users could use the following Flink config options to set the concurrent max requests.

```yaml
containerized.master.env.KUBERNETES_MAX_CONCURRENT_REQUESTS: 200
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ cd $END_TO_END_DIR/../flink-kubernetes
# Run the ITCases
run_mvn_test org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClientITCase
run_mvn_test org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElectorITCase
run_mvn_test org.apache.flink.kubernetes.kubeclient.resources.KubernetesSharedInformerITCase
run_mvn_test org.apache.flink.kubernetes.highavailability.KubernetesLeaderElectionAndRetrievalITCase
run_mvn_test org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStoreITCase
run_mvn_test org.apache.flink.kubernetes.highavailability.KubernetesHighAvailabilityRecoverFromSavepointITCase
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.configuration.KubernetesLeaderElectionConfiguration;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.KubernetesConfigMapSharedWatcher;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.apache.flink.runtime.blob.BlobStoreService;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
Expand All @@ -33,9 +34,14 @@
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;

import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
import static org.apache.flink.kubernetes.utils.Constants.NAME_SEPARATOR;
Expand Down Expand Up @@ -63,6 +69,9 @@ public class KubernetesHaServices extends AbstractHaServices {
/** Kubernetes client. */
private final FlinkKubeClient kubeClient;

private final KubernetesConfigMapSharedWatcher configMapSharedWatcher;
private final ExecutorService watchExecutorService;

private static final String RESOURCE_MANAGER_NAME = "resourcemanager";

private static final String DISPATCHER_NAME = "dispatcher";
Expand All @@ -89,6 +98,15 @@ public class KubernetesHaServices extends AbstractHaServices {
this.kubeClient = checkNotNull(kubeClient);
this.clusterId = checkNotNull(config.get(KubernetesConfigOptions.CLUSTER_ID));

this.configMapSharedWatcher =
this.kubeClient.createConfigMapSharedWatcher(
KubernetesUtils.getConfigMapLabels(
clusterId, LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY));
this.watchExecutorService =
Executors.newCachedThreadPool(
new ExecutorThreadFactory("config-map-watch-handler"));
this.configMapSharedWatcher.run();

lockIdentity = UUID.randomUUID().toString();
}

Expand All @@ -97,13 +115,15 @@ public LeaderElectionService createLeaderElectionService(String leaderName) {
final KubernetesLeaderElectionConfiguration leaderConfig =
new KubernetesLeaderElectionConfiguration(leaderName, lockIdentity, configuration);
return new DefaultLeaderElectionService(
new KubernetesLeaderElectionDriverFactory(kubeClient, leaderConfig));
new KubernetesLeaderElectionDriverFactory(
kubeClient, configMapSharedWatcher, watchExecutorService, leaderConfig));
}

@Override
public LeaderRetrievalService createLeaderRetrievalService(String leaderName) {
return new DefaultLeaderRetrievalService(
new KubernetesLeaderRetrievalDriverFactory(kubeClient, leaderName));
new KubernetesLeaderRetrievalDriverFactory(
kubeClient, configMapSharedWatcher, watchExecutorService, leaderName));
}

@Override
Expand All @@ -130,7 +150,9 @@ public RunningJobsRegistry createRunningJobsRegistry() {

@Override
public void internalClose() {
configMapSharedWatcher.close();
kubeClient.close();
ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, this.watchExecutorService);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@

import org.apache.flink.kubernetes.configuration.KubernetesLeaderElectionConfiguration;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.KubernetesConfigMapSharedWatcher;
import org.apache.flink.kubernetes.kubeclient.KubernetesSharedWatcher.Watch;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesException;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesTooOldResourceVersionException;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.apache.flink.runtime.leaderelection.LeaderElectionDriver;
import org.apache.flink.runtime.leaderelection.LeaderElectionEventHandler;
Expand All @@ -35,12 +35,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.concurrent.GuardedBy;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutorService;

import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
import static org.apache.flink.kubernetes.utils.Constants.LEADER_ADDRESS_KEY;
Expand All @@ -60,8 +59,6 @@ public class KubernetesLeaderElectionDriver implements LeaderElectionDriver {

private static final Logger LOG = LoggerFactory.getLogger(KubernetesLeaderElectionDriver.class);

private final Object watchLock = new Object();

private final FlinkKubeClient kubeClient;

private final String configMapName;
Expand All @@ -79,11 +76,12 @@ public class KubernetesLeaderElectionDriver implements LeaderElectionDriver {

private volatile boolean running;

@GuardedBy("watchLock")
private KubernetesWatch kubernetesWatch;
private final Watch kubernetesWatch;

public KubernetesLeaderElectionDriver(
FlinkKubeClient kubeClient,
KubernetesConfigMapSharedWatcher configMapSharedWatcher,
ExecutorService watchExecutorService,
KubernetesLeaderElectionConfiguration leaderConfig,
LeaderElectionEventHandler leaderElectionEventHandler,
FatalErrorHandler fatalErrorHandler) {
Expand All @@ -105,7 +103,11 @@ public KubernetesLeaderElectionDriver(
running = true;
leaderElector.run();
kubernetesWatch =
kubeClient.watchConfigMaps(configMapName, new ConfigMapCallbackHandlerImpl());
checkNotNull(configMapSharedWatcher, "ConfigMap Shared Informer")
.watch(
configMapName,
new ConfigMapCallbackHandlerImpl(),
watchExecutorService);
}

@Override
Expand All @@ -118,11 +120,7 @@ public void close() {
LOG.info("Closing {}.", this);
leaderElector.stop();

synchronized (watchLock) {
if (kubernetesWatch != null) {
kubernetesWatch.close();
}
}
kubernetesWatch.close();
}

@Override
Expand Down Expand Up @@ -246,23 +244,9 @@ public void onError(List<KubernetesConfigMap> configMaps) {

@Override
public void handleError(Throwable throwable) {
if (throwable instanceof KubernetesTooOldResourceVersionException) {
synchronized (watchLock) {
if (running) {
if (kubernetesWatch != null) {
kubernetesWatch.close();
}
LOG.info("Creating a new watch on ConfigMap {}.", configMapName);
kubernetesWatch =
kubeClient.watchConfigMaps(
configMapName, new ConfigMapCallbackHandlerImpl());
}
}
} else {
fatalErrorHandler.onFatalError(
new LeaderElectionException(
"Error while watching the ConfigMap " + configMapName, throwable));
}
fatalErrorHandler.onFatalError(
new LeaderElectionException(
"Error while watching the ConfigMap " + configMapName, throwable));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,29 @@

import org.apache.flink.kubernetes.configuration.KubernetesLeaderElectionConfiguration;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.KubernetesConfigMapSharedWatcher;
import org.apache.flink.runtime.leaderelection.LeaderElectionDriverFactory;
import org.apache.flink.runtime.leaderelection.LeaderElectionEventHandler;
import org.apache.flink.runtime.rpc.FatalErrorHandler;

import java.util.concurrent.ExecutorService;

/** {@link LeaderElectionDriverFactory} implementation for Kubernetes. */
public class KubernetesLeaderElectionDriverFactory implements LeaderElectionDriverFactory {

private final FlinkKubeClient kubeClient;
private final KubernetesConfigMapSharedWatcher configMapSharedWatcher;
private final ExecutorService watchExecutorService;
private final KubernetesLeaderElectionConfiguration leaderConfig;

public KubernetesLeaderElectionDriverFactory(
FlinkKubeClient kubeClient, KubernetesLeaderElectionConfiguration leaderConfig) {
FlinkKubeClient kubeClient,
KubernetesConfigMapSharedWatcher configMapSharedWatcher,
ExecutorService watchExecutorService,
KubernetesLeaderElectionConfiguration leaderConfig) {
this.kubeClient = kubeClient;
this.configMapSharedWatcher = configMapSharedWatcher;
this.watchExecutorService = watchExecutorService;
this.leaderConfig = leaderConfig;
}

Expand All @@ -42,6 +52,11 @@ public KubernetesLeaderElectionDriver createLeaderElectionDriver(
FatalErrorHandler fatalErrorHandler,
String leaderContenderDescription) {
return new KubernetesLeaderElectionDriver(
kubeClient, leaderConfig, leaderEventHandler, fatalErrorHandler);
kubeClient,
configMapSharedWatcher,
watchExecutorService,
leaderConfig,
leaderEventHandler,
fatalErrorHandler);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
package org.apache.flink.kubernetes.highavailability;

import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.KubernetesConfigMapSharedWatcher;
import org.apache.flink.kubernetes.kubeclient.KubernetesSharedWatcher.Watch;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesTooOldResourceVersionException;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalDriver;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalEventHandler;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
Expand All @@ -30,9 +30,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.concurrent.GuardedBy;

import java.util.List;
import java.util.concurrent.ExecutorService;

import static org.apache.flink.kubernetes.utils.KubernetesUtils.checkConfigMaps;
import static org.apache.flink.kubernetes.utils.KubernetesUtils.getLeaderInformationFromConfigMap;
Expand All @@ -49,8 +48,6 @@ public class KubernetesLeaderRetrievalDriver implements LeaderRetrievalDriver {
private static final Logger LOG =
LoggerFactory.getLogger(KubernetesLeaderRetrievalDriver.class);

private final Object watchLock = new Object();

private final FlinkKubeClient kubeClient;

private final String configMapName;
Expand All @@ -61,11 +58,12 @@ public class KubernetesLeaderRetrievalDriver implements LeaderRetrievalDriver {

private volatile boolean running;

@GuardedBy("watchLock")
private KubernetesWatch kubernetesWatch;
private final Watch kubernetesWatch;

public KubernetesLeaderRetrievalDriver(
FlinkKubeClient kubeClient,
KubernetesConfigMapSharedWatcher configMapSharedWatcher,
ExecutorService watchExecutorService,
String configMapName,
LeaderRetrievalEventHandler leaderRetrievalEventHandler,
FatalErrorHandler fatalErrorHandler) {
Expand All @@ -76,7 +74,11 @@ public KubernetesLeaderRetrievalDriver(
this.fatalErrorHandler = checkNotNull(fatalErrorHandler);

kubernetesWatch =
kubeClient.watchConfigMaps(configMapName, new ConfigMapCallbackHandlerImpl());
checkNotNull(configMapSharedWatcher, "ConfigMap Shared Informer")
.watch(
configMapName,
new ConfigMapCallbackHandlerImpl(),
watchExecutorService);

running = true;
}
Expand All @@ -90,11 +92,7 @@ public void close() {

LOG.info("Stopping {}.", this);

synchronized (watchLock) {
if (kubernetesWatch != null) {
kubernetesWatch.close();
}
}
kubernetesWatch.close();
}

private class ConfigMapCallbackHandlerImpl
Expand Down Expand Up @@ -128,23 +126,9 @@ public void onError(List<KubernetesConfigMap> configMaps) {

@Override
public void handleError(Throwable throwable) {
if (throwable instanceof KubernetesTooOldResourceVersionException) {
synchronized (watchLock) {
if (running) {
if (kubernetesWatch != null) {
kubernetesWatch.close();
}
LOG.info("Creating a new watch on ConfigMap {}.", configMapName);
kubernetesWatch =
kubeClient.watchConfigMaps(
configMapName, new ConfigMapCallbackHandlerImpl());
}
}
} else {
fatalErrorHandler.onFatalError(
new LeaderRetrievalException(
"Error while watching the ConfigMap " + configMapName, throwable));
}
fatalErrorHandler.onFatalError(
new LeaderRetrievalException(
"Error while watching the ConfigMap " + configMapName, throwable));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,43 @@
package org.apache.flink.kubernetes.highavailability;

import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.KubernetesConfigMapSharedWatcher;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalDriverFactory;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalEventHandler;
import org.apache.flink.runtime.rpc.FatalErrorHandler;

import java.util.concurrent.ExecutorService;

/** {@link LeaderRetrievalDriverFactory} implementation for Kubernetes. */
public class KubernetesLeaderRetrievalDriverFactory implements LeaderRetrievalDriverFactory {

private final FlinkKubeClient kubeClient;

private final KubernetesConfigMapSharedWatcher configMapSharedWatcher;
private final ExecutorService watchExecutorService;

private final String configMapName;

public KubernetesLeaderRetrievalDriverFactory(
FlinkKubeClient kubeClient, String configMapName) {
FlinkKubeClient kubeClient,
KubernetesConfigMapSharedWatcher configMapSharedWatcher,
ExecutorService watchExecutorService,
String configMapName) {
this.kubeClient = kubeClient;
this.configMapSharedWatcher = configMapSharedWatcher;
this.watchExecutorService = watchExecutorService;
this.configMapName = configMapName;
}

@Override
public KubernetesLeaderRetrievalDriver createLeaderRetrievalDriver(
LeaderRetrievalEventHandler leaderEventHandler, FatalErrorHandler fatalErrorHandler) {
return new KubernetesLeaderRetrievalDriver(
kubeClient, configMapName, leaderEventHandler, fatalErrorHandler);
kubeClient,
configMapSharedWatcher,
watchExecutorService,
configMapName,
leaderEventHandler,
fatalErrorHandler);
}
}
Loading

0 comments on commit 9c7e300

Please sign in to comment.