Skip to content

Commit

Permalink
[FLINK-8531] [checkpoints] (part 9) Introduce EXCLUSIVE and SHARED sc…
Browse files Browse the repository at this point in the history
…ope for states
  • Loading branch information
StephanEwen committed Feb 1, 2018
1 parent e0b0f45 commit 4e481a7
Show file tree
Hide file tree
Showing 16 changed files with 164 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
Expand Down Expand Up @@ -536,7 +537,7 @@ public void takeDBSnapShot(long checkpointId, long checkpointTimeStamp) {
*/
public void openCheckpointStream() throws Exception {
Preconditions.checkArgument(outStream == null, "Output stream for snapshot is already set.");
outStream = checkpointStreamFactory.createCheckpointStateOutputStream(checkpointId, checkpointTimeStamp);
outStream = checkpointStreamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
snapshotCloseableRegistry.registerCloseable(outStream);
outputView = new DataOutputViewStreamWrapper(outStream);
}
Expand Down Expand Up @@ -811,7 +812,7 @@ private StreamStateHandle materializeStateData(Path filePath) throws Exception {
closeableRegistry.registerCloseable(inputStream);

outputStream = checkpointStreamFactory
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
.createCheckpointStateOutputStream(CheckpointedStateScope.SHARED);
closeableRegistry.registerCloseable(outputStream);

while (true) {
Expand Down Expand Up @@ -847,7 +848,7 @@ private StreamStateHandle materializeMetaData() throws Exception {

try {
outputStream = checkpointStreamFactory
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
closeableRegistry.registerCloseable(outputStream);

//no need for compression scheme support because sst-files are already compressed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.StateBackend;
Expand Down Expand Up @@ -255,17 +256,14 @@ public void testCancelFullyAsyncCheckpoints() throws Exception {
int count = 1;

@Override
public MemCheckpointStreamFactory.MemoryCheckpointOutputStream createCheckpointStateOutputStream(
long checkpointID,
long timestamp) throws Exception {

public CheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws Exception {
// we skip the first created stream, because it is used to checkpoint the timer service, which is
// currently not asynchronous.
if (count > 0) {
--count;
return new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(maxSize);
} else {
return super.createCheckpointStateOutputStream(checkpointID, timestamp);
return super.createCheckpointStateOutputStream(scope);
}
}
};
Expand Down Expand Up @@ -462,7 +460,7 @@ private static class StaticForwardFactory implements StreamFactory {

@Override
public CheckpointStateOutputStream get() throws Exception {
return factory.createCheckpointStateOutputStream(1L, 1L);
return factory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,12 @@ public interface CheckpointStreamFactory {
* Creates an new {@link CheckpointStateOutputStream}. When the stream
* is closed, it returns a state handle that can retrieve the state back.
*
* @param checkpointID The ID of the checkpoint.
* @param timestamp The timestamp of the checkpoint.
*
* @param scope The state's scope, whether it is exclusive or shared.
* @return An output stream that writes state for the given checkpoint.
*
* @throws Exception Exceptions may occur while creating the stream and should be forwarded.
*/
CheckpointStateOutputStream createCheckpointStateOutputStream(
long checkpointID,
long timestamp) throws Exception;
CheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws Exception;

/**
* A dedicated output stream that produces a {@link StreamStateHandle} when closed.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.state;

/**
* The scope for a chunk of checkpointed state. Defines whether state is owned by one
* checkpoint, or whether it is shared by multiple checkpoints.
*
* <p>Different checkpoint storage implementations may treat checkpointed state of different
* scopes differently, for example put it into different folders or tables.
*/
public enum CheckpointedStateScope {

/**
* Exclusive state belongs exclusively to one specific checkpoint / savepoint.
*/
EXCLUSIVE,

/**
* Shared state may belong to more than one checkpoint.
*
* <p>Shared state is typically used for incremental or differential checkpointing
* methods, where only deltas are written, and state from prior checkpoints is
* referenced in newer checkpoints as well.
*/
SHARED
}
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ protected void stopOperation() throws Exception {
}

private void openOutStream() throws Exception {
out = streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp);
out = streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
closeStreamOnCancelRegistry.registerCloseable(out);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public long getCheckpointTimestamp() {

private CheckpointStreamFactory.CheckpointStateOutputStream openAndRegisterNewStream() throws Exception {
CheckpointStreamFactory.CheckpointStateOutputStream cout =
streamFactory.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);

closableRegistry.registerCloseable(cout);
return cout;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ public FsCheckpointStorage(
fileSystem.mkdirs(taskOwnedStateDirectory);
}

// ------------------------------------------------------------------------

public Path getCheckpointsDirectory() {
return checkpointsDirectory;
}

// ------------------------------------------------------------------------
// CheckpointStorage implementation
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public FsCheckpointStorageLocation(
CheckpointStorageLocationReference reference,
int fileStateSizeThreshold) {

super(fileSystem, checkpointDir, fileStateSizeThreshold);
super(fileSystem, checkpointDir, sharedStateDir, fileStateSizeThreshold);

checkArgument(fileStateSizeThreshold >= 0);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.util.FileUtils;
Expand Down Expand Up @@ -70,9 +71,12 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
/** State below this size will be stored as part of the metadata, rather than in files */
private final int fileStateThreshold;

/** The directory (job specific) into this initialized instance of the backend stores its data */
/** The directory for checkpoint exclusive state data. */
private final Path checkpointDirectory;

/** The directory for shared checkpoint data. */
private final Path sharedStateDirectory;

/** Cached handle to the file system for file operations. */
private final FileSystem filesystem;

Expand All @@ -84,14 +88,15 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
* JavaDocs for an explanation why this factory must not try and create the checkpoints.
*
* @param fileSystem The filesystem to write to.
* @param checkpointDirectory The URI describing the filesystem (scheme and optionally authority),
* and the path to the checkpoint data directory.
* @param checkpointDirectory The directory for checkpoint exclusive state data.
* @param sharedStateDirectory The directory for shared checkpoint data.
* @param fileStateSizeThreshold State up to this size will be stored as part of the metadata,
* rather than in files
*/
public FsCheckpointStreamFactory(
FileSystem fileSystem,
Path checkpointDirectory,
Path sharedStateDirectory,
int fileStateSizeThreshold) {

if (fileStateSizeThreshold < 0) {
Expand All @@ -104,15 +109,18 @@ public FsCheckpointStreamFactory(

this.filesystem = checkNotNull(fileSystem);
this.checkpointDirectory = checkNotNull(checkpointDirectory);
this.sharedStateDirectory = checkNotNull(sharedStateDirectory);
this.fileStateThreshold = fileStateSizeThreshold;
}

// ------------------------------------------------------------------------

@Override
public FsCheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception {
public FsCheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws Exception {
Path target = scope == CheckpointedStateScope.EXCLUSIVE ? checkpointDirectory : sharedStateDirectory;
int bufferSize = Math.max(DEFAULT_WRITE_BUFFER_SIZE, fileStateThreshold);
return new FsCheckpointStateOutputStream(checkpointDirectory, filesystem, bufferSize, fileStateThreshold);

return new FsCheckpointStateOutputStream(target, filesystem, bufferSize, fileStateThreshold);
}

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.HashMapSerializer;
import org.apache.flink.runtime.state.KeyGroupRange;
Expand Down Expand Up @@ -332,7 +333,7 @@ public RunnableFuture<KeyedStateHandle> snapshot(

@Override
protected void acquireResources() throws Exception {
stream = streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp);
stream = streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
cancelStreamRegistry.registerCloseable(stream);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.StreamStateHandle;

import java.io.IOException;
Expand All @@ -46,8 +47,7 @@ public MemCheckpointStreamFactory(int maxStateSize) {

@Override
public CheckpointStateOutputStream createCheckpointStateOutputStream(
long checkpointID, long timestamp) throws Exception
{
CheckpointedStateScope scope) throws Exception {
return new MemoryCheckpointOutputStream(maxStateSize);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,27 @@
package org.apache.flink.runtime.state.filesystem;

import org.apache.flink.api.common.JobID;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream;

import org.junit.Test;

import java.io.File;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Arrays;
import java.util.List;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;

/**
Expand Down Expand Up @@ -129,6 +136,74 @@ public void testTaskOwnedStateStream() throws Exception {
}
}

@Test
public void testDirectoriesForExclusiveAndSharedState() throws Exception {
final FileSystem fs = LocalFileSystem.getSharedInstance();
final Path checkpointDir = randomTempPath();
final Path sharedStateDir = randomTempPath();

FsCheckpointStorageLocation storageLocation = new FsCheckpointStorageLocation(
fs,
checkpointDir,
sharedStateDir,
randomTempPath(),
CheckpointStorageLocationReference.getDefault(),
FILE_SIZE_THRESHOLD);

assertNotEquals(storageLocation.getCheckpointDirectory(), storageLocation.getSharedStateDirectory());

assertEquals(0, fs.listStatus(storageLocation.getCheckpointDirectory()).length);
assertEquals(0, fs.listStatus(storageLocation.getSharedStateDirectory()).length);

// create exclusive state

CheckpointStateOutputStream exclusiveStream =
storageLocation.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);

exclusiveStream.write(42);
exclusiveStream.flush();
StreamStateHandle exclusiveHandle = exclusiveStream.closeAndGetHandle();

assertEquals(1, fs.listStatus(storageLocation.getCheckpointDirectory()).length);
assertEquals(0, fs.listStatus(storageLocation.getSharedStateDirectory()).length);

// create shared state

CheckpointStateOutputStream sharedStream =
storageLocation.createCheckpointStateOutputStream(CheckpointedStateScope.SHARED);

sharedStream.write(42);
sharedStream.flush();
StreamStateHandle sharedHandle = sharedStream.closeAndGetHandle();

assertEquals(1, fs.listStatus(storageLocation.getCheckpointDirectory()).length);
assertEquals(1, fs.listStatus(storageLocation.getSharedStateDirectory()).length);

// drop state

exclusiveHandle.discardState();
sharedHandle.discardState();
}

/**
* This test checks that no mkdirs is called by the checkpoint storage location when resolved.
*/
@Test
public void testStorageLocationDoesNotMkdirs() throws Exception {
FsCheckpointStorage storage = new FsCheckpointStorage(
randomTempPath(), null, new JobID(), FILE_SIZE_THRESHOLD);

File baseDir = new File(storage.getCheckpointsDirectory().getPath());
assertTrue(baseDir.exists());

FsCheckpointStorageLocation location = (FsCheckpointStorageLocation)
storage.resolveCheckpointStorageLocation(177, CheckpointStorageLocationReference.getDefault());

Path checkpointPath = location.getCheckpointDirectory();
File checkpointDir = new File(checkpointPath.getPath());
assertFalse(checkpointDir.exists());
}

// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.util.function.SupplierWithException;
Expand Down Expand Up @@ -118,7 +119,7 @@ private static final class TestFactory implements CheckpointStreamFactory, java.
}

@Override
public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception {
public CheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws Exception {
return streamFactory.get();
}
}
Expand Down
Loading

0 comments on commit 4e481a7

Please sign in to comment.