Skip to content

Commit

Permalink
Merge branch 'version02' into ssc-iterations
Browse files Browse the repository at this point in the history
  • Loading branch information
sewen committed Jul 23, 2012
2 parents 30c52d0 + 0727ee7 commit e49ba52
Show file tree
Hide file tree
Showing 123 changed files with 5,063 additions and 1,953 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
*.swp
logs.zip
.idea
.idea/*
*.jar
target
.classpath
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,22 @@
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/

package eu.stratosphere.nephele.annotations;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Annotation to force a checkpoint-decision.
* Annotation to force a checkpoint decision.
*
* @author marrus
*
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface ForceCheckpoint {

boolean checkpoint();
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ public abstract class AbstractEvent implements IOReadableWritable {
/**
* Static variable that points to the current global sequence number
*/
private static final AtomicLong globalSequenceNumber = new AtomicLong(0);
private static final AtomicLong GLOBAL_SEQUENCE_NUMBER = new AtomicLong(0);

/**
* Auxiliary object which helps to convert a {@link Date} object to the given string representation.
*/
Expand All @@ -51,7 +51,7 @@ public abstract class AbstractEvent implements IOReadableWritable {
* The sequence number of the event.
*/
private long sequenceNumber = -1;

/**
* Constructs a new abstract event object.
*
Expand All @@ -60,10 +60,10 @@ public abstract class AbstractEvent implements IOReadableWritable {
*/
public AbstractEvent(final long timestamp) {
this.timestamp = timestamp;
this.sequenceNumber = globalSequenceNumber.incrementAndGet();
this.sequenceNumber = GLOBAL_SEQUENCE_NUMBER.incrementAndGet();
}
public long getSequenceNumber(){

public long getSequenceNumber() {
return this.sequenceNumber;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@
import java.net.URLClassLoader;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;

import eu.stratosphere.nephele.fs.FSDataInputStream;
import eu.stratosphere.nephele.fs.FSDataOutputStream;
Expand All @@ -42,6 +43,8 @@
* a set of libraries (typically JAR files) which the job requires to run. The library cache manager
* caches library files in order to avoid unnecessary retransmission of data. It is based on a singleton
* programming pattern, so there exists at most on library manager at a time.
* <p>
* This class is thread-safe.
*
* @author warneke
*/
Expand All @@ -55,7 +58,7 @@ public final class LibraryCacheManager {
/**
* Map to translate client paths of libraries to the file name used by the cache manager.
*/
private final Map<LibraryTranslationKey, String> clientPathToCacheName = new HashMap<LibraryTranslationKey, String>();
private final ConcurrentMap<LibraryTranslationKey, String> clientPathToCacheName = new ConcurrentHashMap<LibraryTranslationKey, String>();

/**
* Name of the directory to put cached libraries in.
Expand All @@ -65,7 +68,12 @@ public final class LibraryCacheManager {
/**
* Algorithm to be used for calculating the checksum of the libraries.
*/
private static final String HASHINGALGORITHM = "SHA-1";
private static final String HASHING_ALGORITHM = "SHA-1";

/**
* Dummy object used in the lock map.
*/
private static final Object LOCK_OBJECT = new Object();

/**
* File system object used to access the local file system.
Expand All @@ -85,13 +93,17 @@ public final class LibraryCacheManager {
/**
* Map to translate a job ID to the responsible library cache manager entry.
*/
private final Map<JobID, LibraryManagerEntry> libraryManagerEntries = new HashMap<JobID, LibraryManagerEntry>();
private final ConcurrentMap<JobID, LibraryManagerEntry> libraryManagerEntries = new ConcurrentHashMap<JobID, LibraryManagerEntry>();

/**
* Stores whether Nephele is executed in local mode.
* Map to store the number of references to a specific library manager entry.
*/
@Deprecated
private boolean localMode = false;
private final ConcurrentMap<JobID, AtomicInteger> libraryReferenceCounter = new ConcurrentHashMap<JobID, AtomicInteger>();

/**
* Map to guarantee atomicity of of register/unregister operations.
*/
private final ConcurrentMap<JobID, Object> lockMap = new ConcurrentHashMap<JobID, Object>();

/**
* Returns the singleton instance of the library cache manager.
Expand Down Expand Up @@ -142,23 +154,63 @@ private LibraryCacheManager() throws IOException {

// Create an MD5 message digest object we can use
try {
this.md = MessageDigest.getInstance(HASHINGALGORITHM);
this.md = MessageDigest.getInstance(HASHING_ALGORITHM);
} catch (NoSuchAlgorithmException e) {
throw new IOException("Cannot find algorithm " + HASHINGALGORITHM + ": "
throw new IOException("Cannot find algorithm " + HASHING_ALGORITHM + ": "
+ StringUtils.stringifyException(e));
}
}

/**
* Sets the library cache manager's local mode flag to <code>true</code>.
* Increments the reference counter for the library manager entry with the given job ID.
*
* @throws IOException
* thrown if the library cache manager could not be instantiated
* @param jobID
* the job ID identifying the library manager entry
* @return the increased reference counter
*/
@Deprecated
public static void setLocalMode() throws IOException {
private int incrementReferenceCounter(final JobID jobID) {

while (true) {

AtomicInteger ai = this.libraryReferenceCounter.get(jobID);
if (ai == null) {

ai = new AtomicInteger(1);
if (this.libraryReferenceCounter.putIfAbsent(jobID, ai) == null) {
return 1;
}

// We had a race, try again

get().localMode = true;
} else {

return ai.incrementAndGet();
}
}
}

/**
* Decrements the reference counter for the library manager entry with the given job ID.
*
* @param jobID
* the job ID identifying the library manager entry
* @return the decremented reference counter
*/
private int decrementReferenceCounter(final JobID jobID) {

final AtomicInteger ai = this.libraryReferenceCounter.get(jobID);

if (ai == null) {
throw new IllegalStateException("Cannot find reference counter entry for job " + jobID);
}

int retVal = ai.decrementAndGet();

if (retVal == 0) {
this.libraryReferenceCounter.remove(jobID);
}

return retVal;
}

/**
Expand Down Expand Up @@ -199,14 +251,12 @@ public static void register(final JobID id, final Path[] clientPaths) throws IOE
private void registerInternal(final JobID id, final Path[] clientPaths) throws IOException {

final String[] cacheNames = new String[clientPaths.length];
synchronized (this.clientPathToCacheName) {

for (int i = 0; i < clientPaths.length; i++) {
final LibraryTranslationKey key = new LibraryTranslationKey(id, clientPaths[i]);
cacheNames[i] = this.clientPathToCacheName.get(key);
if (cacheNames[i] == null) {
throw new IOException("Cannot map" + clientPaths[i].toString() + " to cache name");
}
for (int i = 0; i < clientPaths.length; ++i) {
final LibraryTranslationKey key = new LibraryTranslationKey(id, clientPaths[i]);
cacheNames[i] = this.clientPathToCacheName.get(key);
if (cacheNames[i] == null) {
throw new IOException("Cannot map" + clientPaths[i].toString() + " to cache name");
}
}

Expand Down Expand Up @@ -246,37 +296,47 @@ public static void register(final JobID id, final String[] requiredJarFiles) thr
*/
private void registerInternal(final JobID id, final String[] requiredJarFiles) throws IOException {

// Check if library manager entry for this id already exists
synchronized (this.libraryManagerEntries) {
if (this.libraryManagerEntries.containsKey(id)) {
// Use spin lock here
while (this.lockMap.putIfAbsent(id, LOCK_OBJECT) != null)
;

try {
if (incrementReferenceCounter(id) > 1) {
return;
}
}

// Check if all the required jar files exist in the cache
URL[] urls = null;
if (requiredJarFiles != null) {
// Check if library manager entry for this id already exists
if (this.libraryManagerEntries.containsKey(id)) {
throw new IllegalStateException("Library cache manager already contains entry for job ID " + id);
}

urls = new URL[requiredJarFiles.length];
// Check if all the required jar files exist in the cache
URL[] urls = null;
if (requiredJarFiles != null) {

for (int i = 0; i < requiredJarFiles.length; i++) {
final Path p = contains(requiredJarFiles[i]);
if (p == null) {
throw new IOException(requiredJarFiles[i] + " does not exist in the library cache");
}
urls = new URL[requiredJarFiles.length];

for (int i = 0; i < requiredJarFiles.length; i++) {
final Path p = contains(requiredJarFiles[i]);
if (p == null) {
throw new IOException(requiredJarFiles[i] + " does not exist in the library cache");
}

// Add file to the URL array
try {
urls[i] = p.toUri().toURL();
} catch (MalformedURLException e) {
throw new IOException(StringUtils.stringifyException(e));
// Add file to the URL array
try {
urls[i] = p.toUri().toURL();
} catch (MalformedURLException e) {
throw new IOException(StringUtils.stringifyException(e));
}
}
}
}

final LibraryManagerEntry entry = new LibraryManagerEntry(id, requiredJarFiles, urls);
synchronized (this.libraryManagerEntries) {
final LibraryManagerEntry entry = new LibraryManagerEntry(id, requiredJarFiles, urls);

this.libraryManagerEntries.put(id, entry);

} finally {
this.lockMap.remove(id);
}
}

Expand All @@ -302,18 +362,15 @@ public static void unregister(final JobID id) throws IOException {
*/
private void unregisterInternal(final JobID id) {

// TODO: the library cache manager (LCM) was designed to be a singleton object
// Running Nephele is local mode confuses the LCM and it deallocates libraries
// which might still be used for further tasks. As a quick remedy, we do not
// deregister libraries in local mode, but this needs to be fixed in future
// releases
if (this.localMode) {
return;
}
// Use spin lock here
while (this.lockMap.putIfAbsent(id, LOCK_OBJECT) != null)
;

synchronized (this.libraryManagerEntries) {
if (decrementReferenceCounter(id) == 0) {
this.libraryManagerEntries.remove(id);
}

this.lockMap.remove(id);
}

/**
Expand Down Expand Up @@ -344,7 +401,7 @@ public static Path contains(final String cacheName) throws IOException {
private Path containsInternal(final String cacheName) throws IOException {

// Create a path object from the external name string
final Path p = new Path(this.libraryCachePath + "/" + cacheName);
final Path p = new Path(this.libraryCachePath + File.separator + cacheName);

synchronized (this.fs) {
if (fs.exists(p)) {
Expand Down Expand Up @@ -386,14 +443,14 @@ public static ClassLoader getClassLoader(final JobID id) throws IOException {
* thrown if the library cache manager could not be instantiated
*/
private ClassLoader getClassLoaderInternal(final JobID id) {
synchronized (this.libraryManagerEntries) {

if (!this.libraryManagerEntries.containsKey(id)) {
return null;
}
final LibraryManagerEntry entry = this.libraryManagerEntries.get(id);

return this.libraryManagerEntries.get(id).getClassLoader();
if (entry == null) {
return null;
}

return entry.getClassLoader();
}

/**
Expand Down Expand Up @@ -425,11 +482,7 @@ public static String[] getRequiredJarFiles(final JobID id) throws IOException {
*/
private String[] getRequiredJarFilesInternal(final JobID id) {

LibraryManagerEntry entry = null;

synchronized (this.libraryManagerEntries) {
entry = this.libraryManagerEntries.get(id);
}
final LibraryManagerEntry entry = this.libraryManagerEntries.get(id);

if (entry == null) {
return null;
Expand Down Expand Up @@ -599,8 +652,8 @@ private void addLibraryInternal(final JobID jobID, final Path name, final long s
in.readFully(buf);

// Reset and calculate message digest from jar file
md.reset();
md.update(buf);
this.md.reset();
this.md.update(buf);

// Construct internal jar name from digest
final String cacheName = StringUtils.byteToHexString(md.digest()) + ".jar";
Expand All @@ -609,21 +662,16 @@ private void addLibraryInternal(final JobID jobID, final Path name, final long s
synchronized (this.fs) {

// Check if file already exists in our library cache, if not write it to the cache directory
if (!fs.exists(storePath)) {
final FSDataOutputStream fos = fs.create(storePath, false);
if (!this.fs.exists(storePath)) {
final FSDataOutputStream fos = this.fs.create(storePath, false);
fos.write(buf, 0, buf.length);
fos.close();
}
}

// Create mapping for client path and cache name
synchronized (this.clientPathToCacheName) {

final LibraryTranslationKey key = new LibraryTranslationKey(jobID, name);
if (!this.clientPathToCacheName.containsKey(key)) {
this.clientPathToCacheName.put(key, cacheName);
}
}
final LibraryTranslationKey key = new LibraryTranslationKey(jobID, name);
this.clientPathToCacheName.putIfAbsent(key, cacheName);
}

/**
Expand Down
Loading

0 comments on commit e49ba52

Please sign in to comment.