diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java index a716029c99e6f..be0d134b54e1c 100644 --- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java +++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java @@ -86,20 +86,7 @@ class HadoopRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream this.targetFile = checkNotNull(recoverable.targetFile()); this.tempFile = checkNotNull(recoverable.tempFile()); - waitUntilLeaseIsRevoked(tempFile); - - // truncate back and append - boolean truncated; - try { - truncated = truncate(fs, tempFile, recoverable.offset()); - } catch (Exception e) { - throw new IOException("Missing data in tmp file: " + tempFile, e); - } - - if (!truncated) { - // Truncate did not complete immediately, we must wait for the operation to complete and release the lease - waitUntilLeaseIsRevoked(tempFile); - } + safelyTruncateFile(fs, tempFile, recoverable); out = fs.append(tempFile); @@ -162,6 +149,30 @@ public void close() throws IOException { // Hadoop 2.7, which have no truncation calls for HDFS. // ------------------------------------------------------------------------ + private static void safelyTruncateFile( + final FileSystem fileSystem, + final Path path, + final HadoopFsRecoverable recoverable) throws IOException { + + ensureTruncateInitialized(); + + waitUntilLeaseIsRevoked(fileSystem, path); + + // truncate back and append + boolean truncated; + try { + truncated = truncate(fileSystem, path, recoverable.offset()); + } catch (Exception e) { + throw new IOException("Problem while truncating file: " + path, e); + } + + if (!truncated) { + // Truncate did not complete immediately, we must wait for + // the operation to complete and release the lease. + waitUntilLeaseIsRevoked(fileSystem, path); + } + } + private static void ensureTruncateInitialized() throws FlinkRuntimeException { if (truncateHandle == null) { Method truncateMethod; @@ -180,7 +191,7 @@ private static void ensureTruncateInitialized() throws FlinkRuntimeException { } } - static boolean truncate(FileSystem hadoopFs, Path file, long length) throws IOException { + private static boolean truncate(final FileSystem hadoopFs, final Path file, final long length) throws IOException { if (truncateHandle != null) { try { return (Boolean) truncateHandle.invoke(hadoopFs, file, length); @@ -197,7 +208,7 @@ static boolean truncate(FileSystem hadoopFs, Path file, long length) throws IOEx else { throw new IllegalStateException("Truncation handle has not been initialized"); } - return true; + return false; } // ------------------------------------------------------------------------ @@ -268,12 +279,7 @@ public void commitAfterRecovery() throws IOException { if (srcStatus.getLen() > expectedLength) { // can happen if we go from persist to recovering for commit directly // truncate the trailing junk away - try { - truncate(fs, src, expectedLength); - } catch (Exception e) { - // this can happen if the file is smaller than expected - throw new IOException("Problem while truncating file: " + src, e); - } + safelyTruncateFile(fs, src, recoverable); } // rename to final location (if it exists, overwrite it) @@ -312,7 +318,7 @@ public CommitRecoverable getRecoverable() { * * @param path The path to the file we want to resume writing to. */ - private boolean waitUntilLeaseIsRevoked(final Path path) throws IOException { + private static boolean waitUntilLeaseIsRevoked(final FileSystem fs, final Path path) throws IOException { Preconditions.checkState(fs instanceof DistributedFileSystem); final DistributedFileSystem dfs = (DistributedFileSystem) fs;