Skip to content

Commit

Permalink
Merge branch 'version02' of https://dev.stratosphere.eu/git/stage1 in…
Browse files Browse the repository at this point in the history
…to version02
  • Loading branch information
sewen committed Jul 16, 2012
2 parents 074b354 + 021b828 commit ecf79bc
Show file tree
Hide file tree
Showing 56 changed files with 1,125 additions and 1,179 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
import eu.stratosphere.nephele.event.task.EventListener;
import eu.stratosphere.nephele.io.channels.ChannelType;
import eu.stratosphere.nephele.io.compression.CompressionLevel;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.types.Record;

Expand Down Expand Up @@ -86,6 +87,13 @@ public interface Gate<T extends Record> {
*/
ChannelType getChannelType();

/**
* Returns the compression level that is applied by the input/output channels attached to this gate.
*
* @return the compression level that is applied by the input/output channels attached to this gate
*/
CompressionLevel getCompressionLevel();

/**
* Returns the ID of the gate.
*
Expand Down Expand Up @@ -124,4 +132,12 @@ public interface Gate<T extends Record> {
* the type of input/output channels which are connected to this gate
*/
void setChannelType(ChannelType channelType);

/**
* Sets the compression level to be applied by the input/output channels connected to this gate.
*
* @param compressionLevel
* the compression level to be applied by the input/output channels connected to this gate
*/
void setCompressionLevel(CompressionLevel compressionLevel);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import eu.stratosphere.nephele.io.channels.bytebuffered.FileInputChannel;
import eu.stratosphere.nephele.io.channels.bytebuffered.InMemoryInputChannel;
import eu.stratosphere.nephele.io.channels.bytebuffered.NetworkInputChannel;
import eu.stratosphere.nephele.io.compression.CompressionException;
import eu.stratosphere.nephele.io.compression.CompressionLevel;
import eu.stratosphere.nephele.types.Record;

Expand Down Expand Up @@ -85,6 +86,14 @@ public interface InputGate<T extends Record> extends Gate<T> {
*/
void notifyDataUnitConsumed(int channelIndex);

/**
* Initializes the decompressor objects inside the input channels attached to this gate.
*
* @throws CompressionException
* thrown if an error occurs while loading the decompressor objects
*/
void initializeDecompressors() throws CompressionException;

/**
* Activates all of the task's input channels.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import eu.stratosphere.nephele.io.channels.bytebuffered.FileOutputChannel;
import eu.stratosphere.nephele.io.channels.bytebuffered.InMemoryOutputChannel;
import eu.stratosphere.nephele.io.channels.bytebuffered.NetworkOutputChannel;
import eu.stratosphere.nephele.io.compression.CompressionException;
import eu.stratosphere.nephele.io.compression.CompressionLevel;
import eu.stratosphere.nephele.types.Record;

Expand Down Expand Up @@ -115,6 +116,14 @@ public interface OutputGate<T extends Record> extends Gate<T> {
*/
void requestClose() throws IOException, InterruptedException;

/**
* Initializes the compression objects inside the input channels attached to this gate.
*
* @throws CompressionException
* thrown if an error occurs while loading the compression objects
*/
void initializeCompressors() throws CompressionException;

/**
* Removes all output channels from the output gate.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,12 @@

import java.io.EOFException;
import java.io.IOException;
import java.lang.reflect.Constructor;

import eu.stratosphere.nephele.configuration.GlobalConfiguration;
import eu.stratosphere.nephele.io.InputGate;
import eu.stratosphere.nephele.io.compression.CompressionException;
import eu.stratosphere.nephele.io.compression.CompressionLevel;
import eu.stratosphere.nephele.io.compression.Decompressor;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.types.Record;
import eu.stratosphere.nephele.util.StringUtils;

/**
* InputChannel is an abstract base class to all different kinds of concrete
Expand Down Expand Up @@ -94,60 +90,13 @@ public InputGate<T> getInputGate() {
*/
public abstract void close() throws IOException, InterruptedException;

// TODO: See if type safety can be improved here
@SuppressWarnings("unchecked")
public Decompressor getDecompressor(int bufferSize) throws CompressionException {

if (getCompressionLevel() == CompressionLevel.NO_COMPRESSION)
throw new CompressionException("CompressionLevel is set to NO_COMPRESSION");

String configurationKey = null;

switch (this.getType()) {
case FILE:
configurationKey = "channel.file.decompressor";
break;
case NETWORK:
configurationKey = "channel.network.decompressor";
break;
}

if (configurationKey == null)
throw new CompressionException("Cannot determine configuration key for the channel type " + this.getType());

String className = GlobalConfiguration.getString(configurationKey, null);
if (className == null)
throw new CompressionException("Configuration does not contain an entry for key " + configurationKey);

Class<? extends Decompressor> decompressionClass = null;

try {
decompressionClass = (Class<? extends Decompressor>) Class.forName(className);
} catch (ClassNotFoundException e) {
throw new CompressionException("Cannot find decompressor class: " + StringUtils.stringifyException(e));
}

Constructor<? extends Decompressor> constructor = null;

try {
constructor = decompressionClass.getConstructor(int.class, CompressionLevel.class);
} catch (SecurityException e) {
throw new CompressionException(StringUtils.stringifyException(e));
} catch (NoSuchMethodException e) {
throw new CompressionException("Cannot find matching constructor for decompression class: "
+ StringUtils.stringifyException(e));
}

Decompressor decompressor = null;

try {
decompressor = constructor.newInstance(bufferSize, getCompressionLevel());
} catch (Exception e) {
throw new CompressionException(StringUtils.stringifyException(e));
}

return decompressor;
}
/**
* Initializes the decompressor object for this input channel.
*
* @throws CompressionException
* thrown if an error occurs during the initialization process
*/
public abstract void initializeDecompressor() throws CompressionException;

/**
* {@inheritDoc}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,12 @@
package eu.stratosphere.nephele.io.channels;

import java.io.IOException;
import java.lang.reflect.Constructor;

import eu.stratosphere.nephele.configuration.GlobalConfiguration;
import eu.stratosphere.nephele.io.OutputGate;
import eu.stratosphere.nephele.io.compression.Compressor;
import eu.stratosphere.nephele.io.compression.CompressionException;
import eu.stratosphere.nephele.io.compression.CompressionLevel;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.types.Record;
import eu.stratosphere.nephele.util.StringUtils;

/**
* OutputChannel is an abstract base class to all different kinds of concrete
Expand Down Expand Up @@ -92,60 +88,13 @@ public OutputGate<T> getOutputGate() {
*/
public abstract void requestClose() throws IOException, InterruptedException;

// TODO: See if type safety can be improved here
@SuppressWarnings("unchecked")
public Compressor getCompressor(int bufferSize) throws CompressionException {

if (getCompressionLevel() == CompressionLevel.NO_COMPRESSION)
throw new CompressionException("CompressionLevel is set to NO_COMPRESSION");

String configurationKey = null;

switch (this.getType()) {
case FILE:
configurationKey = "channel.file.compressor";
break;
case NETWORK:
configurationKey = "channel.network.compressor";
break;
}

if (configurationKey == null)
throw new CompressionException("Cannot determine configuration key for the channel type " + this.getType());

String className = GlobalConfiguration.getString(configurationKey, null);
if (className == null)
throw new CompressionException("Configuration does not contain an entry for key " + configurationKey);

Class<? extends Compressor> compressionClass = null;

try {
compressionClass = (Class<? extends Compressor>) Class.forName(className);
} catch (ClassNotFoundException e) {
throw new CompressionException("Cannot find compressor class: " + StringUtils.stringifyException(e));
}

Constructor<? extends Compressor> constructor = null;

try {
constructor = compressionClass.getConstructor(int.class, CompressionLevel.class);
} catch (SecurityException e) {
throw new CompressionException(StringUtils.stringifyException(e));
} catch (NoSuchMethodException e) {
throw new CompressionException("Cannot find matching constructor for compression class: "
+ StringUtils.stringifyException(e));
}

Compressor compressor = null;

try {
compressor = constructor.newInstance(bufferSize, getCompressionLevel());
} catch (Exception e) {
throw new CompressionException(StringUtils.stringifyException(e));
}

return compressor;
}
/**
* Initializes the compressor object for this output channel.
*
* @throws CompressionException
* thrown if an error occurs during the initialization process
*/
public abstract void initializeCompressor() throws CompressionException;

/**
* {@inheritDoc}
Expand Down
Loading

0 comments on commit ecf79bc

Please sign in to comment.