Skip to content

Commit

Permalink
Merge branch 'stratosphere'
Browse files Browse the repository at this point in the history
  • Loading branch information
Fabian Hueske committed Apr 4, 2011
2 parents 2ed1db2 + 544950f commit 67f0445
Show file tree
Hide file tree
Showing 136 changed files with 693 additions and 27,506 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ public void run() {
LOG.error(StringUtils.stringifyException(e2));
}
}

// Release all resources that may currently be allocated by the individual channels
releaseAllChannelResources();

Expand Down Expand Up @@ -397,6 +397,9 @@ public void run() {
waitForOutputChannelsToBeClosed();
} catch (Exception e) {

// Release all resources that may currently be allocated by the individual channels
releaseAllChannelResources();

if (this.isCanceled) {
changeExecutionState(ExecutionState.CANCELED, null);
} else {
Expand All @@ -406,6 +409,9 @@ public void run() {
return;
}

// Release all resources that may currently be allocated by the individual channels
releaseAllChannelResources();

// Finally, switch execution state to FINISHED and report to job manager
changeExecutionState(ExecutionState.FINISHED, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,5 +295,9 @@ public void releaseResources() {
this.uncompressedDataBuffer.recycleBuffer();
this.uncompressedDataBuffer = null;
}

if(this.decompressor != null) {
this.decompressor.shutdown();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -395,5 +395,9 @@ public void releaseResources() {
this.uncompressedDataBuffer.recycleBuffer();
this.uncompressedDataBuffer = null;
}

if(this.compressor != null) {
this.compressor.shutdown();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ public static synchronized void init() {
continue;
}

LOG.debug("Trying to load compression library " + libraryClass);
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to load compression library " + libraryClass);
}
final CompressionLibrary compressionLibrary = initCompressionLibrary(libraryClass);
if (compressionLibrary == null) {
LOG.error("Cannot load " + libraryClass);
Expand All @@ -82,21 +84,22 @@ public static synchronized void init() {
/**
* Returns the path to the native libraries or <code>null</code> if an error occurred.
*
* @param libraryClass
* the name of this compression library's wrapper class including full package name
* @return the path to the native libraries or <code>null</code> if an error occurred
*/
private static String getNativeLibraryPath() {
private static String getNativeLibraryPath(final String libraryClass) {

final ClassLoader cl = ClassLoader.getSystemClassLoader();
if (cl == null) {
LOG.error("Cannot find system class loader");
return null;
}

final String classLocation = "eu/stratosphere/nephele/io/compression/library/zlib/ZlibLibrary.class"; // TODO:
// Use
// other
// class
// here
final String classLocation = libraryClass.replace('.', '/') + ".class";
if (LOG.isDebugEnabled()) {
LOG.debug("Class location is " + classLocation);
}

final URL location = cl.getResource(classLocation);
if (location == null) {
Expand All @@ -105,7 +108,6 @@ private static String getNativeLibraryPath() {
}

final String locationString = location.toString();
// System.out.println("LOCATION: " + locationString);
if (locationString.contains(".jar!")) { // Class if inside of a deployed jar file

// Create and return path to native library cache
Expand Down Expand Up @@ -144,109 +146,6 @@ private static String getNativeLibraryPath() {
}
}

/**
* Initialize the CompressionLoader, load all native compression libraries and prepare Compression-Decision-Model.
*/
/*
* public static void init2() {
* init();
* String compressionTrainingFile = GlobalConfiguration.getString("taskmanager.compression.trainingfile.default",
* "/home/akli/uni/diplom/nephele-new/nephele/nephele-common/src/native/default_compression_training_data.dat");
* String compressionDecisionModel =
* GlobalConfiguration.getString("taskmanager.compression.decisionmodel.classname",
* "eu.stratosphere.nephele.io.compression.dynamic.TestModel");
* if(GlobalConfiguration.getBoolean(ENABLE_THROUGHPUT_ANALYZER_KEY, false)) {
* final String address = GlobalConfiguration.getString(THROUGHPUT_ANALYZER_IPC_ADDRESS_KEY,
* DEFAULT_THROUGHPUT_ANALYZER_IPC_ADDRESS);
* final int port = GlobalConfiguration.getInteger(THROUGHPUT_ANALYZER_IPC_PORT_KEY,
* DEFAULT_THROUGHPUT_ANALYZER_IPC_PORT);
* InetSocketAddress throughputAnalyzerAddress = new InetSocketAddress(address, port);
* //Try to create local stub for the ThroughputAnalyzer
* ThroughputAnalyzerProtocol throughput = null; //TODO: Fix me
* try {
* throughput = (ThroughputAnalyzerProtocol) RPC.getProxy(ThroughputAnalyzerProtocol.class,
* throughputAnalyzerAddress, NetUtils.getSocketFactory());
* } catch (IOException e) {
* LOG.error("Error during load of ThroughputAnalyzer");
* LOG.error(StringUtils.stringifyException(e));
* //System.exit(FAILURERETURNCODE);
* }
* throughputAnalyzer = throughput;
* LOG.info("ThroughputAnalyzer loaded!");
* }
* //initCompressionDecisionModel(compressionDecisionModel, compressionTrainingFile); //TODO: Fix me
* }
*/

/*
* public static void initCompressionDecisionModel(String modelName, String pathToTrainingFile){
* if (modelName.compareTo(IOWaitModel.class.getName()) == 0){
* if (instance != null && initWithProfiler(instance)){
* decisionModel = DecisionModel.IOWAIT_MODEL;
* LOG.info("CompressionLoader: Using IOWaitModel for Dynamic Compression.");
* }else{
* decisionModel = DecisionModel.TEST_MODEL;
* LOG.warn(
* "CompressionLoader: Could not load IOWaitModel for Dynamic Compression - Using TestModel with Lzo Compression.");
* }
* }else if (modelName.compareTo(ThroughputModel.class.getName()) == 0){
* if (throughputAnalyzer != null){
* decisionModel = DecisionModel.THROUGHPUT_MODEL;
* LOG.info("CompressionLoader: Using ThroughputModel for Dynamic Compression.");
* }else{
* decisionModel = DecisionModel.TEST_MODEL;
* LOG.warn(
* "CompressionLoader: Could not load ThroughputModel for Dynamic Compression - Using TestModel with Lzo Compression."
* );
* }
* }else if (modelName.compareTo(ExtendedThroughputModel.class.getName()) == 0){
* if (throughputAnalyzer != null){
* decisionModel = DecisionModel.EXTENDED_THROUGHPUT_MODEL;
* LOG.info("CompressionLoader: Using ExtendedThroughputModel for Dynamic Compression.");
* }else{
* decisionModel = DecisionModel.TEST_MODEL;
* LOG.warn(
* "CompressionLoader: Could not load ExtendedThroughputModel for Dynamic Compression - Using TestModel with Lzo Compression."
* );
* }
* }else if (modelName.compareTo(NumericModel.class.getName()) == 0){
* if (instance != null && initWithProfiler(instance)){
* decisionModel = DecisionModel.NUMERIC_MODEL;
* LOG.info("CompressionLoader: Using NumericModel for Dynamic Compression.");
* }else{
* decisionModel = DecisionModel.TEST_MODEL;
* LOG.warn(
* "CompressionLoader: Could not load NumericModel for Dynamic Compression - Using TestModel with Lzo Compression."
* );
* }
* }else if (modelName.compareTo(TrainedModel.class.getName()) == 0){
* if (pathToTrainingFile != null && initWithTrainingset(pathToTrainingFile)){
* decisionModel = DecisionModel.TRAINED_MODEL;
* LOG.info("CompressionLoader: Using TrainedModel for Dynamic Compression.");
* }else{
* decisionModel = DecisionModel.TEST_MODEL;
* LOG.warn(
* "CompressionLoader: Could not load TrainedModel for Dynamic Compression - Using TestModel with Lzo Compression."
* );
* }
* }else if (modelName.compareTo(ExtendedTrainedModel.class.getName()) == 0){
* if (pathToTrainingFile != null && initWithTrainingset(pathToTrainingFile) && throughputAnalyzer != null){
* decisionModel = DecisionModel.EXTENDED_TRAINED_MODEL;
* LOG.info("CompressionLoader: Using ExtendedTrainedModel for Dynamic Compression.");
* }else{
* decisionModel = DecisionModel.TEST_MODEL;
* LOG.warn(
* "CompressionLoader: Could not load ExtendedTrainedModel for Dynamic Compression - Using TestModel with Lzo Compression."
* );
* }
* }else{
* LOG.info("DecisionModel for Dynamic Compression not set or unknown ( " + modelName +
* " ) - Using TestModel with Lzo Compression.");
* decisionModel = DecisionModel.TEST_MODEL;
* }
* }
*/

@SuppressWarnings("unchecked")
private static CompressionLibrary initCompressionLibrary(String libraryClass) {

Expand Down Expand Up @@ -281,51 +180,24 @@ private static CompressionLibrary initCompressionLibrary(String libraryClass) {
CompressionLibrary compressionLibrary;

try {
compressionLibrary = constructor.newInstance(getNativeLibraryPath());
compressionLibrary = constructor.newInstance(getNativeLibraryPath(libraryClass));
} catch (IllegalArgumentException e) {
LOG.error(e);
LOG.error(StringUtils.stringifyException(e));
return null;
} catch (InstantiationException e) {
LOG.error(e);
LOG.error(StringUtils.stringifyException(e));
return null;
} catch (IllegalAccessException e) {
LOG.error(e);
LOG.error(StringUtils.stringifyException(e));
return null;
} catch (InvocationTargetException e) {
LOG.error(e);
LOG.error(StringUtils.stringifyException(e));
return null;
}

return compressionLibrary;
}

/*
* private static boolean initWithTrainingset(String fileName){
* File f = new File(fileName);
* if (f.exists() && f.isFile()){
* FileInputStream fis;
* ObjectInputStream ois;
* try {
* fis = new FileInputStream(f);
* ois = new ObjectInputStream(fis);
* trainingSet = (TrainingResults) ois.readObject();
* ois.close();
* return true;
* } catch (FileNotFoundException e1) {
* // TODO Auto-generated catch block
* e1.printStackTrace();
* } catch (IOException e2) {
* // TODO Auto-generated catch block
* e2.printStackTrace();
* } catch (ClassNotFoundException e3) {
* // TODO Auto-generated catch block
* e3.printStackTrace();
* }
* }
* return false;
* }
*/

public static synchronized CompressionLibrary getCompressionLibraryByCompressionLevel(CompressionLevel level) {

if (level == CompressionLevel.NO_COMPRESSION) {
Expand Down Expand Up @@ -405,12 +277,6 @@ public static synchronized Decompressor getDecompressorByCompressionLevel(Compre
}
}

/*
* public static boolean isThroughputAnalyzerLoaded(){
* return throughputAnalyzer != null;
* }
*/

public static synchronized int getUncompressedBufferSize(int compressedBufferSize, CompressionLevel cl) {

final CompressionLibrary c = compressionLibraries.get(cl);
Expand Down Expand Up @@ -449,48 +315,4 @@ public static synchronized int getCompressedBufferSize(int uncompressedBufferSiz
return uncompressedBufferSize;
}
}

/*
* public static void reportDatapackageSend(int compressorID, int dataID, int bytes, int uncompressedBytes, int
* uncompressedBufferSize, int compressionLevel){
* if (throughputAnalyzer != null)
* throughputAnalyzer.reportDatapackageSend(new IntegerRecord(compressorID), new IntegerRecord(dataID), new
* IntegerRecord(bytes), new IntegerRecord(uncompressedBytes), new IntegerRecord(uncompressedBufferSize), new
* IntegerRecord(compressionLevel));
* }
* public static void reportDatapackageReceive(int dataID){
* if (throughputAnalyzer != null)
* throughputAnalyzer.reportDatapackageReceive(new IntegerRecord(dataID));
* }
* public static ThroughputAnalyzerResult getAverageCommunicationTimeForCompressor(int compressorID){
* if (throughputAnalyzer != null)
* return throughputAnalyzer.getAverageCommunicationTimeForCompressor(new IntegerRecord(compressorID));
* else{
* return new ThroughputAnalyzerResult();
* }
* }
* public static double getCurrentBandwidthInBytesPerNS(ChannelType type){
* switch(type){
* case FILE:
* return 0.042; //0.084Byte/ns = 83886.08Byte/ms = 83886080Byte/s = 80MByte/s (assumed average read and write
* performance is 80MByte/s - result divided by 2 because we have to write and read)
* case NETWORK:
* InternalInstanceProfilingDataCompression pc =
* profiler.generateProfilingDataCompression(System.currentTimeMillis());
* if (pc.getGoodput() == 0){
* LOG.warn("Could not determine speed of network connection. Using 100MBit/s" );
* return 0.0131*0.8; //0.0131Byte/ns = 13107.2Byte/ms = 13107200Byte/s = 100MBit/s (more than 80% are not
* realistic)
* }else
* return pc.getGoodput()/1000000000; //getGoodput gives speed per second in bytes, we need per nanosecond
* //return 0.131*0.8; //0.131Byte/ns = 131072Byte/ms = 131072000Byte/s = 1000MBit/s (more than 80% are not
* realistic)
* case INMEMORY:
* return 1;
* }
* LOG.error("Unknown Channel Type! " + type);
* return 1;
* }
*/

}
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,9 @@ public interface Compressor {
* or <code>0</code> for any other compression level.
*/
int getCurrentInternalCompressionLibraryIndex();

/**
* Stops the compressor and releases all allocated internal resources.
*/
void shutdown();
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,9 @@ public interface Decompressor {
* the current internal decompression library index
*/
void setCurrentInternalDecompressionLibraryIndex(int index);

/**
* Stops the decompressor and releases all allocated internal resources.
*/
void shutdown();
}
51 changes: 51 additions & 0 deletions nephele/nephele-compression-bzip2/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
<?xml version="1.0" encoding="UTF-8"?>
<project
xsi:schemaLocation="http:https://maven.apache.org/POM/4.0.0 http:https://maven.apache.org/xsd/maven-4.0.0.xsd"
xmlns="http:https://maven.apache.org/POM/4.0.0" xmlns:xsi="http:https://www.w3.org/2001/XMLSchema-instance">

<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>nephele</artifactId>
<groupId>eu.stratosphere</groupId>
<version>0.1</version>
</parent>

<groupId>eu.stratosphere</groupId>
<artifactId>nephele-compression-bzip2</artifactId>
<version>0.1</version>
<name>nephele-compression-bzip2</name>

<dependencies>
<dependency>
<groupId>eu.stratosphere</groupId>
<artifactId>nephele-server</artifactId>
<version>${version}</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<artifactId>maven-antrun-plugin</artifactId>
<executions>
<execution>
<id>build-native</id>
<phase>process-classes</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<tasks>
<!--
<property name="native.classpath" refid="maven.compile.classpath" />
<echo file="${project.build.directory}/compile-classpath" message="${native.classpath}" />-->
<exec dir="src/main/native" executable="make" failonerror="true"/>
</tasks>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
Loading

0 comments on commit 67f0445

Please sign in to comment.