Skip to content

Commit

Permalink
[FLINK-22494][kubernetes] Introduces PossibleInconsistentStateException
Browse files Browse the repository at this point in the history
We experienced cases where the ConfigMap was updated but the corresponding HTTP
request failed due to connectivity issues. PossibleInconsistentStateException
is used to reflect cases where it's not clear whether the data was actually
written or not.
  • Loading branch information
XComp authored and tillrohrmann committed May 18, 2021
1 parent 15eafd9 commit 9d2e2d9
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.persistence.PossibleInconsistentStateException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkRuntimeException;
Expand Down Expand Up @@ -290,13 +291,17 @@ public CompletableFuture<Boolean> checkAndUpdateConfigMap(
Throwable
throwable) {
LOG.debug(
"Failed to update ConfigMap {} with data {} because of concurrent "
+ "modifications. Trying again.",
"Failed to update ConfigMap {} with data {}. Trying again.",
configMap
.getName(),
configMap
.getData());
throw throwable;
// the
// client
// implementation does not expose the different kind of error causes to a degree that we could do a more fine-grained error handling here
throw new CompletionException(
new PossibleInconsistentStateException(
throwable));
}
return true;
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
import org.apache.flink.runtime.persistence.PossibleInconsistentStateException;

import java.io.File;
import java.util.List;
Expand Down Expand Up @@ -151,7 +152,12 @@ KubernetesLeaderElector createLeaderElector(
* one. If the returned optional is empty, we will not do the update.
* @return Return the ConfigMap update future. The boolean result indicates whether the
* ConfigMap is updated. The returned future will be completed exceptionally if the
* ConfigMap does not exist.
* ConfigMap does not exist. A failure during the update operation will result in the future
* failing with a {@link PossibleInconsistentStateException} indicating that no clear
* decision can be made on whether the update was successful or not. The {@code
* PossibleInconsistentStateException} not being present indicates that the failure happened
* before writing the updated ConfigMap to Kubernetes. For the latter case, it can be
* assumed that the ConfigMap was not updated.
*/
CompletableFuture<Boolean> checkAndUpdateConfigMap(
String configMapName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,16 @@
import io.fabric8.kubernetes.api.model.ServicePortBuilder;
import io.fabric8.kubernetes.api.model.ServiceStatus;
import io.fabric8.kubernetes.api.model.ServiceStatusBuilder;
import io.fabric8.mockwebserver.dsl.DelayPathable;
import io.fabric8.mockwebserver.dsl.HttpMethodable;
import io.fabric8.mockwebserver.dsl.MockServerExpectation;
import io.fabric8.mockwebserver.dsl.ReturnOrWebsocketable;
import io.fabric8.mockwebserver.dsl.TimesOnceableOrHttpHeaderable;

import javax.annotation.Nullable;

import java.util.Collections;
import java.util.function.Function;

/**
* Base class for {@link KubernetesClusterDescriptorTest} and {@link
Expand All @@ -53,14 +59,35 @@ protected void mockExpectedServiceFromServerSide(Service expectedService) {
}

protected void mockCreateConfigMapAlreadyExisting(ConfigMap configMap) {
final String path = String.format("/api/v1/namespaces/%s/configmaps", NAMESPACE);
final String path =
String.format(
"/api/%s/namespaces/%s/configmaps",
configMap.getApiVersion(), configMap.getMetadata().getNamespace());
server.expect().post().withPath(path).andReturn(500, configMap).always();
}

protected void mockGetConfigMapFailed(ConfigMap configMap) {
mockConfigMapRequest(configMap, HttpMethodable::get);
}

protected void mockReplaceConfigMapFailed(ConfigMap configMap) {
final String name = configMap.getMetadata().getName();
final String path = String.format("/api/v1/namespaces/%s/configmaps/%s", NAMESPACE, name);
server.expect().put().withPath(path).andReturn(500, configMap).always();
mockConfigMapRequest(configMap, HttpMethodable::put);
}

private void mockConfigMapRequest(
ConfigMap configMap,
Function<
MockServerExpectation,
DelayPathable<
ReturnOrWebsocketable<TimesOnceableOrHttpHeaderable<Void>>>>
methodTypeSetter) {
final String path =
String.format(
"/api/%s/namespaces/%s/configmaps/%s",
configMap.getApiVersion(),
configMap.getMetadata().getNamespace(),
configMap.getMetadata().getName());
methodTypeSetter.apply(server.expect()).withPath(path).andReturn(500, configMap).always();
}

protected Service buildExternalServiceWithLoadBalancer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public MixedKubernetesServer(boolean https, boolean crudMode) {
}

public void before() {
HashMap<ServerRequest, Queue<ServerResponse>> response = new HashMap<>();
final HashMap<ServerRequest, Queue<ServerResponse>> response = new HashMap<>();
mock =
crudMode
? new KubernetesMockServer(
Expand All @@ -77,6 +77,10 @@ public RecordedRequest takeRequest(long timeout, TimeUnit unit) throws Exception
return mockWebServer.takeRequest(timeout, unit);
}

public int getRequestCount() {
return mockWebServer.getRequestCount();
}

public MockServerExpectation expect() {
return mock.expect();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
import org.apache.flink.kubernetes.kubeclient.resources.NoOpWatchCallbackHandler;
import org.apache.flink.runtime.persistence.PossibleInconsistentStateException;
import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.util.ExceptionUtils;

import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
Expand Down Expand Up @@ -435,6 +437,44 @@ public void testCheckAndUpdateConfigMapWhenConfigMapNotExist() {
}
}

@Test
public void testCheckAndUpdateConfigMapWhenGetConfigMapFailed() throws Exception {
final int configuredRetries =
flinkConfig.getInteger(
KubernetesConfigOptions.KUBERNETES_TRANSACTIONAL_OPERATION_MAX_RETRIES);
final KubernetesConfigMap configMap = buildTestingConfigMap();
this.flinkKubeClient.createConfigMap(configMap).get();

mockGetConfigMapFailed(configMap.getInternalResource());

final int initialRequestCount = server.getRequestCount();
try {
this.flinkKubeClient
.checkAndUpdateConfigMap(
TESTING_CONFIG_MAP_NAME,
c -> {
throw new AssertionError(
"The replace operation should have never been triggered.");
})
.get();
fail(
"checkAndUpdateConfigMap should fail without a PossibleInconsistentStateException being the cause when number of retries has been exhausted.");
} catch (Exception ex) {
assertThat(
ex,
FlinkMatchers.containsMessage(
"Could not complete the "
+ "operation. Number of retries has been exhausted."));
final int actualRetryCount = server.getRequestCount() - initialRequestCount;
assertThat(actualRetryCount, is(configuredRetries + 1));
assertThat(
"An error while retrieving the ConfigMap should not cause a PossibleInconsistentStateException.",
ExceptionUtils.findThrowable(ex, PossibleInconsistentStateException.class)
.isPresent(),
is(false));
}
}

@Test
public void testCheckAndUpdateConfigMapWhenReplaceConfigMapFailed() throws Exception {
final int configuredRetries =
Expand All @@ -456,14 +496,20 @@ public void testCheckAndUpdateConfigMapWhenReplaceConfigMapFailed() throws Excep
})
.get();
fail(
"CheckAndUpdateConfigMap should fail with exception when number of retries has been exhausted.");
"checkAndUpdateConfigMap should fail due to a PossibleInconsistentStateException when number of retries has been exhausted.");
} catch (Exception ex) {
assertThat(
ex,
FlinkMatchers.containsMessage(
"Could not complete the "
+ "operation. Number of retries has been exhausted."));
assertThat(retries.get(), is(configuredRetries + 1));

assertThat(
"An error while replacing the ConfigMap should cause an PossibleInconsistentStateException.",
ExceptionUtils.findThrowable(ex, PossibleInconsistentStateException.class)
.isPresent(),
is(true));
}
}

Expand Down Expand Up @@ -504,6 +550,7 @@ private KubernetesConfigMap buildTestingConfigMap() {
.withNewMetadata()
.withName(TESTING_CONFIG_MAP_NAME)
.withLabels(TESTING_LABELS)
.withNamespace(NAMESPACE)
.endMetadata()
.withData(data)
.build());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.persistence;

import org.apache.flink.util.FlinkException;

/**
* {@code PossibleInconsistentStateException} represents errors that might have lead to an
* inconsistent state within the HA resources.
*/
public class PossibleInconsistentStateException extends FlinkException {

private static final long serialVersionUID = 364105635349022882L;

public PossibleInconsistentStateException(String message, Throwable cause) {
super(message, cause);
}

public PossibleInconsistentStateException(Throwable cause) {
super(cause);
}
}

0 comments on commit 9d2e2d9

Please sign in to comment.