Skip to content

Commit

Permalink
[FLINK-26093][tests] Adjust SavepointFormatITCase for ChangelogStateB…
Browse files Browse the repository at this point in the history
…ackend
  • Loading branch information
rkhachatryan committed Feb 14, 2022
1 parent 8bec0ad commit 746acb4
Showing 1 changed file with 148 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.configuration.StateChangelogOptions;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.checkpoint.Checkpoints;
import org.apache.flink.runtime.checkpoint.OperatorState;
Expand All @@ -41,6 +42,7 @@
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.SavepointKeyedStateHandle;
import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand All @@ -59,11 +61,13 @@
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.file.Path;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.stream.Stream;

import static java.util.Arrays.asList;
import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
Expand All @@ -78,138 +82,151 @@ public class SavepointFormatITCase {
LoggerAuditingExtension loggerAuditingExtension =
new LoggerAuditingExtension(SavepointFormatITCase.class, Level.INFO);

private static Stream<Arguments> parameters() {
return Stream.of(
Arguments.of(
SavepointFormatType.CANONICAL,
HEAP,
(Consumer<KeyedStateHandle>)
keyedState ->
assertThat(
keyedState,
instanceOf(SavepointKeyedStateHandle.class))),
Arguments.of(
SavepointFormatType.NATIVE,
HEAP,
(Consumer<KeyedStateHandle>)
keyedState ->
assertThat(
keyedState,
instanceOf(KeyGroupsStateHandle.class))),
Arguments.of(
SavepointFormatType.CANONICAL,
ROCKSDB_FULL_SNAPSHOTS,
(Consumer<KeyedStateHandle>)
keyedState ->
assertThat(
keyedState,
instanceOf(SavepointKeyedStateHandle.class))),
Arguments.of(
SavepointFormatType.NATIVE,
ROCKSDB_FULL_SNAPSHOTS,
(Consumer<KeyedStateHandle>)
keyedState ->
assertThat(
keyedState,
instanceOf(KeyGroupsStateHandle.class))),
Arguments.of(
SavepointFormatType.CANONICAL,
ROCKSDB_INCREMENTAL_SNAPSHOTS,
(Consumer<KeyedStateHandle>)
keyedState ->
assertThat(
keyedState,
instanceOf(SavepointKeyedStateHandle.class))),
Arguments.of(
SavepointFormatType.NATIVE,
ROCKSDB_INCREMENTAL_SNAPSHOTS,
(Consumer<KeyedStateHandle>)
keyedState ->
assertThat(
keyedState,
instanceOf(
IncrementalRemoteKeyedStateHandle.class))));
private static List<Arguments> parameters() {
// iterate through all combinations of backends, isIncremental, isChangelogEnabled
List<Arguments> result = new LinkedList<>();
for (BiFunction<Boolean, Boolean, StateBackendConfig> builder :
StateBackendConfig.builders) {
for (boolean incremental : new boolean[] {true, false}) {
for (boolean changelog : new boolean[] {true, false}) {
for (SavepointFormatType formatType : SavepointFormatType.values()) {
result.add(Arguments.of(formatType, builder.apply(incremental, changelog)));
}
}
}
}
return result;
}

private void validateState(
KeyedStateHandle state,
SavepointFormatType formatType,
StateBackendConfig backendConfig) {
if (formatType == SavepointFormatType.CANONICAL) {
assertThat(state, instanceOf(SavepointKeyedStateHandle.class));
} else if (backendConfig.isChangelogEnabled()) {
assertThat(state, instanceOf(ChangelogStateBackendHandle.class));
for (KeyedStateHandle nestedState :
((ChangelogStateBackendHandle) state).getMaterializedStateHandles()) {
validateNativeNonChangelogState(nestedState, backendConfig);
}
} else {
validateNativeNonChangelogState(state, backendConfig);
}
}

private void validateNativeNonChangelogState(
KeyedStateHandle state, StateBackendConfig backendConfig) {
if (backendConfig.isIncremental()) {
assertThat(state, instanceOf(IncrementalRemoteKeyedStateHandle.class));
} else {
assertThat(state, instanceOf(KeyGroupsStateHandle.class));
}
}

private abstract static class StateBackendConfig {
protected final boolean changelogEnabled;
protected final boolean incremental;

protected StateBackendConfig(boolean changelogEnabled, boolean incremental) {
this.changelogEnabled = changelogEnabled;
this.incremental = incremental;
}

public abstract String getName();

public abstract Configuration getConfiguration();
public Configuration getConfiguration() {
Configuration stateBackendConfig = new Configuration();
stateBackendConfig.setString(StateBackendOptions.STATE_BACKEND, getConfigName());
stateBackendConfig.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, incremental);
stateBackendConfig.set(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, changelogEnabled);
return stateBackendConfig;
}

public int getCheckpointsBeforeSavepoint() {
return 0;
}

protected abstract String getConfigName();

@Override
public final String toString() {
return getName();
return String.format(
"%s, incremental: %b, changelog: %b", getName(), incremental, changelogEnabled);
}
}

private static final StateBackendConfig HEAP =
new StateBackendConfig() {
@Override
public String getName() {
return "HEAP";
}

@Override
public Configuration getConfiguration() {
Configuration stateBackendConfig = new Configuration();
stateBackendConfig.setString(StateBackendOptions.STATE_BACKEND, "filesystem");
stateBackendConfig.set(
CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, MemorySize.ZERO);
return stateBackendConfig;
}
};

private static final StateBackendConfig ROCKSDB_FULL_SNAPSHOTS =
new StateBackendConfig() {
@Override
public String getName() {
return "ROCKSDB_FULL_SNAPSHOTS";
}
private static final List<BiFunction<Boolean, Boolean, StateBackendConfig>> builders =
asList(SavepointFormatITCase::getRocksdb, SavepointFormatITCase::heap);

@Override
public Configuration getConfiguration() {
Configuration stateBackendConfig = new Configuration();
stateBackendConfig.setString(StateBackendOptions.STATE_BACKEND, "rocksdb");
stateBackendConfig.set(
CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, MemorySize.ZERO);
stateBackendConfig.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, false);
return stateBackendConfig;
}
};
public abstract boolean isIncremental();

