Skip to content

Commit

Permalink
[FLINK-11419][filesystem] Wait for lease to be revoked when truncatin…
Browse files Browse the repository at this point in the history
…g file in Hadoop.

This closes apache#7588.
  • Loading branch information
kl0u committed Feb 4, 2019
1 parent 81e361c commit 892ff1d
Showing 1 changed file with 29 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,20 +86,7 @@ class HadoopRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream
this.targetFile = checkNotNull(recoverable.targetFile());
this.tempFile = checkNotNull(recoverable.tempFile());

waitUntilLeaseIsRevoked(tempFile);

// truncate back and append
boolean truncated;
try {
truncated = truncate(fs, tempFile, recoverable.offset());
} catch (Exception e) {
throw new IOException("Missing data in tmp file: " + tempFile, e);
}

if (!truncated) {
// Truncate did not complete immediately, we must wait for the operation to complete and release the lease
waitUntilLeaseIsRevoked(tempFile);
}
safelyTruncateFile(fs, tempFile, recoverable);

out = fs.append(tempFile);

Expand Down Expand Up @@ -162,6 +149,30 @@ public void close() throws IOException {
// Hadoop 2.7, which have no truncation calls for HDFS.
// ------------------------------------------------------------------------

private static void safelyTruncateFile(
final FileSystem fileSystem,
final Path path,
final HadoopFsRecoverable recoverable) throws IOException {

ensureTruncateInitialized();

waitUntilLeaseIsRevoked(fileSystem, path);

// truncate back and append
boolean truncated;
try {
truncated = truncate(fileSystem, path, recoverable.offset());
} catch (Exception e) {
throw new IOException("Problem while truncating file: " + path, e);
}

if (!truncated) {
// Truncate did not complete immediately, we must wait for
// the operation to complete and release the lease.
waitUntilLeaseIsRevoked(fileSystem, path);
}
}

private static void ensureTruncateInitialized() throws FlinkRuntimeException {
if (truncateHandle == null) {
Method truncateMethod;
Expand All @@ -180,7 +191,7 @@ private static void ensureTruncateInitialized() throws FlinkRuntimeException {
}
}

static boolean truncate(FileSystem hadoopFs, Path file, long length) throws IOException {
private static boolean truncate(final FileSystem hadoopFs, final Path file, final long length) throws IOException {
if (truncateHandle != null) {
try {
return (Boolean) truncateHandle.invoke(hadoopFs, file, length);
Expand All @@ -197,7 +208,7 @@ static boolean truncate(FileSystem hadoopFs, Path file, long length) throws IOEx
else {
throw new IllegalStateException("Truncation handle has not been initialized");
}
return true;
return false;
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -268,12 +279,7 @@ public void commitAfterRecovery() throws IOException {
if (srcStatus.getLen() > expectedLength) {
// can happen if we go from persist to recovering for commit directly
// truncate the trailing junk away
try {
truncate(fs, src, expectedLength);
} catch (Exception e) {
// this can happen if the file is smaller than expected
throw new IOException("Problem while truncating file: " + src, e);
}
safelyTruncateFile(fs, src, recoverable);
}

// rename to final location (if it exists, overwrite it)
Expand Down Expand Up @@ -312,7 +318,7 @@ public CommitRecoverable getRecoverable() {
*
* @param path The path to the file we want to resume writing to.
*/
private boolean waitUntilLeaseIsRevoked(final Path path) throws IOException {
private static boolean waitUntilLeaseIsRevoked(final FileSystem fs, final Path path) throws IOException {
Preconditions.checkState(fs instanceof DistributedFileSystem);

final DistributedFileSystem dfs = (DistributedFileSystem) fs;
Expand Down

0 comments on commit 892ff1d

Please sign in to comment.