Skip to content

Commit

Permalink
[FLINK-22494][ha] Introduces PossibleInconsistentState to StateHandle…
Browse files Browse the repository at this point in the history
…Store
  • Loading branch information
XComp authored and tillrohrmann committed May 18, 2021
1 parent cc59ad5 commit 417cf78
Show file tree
Hide file tree
Showing 8 changed files with 673 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesException;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
import org.apache.flink.runtime.persistence.PossibleInconsistentStateException;
import org.apache.flink.runtime.persistence.RetrievableStateStorageHelper;
import org.apache.flink.runtime.persistence.StateHandleStore;
import org.apache.flink.runtime.persistence.StringResourceVersion;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.InstantiationUtil;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -48,6 +48,8 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static org.apache.flink.runtime.util.StateHandleStoreUtils.deserialize;
import static org.apache.flink.runtime.util.StateHandleStoreUtils.serializeOrDiscard;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
Expand Down Expand Up @@ -114,21 +116,28 @@ public KubernetesStateHandleStore(
* @param key Key in ConfigMap
* @param state State to be added
* @throws AlreadyExistException if the name already exists
* @throws PossibleInconsistentStateException if the write-to-Kubernetes operation failed. This
* indicates that it's not clear whether the new state was successfully written to
* Kubernetes or not. No state was discarded. Proper error handling has to be applied on the
* caller's side.
* @throws Exception if persisting state or writing state handle failed
*/
@Override
public RetrievableStateHandle<T> addAndLock(String key, T state) throws Exception {
public RetrievableStateHandle<T> addAndLock(String key, T state)
throws PossibleInconsistentStateException, Exception {
checkNotNull(key, "Key in ConfigMap.");
checkNotNull(state, "State.");

final RetrievableStateHandle<T> storeHandle = storage.store(state);

boolean success = false;
final byte[] serializedStoreHandle = serializeOrDiscard(storeHandle);

// initialize flag to serve the failure case
boolean discardState = true;
try {
final byte[] serializedStoreHandle = InstantiationUtil.serializeObject(storeHandle);
success =
kubeClient
// a successful operation will result in the state not being discarded
discardState =
!kubeClient
.checkAndUpdateConfigMap(
configMapName,
c -> {
Expand All @@ -151,14 +160,20 @@ public RetrievableStateHandle<T> addAndLock(String key, T state) throws Exceptio
.get();
return storeHandle;
} catch (Exception ex) {
final Optional<PossibleInconsistentStateException> possibleInconsistentStateException =
ExceptionUtils.findThrowable(ex, PossibleInconsistentStateException.class);
if (possibleInconsistentStateException.isPresent()) {
// it's unclear whether the state handle metadata was written to the ConfigMap -
// hence, we don't discard the data
discardState = false;
throw possibleInconsistentStateException.get();
}

throw ExceptionUtils.findThrowable(ex, AlreadyExistException.class)
.orElseThrow(() -> ex);
} finally {
if (!success) {
// Cleanup the state handle if it was not written to ConfigMap.
if (storeHandle != null) {
storeHandle.discardState();
}
if (discardState) {
storeHandle.discardState();
}
}
}
Expand All @@ -173,6 +188,9 @@ public RetrievableStateHandle<T> addAndLock(String key, T state) throws Exceptio
* @param resourceVersion resource version when checking existence via {@link #exists}.
* @param state State to be added
* @throws NotExistException if the name does not exist
* @throws PossibleInconsistentStateException if a failure occurred during the update operation.
* It's unclear whether the operation actually succeeded or not. No state was discarded. The
* method's caller should handle this case properly.
* @throws Exception if persisting state or writing state handle failed
*/
@Override
Expand All @@ -185,11 +203,13 @@ public void replace(String key, StringResourceVersion resourceVersion, T state)

final RetrievableStateHandle<T> newStateHandle = storage.store(state);

boolean success = false;
final byte[] serializedStateHandle = serializeOrDiscard(newStateHandle);

// initialize flags to serve the failure case
boolean discardOldState = false;
boolean discardNewState = true;
try {
final byte[] serializedStoreHandle = InstantiationUtil.serializeObject(newStateHandle);
success =
boolean success =
kubeClient
.checkAndUpdateConfigMap(
configMapName,
Expand All @@ -202,7 +222,7 @@ public void replace(String key, StringResourceVersion resourceVersion, T state)
.put(
key,
encodeStateHandle(
serializedStoreHandle));
serializedStateHandle));
} else {
throw new CompletionException(
getKeyNotExistException(key));
Expand All @@ -212,14 +232,29 @@ public void replace(String key, StringResourceVersion resourceVersion, T state)
return Optional.empty();
})
.get();

// swap subject for deletion in case of success
discardOldState = success;
discardNewState = !success;
} catch (Exception ex) {
final Optional<PossibleInconsistentStateException> possibleInconsistentStateException =
ExceptionUtils.findThrowable(ex, PossibleInconsistentStateException.class);
if (possibleInconsistentStateException.isPresent()) {
// it's unclear whether the state handle metadata was written to the ConfigMap -
// hence, we don't discard any data
discardNewState = false;
throw possibleInconsistentStateException.get();
}

throw ExceptionUtils.findThrowable(ex, NotExistException.class).orElseThrow(() -> ex);
} finally {
if (success) {
oldStateHandle.discardState();
} else {
if (discardNewState) {
newStateHandle.discardState();
}

if (discardOldState) {
oldStateHandle.discardState();
}
}
}

Expand Down Expand Up @@ -476,8 +511,7 @@ private RetrievableStateHandle<T> deserializeObject(String content) throws IOExc
final byte[] data = Base64.getDecoder().decode(content);

try {
return InstantiationUtil.deserializeObject(
data, Thread.currentThread().getContextClassLoader());
return deserialize(data);
} catch (IOException | ClassNotFoundException e) {
throw new IOException(
"Failed to deserialize state handle from ConfigMap data " + content + '.', e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.persistence.PossibleInconsistentStateException;
import org.apache.flink.runtime.persistence.StateHandleStore;
import org.apache.flink.runtime.persistence.StringResourceVersion;
import org.apache.flink.runtime.persistence.TestingLongStateHandleHelper;
Expand Down Expand Up @@ -126,6 +128,44 @@ public void testAddAlreadyExistingKey() throws Exception {
};
}

@Test
public void testAddWithPossiblyInconsistentStateHandling() throws Exception {
new Context() {
{
runTest(
() -> {
leaderCallbackGrantLeadership();

final FlinkKubeClient anotherFlinkKubeClient =
createFlinkKubeClientBuilder()
.setCheckAndUpdateConfigMapFunction(
(configMapName, function) ->
FutureUtils.completedExceptionally(
new PossibleInconsistentStateException()))
.build();
final KubernetesStateHandleStore<
TestingLongStateHandleHelper.LongStateHandle>
store =
new KubernetesStateHandleStore<>(
anotherFlinkKubeClient,
LEADER_CONFIGMAP_NAME,
longStateStorage,
filter,
LOCK_IDENTITY);

try {
store.addAndLock(key, state);
fail("PossibleInconsistentStateException should have been thrown.");
} catch (PossibleInconsistentStateException ex) {
// PossibleInconsistentStateException is expected
}
assertThat(TestingLongStateHandleHelper.getGlobalStorageSize(), is(1));
assertThat(TestingLongStateHandleHelper.getGlobalDiscardCount(), is(0));
});
}
};
}

@Test
public void testAddFailedWhenConfigMapNotExistAndDiscardState() throws Exception {
new Context() {
Expand Down Expand Up @@ -346,6 +386,75 @@ public void testReplaceFailedAndDiscardState() throws Exception {
};
}

@Test
public void testReplaceFailedWithPossiblyInconsistentState() throws Exception {
final PossibleInconsistentStateException updateException =
new PossibleInconsistentStateException();
new Context() {
{
runTest(
() -> {
leaderCallbackGrantLeadership();

final KubernetesStateHandleStore<
TestingLongStateHandleHelper.LongStateHandle>
store =
new KubernetesStateHandleStore<>(
flinkKubeClient,
LEADER_CONFIGMAP_NAME,
longStateStorage,
filter,
LOCK_IDENTITY);
store.addAndLock(key, state);

final FlinkKubeClient anotherFlinkKubeClient =
createFlinkKubeClientBuilder()
.setCheckAndUpdateConfigMapFunction(
(configMapName, function) ->
FutureUtils.completedExceptionally(
updateException))
.build();
final KubernetesStateHandleStore<
TestingLongStateHandleHelper.LongStateHandle>
anotherStore =
new KubernetesStateHandleStore<>(
anotherFlinkKubeClient,
LEADER_CONFIGMAP_NAME,
longStateStorage,
filter,
LOCK_IDENTITY);

final StringResourceVersion resourceVersion = anotherStore.exists(key);
assertThat(resourceVersion.isExisting(), is(true));
try {
anotherStore.replace(
key,
resourceVersion,
new TestingLongStateHandleHelper.LongStateHandle(23456L));
fail(
"An exception having a PossibleInconsistentStateException as its cause should have been thrown.");
} catch (Exception ex) {
assertThat(ex, is(updateException));
}
assertThat(anotherStore.getAllAndLock().size(), is(1));
// The state does not change
assertThat(anotherStore.getAndLock(key).retrieveState(), is(state));

assertThat(TestingLongStateHandleHelper.getGlobalStorageSize(), is(2));
// no state was discarded
assertThat(
TestingLongStateHandleHelper
.getDiscardCallCountForStateHandleByIndex(0),
is(0));
assertThat(
TestingLongStateHandleHelper
.getDiscardCallCountForStateHandleByIndex(1),
is(0));
});
}
};
}

@Test
public void testGetAndExist() throws Exception {
new Context() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.persistence;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.FlinkException;

/**
Expand All @@ -28,6 +29,11 @@ public class PossibleInconsistentStateException extends FlinkException {

private static final long serialVersionUID = 364105635349022882L;

@VisibleForTesting
public PossibleInconsistentStateException() {
super("The system might be in an inconsistent state.");
}

public PossibleInconsistentStateException(String message, Throwable cause) {
super(message, cause);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,14 @@ public interface StateHandleStore<T extends Serializable, R extends ResourceVers
* @param name Key name in ConfigMap or child path name in ZooKeeper
* @param state State to be added
* @throws AlreadyExistException if the name already exists
* @throws PossibleInconsistentStateException if the write operation failed. This indicates that
* it's not clear whether the new state was successfully written to distributed coordination
* system or not. No state was discarded. Proper error handling has to be applied on the
* caller's side.
* @throws Exception if persisting state or writing state handle failed
*/
RetrievableStateHandle<T> addAndLock(String name, T state) throws Exception;
RetrievableStateHandle<T> addAndLock(String name, T state)
throws PossibleInconsistentStateException, Exception;

/**
* Replaces a state handle in the distributed coordination system and discards the old state
Expand All @@ -64,9 +69,13 @@ public interface StateHandleStore<T extends Serializable, R extends ResourceVers
* operation snuck in.
* @param state State to be replace with
* @throws NotExistException if the name does not exist
* @throws PossibleInconsistentStateException if a failure occurred during the update operation
* for which it's unclear whether the operation actually succeeded or not. No state was
* discarded. The method's caller should handle this case properly.
* @throws Exception if persisting state or writing state handle failed
*/
void replace(String name, R resourceVersion, T state) throws Exception;
void replace(String name, R resourceVersion, T state)
throws PossibleInconsistentStateException, Exception;

/**
* Returns resource version of state handle with specific name on the underlying storage.
Expand Down
Loading

0 comments on commit 417cf78

Please sign in to comment.