Skip to content

Commit

Permalink
[FLINK-16192][checkpointing] Remove remaining bits of "legacy state" …
Browse files Browse the repository at this point in the history
…and Savepoint 1.2 compatibility

"Legacy State" Refers to the state originally created by the old "Checkpointed" interface, before state was re-scalable.
Because some of the tests for 1.3 compatibility used checkpoints with that legacy state, some 1.3 migration
tests had to be removed as well.
  • Loading branch information
StephanEwen committed Feb 25, 2020
1 parent f575a82 commit b8277f9
Show file tree
Hide file tree
Showing 39 changed files with 13 additions and 774 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ public static void verifyOS() {
@Parameterized.Parameters(name = "Migration Savepoint / Bucket Files Prefix: {0}")
public static Collection<Tuple2<MigrationVersion, String>> parameters () {
return Arrays.asList(
Tuple2.of(MigrationVersion.v1_2, "/var/folders/v_/ry2wp5fx0y7c1rvr41xy9_700000gn/T/junit9160378385359106772/junit479663758539998903/1970-01-01--01/part-0-"),
Tuple2.of(MigrationVersion.v1_3, "/var/folders/tv/b_1d8fvx23dgk1_xs8db_95h0000gn/T/junit4273542175898623023/junit3801102997056424640/1970-01-01--01/part-0-"),
Tuple2.of(MigrationVersion.v1_4, "/var/folders/tv/b_1d8fvx23dgk1_xs8db_95h0000gn/T/junit3198043255809479705/junit8947526563966405708/1970-01-01--01/part-0-"),
Tuple2.of(MigrationVersion.v1_5, "/tmp/junit4927100426019463155/junit2465610012100182280/1970-01-01--00/part-0-"),
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ public class FlinkKafkaConsumerBaseMigrationTest {
@Parameterized.Parameters(name = "Migration Savepoint: {0}")
public static Collection<MigrationVersion> parameters () {
return Arrays.asList(
MigrationVersion.v1_2,
MigrationVersion.v1_3,
MigrationVersion.v1_4,
MigrationVersion.v1_5,
Expand Down Expand Up @@ -260,7 +259,7 @@ public void testRestoreFromEmptyStateWithPartitions() throws Exception {

testHarness.open();

// the expected state in "kafka-consumer-migration-test-flink1.2-snapshot-empty-state";
// the expected state in "kafka-consumer-migration-test-flink1.x-snapshot-empty-state";
// all new partitions after the snapshot are considered as partitions that were created while the
// consumer wasn't running, and should start from the earliest offset.
final HashMap<KafkaTopicPartition, Long> expectedSubscribedPartitionsWithStartOffsets = new HashMap<>();
Expand Down Expand Up @@ -332,7 +331,7 @@ public void testRestore() throws Exception {
*/
@Test
public void testRestoreFailsWithNonEmptyPreFlink13StatesIfDiscoveryEnabled() throws Exception {
assumeTrue(testMigrateVersion == MigrationVersion.v1_3 || testMigrateVersion == MigrationVersion.v1_2);
assumeTrue(testMigrateVersion == MigrationVersion.v1_3);

final List<KafkaTopicPartition> partitions = new ArrayList<>(PARTITION_STATE.keySet());

Expand Down
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ public enum MigrationVersion {

// NOTE: the version strings must not change,
// as they are used to locate snapshot file paths
v1_2("1.2"),
v1_3("1.3"),
v1_4("1.4"),
v1_5("1.5"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ public class ContinuousFileProcessingMigrationTest {
@Parameterized.Parameters(name = "Migration Savepoint / Mod Time: {0}")
public static Collection<Tuple2<MigrationVersion, Long>> parameters () {
return Arrays.asList(
Tuple2.of(MigrationVersion.v1_2, 1493116191000L),
Tuple2.of(MigrationVersion.v1_3, 1496532000000L),
Tuple2.of(MigrationVersion.v1_4, 1516897628000L),
Tuple2.of(MigrationVersion.v1_5, 1533639934000L),
Expand Down
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ public static Savepoint loadCheckpointMetadata(DataInputStream in, ClassLoader c
}
}

@SuppressWarnings("deprecation")
public static CompletedCheckpoint loadAndValidateCheckpoint(
JobID jobId,
Map<JobVertexID, ExecutionJobVertex> tasks,
Expand All @@ -130,16 +129,12 @@ public static CompletedCheckpoint loadAndValidateCheckpoint(
final String checkpointPointer = location.getExternalPointer();

// (1) load the savepoint
final Savepoint rawCheckpointMetadata;
final Savepoint checkpointMetadata;
try (InputStream in = metadataHandle.openInputStream()) {
DataInputStream dis = new DataInputStream(in);
rawCheckpointMetadata = loadCheckpointMetadata(dis, classLoader);
checkpointMetadata = loadCheckpointMetadata(dis, classLoader);
}

final Savepoint checkpointMetadata = rawCheckpointMetadata.getTaskStates() == null ?
rawCheckpointMetadata :
SavepointV2.convertToOperatorStateSavepointV2(tasks, rawCheckpointMetadata);

// generate mapping from operator to task
Map<OperatorID, ExecutionJobVertex> operatorToJobVertexMapping = new HashMap<>();
for (ExecutionJobVertex task : tasks.values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.MasterState;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.TaskState;
import org.apache.flink.util.Disposable;

import java.util.Collection;
Expand Down Expand Up @@ -50,17 +49,6 @@ public interface Savepoint extends Disposable, Versioned {
*/
long getCheckpointId();

/**
* Returns the snapshotted task states.
*
* <p>These are used to restore the snapshot state.
*
* @deprecated Only kept for backwards-compatibility with versionS < 1.3. Will be removed in the future.
* @return Snapshotted task states
*/
@Deprecated
Collection<TaskState> getTaskStates();

/**
* Gets the checkpointed states generated by the master.
*/
Expand All @@ -74,5 +62,4 @@ public interface Savepoint extends Disposable, Versioned {
* @return Snapshotted operator states
*/
Collection<OperatorState> getOperatorStates();

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.flink.runtime.checkpoint.savepoint;

import org.apache.flink.annotation.VisibleForTesting;

import java.util.HashMap;
import java.util.Map;

Expand All @@ -30,9 +28,6 @@
*/
public class SavepointSerializers {

/** If this flag is true, restoring a savepoint fails if it contains legacy state (<= Flink 1.1 format). */
static boolean failWhenLegacyStateDetected = true;

private static final Map<Integer, SavepointSerializer> SERIALIZERS = new HashMap<>(2);

static {
Expand Down Expand Up @@ -61,13 +56,4 @@ public static SavepointSerializer getSerializer(int version) {
throw new IllegalArgumentException("Unrecognized checkpoint version number: " + version);
}
}

/**
* This is only visible as a temporary solution to keep the stateful job migration it cases working from binary
* savepoints that still contain legacy state (<= Flink 1.1).
*/
@VisibleForTesting
public static void setFailWhenLegacyStateDetected(boolean fail) {
failWhenLegacyStateDetected = fail;
}
}
Loading

0 comments on commit b8277f9

Please sign in to comment.