Skip to content

Commit

Permalink
Checkpoints are now written asynchronously
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniel Warneke committed Feb 27, 2012
1 parent ca08872 commit 078e87f
Show file tree
Hide file tree
Showing 8 changed files with 409 additions and 231 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,33 +15,22 @@

package eu.stratosphere.nephele.checkpointing;

import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.Queue;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import eu.stratosphere.nephele.taskmanager.bytebuffered.OutputChannelForwarder;
import eu.stratosphere.nephele.taskmanager.runtime.RuntimeTask;
import eu.stratosphere.nephele.taskmanager.transferenvelope.CheckpointSerializer;
import eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelope;
import eu.stratosphere.nephele.event.task.AbstractEvent;
import eu.stratosphere.nephele.event.task.EventList;
import eu.stratosphere.nephele.execution.Environment;
import eu.stratosphere.nephele.execution.RuntimeEnvironment;
import eu.stratosphere.nephele.executiongraph.CheckpointState;
import eu.stratosphere.nephele.fs.FileChannelWrapper;
import eu.stratosphere.nephele.fs.FileSystem;
import eu.stratosphere.nephele.fs.Path;
import eu.stratosphere.nephele.io.channels.Buffer;
import eu.stratosphere.nephele.io.channels.BufferFactory;
import eu.stratosphere.nephele.io.channels.FileBufferManager;
import eu.stratosphere.nephele.io.channels.bytebuffered.ByteBufferedChannelCloseEvent;

