Skip to content

Commit

Permalink
[FLINK-6427] Ensure file length is flushed in StreamWriterBase
Browse files Browse the repository at this point in the history
  • Loading branch information
juergenthomann authored and aljoscha committed May 3, 2017
1 parent 6181302 commit 6d0c4c3
Showing 1 changed file with 6 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.EnumSet;

/**
* Base class for {@link Writer Writers} that write to a {@link FSDataOutputStream}.
Expand Down Expand Up @@ -70,6 +72,10 @@ protected void hflushOrSync(FSDataOutputStream os) throws IOException {
// At this point the refHflushOrSync cannot be null,
// since register method would have thrown if it was.
this.refHflushOrSync.invoke(os);

if (os instanceof HdfsDataOutputStream) {
((HdfsDataOutputStream) os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
}
} catch (InvocationTargetException e) {
String msg = "Error while trying to hflushOrSync!";
LOG.error(msg + " " + e.getCause());
Expand Down

0 comments on commit 6d0c4c3

Please sign in to comment.