private static final StateBackendConfig ROCKSDB_INCREMENTAL_SNAPSHOTS =
new StateBackendConfig() {
@Override
public String getName() {
return "ROCKSDB_INCREMENTAL_SNAPSHOTS";
}
private boolean isChangelogEnabled() {
return changelogEnabled;
}
}

@Override
public int getCheckpointsBeforeSavepoint() {
return 1;
}
private static StateBackendConfig heap(boolean incremental, boolean changelogEnabled) {
return new StateBackendConfig(changelogEnabled, incremental /* ignored for now */) {
@Override
public String getName() {
return "HEAP";
}

@Override
public Configuration getConfiguration() {
Configuration stateBackendConfig = super.getConfiguration();
stateBackendConfig.set(
CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, MemorySize.ZERO);
return stateBackendConfig;
}

@Override
protected String getConfigName() {
return "filesystem";
}

@Override
public boolean isIncremental() {
return false;
}
};
}

@Override
public Configuration getConfiguration() {
Configuration stateBackendConfig = new Configuration();
stateBackendConfig.setString(StateBackendOptions.STATE_BACKEND, "rocksdb");
stateBackendConfig.set(
CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, MemorySize.ZERO);
stateBackendConfig.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);
return stateBackendConfig;
}
};
private static StateBackendConfig getRocksdb(boolean incremental, boolean changelogEnabled) {
return new StateBackendConfig(changelogEnabled, incremental) {
@Override
public String getName() {
return "ROCKSDB";
}

@Override
public int getCheckpointsBeforeSavepoint() {
return 1;
}

@Override
public boolean isIncremental() {
return this.incremental;
}

@Override
public Configuration getConfiguration() {
Configuration stateBackendConfig = super.getConfiguration();
stateBackendConfig.set(
CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, MemorySize.ZERO);
return stateBackendConfig;
}

protected String getConfigName() {
return "rocksdb";
}
};
}

