Skip to content

Commit

Permalink
[FLINK-8531] [checkpoints] (part 5) Introduce CheckpointStorageLocati…
Browse files Browse the repository at this point in the history
…onReference instead of String to communicate the location
  • Loading branch information
StephanEwen committed Feb 1, 2018
1 parent 5cc5093 commit bb19e7f
Show file tree
Hide file tree
Showing 15 changed files with 485 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public class CheckpointCoordinator {

/** The executor used for asynchronous calls, like potentially blocking I/O */
private final Executor executor;

/** Tasks who need to be sent a message when a checkpoint is started */
private final ExecutionVertex[] tasksToTrigger;

Expand Down Expand Up @@ -602,12 +602,9 @@ else if (!props.forceCheckpoint()) {
}
// end of lock scope

CheckpointOptions checkpointOptions;
if (!props.isSavepoint()) {
checkpointOptions = CheckpointOptions.forCheckpointWithDefaultLocation();
} else {
checkpointOptions = CheckpointOptions.forSavepoint(checkpointStorageLocation.getLocationAsPointer());
}
final CheckpointOptions checkpointOptions = new CheckpointOptions(
props.getCheckpointType(),
checkpointStorageLocation.getLocationReference());

// send the messages to the tasks that trigger their checkpoint
for (Execution execution: executions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@
package org.apache.flink.runtime.checkpoint;

import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;

import static org.apache.flink.util.Preconditions.checkNotNull;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;

import java.io.Serializable;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* Options for performing the checkpoint.
Expand All @@ -38,59 +37,70 @@ public class CheckpointOptions implements Serializable {
private static final long serialVersionUID = 5010126558083292915L;

/** Type of the checkpoint. */
@Nonnull
private final CheckpointType checkpointType;

/** Target location for the checkpoint. */
@Nullable
private final String targetLocation;
private final CheckpointStorageLocationReference targetLocation;

public CheckpointOptions(
CheckpointType checkpointType,
CheckpointStorageLocationReference targetLocation) {

private CheckpointOptions(
@Nonnull CheckpointType checkpointType,
@Nullable String targetLocation) {
this.checkpointType = checkNotNull(checkpointType);
this.targetLocation = targetLocation;
this.targetLocation = checkNotNull(targetLocation);
}

// ------------------------------------------------------------------------

/**
* Returns the type of checkpoint to perform.
*
* @return Type of checkpoint to perform.
*/
@Nonnull
public CheckpointType getCheckpointType() {
return checkpointType;
}

/**
* Returns a custom target location or <code>null</code> if none
* was specified.
*
* @return A custom target location or <code>null</code>.
* Returns the target location for the checkpoint.
*/
@Nullable
public String getTargetLocation() {
public CheckpointStorageLocationReference getTargetLocation() {
return targetLocation;
}

@Override
public String toString() {
return "CheckpointOptions(" + checkpointType + ")";
}

// ------------------------------------------------------------------------

private static final CheckpointOptions CHECKPOINT = new CheckpointOptions(CheckpointType.CHECKPOINT, null);
@Override
public int hashCode() {
return 31 * targetLocation.hashCode() + checkpointType.hashCode();
}

public static CheckpointOptions forCheckpointWithDefaultLocation() {
return CHECKPOINT;
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
else if (obj != null && obj.getClass() == CheckpointOptions.class) {
final CheckpointOptions that = (CheckpointOptions) obj;
return this.checkpointType == that.checkpointType &&
this.targetLocation.equals(that.targetLocation);
}
else {
return false;
}
}

public static CheckpointOptions forSavepoint(String targetDirectory) {
checkNotNull(targetDirectory, "targetDirectory");
return new CheckpointOptions(CheckpointType.SAVEPOINT, targetDirectory);
@Override
public String toString() {
return "CheckpointOptions: " + checkpointType + " @ " + targetLocation;
}

// ------------------------------------------------------------------------
// Factory methods
// ------------------------------------------------------------------------

private static final CheckpointOptions CHECKPOINT_AT_DEFAULT_LOCATION =
new CheckpointOptions(CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault());

public static CheckpointOptions forCheckpointWithDefaultLocation() {
return CHECKPOINT_AT_DEFAULT_LOCATION;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@

package org.apache.flink.runtime.io.network.api.serialization;

import java.nio.charset.Charset;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
Expand All @@ -31,21 +32,21 @@
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.util.InstantiationUtil;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import org.apache.flink.util.Preconditions;

/**
* Utility class to serialize and deserialize task events.
*/
public class EventSerializer {

private static final Charset STRING_CODING_CHARSET = Charset.forName("UTF-8");
// ------------------------------------------------------------------------
// Constants
// ------------------------------------------------------------------------

private static final int END_OF_PARTITION_EVENT = 0;

Expand All @@ -57,6 +58,12 @@ public class EventSerializer {

private static final int CANCEL_CHECKPOINT_MARKER_EVENT = 4;

private static final int CHECKPOINT_TYPE_CHECKPOINT = 0;

private static final int CHECKPOINT_TYPE_SAVEPOINT = 1;

// ------------------------------------------------------------------------
// Serialization Logic
// ------------------------------------------------------------------------

public static ByteBuffer toSerializedEvent(AbstractEvent event) throws IOException {
Expand All @@ -65,37 +72,7 @@ public static ByteBuffer toSerializedEvent(AbstractEvent event) throws IOExcepti
return ByteBuffer.wrap(new byte[] { 0, 0, 0, END_OF_PARTITION_EVENT });
}
else if (eventClass == CheckpointBarrier.class) {
CheckpointBarrier barrier = (CheckpointBarrier) event;

CheckpointOptions checkpointOptions = barrier.getCheckpointOptions();
CheckpointType checkpointType = checkpointOptions.getCheckpointType();

ByteBuffer buf;
if (checkpointType == CheckpointType.CHECKPOINT) {
buf = ByteBuffer.allocate(24);
buf.putInt(0, CHECKPOINT_BARRIER_EVENT);
buf.putLong(4, barrier.getId());
buf.putLong(12, barrier.getTimestamp());
buf.putInt(20, checkpointType.ordinal());
} else if (checkpointType == CheckpointType.SAVEPOINT) {
String targetLocation = checkpointOptions.getTargetLocation();
assert(targetLocation != null);
byte[] locationBytes = targetLocation.getBytes(STRING_CODING_CHARSET);

buf = ByteBuffer.allocate(24 + 4 + locationBytes.length);
buf.putInt(0, CHECKPOINT_BARRIER_EVENT);
buf.putLong(4, barrier.getId());
buf.putLong(12, barrier.getTimestamp());
buf.putInt(20, checkpointType.ordinal());
buf.putInt(24, locationBytes.length);
for (int i = 0; i < locationBytes.length; i++) {
buf.put(28 + i, locationBytes[i]);
}
} else {
throw new IOException("Unknown checkpoint type: " + checkpointType);
}

return buf;
return serializeCheckpointBarrier((CheckpointBarrier) event);
}
else if (eventClass == EndOfSuperstepEvent.class) {
return ByteBuffer.wrap(new byte[] { 0, 0, 0, END_OF_SUPERSTEP_EVENT });
Expand Down Expand Up @@ -131,7 +108,6 @@ else if (eventClass == CancelCheckpointMarker.class) {
* @param eventClass the expected class of the event type
* @param classLoader the class loader to use for custom event classes
* @return whether the event class of the <tt>buffer</tt> matches the given <tt>eventClass</tt>
* @throws IOException
*/
private static boolean isEvent(ByteBuffer buffer, Class<?> eventClass, ClassLoader classLoader) throws IOException {
if (buffer.remaining() < 4) {
Expand Down Expand Up @@ -195,35 +171,13 @@ public static AbstractEvent fromSerializedEvent(ByteBuffer buffer, ClassLoader c
buffer.order(ByteOrder.BIG_ENDIAN);

try {
int type = buffer.getInt();
final int type = buffer.getInt();

if (type == END_OF_PARTITION_EVENT) {
return EndOfPartitionEvent.INSTANCE;
}
else if (type == CHECKPOINT_BARRIER_EVENT) {
long id = buffer.getLong();
long timestamp = buffer.getLong();

CheckpointOptions checkpointOptions;

int checkpointTypeOrdinal = buffer.getInt();
Preconditions.checkElementIndex(type, CheckpointType.values().length, "Illegal CheckpointType ordinal");
CheckpointType checkpointType = CheckpointType.values()[checkpointTypeOrdinal];

if (checkpointType == CheckpointType.CHECKPOINT) {
checkpointOptions = CheckpointOptions.forCheckpointWithDefaultLocation();
} else if (checkpointType == CheckpointType.SAVEPOINT) {
int len = buffer.getInt();
byte[] bytes = new byte[len];
buffer.get(bytes);
String targetLocation = new String(bytes, STRING_CODING_CHARSET);

checkpointOptions = CheckpointOptions.forSavepoint(targetLocation);
} else {
throw new IOException("Unknown checkpoint type: " + checkpointType);
}

return new CheckpointBarrier(id, timestamp, checkpointOptions);
return deserializeCheckpointBarrier(buffer);
}
else if (type == END_OF_SUPERSTEP_EVENT) {
return EndOfSuperstepEvent.INSTANCE;
Expand Down Expand Up @@ -257,7 +211,7 @@ else if (type == OTHER_EVENT) {
catch (Exception e) {
throw new IOException("Error while deserializing or instantiating event.", e);
}
}
}
else {
throw new IOException("Corrupt byte stream for event");
}
Expand All @@ -267,6 +221,70 @@ else if (type == OTHER_EVENT) {
}
}

private static ByteBuffer serializeCheckpointBarrier(CheckpointBarrier barrier) throws IOException {
final CheckpointOptions checkpointOptions = barrier.getCheckpointOptions();
final CheckpointType checkpointType = checkpointOptions.getCheckpointType();

final byte[] locationBytes = checkpointOptions.getTargetLocation().isDefaultReference() ?
null : checkpointOptions.getTargetLocation().getReferenceBytes();

final ByteBuffer buf = ByteBuffer.allocate(28 + (locationBytes == null ? 0 : locationBytes.length));

// we do not use checkpointType.ordinal() here to make the serialization robust
// against changes in the enum (such as changes in the order of the values)
final int typeInt;
if (checkpointType == CheckpointType.CHECKPOINT) {
typeInt = CHECKPOINT_TYPE_CHECKPOINT;
} else if (checkpointType == CheckpointType.SAVEPOINT) {
typeInt = CHECKPOINT_TYPE_SAVEPOINT;
} else {
throw new IOException("Unknown checkpoint type: " + checkpointType);
}

buf.putInt(CHECKPOINT_BARRIER_EVENT);
buf.putLong(barrier.getId());
buf.putLong(barrier.getTimestamp());
buf.putInt(typeInt);

if (locationBytes == null) {
buf.putInt(-1);
} else {
buf.putInt(locationBytes.length);
buf.put(locationBytes);
}

buf.flip();
return buf;
}

private static CheckpointBarrier deserializeCheckpointBarrier(ByteBuffer buffer) throws IOException {
final long id = buffer.getLong();
final long timestamp = buffer.getLong();

final int checkpointTypeCode = buffer.getInt();
final int locationRefLen = buffer.getInt();

final CheckpointType checkpointType;
if (checkpointTypeCode == CHECKPOINT_TYPE_CHECKPOINT) {
checkpointType = CheckpointType.CHECKPOINT;
} else if (checkpointTypeCode == CHECKPOINT_TYPE_SAVEPOINT) {
checkpointType = CheckpointType.SAVEPOINT;
} else {
throw new IOException("Unknown checkpoint type code: " + checkpointTypeCode);
}

final CheckpointStorageLocationReference locationRef;
if (locationRefLen == -1) {
locationRef = CheckpointStorageLocationReference.getDefault();
} else {
byte[] bytes = new byte[locationRefLen];
buffer.get(bytes);
locationRef = new CheckpointStorageLocationReference(bytes);
}

return new CheckpointBarrier(id, timestamp, new CheckpointOptions(checkpointType, locationRef));
}

// ------------------------------------------------------------------------
// Buffer helpers
// ------------------------------------------------------------------------
Expand All @@ -293,7 +311,6 @@ public static AbstractEvent fromBuffer(Buffer buffer, ClassLoader classLoader) t
* @param eventClass the expected class of the event type
* @param classLoader the class loader to use for custom event classes
* @return whether the event class of the <tt>buffer</tt> matches the given <tt>eventClass</tt>
* @throws IOException
*/
public static boolean isEvent(final Buffer buffer,
final Class<?> eventClass,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,12 @@ public interface CheckpointStorageLocation {
void disposeOnFailure() throws IOException;

/**
* Gets the location encoded as a string pointer.
* Gets a reference to the storage location. This reference is sent to the
* target storage location via checkpoint RPC messages and checkpoint barriers,
* in a format avoiding backend-specific classes.
*
* <p>This pointer is used to send the target storage location via checkpoint RPC messages
* and checkpoint barriers, in a format avoiding backend-specific classes.
*
* <p>That string encodes the location typically in a backend-specific way.
* For example, file-based backends can encode paths here.
* <p>If there is no custom location information that needs to be communicated,
* this method can simply return {@link CheckpointStorageLocationReference#getDefault()}.
*/
String getLocationAsPointer();
CheckpointStorageLocationReference getLocationReference();
}
Loading

0 comments on commit bb19e7f

Please sign in to comment.