Skip to content

Commit

Permalink
[FLINK-8531] [checkpoints] (part 4) rename forCheckpoint() to forChec…
Browse files Browse the repository at this point in the history
…kpointWithDefaultLocation()
  • Loading branch information
StephanEwen committed Feb 1, 2018
1 parent 9903c8c commit 5cc5093
Show file tree
Hide file tree
Showing 35 changed files with 128 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ public void declineCheckpoint(
}
}

task.triggerCheckpoint(new CheckpointMetaData(42, 17), CheckpointOptions.forCheckpoint());
task.triggerCheckpoint(new CheckpointMetaData(42, 17), CheckpointOptions.forCheckpointWithDefaultLocation());

testHarness.processElement(new StreamRecord<>("Wohoo", 0));

Expand Down Expand Up @@ -317,7 +317,7 @@ public MemCheckpointStreamFactory.MemoryCheckpointOutputStream createCheckpointS

task.triggerCheckpoint(
new CheckpointMetaData(42, 17),
CheckpointOptions.forCheckpoint());
CheckpointOptions.forCheckpointWithDefaultLocation());

testHarness.processElement(new StreamRecord<>("Wohoo", 0));
blockerCheckpointStreamFactory.getWaiterLatch().await();
Expand Down Expand Up @@ -388,7 +388,7 @@ public void testCleanupOfSnapshotsInFailureCase() throws Exception {
new ValueStateDescriptor<>("foobar", String.class));

RunnableFuture<KeyedStateHandle> snapshotFuture = keyedStateBackend.snapshot(
checkpointId, timestamp, checkpointStreamFactory, CheckpointOptions.forCheckpoint());
checkpointId, timestamp, checkpointStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());

