Skip to content

Commit

Permalink
Added optional parameter to always write into directories even in cas…
Browse files Browse the repository at this point in the history
…e of DOP = 1.

Extended FileOutputFormatTest.
Some code beautifications.
  • Loading branch information
Fabian Hueske committed Jan 30, 2014
1 parent 601fdd2 commit 31bae02
Show file tree
Hide file tree
Showing 4 changed files with 276 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,17 @@ public abstract class FileOutputFormat<IT> implements OutputFormat<IT> {
public static final String WRITEMODE_CREATE = "stratosphere.output.writemode.create";
public static final String WRITEMODE_OVERWRITE = "stratosphere.output.writemode.overwrite";

/**
* The key under which the output directory mode parameter is stored in the configuration
*/
public static final String OUT_DIRECTORY_PARAMETER_KEY = "stratosphere.output.directory";

/**
* Value keys for the output directory modes
*/
public static final String OUT_DIRECTORY_ALWAYS = "stratosphere.output.directory.always";
public static final String OUT_DIRECTORY_PARONLY = "stratosphere.output.directory.paronly";

/**
* The config parameter for the opening timeout in milliseconds.
*/
Expand All @@ -70,6 +81,11 @@ public abstract class FileOutputFormat<IT> implements OutputFormat<IT> {
*/
protected WriteMode writeMode;

/**
* The output directory mode
*/
protected OutputDirectoryMode outDirMode;

/**
* The stream to which the data is written;
*/
Expand All @@ -82,7 +98,16 @@ public abstract class FileOutputFormat<IT> implements OutputFormat<IT> {

// --------------------------------------------------------------------------------------------


/**
* Defines the behavior for creating output directories.
*
*/
public static enum OutputDirectoryMode {
ALWAYS, // A directory is always created, regardless of number of write tasks
PARONLY // A directory is only created for parallel output tasks, i.e., number of output tasks > 1.
// If number of output tasks = 1, the output is written to a single file.
}

@Override
public void configure(Configuration parameters) {
// get the file parameter
Expand All @@ -108,6 +133,16 @@ public void configure(Configuration parameters) {
throw new RuntimeException("Invalid write mode configuration: "+writeModeParam);
}

// get the output directory parameter
String outDirParam = parameters.getString(OUT_DIRECTORY_PARAMETER_KEY, OUT_DIRECTORY_PARONLY);
if(outDirParam.equals(OUT_DIRECTORY_ALWAYS)) {
this.outDirMode = OutputDirectoryMode.ALWAYS;
} else if(outDirParam.equals(OUT_DIRECTORY_PARONLY)) {
this.outDirMode = OutputDirectoryMode.PARONLY;
} else {
throw new RuntimeException("Invalid output directory mode configuration: "+outDirParam);
}

// get timeout for stream opening
this.openTimeout = parameters.getLong(OUTPUT_STREAM_OPEN_TIMEOUT_KEY, FileInputFormat.DEFAULT_OPENING_TIMEOUT);
if (this.openTimeout < 0) {
Expand All @@ -124,7 +159,7 @@ public void configure(Configuration parameters) {
@Override
public void open(int taskNumber, int numTasks) throws IOException {
// obtain FSDataOutputStream asynchronously, since HDFS client can not handle InterruptedExceptions
OutputPathOpenThread opot = new OutputPathOpenThread(this.outputFilePath, this.writeMode, (taskNumber + 1), numTasks, this.openTimeout);
OutputPathOpenThread opot = new OutputPathOpenThread(this, (taskNumber + 1), numTasks);
opot.start();

try {
Expand All @@ -147,10 +182,22 @@ public void close() throws IOException {
}
}

public Path getOutputFilePath() {
return this.outputFilePath;
}

public WriteMode getWriteMode() {
return this.writeMode;
}

public OutputDirectoryMode getOutDirMode() {
return this.outDirMode;
}

public long getOpenTimeout() {
return this.openTimeout;
}

// ============================================================================================

private static final class OutputPathOpenThread extends Thread {
Expand All @@ -163,6 +210,8 @@ private static final class OutputPathOpenThread extends Thread {

private final WriteMode writeMode;

private final OutputDirectoryMode outDirMode;

private final long timeoutMillies;

private volatile FSDataOutputStream fdos;
Expand All @@ -172,10 +221,11 @@ private static final class OutputPathOpenThread extends Thread {
private volatile boolean aborted;


public OutputPathOpenThread(Path path, WriteMode writeMode, int taskIndex, int numTasks, long timeoutMillies) {
this.path = path;
this.writeMode = writeMode;
this.timeoutMillies = timeoutMillies;
public OutputPathOpenThread(FileOutputFormat<?> fof, int taskIndex, int numTasks) {
this.path = fof.getOutputFilePath();
this.writeMode = fof.getWriteMode();
this.outDirMode = fof.getOutDirMode();
this.timeoutMillies = fof.getOpenTimeout();
this.taskIndex = taskIndex;
this.numTasks = numTasks;
}
Expand All @@ -187,8 +237,9 @@ public void run() {
Path p = this.path;
final FileSystem fs = p.getFileSystem();

if(this.numTasks == 1) {
// output is not written in parallel
// initialize output path.
if(this.numTasks == 1 && outDirMode == OutputDirectoryMode.PARONLY) {
// output is not written in parallel and should go to a single file

if(!fs.isDistributedFS()) {
// prepare local output path
Expand All @@ -199,11 +250,8 @@ public void run() {
}
}

// create output file
this.fdos = fs.create(p, false);

} else if(this.numTasks > 1) {
// output is written in parallel
} else if(this.numTasks > 1 || outDirMode == OutputDirectoryMode.ALWAYS) {
// output is written in parallel into a directory or should always be written to a directory

if(!fs.isDistributedFS()) {
// File system is not distributed.
Expand All @@ -217,23 +265,22 @@ public void run() {
// Suffix the path with the parallel instance index
p = p.suffix("/" + this.taskIndex);

// create output file
switch(writeMode) {
case CREATE:
this.fdos = fs.create(p, false);
break;
case OVERWRITE:
this.fdos = fs.create(p, true);
break;
default:
throw new IllegalArgumentException("Invalid write mode: "+writeMode);
}
this.fdos = fs.create(p, true);

} else {
// invalid number of subtasks (<= 0)
throw new IllegalArgumentException("Invalid number of subtasks. Canceling task.");
}

// create output file
switch(writeMode) {
case CREATE:
this.fdos = fs.create(p, false);
break;
case OVERWRITE:
this.fdos = fs.create(p, true);
break;
default:
throw new IllegalArgumentException("Invalid write mode: "+writeMode);
}

// check for canceling and close the stream in that case, because no one will obtain it
if (this.aborted) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,11 +427,11 @@ public abstract FSDataOutputStream create(Path f, boolean overwrite, int bufferS
*
* @param outPath Output path that should be prepared.
* @param writeMode Write mode to consider.
* @param parallelOutput True, if it will be written to the output path in parallel, false otherwise.
* @param createDirectory True, to initialize a directory at the given path, false otherwise.
* @return True, if the path was successfully prepared, false otherwise.
* @throws IOException
*/
public boolean initOutPathLocalFS(Path outPath, WriteMode writeMode, boolean parallelOutput) throws IOException {
public boolean initOutPathLocalFS(Path outPath, WriteMode writeMode, boolean createDirectory) throws IOException {
if(this.isDistributedFS()) {
return false;
}
Expand All @@ -449,7 +449,7 @@ public boolean initOutPathLocalFS(Path outPath, WriteMode writeMode, boolean par
}
case OVERWRITE:
if(this.getFileStatus(outPath).isDir()) {
if(parallelOutput) {
if(createDirectory) {
// directory exists and does not need to be created
return true;
} else {
Expand All @@ -476,7 +476,7 @@ public boolean initOutPathLocalFS(Path outPath, WriteMode writeMode, boolean par
}
}

if(parallelOutput) {
if(createDirectory) {
// Output directory needs to be created
try {
if(!this.exists(outPath)) {
Expand Down Expand Up @@ -517,11 +517,11 @@ public boolean initOutPathLocalFS(Path outPath, WriteMode writeMode, boolean par
*
* @param outPath Output path that should be prepared.
* @param writeMode Write mode to consider.
* @param parallelOutput True, if it will be written to the output path in parallel, false otherwise.
* @param createDirectory True, to initialize a directory at the given path, false otherwise.
* @return True, if the path was successfully prepared, false otherwise.
* @throws IOException
*/
public boolean initOutPathDistFS(Path outPath, WriteMode writeMode, boolean parallelOutput) throws IOException {
public boolean initOutPathDistFS(Path outPath, WriteMode writeMode, boolean createDirectory) throws IOException {
if(!this.isDistributedFS()) {
return false;
}
Expand All @@ -548,7 +548,7 @@ public boolean initOutPathDistFS(Path outPath, WriteMode writeMode, boolean para
}
}

if(parallelOutput) {
if(createDirectory) {
// Output directory needs to be created
try {
if(!this.exists(outPath)) {
Expand Down
Loading

0 comments on commit 31bae02

Please sign in to comment.