Skip to content

Commit

Permalink
[FLINK-10406][tests] Port JobManagerTest#testSavepointRestoreSettings
Browse files Browse the repository at this point in the history
Port JobManagerTest#testSavepointRestoreSettings to JobMasterTest#testRestoringModifiedJobFromSavepoint.
  • Loading branch information
tillrohrmann committed Oct 7, 2018
1 parent 129adef commit 0590935
Showing 1 changed file with 97 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.Checkpoints;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
Expand Down Expand Up @@ -70,6 +72,7 @@
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
Expand All @@ -91,7 +94,9 @@
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.OperatorStreamStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
Expand Down Expand Up @@ -399,6 +404,67 @@ public void testRestoringFromSavepoint() throws Exception {
}
}

/**
* Tests that a JobMaster will only restore a modified JobGraph if non
* restored state is allowed.
*/
@Test
public void testRestoringModifiedJobFromSavepoint() throws Exception {

// create savepoint data
final long savepointId = 42L;
final OperatorID operatorID = new OperatorID();
final File savepointFile = createSavepointWithOperatorState(savepointId, operatorID);

// set savepoint settings which don't allow non restored state
final SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.forPath(
savepointFile.getAbsolutePath(),
false);

// create a new operator
final JobVertex jobVertex = new JobVertex("New operator");
jobVertex.setInvokableClass(NoOpInvokable.class);
final JobGraph jobGraphWithNewOperator = createJobGraphFromJobVerticesWithCheckpointing(savepointRestoreSettings, jobVertex);

final StandaloneCompletedCheckpointStore completedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
final TestingCheckpointRecoveryFactory testingCheckpointRecoveryFactory = new TestingCheckpointRecoveryFactory(completedCheckpointStore, new StandaloneCheckpointIDCounter());
haServices.setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory);

try {
createJobMaster(
configuration,
jobGraphWithNewOperator,
haServices,
new TestingJobManagerSharedServicesBuilder().build());
fail("Should fail because we cannot resume the changed JobGraph from the savepoint.");
} catch (IllegalStateException expected) {
// that was expected :-)
}

// allow for non restored state
jobGraphWithNewOperator.setSavepointRestoreSettings(
SavepointRestoreSettings.forPath(
savepointFile.getAbsolutePath(),
true));

final JobMaster jobMaster = createJobMaster(
configuration,
jobGraphWithNewOperator,
haServices,
new TestingJobManagerSharedServicesBuilder().build());

try {
// starting the JobMaster should have read the savepoint
final CompletedCheckpoint savepointCheckpoint = completedCheckpointStore.getLatestCheckpoint();

assertThat(savepointCheckpoint, Matchers.notNullValue());

assertThat(savepointCheckpoint.getCheckpointID(), is(savepointId));
} finally {
RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
}
}

/**
* Tests that in a streaming use case where checkpointing is enabled, a
* fixed delay with Integer.MAX_VALUE retries is instantiated if no other restart
Expand Down Expand Up @@ -1313,8 +1379,13 @@ private JobGraph producerConsumerJobGraph() {
}

private File createSavepoint(long savepointId) throws IOException {
return createSavepointWithOperatorState(savepointId);
}

private File createSavepointWithOperatorState(long savepointId, OperatorID... operatorIds) throws IOException {
final File savepointFile = temporaryFolder.newFile();
final SavepointV2 savepoint = new SavepointV2(savepointId, Collections.emptyList(), Collections.emptyList());
final Collection<OperatorState> operatorStates = createOperatorState(operatorIds);
final SavepointV2 savepoint = new SavepointV2(savepointId, operatorStates, Collections.emptyList());

try (FileOutputStream fileOutputStream = new FileOutputStream(savepointFile)) {
Checkpoints.storeCheckpointMetadata(savepoint, fileOutputStream);
Expand All @@ -1323,9 +1394,33 @@ private File createSavepoint(long savepointId) throws IOException {
return savepointFile;
}

private Collection<OperatorState> createOperatorState(OperatorID... operatorIds) {
Collection<OperatorState> operatorStates = new ArrayList<>(operatorIds.length);

for (OperatorID operatorId : operatorIds) {
final OperatorState operatorState = new OperatorState(operatorId, 1, 42);
final OperatorSubtaskState subtaskState = new OperatorSubtaskState(
new OperatorStreamStateHandle(
Collections.emptyMap(),
new ByteStreamStateHandle("foobar", new byte[0])),
null,
null,
null);
operatorState.putState(0, subtaskState);
operatorStates.add(operatorState);
}

return operatorStates;
}

@Nonnull
private JobGraph createJobGraphWithCheckpointing(SavepointRestoreSettings savepointRestoreSettings) {
final JobGraph jobGraph = new JobGraph();
return createJobGraphFromJobVerticesWithCheckpointing(savepointRestoreSettings);
}

@Nonnull
private JobGraph createJobGraphFromJobVerticesWithCheckpointing(SavepointRestoreSettings savepointRestoreSettings, JobVertex... jobVertices) {
final JobGraph jobGraph = new JobGraph(jobVertices);

// enable checkpointing which is required to resume from a savepoint
final CheckpointCoordinatorConfiguration checkpoinCoordinatorConfiguration = new CheckpointCoordinatorConfiguration(
Expand Down

0 comments on commit 0590935

Please sign in to comment.