try {
FutureUtil.runIfNotDoneAndGet(snapshotFuture);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ public void testReleasingSnapshotAfterBackendClosed() throws Exception {

try {
RunnableFuture<KeyedStateHandle> snapshot =
keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forCheckpoint());
keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());

RocksDB spyDB = keyedStateBackend.db;

Expand Down Expand Up @@ -287,7 +287,7 @@ public void testDismissingSnapshot() throws Exception {
setupRocksKeyedStateBackend();
try {
RunnableFuture<KeyedStateHandle> snapshot =
keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forCheckpoint());
keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
snapshot.cancel(true);
verifyRocksObjectsReleased();
} finally {
Expand All @@ -301,7 +301,7 @@ public void testDismissingSnapshotNotRunnable() throws Exception {
setupRocksKeyedStateBackend();
try {
RunnableFuture<KeyedStateHandle> snapshot =
keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forCheckpoint());
keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
snapshot.cancel(true);
Thread asyncSnapshotThread = new Thread(snapshot);
asyncSnapshotThread.start();
Expand All @@ -324,7 +324,7 @@ public void testCompletingSnapshot() throws Exception {
setupRocksKeyedStateBackend();
try {
RunnableFuture<KeyedStateHandle> snapshot =
keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forCheckpoint());
keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
Thread asyncSnapshotThread = new Thread(snapshot);
asyncSnapshotThread.start();
waiter.await(); // wait for snapshot to run
Expand All @@ -349,7 +349,7 @@ public void testCompletingSnapshot() throws Exception {
public void testCancelRunningSnapshot() throws Exception {
setupRocksKeyedStateBackend();
try {
RunnableFuture<KeyedStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forCheckpoint());
RunnableFuture<KeyedStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
Thread asyncSnapshotThread = new Thread(snapshot);
asyncSnapshotThread.start();
waiter.await(); // wait for snapshot to run
Expand Down Expand Up @@ -429,7 +429,7 @@ public void testSharedIncrementalStateDeRegistration() throws Exception {
checkpointId,
checkpointId,
createStreamFactory(),
CheckpointOptions.forCheckpoint());
CheckpointOptions.forCheckpointWithDefaultLocation());

snapshot.run();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ else if (!props.forceCheckpoint()) {

CheckpointOptions checkpointOptions;
if (!props.isSavepoint()) {
checkpointOptions = CheckpointOptions.forCheckpoint();
checkpointOptions = CheckpointOptions.forCheckpointWithDefaultLocation();
} else {
checkpointOptions = CheckpointOptions.forSavepoint(checkpointStorageLocation.getLocationAsPointer());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public String toString() {

private static final CheckpointOptions CHECKPOINT = new CheckpointOptions(CheckpointType.CHECKPOINT, null);

public static CheckpointOptions forCheckpoint() {
public static CheckpointOptions forCheckpointWithDefaultLocation() {
return CHECKPOINT;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ else if (type == CHECKPOINT_BARRIER_EVENT) {
CheckpointType checkpointType = CheckpointType.values()[checkpointTypeOrdinal];

if (checkpointType == CheckpointType.CHECKPOINT) {
checkpointOptions = CheckpointOptions.forCheckpoint();
checkpointOptions = CheckpointOptions.forCheckpointWithDefaultLocation();
} else if (checkpointType == CheckpointType.SAVEPOINT) {
int len = buffer.getInt();
byte[] bytes = new byte[len];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,8 +338,8 @@ public void testTriggerAndDeclineCheckpointSimple() {
assertFalse(checkpoint.isFullyAcknowledged());

// check that the vertices received the trigger checkpoint message
verify(vertex1.getCurrentExecutionAttempt()).triggerCheckpoint(checkpointId, timestamp, CheckpointOptions.forCheckpoint());
verify(vertex2.getCurrentExecutionAttempt()).triggerCheckpoint(checkpointId, timestamp, CheckpointOptions.forCheckpoint());
verify(vertex1.getCurrentExecutionAttempt()).triggerCheckpoint(checkpointId, timestamp, CheckpointOptions.forCheckpointWithDefaultLocation());
verify(vertex2.getCurrentExecutionAttempt()).triggerCheckpoint(checkpointId, timestamp, CheckpointOptions.forCheckpointWithDefaultLocation());

// acknowledge from one of the tasks
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class CheckpointOptionsTest {

@Test
public void testFullCheckpoint() throws Exception {
CheckpointOptions options = CheckpointOptions.forCheckpoint();
CheckpointOptions options = CheckpointOptions.forCheckpointWithDefaultLocation();
assertEquals(CheckpointType.CHECKPOINT, options.getCheckpointType());
assertNull(options.getTargetLocation());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void testSerialization() throws Exception {
long id = Integer.MAX_VALUE + 123123L;
long timestamp = Integer.MAX_VALUE + 1228L;

CheckpointOptions options = CheckpointOptions.forCheckpoint();
CheckpointOptions options = CheckpointOptions.forCheckpointWithDefaultLocation();
CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, options);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void testCheckpointBarrierSerialization() throws Exception {
long id = Integer.MAX_VALUE + 123123L;
long timestamp = Integer.MAX_VALUE + 1228L;

CheckpointOptions checkpoint = CheckpointOptions.forCheckpoint();
CheckpointOptions checkpoint = CheckpointOptions.forCheckpointWithDefaultLocation();
testCheckpointBarrierSerialization(id, timestamp, checkpoint);

CheckpointOptions savepoint = CheckpointOptions.forSavepoint("1289031838919123");
Expand All @@ -68,7 +68,7 @@ public void testSerializeDeserializeEvent() throws Exception {
AbstractEvent[] events = {
EndOfPartitionEvent.INSTANCE,
EndOfSuperstepEvent.INSTANCE,
new CheckpointBarrier(1678L, 4623784L, CheckpointOptions.forCheckpoint()),
new CheckpointBarrier(1678L, 4623784L, CheckpointOptions.forCheckpointWithDefaultLocation()),
new TestTaskEvent(Math.random(), 12361231273L),
new CancelCheckpointMarker(287087987329842L)
};
Expand Down Expand Up @@ -119,7 +119,7 @@ public void testIsEvent() throws Exception {
AbstractEvent[] events = {
EndOfPartitionEvent.INSTANCE,
EndOfSuperstepEvent.INSTANCE,
new CheckpointBarrier(1678L, 4623784L, CheckpointOptions.forCheckpoint()),
new CheckpointBarrier(1678L, 4623784L, CheckpointOptions.forCheckpointWithDefaultLocation()),
new TestTaskEvent(Math.random(), 12361231273L),
new CancelCheckpointMarker(287087987329842L)
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ public void testBroadcastEventNoRecords() throws Exception {

ResultPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider);
RecordWriter<ByteArrayIO> writer = new RecordWriter<>(partitionWriter, new RoundRobin<ByteArrayIO>());
CheckpointBarrier barrier = new CheckpointBarrier(Integer.MAX_VALUE + 919192L, Integer.MAX_VALUE + 18828228L, CheckpointOptions.forCheckpoint());
CheckpointBarrier barrier = new CheckpointBarrier(Integer.MAX_VALUE + 919192L, Integer.MAX_VALUE + 18828228L, CheckpointOptions.forCheckpointWithDefaultLocation());

// No records emitted yet, broadcast should not request a buffer
writer.broadcastEvent(barrier);
Expand Down Expand Up @@ -380,7 +380,7 @@ public void testBroadcastEventMixedRecords() throws Exception {

ResultPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider);
RecordWriter<ByteArrayIO> writer = new RecordWriter<>(partitionWriter, new RoundRobin<ByteArrayIO>());
CheckpointBarrier barrier = new CheckpointBarrier(Integer.MAX_VALUE + 1292L, Integer.MAX_VALUE + 199L, CheckpointOptions.forCheckpoint());
CheckpointBarrier barrier = new CheckpointBarrier(Integer.MAX_VALUE + 1292L, Integer.MAX_VALUE + 199L, CheckpointOptions.forCheckpointWithDefaultLocation());

// Emit records on some channels first (requesting buffers), then
// broadcast the event. The record buffers should be emitted first, then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void testTriggerAndConfirmCheckpoint() {
NotifyCheckpointComplete cc = new NotifyCheckpointComplete(new JobID(), new ExecutionAttemptID(), 45287698767345L, 467L);
testSerializabilityEqualsHashCode(cc);

TriggerCheckpoint tc = new TriggerCheckpoint(new JobID(), new ExecutionAttemptID(), 347652734L, 7576752L, CheckpointOptions.forCheckpoint());
TriggerCheckpoint tc = new TriggerCheckpoint(new JobID(), new ExecutionAttemptID(), 347652734L, 7576752L, CheckpointOptions.forCheckpointWithDefaultLocation());
testSerializabilityEqualsHashCode(tc);

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ public void testOperatorStateRestoreFailsIfSerializerDeserializationFails() thro

CheckpointStreamFactory streamFactory = abstractStateBackend.createStreamFactory(new JobID(), "testOperator");
RunnableFuture<OperatorStateHandle> runnableFuture =
operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forCheckpoint());
operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
OperatorStateHandle stateHandle = FutureUtil.runIfNotDoneAndGet(runnableFuture);

try {
Expand Down Expand Up @@ -273,7 +273,7 @@ public void testKeyedStateRestoreFailsIfSerializerDeserializationFails() throws
682375462378L,
2,
streamFactory,
CheckpointOptions.forCheckpoint()));
CheckpointOptions.forCheckpointWithDefaultLocation()));

backend.dispose();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ public void testCorrectClassLoaderUsedOnSnapshot() throws Exception {

CheckpointStreamFactory streamFactory = abstractStateBackend.createStreamFactory(new JobID(), "testOperator");
RunnableFuture<OperatorStateHandle> runnableFuture =
operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forCheckpoint());
operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
FutureUtil.runIfNotDoneAndGet(runnableFuture);

// make sure that the copy method has been called
Expand Down Expand Up @@ -355,7 +355,7 @@ public void testSnapshotEmpty() throws Exception {
abstractStateBackend.createStreamFactory(new JobID(), "testOperator");

RunnableFuture<OperatorStateHandle> snapshot =
operatorStateBackend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forCheckpoint());
operatorStateBackend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());

OperatorStateHandle stateHandle = FutureUtil.runIfNotDoneAndGet(snapshot);
assertNull(stateHandle);
Expand Down Expand Up @@ -387,7 +387,7 @@ public void testSnapshotRestoreSync() throws Exception {

CheckpointStreamFactory streamFactory = abstractStateBackend.createStreamFactory(new JobID(), "testOperator");
RunnableFuture<OperatorStateHandle> runnableFuture =
operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forCheckpoint());
operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
OperatorStateHandle stateHandle = FutureUtil.runIfNotDoneAndGet(runnableFuture);

try {
Expand Down Expand Up @@ -470,7 +470,7 @@ public void testSnapshotRestoreAsync() throws Exception {
streamFactory.setBlockerLatch(blockerLatch);

RunnableFuture<OperatorStateHandle> runnableFuture =
operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forCheckpoint());
operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());

ExecutorService executorService = Executors.newFixedThreadPool(1);

Expand Down Expand Up @@ -572,7 +572,7 @@ public void testSnapshotAsyncClose() throws Exception {
streamFactory.setBlockerLatch(blockerLatch);

RunnableFuture<OperatorStateHandle> runnableFuture =
operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forCheckpoint());
operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());

ExecutorService executorService = Executors.newFixedThreadPool(1);

Expand Down Expand Up @@ -616,7 +616,7 @@ public void testSnapshotAsyncCancel() throws Exception {
streamFactory.setBlockerLatch(blockerLatch);

RunnableFuture<OperatorStateHandle> runnableFuture =
operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forCheckpoint());
operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());

ExecutorService executorService = Executors.newFixedThreadPool(1);

Expand Down Expand Up @@ -667,7 +667,7 @@ public void testRestoreFailsIfSerializerDeserializationFails() throws Exception

CheckpointStreamFactory streamFactory = abstractStateBackend.createStreamFactory(new JobID(), "testOperator");
RunnableFuture<OperatorStateHandle> runnableFuture =
operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forCheckpoint());
operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
OperatorStateHandle stateHandle = FutureUtil.runIfNotDoneAndGet(runnableFuture);

try {
Expand Down
Loading

0 comments on commit 5cc5093

Please sign in to comment.