Skip to content

Commit

Permalink
Default mode for output formats is not to overwrite existing files.
Browse files Browse the repository at this point in the history
Globally configurable default modes for overwrite / output directory behavior.
Configurable behavior for FileOutputFormat through regular parameters, rather than configuration.
  • Loading branch information
StephanEwen committed Mar 3, 2014
1 parent 6c261d3 commit 3a63443
Show file tree
Hide file tree
Showing 15 changed files with 337 additions and 216 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public synchronized void start(int numTaskMgr) throws InterruptedException {
}

Configuration conf = NepheleMiniCluster.getMiniclusterDefaultConfig(
JOB_MANAGER_RPC_PORT, 6500, 7501, null, true);
JOB_MANAGER_RPC_PORT, 6500, 7501, null, true, true, false);
GlobalConfiguration.includeConfiguration(conf);

// start job manager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ public class NepheleMiniCluster {
private String hdfsConfigFile;

private boolean visualizerEnabled = DEFAULT_VISUALIZER_ENABLED;

private boolean defaultOverwriteFiles = false;

private boolean defaultAlwaysCreateDirectory = false;


private Thread runner;
Expand Down Expand Up @@ -109,6 +113,22 @@ public void setVisualizerEnabled(boolean visualizerEnabled) {
this.visualizerEnabled = visualizerEnabled;
}

public boolean isDefaultOverwriteFiles() {
return defaultOverwriteFiles;
}

public void setDefaultOverwriteFiles(boolean defaultOverwriteFiles) {
this.defaultOverwriteFiles = defaultOverwriteFiles;
}

public boolean isDefaultAlwaysCreateDirectory() {
return defaultAlwaysCreateDirectory;
}

public void setDefaultAlwaysCreateDirectory(boolean defaultAlwaysCreateDirectory) {
this.defaultAlwaysCreateDirectory = defaultAlwaysCreateDirectory;
}


// ------------------------------------------------------------------------
// Life cycle and Job Submission
Expand All @@ -128,7 +148,7 @@ public void start() throws Exception {
GlobalConfiguration.loadConfiguration(configDir);
} else {
Configuration conf = getMiniclusterDefaultConfig(jobManagerRpcPort, taskManagerRpcPort,
taskManagerDataPort, hdfsConfigFile, visualizerEnabled);
taskManagerDataPort, hdfsConfigFile, visualizerEnabled, defaultOverwriteFiles, defaultAlwaysCreateDirectory);
GlobalConfiguration.includeConfiguration(conf);
}

Expand Down Expand Up @@ -188,7 +208,8 @@ private void waitForJobManagerToBecomeReady() throws InterruptedException {
}

