Skip to content

Commit

Permalink
[FLINK-5443] Migrate from Rolling to Bucketing sink.
Browse files Browse the repository at this point in the history
To migrate from a RollingSink to a BucketingSink, a
user can now take a savepoint, change his code to
use a BuckeingSink with the same properties as the
previous RollingSink, and resume his program from
that savepoint.
  • Loading branch information
kl0u authored and aljoscha committed Jan 13, 2017
1 parent bbca329 commit 2bda5e4
Show file tree
Hide file tree
Showing 4 changed files with 348 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -905,30 +905,30 @@ public RollingSink<T> setAsyncTimeout(long timeout) {
* This is used for keeping track of the current in-progress files and files that we mark
* for moving from pending to final location after we get a checkpoint-complete notification.
*/
static final class BucketState implements Serializable {
public static final class BucketState implements Serializable {
private static final long serialVersionUID = 1L;

/**
* The file that was in-progress when the last checkpoint occurred.
*/
String currentFile;
public String currentFile;

/**
* The valid length of the in-progress file at the time of the last checkpoint.
*/
long currentFileValidLength = -1;
public long currentFileValidLength = -1;

/**
* Pending files that accumulated since the last checkpoint.
*/
List<String> pendingFiles = new ArrayList<>();
public List<String> pendingFiles = new ArrayList<>();

/**
* When doing a checkpoint we move the pending files since the last checkpoint to this map
* with the id of the checkpoint. When we get the checkpoint-complete notification we move
* pending files of completed checkpoints to their final location.
*/
final Map<Long, List<String>> pendingFilesPerCheckpoint = new HashMap<>();
public final Map<Long, List<String>> pendingFilesPerCheckpoint = new HashMap<>();

@Override
public String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.fs.Clock;
import org.apache.flink.streaming.connectors.fs.RollingSink;
import org.apache.flink.streaming.connectors.fs.SequenceFileWriter;
import org.apache.flink.streaming.connectors.fs.StringWriter;
import org.apache.flink.streaming.connectors.fs.Writer;
Expand Down Expand Up @@ -149,7 +151,8 @@
*/
public class BucketingSink<T>
extends RichSinkFunction<T>
implements InputTypeConfigurable, CheckpointedFunction, CheckpointListener, ProcessingTimeCallback {
implements InputTypeConfigurable, CheckpointedFunction, CheckpointListener,
CheckpointedRestoring<RollingSink.BucketState>, ProcessingTimeCallback {

private static final long serialVersionUID = 1L;

Expand Down Expand Up @@ -344,7 +347,12 @@ public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfi
public void initializeState(FunctionInitializationContext context) throws Exception {
Preconditions.checkArgument(this.restoredBucketStates == null, "The operator has already been initialized.");

initFileSystem();
try {
initFileSystem();
} catch (IOException e) {
LOG.error("Error while creating FileSystem when initializing the state of the BucketingSink.", e);
throw new RuntimeException("Error while creating FileSystem when initializing the state of the BucketingSink.", e);
}

if (this.refTruncate == null) {
this.refTruncate = reflectTruncate(fs);
Expand Down Expand Up @@ -706,139 +714,183 @@ private void handleRestoredBucketState(State<T> restoredState) {
// (we re-start from the last **successful** checkpoint)
bucketState.pendingFiles.clear();

if (bucketState.currentFile != null) {
handlePendingInProgressFile(bucketState.currentFile, bucketState.currentFileValidLength);

// We were writing to a file when the last checkpoint occurred. This file can either
// be still in-progress or became a pending file at some point after the checkpoint.
// Either way, we have to truncate it back to a valid state (or write a .valid-length
// file that specifies up to which length it is valid) and rename it to the final name
// before starting a new bucket file.
// Now that we've restored the bucket to a valid state, reset the current file info
bucketState.currentFile = null;
bucketState.currentFileValidLength = -1;
bucketState.isWriterOpen = false;

Path partPath = new Path(bucketState.currentFile);
try {
Path partPendingPath = getPendingPathFor(partPath);
Path partInProgressPath = getInProgressPathFor(partPath);

if (fs.exists(partPendingPath)) {
LOG.debug("In-progress file {} has been moved to pending after checkpoint, moving to final location.", partPath);
// has been moved to pending in the mean time, rename to final location
fs.rename(partPendingPath, partPath);
} else if (fs.exists(partInProgressPath)) {
LOG.debug("In-progress file {} is still in-progress, moving to final location.", partPath);
// it was still in progress, rename to final path
fs.rename(partInProgressPath, partPath);
} else if (fs.exists(partPath)) {
LOG.debug("In-Progress file {} was already moved to final location {}.", bucketState.currentFile, partPath);
} else {
LOG.debug("In-Progress file {} was neither moved to pending nor is still in progress. Possibly, " +
"it was moved to final location by a previous snapshot restore", bucketState.currentFile);
}
handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint);

// We use reflection to get the .truncate() method, this
// is only available starting with Hadoop 2.7
if (this.refTruncate == null) {
this.refTruncate = reflectTruncate(fs);
}
synchronized (bucketState.pendingFilesPerCheckpoint) {
bucketState.pendingFilesPerCheckpoint.clear();
}
}
}

private void handleRestoredRollingSinkState(RollingSink.BucketState restoredState) {
restoredState.pendingFiles.clear();

// truncate it or write a ".valid-length" file to specify up to which point it is valid
if (refTruncate != null) {
LOG.debug("Truncating {} to valid length {}", partPath, bucketState.currentFileValidLength);
// some-one else might still hold the lease from a previous try, we are
// recovering, after all ...
if (fs instanceof DistributedFileSystem) {
DistributedFileSystem dfs = (DistributedFileSystem) fs;
LOG.debug("Trying to recover file lease {}", partPath);
dfs.recoverLease(partPath);
boolean isclosed = dfs.isFileClosed(partPath);
StopWatch sw = new StopWatch();
sw.start();
while (!isclosed) {
if (sw.getTime() > asyncTimeout) {
break;
}
try {
Thread.sleep(500);
} catch (InterruptedException e1) {
// ignore it
}
isclosed = dfs.isFileClosed(partPath);
handlePendingInProgressFile(restoredState.currentFile, restoredState.currentFileValidLength);

// Now that we've restored the bucket to a valid state, reset the current file info
restoredState.currentFile = null;
restoredState.currentFileValidLength = -1;

handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint);

synchronized (restoredState.pendingFilesPerCheckpoint) {
restoredState.pendingFilesPerCheckpoint.clear();
}
}

private void handlePendingInProgressFile(String file, long validLength) {
if (file != null) {

// We were writing to a file when the last checkpoint occurred. This file can either
// be still in-progress or became a pending file at some point after the checkpoint.
// Either way, we have to truncate it back to a valid state (or write a .valid-length
// file that specifies up to which length it is valid) and rename it to the final name
// before starting a new bucket file.

Path partPath = new Path(file);
try {
Path partPendingPath = getPendingPathFor(partPath);
Path partInProgressPath = getInProgressPathFor(partPath);

if (fs.exists(partPendingPath)) {
LOG.debug("In-progress file {} has been moved to pending after checkpoint, moving to final location.", partPath);
// has been moved to pending in the mean time, rename to final location
fs.rename(partPendingPath, partPath);
} else if (fs.exists(partInProgressPath)) {
LOG.debug("In-progress file {} is still in-progress, moving to final location.", partPath);
// it was still in progress, rename to final path
fs.rename(partInProgressPath, partPath);
} else if (fs.exists(partPath)) {
LOG.debug("In-Progress file {} was already moved to final location {}.", file, partPath);
} else {
LOG.debug("In-Progress file {} was neither moved to pending nor is still in progress. Possibly, " +
"it was moved to final location by a previous snapshot restore", file);
}

// We use reflection to get the .truncate() method, this
// is only available starting with Hadoop 2.7
if (this.refTruncate == null) {
this.refTruncate = reflectTruncate(fs);
}

// truncate it or write a ".valid-length" file to specify up to which point it is valid
if (refTruncate != null) {
LOG.debug("Truncating {} to valid length {}", partPath, validLength);
// some-one else might still hold the lease from a previous try, we are
// recovering, after all ...
if (fs instanceof DistributedFileSystem) {
DistributedFileSystem dfs = (DistributedFileSystem) fs;
LOG.debug("Trying to recover file lease {}", partPath);
dfs.recoverLease(partPath);
boolean isclosed = dfs.isFileClosed(partPath);
StopWatch sw = new StopWatch();
sw.start();
while (!isclosed) {
if (sw.getTime() > asyncTimeout) {
break;
}
try {
Thread.sleep(500);
} catch (InterruptedException e1) {
// ignore it
}
isclosed = dfs.isFileClosed(partPath);
}
Boolean truncated = (Boolean) refTruncate.invoke(fs, partPath, bucketState.currentFileValidLength);
if (!truncated) {
LOG.debug("Truncate did not immediately complete for {}, waiting...", partPath);

// we must wait for the asynchronous truncate operation to complete
StopWatch sw = new StopWatch();
sw.start();
long newLen = fs.getFileStatus(partPath).getLen();
while (newLen != bucketState.currentFileValidLength) {
if (sw.getTime() > asyncTimeout) {
break;
}
try {
Thread.sleep(500);
} catch (InterruptedException e1) {
// ignore it
}
newLen = fs.getFileStatus(partPath).getLen();
}
Boolean truncated = (Boolean) refTruncate.invoke(fs, partPath, validLength);
if (!truncated) {
LOG.debug("Truncate did not immediately complete for {}, waiting...", partPath);

// we must wait for the asynchronous truncate operation to complete
StopWatch sw = new StopWatch();
sw.start();
long newLen = fs.getFileStatus(partPath).getLen();
while (newLen != validLength) {
if (sw.getTime() > asyncTimeout) {
break;
}
if (newLen != bucketState.currentFileValidLength) {
throw new RuntimeException("Truncate did not truncate to right length. Should be " + bucketState.currentFileValidLength + " is " + newLen + ".");
try {
Thread.sleep(500);
} catch (InterruptedException e1) {
// ignore it
}
newLen = fs.getFileStatus(partPath).getLen();
}
} else {
LOG.debug("Writing valid-length file for {} to specify valid length {}", partPath, bucketState.currentFileValidLength);
Path validLengthFilePath = getValidLengthPathFor(partPath);
if (!fs.exists(validLengthFilePath)) {
FSDataOutputStream lengthFileOut = fs.create(validLengthFilePath);
lengthFileOut.writeUTF(Long.toString(bucketState.currentFileValidLength));
lengthFileOut.close();
if (newLen != validLength) {
throw new RuntimeException("Truncate did not truncate to right length. Should be " + validLength + " is " + newLen + ".");
}
}

// Now that we've restored the bucket to a valid state, reset the current file info
bucketState.currentFile = null;
bucketState.currentFileValidLength = -1;
bucketState.isWriterOpen = false;
} catch (IOException e) {
LOG.error("Error while restoring BucketingSink state.", e);
throw new RuntimeException("Error while restoring BucketingSink state.", e);
} catch (InvocationTargetException | IllegalAccessException e) {
LOG.error("Could not invoke truncate.", e);
throw new RuntimeException("Could not invoke truncate.", e);
} else {
LOG.debug("Writing valid-length file for {} to specify valid length {}", partPath, validLength);
Path validLengthFilePath = getValidLengthPathFor(partPath);
if (!fs.exists(validLengthFilePath) && fs.exists(partPath)) {
FSDataOutputStream lengthFileOut = fs.create(validLengthFilePath);
lengthFileOut.writeUTF(Long.toString(validLength));
lengthFileOut.close();
}
}

} catch (IOException e) {
LOG.error("Error while restoring BucketingSink state.", e);
throw new RuntimeException("Error while restoring BucketingSink state.", e);
} catch (InvocationTargetException | IllegalAccessException e) {
LOG.error("Could not invoke truncate.", e);
throw new RuntimeException("Could not invoke truncate.", e);
}
}
}

// Move files that are confirmed by a checkpoint but did not get moved to final location
// because the checkpoint notification did not happen before a failure
private void handlePendingFilesForPreviousCheckpoints(Map<Long, List<String>> pendingFilesPerCheckpoint) {
// Move files that are confirmed by a checkpoint but did not get moved to final location
// because the checkpoint notification did not happen before a failure

LOG.debug("Moving pending files to final location on restore.");
LOG.debug("Moving pending files to final location on restore.");

Set<Long> pastCheckpointIds = bucketState.pendingFilesPerCheckpoint.keySet();
for (Long pastCheckpointId : pastCheckpointIds) {
// All the pending files are buckets that have been completed but are waiting to be renamed
// to their final name
for (String filename : bucketState.pendingFilesPerCheckpoint.get(pastCheckpointId)) {
Path finalPath = new Path(filename);
Path pendingPath = getPendingPathFor(finalPath);
Set<Long> pastCheckpointIds = pendingFilesPerCheckpoint.keySet();
for (Long pastCheckpointId : pastCheckpointIds) {
// All the pending files are buckets that have been completed but are waiting to be renamed
// to their final name
for (String filename : pendingFilesPerCheckpoint.get(pastCheckpointId)) {
Path finalPath = new Path(filename);
Path pendingPath = getPendingPathFor(finalPath);

try {
if (fs.exists(pendingPath)) {
LOG.debug("Restoring BucketingSink State: Moving pending file {} to final location after complete checkpoint {}.", pendingPath, pastCheckpointId);
fs.rename(pendingPath, finalPath);
}
} catch (IOException e) {
LOG.error("Restoring BucketingSink State: Error while renaming pending file {} to final path {}: {}", pendingPath, finalPath, e);
throw new RuntimeException("Error while renaming pending file " + pendingPath + " to final path " + finalPath, e);
try {
if (fs.exists(pendingPath)) {
LOG.debug("Restoring BucketingSink State: Moving pending file {} to final location after complete checkpoint {}.", pendingPath, pastCheckpointId);
fs.rename(pendingPath, finalPath);
}
} catch (IOException e) {
LOG.error("Restoring BucketingSink State: Error while renaming pending file {} to final path {}: {}", pendingPath, finalPath, e);
throw new RuntimeException("Error while renaming pending file " + pendingPath + " to final path " + finalPath, e);
}
}
}
}

synchronized (bucketState.pendingFilesPerCheckpoint) {
bucketState.pendingFilesPerCheckpoint.clear();
}
// --------------------------------------------------------------------------------------------
// Backwards compatibility with Flink 1.1
// --------------------------------------------------------------------------------------------

@Override
public void restoreState(RollingSink.BucketState state) throws Exception {
LOG.info("{} (taskIdx={}) restored bucket state from the RollingSink an older Flink version: {}",
getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask(), state);

try {
initFileSystem();
} catch (IOException e) {
LOG.error("Error while creating FileSystem when restoring the state of the BucketingSink.", e);
throw new RuntimeException("Error while creating FileSystem when restoring the state of the BucketingSink.", e);
}

handleRestoredRollingSinkState(state);
}

// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.fs.bucketing;

import org.apache.commons.io.FileUtils;
Expand Down
Loading

0 comments on commit 2bda5e4

Please sign in to comment.