@ParameterizedTest(name = "[{index}] {0}, {1}")
@MethodSource("parameters")
public void testTriggerSavepointAndResumeWithFileBasedCheckpointsAndRelocateBasePath(
SavepointFormatType formatType,
StateBackendConfig stateBackendConfig,
Consumer<KeyedStateHandle> stateHandleVerification)
SavepointFormatType formatType, StateBackendConfig stateBackendConfig)
throws Exception {
final int numTaskManagers = 2;
final int numSlotsPerTaskManager = 2;
Expand All @@ -231,20 +248,16 @@ public void testTriggerSavepointAndResumeWithFileBasedCheckpointsAndRelocateBase
submitJobAndTakeSavepoint(
miniClusterResource,
formatType,
stateBackendConfig.getCheckpointsBeforeSavepoint());
stateBackendConfig.getCheckpointsBeforeSavepoint(),
config);
final CheckpointMetadata metadata = loadCheckpointMetadata(savepointPath);

final OperatorState operatorState =
metadata.getOperatorStates().stream().filter(hasKeyedState()).findFirst().get();
operatorState
.getStates()
.forEach(
subtaskState -> {
subtaskState
.getManagedKeyedState()
.forEach(stateHandleVerification);
});
relocateAndVerify(miniClusterResource, savepointPath, renamedSavepointDir);
operatorState.getStates().stream()
.flatMap(subtaskState -> subtaskState.getManagedKeyedState().stream())
.forEach(handle -> validateState(handle, formatType, stateBackendConfig));
relocateAndVerify(miniClusterResource, savepointPath, renamedSavepointDir, config);
} finally {
miniClusterResource.after();
}
Expand Down Expand Up @@ -272,14 +285,17 @@ private CheckpointMetadata loadCheckpointMetadata(String savepointPath) throws I
}

private void relocateAndVerify(
MiniClusterWithClientResource cluster, String savepointPath, Path renamedSavepointDir)
MiniClusterWithClientResource cluster,
String savepointPath,
Path renamedSavepointDir,
Configuration config)
throws Exception {
final org.apache.flink.core.fs.Path oldPath =
new org.apache.flink.core.fs.Path(savepointPath);
final org.apache.flink.core.fs.Path newPath =
new org.apache.flink.core.fs.Path(renamedSavepointDir.toUri().toString());
(new org.apache.flink.core.fs.Path(savepointPath).getFileSystem()).rename(oldPath, newPath);
final JobGraph jobGraph = createJobGraph();
final JobGraph jobGraph = createJobGraph(config);
jobGraph.setSavepointRestoreSettings(
SavepointRestoreSettings.forPath(
renamedSavepointDir.toUri().toString(), false, RestoreMode.CLAIM));
Expand All @@ -293,9 +309,10 @@ private void relocateAndVerify(
private String submitJobAndTakeSavepoint(
MiniClusterWithClientResource cluster,
SavepointFormatType formatType,
int checkpointBeforeSavepoint)
int checkpointBeforeSavepoint,
Configuration config)
throws Exception {
final JobGraph jobGraph = createJobGraph();
final JobGraph jobGraph = createJobGraph(config);

final JobID jobId = jobGraph.getJobID();
ClusterClient<?> client = cluster.getClusterClient();
Expand All @@ -311,8 +328,11 @@ private String submitJobAndTakeSavepoint(
.get();
}

private static JobGraph createJobGraph() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
private static JobGraph createJobGraph(Configuration config) {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(
/* pass configuration to prevent any conflicting randomization*/
config);
env.setParallelism(4);
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
env.disableOperatorChaining();
Expand Down

0 comments on commit 746acb4

Please sign in to comment.