Skip to content

Commit

Permalink
Merge branch 'version02' of https://dev.stratosphere.eu/git/stage1 in…
Browse files Browse the repository at this point in the history
…to version02
  • Loading branch information
sewen committed Jul 18, 2012
2 parents 7e10e2b + 254d790 commit d04c89f
Show file tree
Hide file tree
Showing 21 changed files with 620 additions and 325 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,22 @@
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/

package eu.stratosphere.nephele.annotations;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Annotation to force a checkpoint-decision.
* Annotation to force a checkpoint decision.
*
* @author marrus
*
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface ForceCheckpoint {

boolean checkpoint();
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@
import java.net.URLClassLoader;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;

import eu.stratosphere.nephele.fs.FSDataInputStream;
import eu.stratosphere.nephele.fs.FSDataOutputStream;
Expand All @@ -42,6 +43,8 @@
* a set of libraries (typically JAR files) which the job requires to run. The library cache manager
* caches library files in order to avoid unnecessary retransmission of data. It is based on a singleton
* programming pattern, so there exists at most on library manager at a time.
* <p>
* This class is thread-safe.
*
* @author warneke
*/
Expand All @@ -55,7 +58,7 @@ public final class LibraryCacheManager {
/**
* Map to translate client paths of libraries to the file name used by the cache manager.
*/
private final Map<LibraryTranslationKey, String> clientPathToCacheName = new HashMap<LibraryTranslationKey, String>();
private final ConcurrentMap<LibraryTranslationKey, String> clientPathToCacheName = new ConcurrentHashMap<LibraryTranslationKey, String>();

/**
* Name of the directory to put cached libraries in.
Expand All @@ -65,7 +68,12 @@ public final class LibraryCacheManager {
/**
* Algorithm to be used for calculating the checksum of the libraries.
*/
private static final String HASHINGALGORITHM = "SHA-1";
private static final String HASHING_ALGORITHM = "SHA-1";

/**
* Dummy object used in the lock map.
*/
private static final Object LOCK_OBJECT = new Object();

/**
* File system object used to access the local file system.
Expand All @@ -85,13 +93,17 @@ public final class LibraryCacheManager {
/**
* Map to translate a job ID to the responsible library cache manager entry.
*/
private final Map<JobID, LibraryManagerEntry> libraryManagerEntries = new HashMap<JobID, LibraryManagerEntry>();
private final ConcurrentMap<JobID, LibraryManagerEntry> libraryManagerEntries = new ConcurrentHashMap<JobID, LibraryManagerEntry>();

/**
* Stores whether Nephele is executed in local mode.
* Map to store the number of references to a specific library manager entry.
*/
@Deprecated
private boolean localMode = false;
private final ConcurrentMap<JobID, AtomicInteger> libraryReferenceCounter = new ConcurrentHashMap<JobID, AtomicInteger>();

/**
* Map to guarantee atomicity of of register/unregister operations.
*/
private final ConcurrentMap<JobID, Object> lockMap = new ConcurrentHashMap<JobID, Object>();

/**
* Returns the singleton instance of the library cache manager.
Expand Down Expand Up @@ -142,23 +154,63 @@ private LibraryCacheManager() throws IOException {

// Create an MD5 message digest object we can use
try {
this.md = MessageDigest.getInstance(HASHINGALGORITHM);
this.md = MessageDigest.getInstance(HASHING_ALGORITHM);
} catch (NoSuchAlgorithmException e) {
throw new IOException("Cannot find algorithm " + HASHINGALGORITHM + ": "
throw new IOException("Cannot find algorithm " + HASHING_ALGORITHM + ": "
+ StringUtils.stringifyException(e));
}
}

/**
* Sets the library cache manager's local mode flag to <code>true</code>.
* Increments the reference counter for the library manager entry with the given job ID.
*
* @throws IOException
* thrown if the library cache manager could not be instantiated
* @param jobID
* the job ID identifying the library manager entry
* @return the increased reference counter
*/
@Deprecated
public static void setLocalMode() throws IOException {
private int incrementReferenceCounter(final JobID jobID) {

while (true) {

AtomicInteger ai = this.libraryReferenceCounter.get(jobID);
if (ai == null) {

ai = new AtomicInteger(1);
if (this.libraryReferenceCounter.putIfAbsent(jobID, ai) == null) {
return 1;
}

// We had a race, try again

get().localMode = true;
} else {

return ai.incrementAndGet();
}
}
}

/**
* Decrements the reference counter for the library manager entry with the given job ID.
*
* @param jobID
* the job ID identifying the library manager entry
* @return the decremented reference counter
*/
private int decrementReferenceCounter(final JobID jobID) {

final AtomicInteger ai = this.libraryReferenceCounter.get(jobID);

if (ai == null) {
throw new IllegalStateException("Cannot find reference counter entry for job " + jobID);
}

int retVal = ai.decrementAndGet();

if (retVal == 0) {
this.libraryReferenceCounter.remove(jobID);
}

return retVal;
}

/**
Expand Down Expand Up @@ -199,14 +251,12 @@ public static void register(final JobID id, final Path[] clientPaths) throws IOE
private void registerInternal(final JobID id, final Path[] clientPaths) throws IOException {

final String[] cacheNames = new String[clientPaths.length];
synchronized (this.clientPathToCacheName) {

for (int i = 0; i < clientPaths.length; i++) {
final LibraryTranslationKey key = new LibraryTranslationKey(id, clientPaths[i]);
cacheNames[i] = this.clientPathToCacheName.get(key);
if (cacheNames[i] == null) {
throw new IOException("Cannot map" + clientPaths[i].toString() + " to cache name");
}
for (int i = 0; i < clientPaths.length; ++i) {
final LibraryTranslationKey key = new LibraryTranslationKey(id, clientPaths[i]);
cacheNames[i] = this.clientPathToCacheName.get(key);
if (cacheNames[i] == null) {
throw new IOException("Cannot map" + clientPaths[i].toString() + " to cache name");
}
}

Expand Down Expand Up @@ -246,37 +296,47 @@ public static void register(final JobID id, final String[] requiredJarFiles) thr
*/
private void registerInternal(final JobID id, final String[] requiredJarFiles) throws IOException {

// Check if library manager entry for this id already exists
synchronized (this.libraryManagerEntries) {
if (this.libraryManagerEntries.containsKey(id)) {
// Use spin lock here
while (this.lockMap.putIfAbsent(id, LOCK_OBJECT) != null)
;

try {
if (incrementReferenceCounter(id) > 1) {
return;
}
}

// Check if all the required jar files exist in the cache
URL[] urls = null;
if (requiredJarFiles != null) {
// Check if library manager entry for this id already exists
if (this.libraryManagerEntries.containsKey(id)) {
throw new IllegalStateException("Library cache manager already contains entry for job ID " + id);
}

urls = new URL[requiredJarFiles.length];
// Check if all the required jar files exist in the cache
URL[] urls = null;
if (requiredJarFiles != null) {

for (int i = 0; i < requiredJarFiles.length; i++) {
final Path p = contains(requiredJarFiles[i]);
if (p == null) {
throw new IOException(requiredJarFiles[i] + " does not exist in the library cache");
}
urls = new URL[requiredJarFiles.length];

for (int i = 0; i < requiredJarFiles.length; i++) {
final Path p = contains(requiredJarFiles[i]);
if (p == null) {
throw new IOException(requiredJarFiles[i] + " does not exist in the library cache");
}

// Add file to the URL array
try {
urls[i] = p.toUri().toURL();
} catch (MalformedURLException e) {
throw new IOException(StringUtils.stringifyException(e));
// Add file to the URL array
try {
urls[i] = p.toUri().toURL();
} catch (MalformedURLException e) {
throw new IOException(StringUtils.stringifyException(e));
}
}
}
}

final LibraryManagerEntry entry = new LibraryManagerEntry(id, requiredJarFiles, urls);
synchronized (this.libraryManagerEntries) {
final LibraryManagerEntry entry = new LibraryManagerEntry(id, requiredJarFiles, urls);

this.libraryManagerEntries.put(id, entry);

} finally {
this.lockMap.remove(id);
}
}

Expand All @@ -302,18 +362,15 @@ public static void unregister(final JobID id) throws IOException {
*/
private void unregisterInternal(final JobID id) {

// TODO: the library cache manager (LCM) was designed to be a singleton object
// Running Nephele is local mode confuses the LCM and it deallocates libraries
// which might still be used for further tasks. As a quick remedy, we do not
// deregister libraries in local mode, but this needs to be fixed in future
// releases
if (this.localMode) {
return;
}
// Use spin lock here
while (this.lockMap.putIfAbsent(id, LOCK_OBJECT) != null)
;

synchronized (this.libraryManagerEntries) {
if (decrementReferenceCounter(id) == 0) {
this.libraryManagerEntries.remove(id);
}

this.lockMap.remove(id);
}

/**
Expand Down Expand Up @@ -344,7 +401,7 @@ public static Path contains(final String cacheName) throws IOException {
private Path containsInternal(final String cacheName) throws IOException {

// Create a path object from the external name string
final Path p = new Path(this.libraryCachePath + "/" + cacheName);
final Path p = new Path(this.libraryCachePath + File.separator + cacheName);

synchronized (this.fs) {
if (fs.exists(p)) {
Expand Down Expand Up @@ -386,14 +443,14 @@ public static ClassLoader getClassLoader(final JobID id) throws IOException {
* thrown if the library cache manager could not be instantiated
*/
private ClassLoader getClassLoaderInternal(final JobID id) {
synchronized (this.libraryManagerEntries) {

if (!this.libraryManagerEntries.containsKey(id)) {
return null;
}
final LibraryManagerEntry entry = this.libraryManagerEntries.get(id);

return this.libraryManagerEntries.get(id).getClassLoader();
if (entry == null) {
return null;
}

return entry.getClassLoader();
}

/**
Expand Down Expand Up @@ -425,11 +482,7 @@ public static String[] getRequiredJarFiles(final JobID id) throws IOException {
*/
private String[] getRequiredJarFilesInternal(final JobID id) {

LibraryManagerEntry entry = null;

synchronized (this.libraryManagerEntries) {
entry = this.libraryManagerEntries.get(id);
}
final LibraryManagerEntry entry = this.libraryManagerEntries.get(id);

if (entry == null) {
return null;
Expand Down Expand Up @@ -599,8 +652,8 @@ private void addLibraryInternal(final JobID jobID, final Path name, final long s
in.readFully(buf);

// Reset and calculate message digest from jar file
md.reset();
md.update(buf);
this.md.reset();
this.md.update(buf);

// Construct internal jar name from digest
final String cacheName = StringUtils.byteToHexString(md.digest()) + ".jar";
Expand All @@ -609,21 +662,16 @@ private void addLibraryInternal(final JobID jobID, final Path name, final long s
synchronized (this.fs) {

// Check if file already exists in our library cache, if not write it to the cache directory
if (!fs.exists(storePath)) {
final FSDataOutputStream fos = fs.create(storePath, false);
if (!this.fs.exists(storePath)) {
final FSDataOutputStream fos = this.fs.create(storePath, false);
fos.write(buf, 0, buf.length);
fos.close();
}
}

// Create mapping for client path and cache name
synchronized (this.clientPathToCacheName) {

final LibraryTranslationKey key = new LibraryTranslationKey(jobID, name);
if (!this.clientPathToCacheName.containsKey(key)) {
this.clientPathToCacheName.put(key, cacheName);
}
}
final LibraryTranslationKey key = new LibraryTranslationKey(jobID, name);
this.clientPathToCacheName.putIfAbsent(key, cacheName);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

package eu.stratosphere.nephele.io.compression.library.lzma;

import eu.stratosphere.nephele.io.channels.Buffer;
import eu.stratosphere.nephele.io.channels.MemoryBuffer;
import eu.stratosphere.nephele.io.compression.AbstractDecompressor;
import eu.stratosphere.nephele.io.compression.CompressionBufferProvider;

Expand Down Expand Up @@ -43,14 +43,14 @@ public class LzmaDecompressor extends AbstractDecompressor {
* {@inheritDoc}
*/
@Override
public void setCompressedDataBuffer(Buffer buffer) {
public void setCompressedDataBuffer(final MemoryBuffer buffer) {

if (buffer == null) {
this.compressedBuffer = null;
this.compressedDataBuffer = null;
this.compressedDataBufferLength = 0;
} else {
this.compressedDataBuffer = getInternalByteBuffer(buffer);
this.compressedDataBuffer = buffer.getByteBuffer();
// this.compressedDataBufferLength = this.compressedDataBuffer.limit();
this.compressedBuffer = buffer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,6 @@ void removeJobFromSchedule(final ExecutionGraph executionGraphToRemove) {
LOG.error("Cannot find job " + executionGraphToRemove.getJobName() + " ("
+ executionGraphToRemove.getJobID() + ") to remove");
}

// TODO: Remove vertices from restart map
}

/**
Expand Down
Loading

0 comments on commit d04c89f

Please sign in to comment.