public static Configuration getMiniclusterDefaultConfig(int jobManagerRpcPort, int taskManagerRpcPort,
int taskManagerDataPort, String hdfsConfigFile, boolean visualization)
int taskManagerDataPort, String hdfsConfigFile, boolean visualization,
boolean defaultOverwriteFiles, boolean defaultAlwaysCreateDirectory)
{
final Configuration config = new Configuration();

Expand All @@ -209,8 +230,13 @@ public static Configuration getMiniclusterDefaultConfig(int jobManagerRpcPort, i

// hdfs
if (hdfsConfigFile != null) {
config.setString("fs.hdfs.hdfsdefault", hdfsConfigFile);
config.setString(ConfigConstants.HDFS_DEFAULT_CONFIG, hdfsConfigFile);
}

// file system behavior
config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, defaultOverwriteFiles);
config.setBoolean(ConfigConstants.FILESYSTEM_OUTPUT_ALWAYS_CREATE_DIRECTORY_KEY, defaultAlwaysCreateDirectory);

return config;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,63 +13,73 @@

package eu.stratosphere.api.common.io;


import java.io.IOException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import eu.stratosphere.api.common.operators.FileDataSink;
import eu.stratosphere.configuration.ConfigConstants;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.configuration.GlobalConfiguration;
import eu.stratosphere.core.fs.FSDataOutputStream;
import eu.stratosphere.core.fs.FileSystem;
import eu.stratosphere.core.fs.FileSystem.WriteMode;
import eu.stratosphere.core.fs.Path;


/**
* The abstract base class for all output formats that are file based. Contains the logic to open/close the target
* file streams.
*/
public abstract class FileOutputFormat<IT> implements OutputFormat<IT> {
private static final long serialVersionUID = 1L;

// --------------------------------------------------------------------------------------------

/**
* The LOG for logging messages in this class.
*/
private static final Log LOG = LogFactory.getLog(FileOutputFormat.class);

/**
* The key under which the name of the target path is stored in the configuration.
* Defines the behavior for creating output directories.
*
*/
public static final String FILE_PARAMETER_KEY = "stratosphere.output.file";
public static enum OutputDirectoryMode {

/** A directory is always created, regardless of number of write tasks. */
ALWAYS,

/** A directory is only created for parallel output tasks, i.e., number of output tasks > 1.
* If number of output tasks = 1, the output is written to a single file. */
PARONLY
}

/**
* The key under which the write mode is stored in the configuration
*/
public static final String WRITEMODE_PARAMETER_KEY = "stratosphere.output.writemode";
// --------------------------------------------------------------------------------------------

private static final WriteMode DEFAULT_WRITE_MODE;

private static final OutputDirectoryMode DEFAULT_OUTPUT_DIRECTORY_MODE;


static {
final boolean overwrite = GlobalConfiguration.getBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY,
ConfigConstants.DEFAULT_FILESYSTEM_OVERWRITE);

DEFAULT_WRITE_MODE = overwrite ? WriteMode.OVERWRITE : WriteMode.CREATE;

final boolean alwaysCreateDirectory = GlobalConfiguration.getBoolean(ConfigConstants.FILESYSTEM_OUTPUT_ALWAYS_CREATE_DIRECTORY_KEY,
ConfigConstants.DEFAULT_FILESYSTEM_ALWAYS_CREATE_DIRECTORY);

DEFAULT_OUTPUT_DIRECTORY_MODE = alwaysCreateDirectory ? OutputDirectoryMode.ALWAYS : OutputDirectoryMode.PARONLY;
}

// --------------------------------------------------------------------------------------------

/**
* Value keys for the write modes
* The LOG for logging messages in this class.
*/
public static final String WRITEMODE_CREATE = "stratosphere.output.writemode.create";
public static final String WRITEMODE_OVERWRITE = "stratosphere.output.writemode.overwrite";
private static final Log LOG = LogFactory.getLog(FileOutputFormat.class);

/**
* The key under which the output directory mode parameter is stored in the configuration
*/
public static final String OUT_DIRECTORY_PARAMETER_KEY = "stratosphere.output.directory";

/**
* Value keys for the output directory modes
*/
public static final String OUT_DIRECTORY_ALWAYS = "stratosphere.output.directory.always";
public static final String OUT_DIRECTORY_PARONLY = "stratosphere.output.directory.paronly";

/**
* The config parameter for the opening timeout in milliseconds.
* The key under which the name of the target path is stored in the configuration.
*/
public static final String OUTPUT_STREAM_OPEN_TIMEOUT_KEY = "stratosphere.output.file.timeout";
public static final String FILE_PARAMETER_KEY = "stratosphere.output.file";

/**
* The path of the file to be written.
Expand All @@ -79,86 +89,71 @@ public abstract class FileOutputFormat<IT> implements OutputFormat<IT> {
/**
* The write mode of the output.
*/
protected WriteMode writeMode;
private WriteMode writeMode;

/**
* The output directory mode
*/
protected OutputDirectoryMode outDirMode;
private OutputDirectoryMode outputDirectoryMode;

/**
* The stream to which the data is written;
* Stream opening timeout.
*/
protected FSDataOutputStream stream;
private long openTimeout = -1;

// --------------------------------------------------------------------------------------------

/**
* Stream opening timeout.
* The stream to which the data is written;
*/
private long openTimeout;
protected transient FSDataOutputStream stream;

// --------------------------------------------------------------------------------------------

/**
* Defines the behavior for creating output directories.
*
*/
public static enum OutputDirectoryMode {
ALWAYS, // A directory is always created, regardless of number of write tasks
PARONLY // A directory is only created for parallel output tasks, i.e., number of output tasks > 1.
// If number of output tasks = 1, the output is written to a single file.
}

@Override
public void configure(Configuration parameters) {
// get the file parameter
String filePath = parameters.getString(FILE_PARAMETER_KEY, null);
if (filePath == null) {
throw new IllegalArgumentException("Configuration file FileOutputFormat does not contain the file path.");
}

try {
this.outputFilePath = new Path(filePath);
}
catch (RuntimeException rex) {
throw new RuntimeException("Could not create a valid URI from the given file path name: " + rex.getMessage());
// get the output file path, if it was not yet set
if (this.outputFilePath == null) {
// get the file parameter
String filePath = parameters.getString(FILE_PARAMETER_KEY, null);
if (filePath == null) {
throw new IllegalArgumentException("The output path has been specified neither via constructor/setters" +
", nor via the Configuration.");
}

try {
this.outputFilePath = new Path(filePath);
}
catch (RuntimeException rex) {
throw new RuntimeException("Could not create a valid URI from the given file path name: " + rex.getMessage());
}
}

// get the write mode parameter
String writeModeParam = parameters.getString(WRITEMODE_PARAMETER_KEY, WRITEMODE_OVERWRITE);
if(writeModeParam.equals(WRITEMODE_OVERWRITE)) {
this.writeMode = WriteMode.OVERWRITE;
} else if(writeModeParam.equals(WRITEMODE_CREATE)) {
this.writeMode = WriteMode.CREATE;
} else {
throw new RuntimeException("Invalid write mode configuration: "+writeModeParam);
// check if have not been set and use the defaults in that case
if (this.writeMode == null) {
this.writeMode = DEFAULT_WRITE_MODE;
}

// get the output directory parameter
String outDirParam = parameters.getString(OUT_DIRECTORY_PARAMETER_KEY, OUT_DIRECTORY_PARONLY);
if(outDirParam.equals(OUT_DIRECTORY_ALWAYS)) {
this.outDirMode = OutputDirectoryMode.ALWAYS;
} else if(outDirParam.equals(OUT_DIRECTORY_PARONLY)) {
this.outDirMode = OutputDirectoryMode.PARONLY;
} else {
throw new RuntimeException("Invalid output directory mode configuration: "+outDirParam);
if (this.outputDirectoryMode == null) {
this.outputDirectoryMode = DEFAULT_OUTPUT_DIRECTORY_MODE;
}

// get timeout for stream opening
this.openTimeout = parameters.getLong(OUTPUT_STREAM_OPEN_TIMEOUT_KEY, FileInputFormat.DEFAULT_OPENING_TIMEOUT);
if (this.openTimeout < 0) {
if (this.openTimeout == -1) {
this.openTimeout = FileInputFormat.DEFAULT_OPENING_TIMEOUT;
if (LOG.isWarnEnabled())
LOG.warn("Ignoring invalid parameter for stream opening timeout (requires a positive value or zero=infinite): " + this.openTimeout);
} else if (this.openTimeout == 0) {
this.openTimeout = Long.MAX_VALUE;
}
}



@Override
public void open(int taskNumber, int numTasks) throws IOException {
// obtain FSDataOutputStream asynchronously, since HDFS client can not handle InterruptedExceptions

if (LOG.isDebugEnabled())
LOG.debug("Openint stream for output (" + (taskNumber+1) + "/" + numTasks + "). WriteMode=" + writeMode +
", OutputDirectoryMode=" + outputDirectoryMode + ", timeout=" + openTimeout);

// obtain FSDataOutputStream asynchronously, since HDFS client is vulnerable to InterruptedExceptions
OutputPathOpenThread opot = new OutputPathOpenThread(this, (taskNumber + 1), numTasks);
opot.start();

Expand All @@ -182,16 +177,50 @@ public void close() throws IOException {
}
}

public void setOutputFilePath(Path path) {
if (path == null)
throw new IllegalArgumentException("Output file path may not be null.");

this.outputFilePath = path;
}

public Path getOutputFilePath() {
return this.outputFilePath;
}


public void setWriteMode(WriteMode mode) {
if (mode == null) {
throw new NullPointerException();
}

this.writeMode = mode;
}

public WriteMode getWriteMode() {
return this.writeMode;
}


public OutputDirectoryMode getOutDirMode() {
return this.outDirMode;
public void setOutputDirectoryMode(OutputDirectoryMode mode) {
if (mode == null) {
throw new NullPointerException();
}

this.outputDirectoryMode = mode;
}

public OutputDirectoryMode getOutputDirectoryMode() {
return this.outputDirectoryMode;
}


public void setOpenTimeout(long timeout) {
if (timeout < 0) {
throw new IllegalArgumentException("The timeout must be a nonnegative numer of milliseconds (zero for infinite).");
}

this.openTimeout = (timeout == 0) ? Long.MAX_VALUE : timeout;
}

public long getOpenTimeout() {
Expand Down Expand Up @@ -224,15 +253,15 @@ private static final class OutputPathOpenThread extends Thread {
public OutputPathOpenThread(FileOutputFormat<?> fof, int taskIndex, int numTasks) {
this.path = fof.getOutputFilePath();
this.writeMode = fof.getWriteMode();
this.outDirMode = fof.getOutDirMode();
this.outDirMode = fof.getOutputDirectoryMode();
this.timeoutMillies = fof.getOpenTimeout();
this.taskIndex = taskIndex;
this.numTasks = numTasks;
}

@Override
public void run() {

try {
Path p = this.path;
final FileSystem fs = p.getFileSystem();
Expand Down Expand Up @@ -381,22 +410,6 @@ public static abstract class AbstractConfigBuilder<T> {
protected AbstractConfigBuilder(Configuration targetConfig) {
this.config = targetConfig;
}

// --------------------------------------------------------------------

/**
* Sets the timeout after which the output format will abort the opening of the output stream,
* if the stream has not responded until then.
*
* @param timeoutInMillies The timeout, in milliseconds, or <code>0</code> for infinite.
* @return The builder itself.
*/
public T openingTimeout(int timeoutInMillies) {
this.config.setLong(OUTPUT_STREAM_OPEN_TIMEOUT_KEY, timeoutInMillies);
@SuppressWarnings("unchecked")
T ret = (T) this;
return ret;
}
}

/**
Expand Down
Loading

0 comments on commit 3a63443

Please sign in to comment.