/**
* An ephemeral checkpoint is a checkpoint that can be used to recover from
Expand All @@ -51,7 +40,7 @@
* For network channels the ephemeral checkpoint is held into main memory until a checkpoint
* decision is made. Based on this decision the checkpoint is either made permanent or discarded.
* <p>
* This class is thread-safe.
* This class is not thread-safe.
*
* @author warneke
*/
Expand All @@ -62,21 +51,11 @@ public class EphemeralCheckpoint implements OutputChannelForwarder {
*/
private static final Log LOG = LogFactory.getLog(EphemeralCheckpoint.class);

/**
* The buffer size in bytes to use for the meta data file channel.
*/
private static final int BUFFER_SIZE = 4096;

/**
* The enveloped which are currently queued until the state of the checkpoint is decided.
*/
private final Queue<TransferEnvelope> queuedEnvelopes = new ArrayDeque<TransferEnvelope>();

/**
* The serializer to convert a transfer envelope into a byte stream.
*/
private final CheckpointSerializer transferEnvelopeSerializer = new CheckpointSerializer();

/**
* The task this checkpoint is created for.
*/
Expand All @@ -87,49 +66,10 @@ public class EphemeralCheckpoint implements OutputChannelForwarder {
*/
private final int numberOfConnectedChannels;

private final boolean distributed;

/**
* The number of channels which can confirmed not to send any further data.
*/
private int numberOfClosedChannels = 0;

/**
* The current suffix for the name of the file containing the meta data.
*/
private int metaDataSuffix = 0;

/**
* The file buffer manager used to allocate file buffers.
* Reference to a write thread that may be spawned to write the checkpoint data asynchronously
*/
private final FileBufferManager fileBufferManager;

/**
* The path to which the checkpoint meta data shall be written to.
*/
private final Path checkpointPath;

/**
* The file system to write the checkpoints to.
*/
private FileSystem fileSystem;

/**
* The default block size of the file system to write the checkpoints to
*/
private long defaultBlockSize = -1L;

/**
* The file channel to write the checkpoint's meta data.
*/
private FileChannel metaDataFileChannel = null;

/**
* A counter for the number of bytes in the checkpoint per meta data file.
*/
private long numberOfBytesPerMetaDataFile = 0;

private Buffer firstSerializedFileBuffer = null;
private WriteThread writeThread = null;

/**
* This enumeration reflects the possible states an ephemeral
Expand All @@ -156,39 +96,28 @@ public EphemeralCheckpoint(final RuntimeTask task, final boolean ephemeral) {
for (int i = 0; i < environment.getNumberOfOutputGates(); ++i) {
nooc += environment.getOutputGate(i).getNumberOfOutputChannels();
}
this.numberOfConnectedChannels = nooc;

final boolean dist = CheckpointUtils.createDistributedCheckpoint();
this.numberOfConnectedChannels = nooc;

this.checkpointingDecision = (ephemeral ? CheckpointingDecisionState.UNDECIDED
: CheckpointingDecisionState.CHECKPOINTING);

this.fileBufferManager = FileBufferManager.getInstance();

if (LOG.isDebugEnabled())
if (LOG.isDebugEnabled()) {
LOG.debug("Created checkpoint for vertex " + task.getVertexID() + ", state " + this.checkpointingDecision);
}

if (this.checkpointingDecision == CheckpointingDecisionState.CHECKPOINTING) {
this.task.checkpointStateChanged(CheckpointState.PARTIAL);
}

if (dist) {
final Path p = CheckpointUtils.getDistributedCheckpointPath();
if (p == null) {
LOG.error("No distributed checkpoint path configured, writing local checkpoints instead");
this.checkpointPath = CheckpointUtils.getLocalCheckpointPath();
this.distributed = false;
} else {
this.checkpointPath = p;
this.distributed = true;
}
} else {
this.checkpointPath = CheckpointUtils.getLocalCheckpointPath();
this.distributed = false;
this.writeThread = new WriteThread(FileBufferManager.getInstance(), this.task.getVertexID(),
this.numberOfConnectedChannels);
}
}

private void destroy() {
/**
* {@inheritDoc}
*/
@Override
public void destroy() {

while (!this.queuedEnvelopes.isEmpty()) {

Expand All @@ -198,152 +127,24 @@ private void destroy() {
buffer.recycleBuffer();
}
}
}

private void write() throws IOException, InterruptedException {

while (!this.queuedEnvelopes.isEmpty()) {
writeTransferEnvelope(this.queuedEnvelopes.poll());
if (this.writeThread != null) {
this.writeThread.cancelAndDestroy();
this.writeThread = null;
}
}

private boolean renameCheckpointPart() throws IOException {

final Path oldFile = this.checkpointPath.suffix(Path.SEPARATOR + CheckpointUtils.METADATA_PREFIX + "_"
+ this.task.getVertexID() + "_part");

final Path newFile = this.checkpointPath.suffix(Path.SEPARATOR + CheckpointUtils.METADATA_PREFIX + "_"
+ this.task.getVertexID() + "_" + this.metaDataSuffix);

if (!this.fileSystem.rename(oldFile, newFile)) {
LOG.error("Unable to rename " + oldFile + " to " + newFile);
return false;
}

return true;
}

private void writeTransferEnvelope(final TransferEnvelope transferEnvelope) throws IOException,
InterruptedException {

Buffer buffer = transferEnvelope.getBuffer();
if (buffer != null) {
if (buffer.isBackedByMemory()) {

// Make sure we transfer the encapsulated buffer to a file and release the memory buffer again
final Buffer fileBuffer = BufferFactory.createFromFile(buffer.size(), this.task.getVertexID(),
this.fileBufferManager, this.distributed, false);
buffer.copyToBuffer(fileBuffer);
transferEnvelope.setBuffer(fileBuffer);
buffer.recycleBuffer();
}
}

if (this.fileSystem == null) {
this.fileSystem = this.checkpointPath.getFileSystem();
}

if (this.defaultBlockSize < 0L) {
this.defaultBlockSize = this.fileSystem.getDefaultBlockSize();
}

// Finish meta data file when the corresponding checkpoint fraction is 10 times the file system's block size
if (this.numberOfBytesPerMetaDataFile > 10L * this.defaultBlockSize && !this.distributed) {

if (this.metaDataFileChannel != null) {
this.metaDataFileChannel.close();
this.metaDataFileChannel = null;

// Rename file
renameCheckpointPart();

// Increase the meta data suffix
++this.metaDataSuffix;
}

// Reset counter
this.numberOfBytesPerMetaDataFile = 0L;
}

if (this.metaDataFileChannel == null) {
this.metaDataFileChannel = getMetaDataFileChannel("_part");
}

this.transferEnvelopeSerializer.setTransferEnvelope(transferEnvelope);
while (this.transferEnvelopeSerializer.write(this.metaDataFileChannel)) {
}

// The following code will prevent the underlying file from being closed
buffer = transferEnvelope.getBuffer();
if (buffer != null) {
if (this.firstSerializedFileBuffer == null) {
this.firstSerializedFileBuffer = buffer;
} else {
buffer.recycleBuffer();
}

// Increase the number of serialized transfer envelopes
this.numberOfBytesPerMetaDataFile += buffer.size();
}

// Look for close event
final EventList eventList = transferEnvelope.getEventList();
if (eventList != null) {
final Iterator<AbstractEvent> it = eventList.iterator();
while (it.hasNext()) {
if (it.next() instanceof ByteBufferedChannelCloseEvent) {
++this.numberOfClosedChannels;
}
}
}

if (this.numberOfClosedChannels == this.numberOfConnectedChannels) {

// Finally, close the underlying file
if (this.firstSerializedFileBuffer != null) {
this.firstSerializedFileBuffer.recycleBuffer();
}

// Finish meta data file
if (this.metaDataFileChannel != null) {
this.metaDataFileChannel.close();

// Rename file
renameCheckpointPart();
}

// Write the meta data file to indicate the checkpoint is complete
this.metaDataFileChannel = getMetaDataFileChannel(CheckpointUtils.COMPLETED_CHECKPOINT_SUFFIX);
this.metaDataFileChannel.write(ByteBuffer.allocate(0));
this.metaDataFileChannel.close();

LOG.info("Finished persistent checkpoint for vertex " + this.task.getVertexID());

// Send notification that checkpoint is completed
this.task.checkpointStateChanged(CheckpointState.COMPLETE);
}
}

private FileChannel getMetaDataFileChannel(final String suffix) throws IOException {
private void write() throws IOException, InterruptedException {

if (LOG.isDebugEnabled()) {
LOG.debug("Writing checkpointing meta data to directory " + this.checkpointPath);
if (this.writeThread == null) {
this.writeThread = new WriteThread(FileBufferManager.getInstance(), task.getVertexID(),
this.numberOfConnectedChannels);
this.writeThread.start();
}

// Bypass FileSystem API for local checkpoints
if (!this.distributed) {

final FileOutputStream fos = new FileOutputStream(this.checkpointPath.toUri().getPath()
+ Path.SEPARATOR + CheckpointUtils.METADATA_PREFIX + "_" + this.task.getVertexID() + suffix);

return fos.getChannel();
while (!this.queuedEnvelopes.isEmpty()) {
this.writeThread.write(this.queuedEnvelopes.poll());
}

return new FileChannelWrapper(this.fileSystem, this.checkpointPath.suffix(Path.SEPARATOR
+ CheckpointUtils.METADATA_PREFIX + "_" + this.task.getVertexID() + suffix), BUFFER_SIZE, (short) 2); // TODO:
// Make
// replication
// configurable
}

public void setCheckpointDecisionSynchronously(final boolean checkpointDecision) throws IOException,
Expand Down Expand Up @@ -380,7 +181,7 @@ public boolean forward(final TransferEnvelope transferEnvelope) throws IOExcepti
if (this.checkpointingDecision == CheckpointingDecisionState.UNDECIDED) {
this.queuedEnvelopes.add(dup);
} else {
writeTransferEnvelope(dup);
this.writeThread.write(dup);
}

return true;
Expand All @@ -397,7 +198,7 @@ public boolean isUndecided() {
@Override
public boolean hasDataLeft() {

return false;
return this.writeThread.hasDataLeft();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,4 +147,13 @@ public void reportAsynchronousEvent() {

this.bufferProvider.reportAsynchronousEvent();
}

/**
* {@inheritDoc}
*/
@Override
public void destroy() {

// Nothing to do here
}
}
Loading

0 comments on commit 078e87f

Please sign in to comment.