Skip to content

Commit

Permalink
[FLINK-24315][k8s] Add retry logic when watching pods
Browse files Browse the repository at this point in the history
A fatal error will be thrown if the retry fails.

This closes apache#17321.
  • Loading branch information
KarmaGYZ authored and wangyang0918 committed Sep 23, 2021
1 parent 815191a commit 36ff71f
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ private void stopPod(String podName) {
});
}

private Optional<KubernetesWatch> watchTaskManagerPods() {
private Optional<KubernetesWatch> watchTaskManagerPods() throws Exception {
return Optional.of(
flinkKubeClient.watchPodsAndDoCallback(
KubernetesUtils.getTaskManagerLabels(clusterId),
Expand Down Expand Up @@ -368,7 +368,11 @@ public void handleError(Throwable throwable) {
if (running) {
podsWatchOpt.ifPresent(KubernetesWatch::close);
log.info("Creating a new watch on TaskManager pods.");
podsWatchOpt = watchTaskManagerPods();
try {
podsWatchOpt = watchTaskManagerPods();
} catch (Exception e) {
getResourceEventHandler().onError(e);
}
}
});
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,12 +229,26 @@ public Optional<KubernetesService> getRestService(String clusterId) {

@Override
public KubernetesWatch watchPodsAndDoCallback(
Map<String, String> labels, WatchCallbackHandler<KubernetesPod> podCallbackHandler) {
return new KubernetesWatch(
this.internalClient
.pods()
.withLabels(labels)
.watch(new KubernetesPodsWatcher(podCallbackHandler)));
Map<String, String> labels, WatchCallbackHandler<KubernetesPod> podCallbackHandler)
throws Exception {
return FutureUtils.retry(
() ->
CompletableFuture.supplyAsync(
() ->
new KubernetesWatch(
this.internalClient
.pods()
.withLabels(labels)
.watch(
new KubernetesPodsWatcher(
podCallbackHandler))),
kubeClientExecutorService),
maxRetryAttempts,
t ->
ExceptionUtils.findThrowable(t, KubernetesClientException.class)
.isPresent(),
kubeClientExecutorService)
.get();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ public interface FlinkKubeClient extends AutoCloseable {
* @return Return a watch for pods. It needs to be closed after use.
*/
KubernetesWatch watchPodsAndDoCallback(
Map<String, String> labels, WatchCallbackHandler<KubernetesPod> podCallbackHandler);
Map<String, String> labels, WatchCallbackHandler<KubernetesPod> podCallbackHandler)
throws Exception;

/**
* Create a leader elector service based on Kubernetes api.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver;
import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriverTestBase;
import org.apache.flink.runtime.testutils.CommonTestUtils;
Expand Down Expand Up @@ -223,6 +224,48 @@ public void testNewWatchCreationWhenKubernetesTooOldResourceVersionException()
};
}

@Test(expected = ExpectedTestException.class)
public void testThrowExceptionWhenWatchPodsFailInInitializing() throws Exception {
new Context() {
{
flinkKubeClientBuilder.setWatchPodsAndDoCallbackFunction(
(ignore1, ignore2) -> {
throw new ExpectedTestException();
});
runTest(() -> {});
}
};
}

@Test
public void testThrowExceptionWhenWatchPodsFailInHandlingError() throws Exception {
new Context() {
{
final CompletableFuture<Throwable> onErrorFuture = new CompletableFuture<>();
resourceEventHandlerBuilder.setOnErrorConsumer(onErrorFuture::complete);
final CompletableFuture<Void> initWatchFuture = new CompletableFuture<>();
final ExpectedTestException testingError = new ExpectedTestException();
flinkKubeClientBuilder.setWatchPodsAndDoCallbackFunction(
(ignore, handler) -> {
if (initWatchFuture.isDone()) {
throw testingError;
} else {
initWatchFuture.complete(null);
getSetWatchPodsAndDoCallbackFuture().complete(handler);
return new TestingFlinkKubeClient.MockKubernetesWatch();
}
});
runTest(
() -> {
getPodCallbackHandler().handleError(testingError);
assertThat(
onErrorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
is(testingError));
});
}
};
}

@Override
protected ResourceManagerDriverTestBase<KubernetesWorkerNode>.Context createContext() {
return new Context();
Expand Down Expand Up @@ -293,6 +336,11 @@ List<TestingFlinkKubeClient.MockKubernetesWatch> getPodsWatches() {
return podsWatches;
}

CompletableFuture<WatchCallbackHandler<KubernetesPod>>
getSetWatchPodsAndDoCallbackFuture() {