Skip to content

Commit

Permalink
Introduced CREATE and OVERWRITE file output write modes. OVERWRITE is…
Browse files Browse the repository at this point in the history
… default (as before).

Rewrote code for file output path preparation.
Fixed distributed writing to local filesystem apache#286
  • Loading branch information
Fabian Hueske committed Jan 30, 2014
1 parent 930b064 commit e6911e7
Show file tree
Hide file tree
Showing 20 changed files with 357 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ public void configure(Configuration parameters) {
}

@Override
public void open(int taskNumber) throws IOException {
super.open(taskNumber);
public void open(int taskNumber, int numTasks) throws IOException {
super.open(taskNumber, numTasks);
this.wrt = this.charsetName == null ? new OutputStreamWriter(new BufferedOutputStream(this.stream, 4096)) :
new OutputStreamWriter(new BufferedOutputStream(this.stream, 4096), this.charsetName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void configure(Configuration parameters) {
}

@Override
public void open(int taskNumber) throws IOException {
public void open(int taskNumber, int numTasks) throws IOException {
this.hadoopConfig = getHadoopConfig(this.config);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public void configure(Configuration parameters) {
* I/O problem.
*/
@Override
public void open(int taskNumber) throws IOException {
public void open(int taskNumber, int numTasks) throws IOException {
try {
establishConnection();
upload = dbConn.prepareStatement(query);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public void testJDBCOutputFormat() throws IOException {

jdbcOutputFormat = new JDBCOutputFormat();
jdbcOutputFormat.configure(cfg);
jdbcOutputFormat.open(1);
jdbcOutputFormat.open(0,1);

jdbcInputFormat = new JDBCInputFormat(
driverPath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@ public abstract class BinaryOutputFormat<T extends IOReadableWritable> extends F

private BlockBasedOutput blockBasedInput;

/*
* (non-Javadoc)
* @see eu.stratosphere.pact.common.io.FileOutputFormat#close()
*/
@Override
public void close() throws IOException {
this.dataOutputStream.close();
Expand All @@ -54,11 +50,6 @@ public void close() throws IOException {
protected void complementBlockInfo(BlockInfo blockInfo) throws IOException {
}

/*
* (non-Javadoc)
* @see
* eu.stratosphere.pact.common.io.FileInputFormat#configure(eu.stratosphere.nephele.configuration.Configuration)
*/
@Override
public void configure(Configuration parameters) {
super.configure(parameters);
Expand All @@ -75,13 +66,9 @@ protected BlockInfo createBlockInfo() {
return new BlockInfo();
}

/*
* (non-Javadoc)
* @see eu.stratosphere.pact.common.io.FileOutputFormat#open(int)
*/
@Override
public void open(int taskNumber) throws IOException {
super.open(taskNumber);
public void open(int taskNumber, int numTasks) throws IOException {
super.open(taskNumber, numTasks);

final long blockSize = this.blockSize == NATIVE_BLOCK_SIZE ?
this.outputFilePath.getFileSystem().getDefaultBlockSize() : this.blockSize;
Expand All @@ -92,10 +79,6 @@ public void open(int taskNumber) throws IOException {

protected abstract void serialize(T record, DataOutput dataOutput) throws IOException;

/*
* (non-Javadoc)
* @see eu.stratosphere.pact.common.io.OutputFormat#writeRecord(eu.stratosphere.pact.common.type.Record)
*/
@Override
public void writeRecord(T record) throws IOException {
this.blockBasedInput.startRecord();
Expand All @@ -109,9 +92,6 @@ public void writeRecord(T record) throws IOException {
*/
protected class BlockBasedOutput extends FilterOutputStream {

/**
*
*/
private static final int NO_RECORD = -1;

private final int maxPayloadSize;
Expand All @@ -132,10 +112,6 @@ public BlockBasedOutput(OutputStream out, int blockSize) {
this.maxPayloadSize = blockSize - this.blockInfo.getInfoSize();
}

/*
* (non-Javadoc)
* @see java.io.FilterOutputStream#close()
*/
@Override
public void close() throws IOException {
if (this.blockPos > 0)
Expand All @@ -151,19 +127,11 @@ public void startRecord() {
this.totalCount++;
}

/*
* (non-Javadoc)
* @see java.io.FilterOutputStream#write(byte[])
*/
@Override
public void write(byte[] b) throws IOException {
this.write(b, 0, b.length);
}

/*
* (non-Javadoc)
* @see java.io.FilterOutputStream#write(byte[], int, int)
*/
@Override
public void write(byte[] b, int off, int len) throws IOException {

Expand All @@ -179,10 +147,6 @@ public void write(byte[] b, int off, int len) throws IOException {
}
}

/*
* (non-Javadoc)
* @see java.io.FilterOutputStream#write(int)
*/
@Override
public void write(int b) throws IOException {
super.write(b);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.core.fs.FSDataOutputStream;
import eu.stratosphere.core.fs.FileSystem;
import eu.stratosphere.core.fs.FileSystem.WriteMode;
import eu.stratosphere.core.fs.Path;


Expand All @@ -43,6 +44,17 @@ public abstract class FileOutputFormat<IT> implements OutputFormat<IT> {
*/
public static final String FILE_PARAMETER_KEY = "pact.output.file";

/**
* The key under which the write mode is stored in the configuration
*/
public static final String WRITEMODE_PARAMETER_KEY = "pact.output.writemode";

/**
* Value keys for the write modes
*/
public static final String WRITEMODE_CREATE = "stratosphere.output.writemode.create";
public static final String WRITEMODE_OVERWRITE = "stratosphere.output.writemode.overwrite";

/**
* The config parameter for the opening timeout in milliseconds.
*/
Expand All @@ -53,6 +65,11 @@ public abstract class FileOutputFormat<IT> implements OutputFormat<IT> {
*/
protected Path outputFilePath;

/**
* The write mode of the output.
*/
protected WriteMode writeMode;

/**
* The stream to which the data is written;
*/
Expand Down Expand Up @@ -81,6 +98,16 @@ public void configure(Configuration parameters) {
throw new RuntimeException("Could not create a valid URI from the given file path name: " + rex.getMessage());
}

// get the write mode parameter
String writeModeParam = parameters.getString(WRITEMODE_PARAMETER_KEY, WRITEMODE_OVERWRITE);
if(writeModeParam.equals(WRITEMODE_OVERWRITE)) {
this.writeMode = WriteMode.OVERWRITE;
} else if(writeModeParam.equals(WRITEMODE_CREATE)) {
this.writeMode = WriteMode.CREATE;
} else {
throw new RuntimeException("Invalid write mode configuration: "+writeModeParam);
}

// get timeout for stream opening
this.openTimeout = parameters.getLong(OUTPUT_STREAM_OPEN_TIMEOUT_KEY, FileInputFormat.DEFAULT_OPENING_TIMEOUT);
if (this.openTimeout < 0) {
Expand All @@ -95,9 +122,9 @@ public void configure(Configuration parameters) {


@Override
public void open(int taskNumber) throws IOException {
public void open(int taskNumber, int numTasks) throws IOException {
// obtain FSDataOutputStream asynchronously, since HDFS client can not handle InterruptedExceptions
OutputPathOpenThread opot = new OutputPathOpenThread(this.outputFilePath, taskNumber, this.openTimeout);
OutputPathOpenThread opot = new OutputPathOpenThread(this.outputFilePath, this.writeMode, (taskNumber + 1), numTasks, this.openTimeout);
opot.start();

try {
Expand All @@ -120,6 +147,10 @@ public void close() throws IOException {
}
}

public WriteMode getWriteMode() {
return this.writeMode;
}

// ============================================================================================

private static final class OutputPathOpenThread extends Thread {
Expand All @@ -128,6 +159,10 @@ private static final class OutputPathOpenThread extends Thread {

private final int taskIndex;

private final int numTasks;

private final WriteMode writeMode;

private final long timeoutMillies;

private volatile FSDataOutputStream fdos;
Expand All @@ -137,30 +172,69 @@ private static final class OutputPathOpenThread extends Thread {
private volatile boolean aborted;


public OutputPathOpenThread(Path path, int taskIndex, long timeoutMillies) {
public OutputPathOpenThread(Path path, WriteMode writeMode, int taskIndex, int numTasks, long timeoutMillies) {
this.path = path;
this.writeMode = writeMode;
this.timeoutMillies = timeoutMillies;
this.taskIndex = taskIndex;
this.numTasks = numTasks;
}

@Override
public void run() {

try {
Path p = this.path;
final FileSystem fs = p.getFileSystem();

// if the output is a directory, suffix the path with the parallel instance index
if (fs.exists(p) && fs.getFileStatus(p).isDir()) {

if(this.numTasks == 1) {
// output is not written in parallel

if(!fs.isDistributedFS()) {
// prepare local output path
// checks for write mode and removes existing files in case of OVERWRITE mode
if(!fs.initOutPathLocalFS(p, writeMode, false)) {
// output preparation failed! Cancel task.
throw new IOException("Output path could not be initialized. Canceling task.");
}
}

// create output file
this.fdos = fs.create(p, false);

} else if(this.numTasks > 1) {
// output is written in parallel

if(!fs.isDistributedFS()) {
// File system is not distributed.
// We need to prepare the output path on each executing node.
if(!fs.initOutPathLocalFS(p, writeMode, true)) {
// output preparation failed! Cancel task.
throw new IOException("Output directory could not be created. Canceling task.");
}
}

// Suffix the path with the parallel instance index
p = p.suffix("/" + this.taskIndex);

// create output file
switch(writeMode) {
case CREATE:
this.fdos = fs.create(p, false);
break;
case OVERWRITE:
this.fdos = fs.create(p, true);
break;
default:
throw new IllegalArgumentException("Invalid write mode: "+writeMode);
}
this.fdos = fs.create(p, true);

} else {
// invalid number of subtasks (<= 0)
throw new IllegalArgumentException("Invalid number of subtasks. Canceling task.");
}

// remove the existing file before creating the output stream
if (fs.exists(p)) {
fs.delete(p, false);
}

this.fdos = fs.create(p, true);

// check for canceling and close the stream in that case, because no one will obtain it
if (this.aborted) {
final FSDataOutputStream f = this.fdos;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public static <T, F extends FileOutputFormat<? extends T>> F openOutput(
configuration.setString(FileOutputFormat.FILE_PARAMETER_KEY, path);
configuration.setLong(FileOutputFormat.OUTPUT_STREAM_OPEN_TIMEOUT_KEY, 0);
outputFormat.configure(configuration);
outputFormat.open(1);
outputFormat.open(0, 1);
return outputFormat;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@ public interface OutputFormat<IT> extends Serializable {
* When this method is called, the output format it guaranteed to be configured.
*
* @param taskNumber The number of the parallel instance.
* @param numTasks The number of parallel tasks.
* @throws IOException Thrown, if the output could not be opened due to an I/O problem.
*/
void open(int taskNumber) throws IOException;
void open(int taskNumber, int numTasks) throws IOException;


/**
Expand Down
Loading

0 comments on commit e6911e7

Please sign in to comment.