Skip to content

Commit

Permalink
[FLINK-1202] Remove incomplete file outputs on failure
Browse files Browse the repository at this point in the history
This closes apache#175
  • Loading branch information
StephanEwen committed Nov 3, 2014
1 parent f42dcc3 commit a747b61
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common.io;

/**
* {@link OutputFormat}s may implement this interface to run a cleanup hook when the execution is not successful.
*/
public interface CleanupWhenUnsuccessful {

/**
* Hook that is called upon an unsuccessful execution.
*
* @throws Exception The method may forward exceptions when the cleanup fails.
*/
void tryCleanupOnError() throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.api.common.io;

import java.io.FileNotFoundException;
import java.io.IOException;

import org.slf4j.Logger;
Expand All @@ -35,7 +36,7 @@
* The abstract base class for all output formats that are file based. Contains the logic to open/close the target
* file streams.
*/
public abstract class FileOutputFormat<IT> implements OutputFormat<IT>, InitializeOnMaster {
public abstract class FileOutputFormat<IT> implements OutputFormat<IT>, InitializeOnMaster, CleanupWhenUnsuccessful {

private static final long serialVersionUID = 1L;

Expand Down Expand Up @@ -106,10 +107,14 @@ private static final void initDefaultsFromConfiguration() {

// --------------------------------------------------------------------------------------------

/**
* The stream to which the data is written;
*/
/** The stream to which the data is written; */
protected transient FSDataOutputStream stream;

/** The path that is actually written to (may a a file in a the directory defined by {@code outputFilePath} ) */
private transient Path actualFilePath;

/** Flag indicating whether this format actually created a file, which should be removed on cleanup. */
private transient boolean fileCreated;

// --------------------------------------------------------------------------------------------

Expand Down Expand Up @@ -231,12 +236,13 @@ public void open(int taskNumber, int numTasks) throws IOException {


// Suffix the path with the parallel instance index, if needed
if (numTasks > 1 || outputDirectoryMode == OutputDirectoryMode.ALWAYS) {
p = p.suffix("/" + (taskNumber+1));
}
this.actualFilePath = (numTasks > 1 || outputDirectoryMode == OutputDirectoryMode.ALWAYS) ? p.suffix("/" + (taskNumber+1)) : p;

// create output file
this.stream = fs.create(p, writeMode == WriteMode.OVERWRITE);
this.stream = fs.create(this.actualFilePath, writeMode == WriteMode.OVERWRITE);

// at this point, the file creation must have succeeded, or an exception has been thrown
this.fileCreated = true;
}

@Override
Expand Down Expand Up @@ -283,6 +289,21 @@ public void initializeGlobal(int parallelism) throws IOException {
}
}

@Override
public void tryCleanupOnError() {
if (this.fileCreated) {
this.fileCreated = false;

try {
FileSystem.get(this.actualFilePath.toUri()).delete(actualFilePath, false);
} catch (FileNotFoundException e) {
// ignore, may not be visible yet or may be already removed
} catch (Throwable t) {
LOG.error("Could not remove the incomplete file " + actualFilePath);
}
}
}

// ============================================================================================

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.io.CleanupWhenUnsuccessful;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializer;
Expand Down Expand Up @@ -78,6 +79,8 @@ public class DataSinkTask<IT> extends AbstractInvokable {
// cancel flag
private volatile boolean taskCanceled;

private volatile boolean cleanupCalled;


@Override
public void registerInputOutput() {
Expand Down Expand Up @@ -180,6 +183,18 @@ public void invoke() throws Exception
}
}
catch (Exception ex) {

// make a best effort to clean up
try {
if (!cleanupCalled && format instanceof CleanupWhenUnsuccessful) {
cleanupCalled = true;
((CleanupWhenUnsuccessful) format).tryCleanupOnError();
}
}
catch (Throwable t) {
LOG.error("Cleanup on error failed.", t);
}

ex = ExceptionInChainedStubException.exceptionUnwrap(ex);

if (ex instanceof CancelTaskException) {
Expand Down Expand Up @@ -237,6 +252,17 @@ public void cancel() throws Exception {
try {
this.format.close();
} catch (Throwable t) {}

// make a best effort to clean up
try {
if (!cleanupCalled && format instanceof CleanupWhenUnsuccessful) {
cleanupCalled = true;
((CleanupWhenUnsuccessful) format).tryCleanupOnError();
}
}
catch (Throwable t) {
LOG.error("Cleanup on error failed.", t);
}
}

if (LOG.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ public void testFailingDataSinkTask() {

// assert that temp file was created
File tempTestFile = new File(this.tempTestPath);
Assert.assertTrue("Temp output file does not exist",tempTestFile.exists());
Assert.assertFalse("Temp output file has not been removed", tempTestFile.exists());

}

Expand Down Expand Up @@ -347,7 +347,7 @@ public void testFailingSortingDataSinkTask() {

// assert that temp file was created
File tempTestFile = new File(this.tempTestPath);
Assert.assertTrue("Temp output file does not exist",tempTestFile.exists());
Assert.assertFalse("Temp output file has not been removed", tempTestFile.exists());

}

Expand Down Expand Up @@ -388,8 +388,7 @@ public void run() {

// assert that temp file was created
File tempTestFile = new File(this.tempTestPath);
Assert.assertTrue("Temp output file does not exist",tempTestFile.exists());

Assert.assertFalse("Temp output file has not been removed", tempTestFile.exists());
}

@Test
Expand Down

0 comments on commit a747b61

Please sign in to comment.