Skip to content

Commit

Permalink
Merge branch 'version011' into version02
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniel Warneke committed Jul 23, 2011
2 parents 074c503 + ac05a71 commit bdfea47
Show file tree
Hide file tree
Showing 25 changed files with 712 additions and 477 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,9 @@ public void run() {
}

if (instanceListener != null) {
instanceListener.allocatedResourceDied(
removedSlice.getJobID(),
new AllocatedResource(
removedSlice.getHostingInstance(), removedSlice.getType(), removedSlice
.getAllocationID()));
instanceListener.allocatedResourceDied(removedSlice.getJobID(),
new AllocatedResource(removedSlice.getHostingInstance(), removedSlice.getType(),
removedSlice.getAllocationID()));
}
}

Expand Down Expand Up @@ -265,24 +263,22 @@ public ClusterManager() {

this.instanceTypeDescriptionMap = new SerializableHashMap<InstanceType, InstanceTypeDescription>();

long tmpCleanUpInterval = (long) GlobalConfiguration.getInteger(
CLEANUP_INTERVAL_KEY, DEFAULT_CLEANUP_INTERVAL) * 1000;
long tmpCleanUpInterval = (long) GlobalConfiguration.getInteger(CLEANUP_INTERVAL_KEY, DEFAULT_CLEANUP_INTERVAL) * 1000;

if (tmpCleanUpInterval < 10) { // Clean up interval must be at least ten seconds
LOG.warn("Invalid clean up interval. Reverting to default cleanup interval of " +
DEFAULT_CLEANUP_INTERVAL + " secs.");
LOG.warn("Invalid clean up interval. Reverting to default cleanup interval of " + DEFAULT_CLEANUP_INTERVAL
+ " secs.");
tmpCleanUpInterval = DEFAULT_CLEANUP_INTERVAL;
}

this.cleanUpInterval = tmpCleanUpInterval;

int tmpDefaultInstanceTypeIndex = GlobalConfiguration.getInteger(
DEFAULT_INSTANCE_TYPE_INDEX_KEY,
int tmpDefaultInstanceTypeIndex = GlobalConfiguration.getInteger(DEFAULT_INSTANCE_TYPE_INDEX_KEY,
ConfigConstants.DEFAULT_DEFAULT_INSTANCE_TYPE_INDEX);

if (tmpDefaultInstanceTypeIndex > this.availableInstanceTypes.length) {
LOG.warn("Incorrect index to for default instance type (" + tmpDefaultInstanceTypeIndex +
"), switching to default index " + ConfigConstants.DEFAULT_DEFAULT_INSTANCE_TYPE_INDEX);
LOG.warn("Incorrect index to for default instance type (" + tmpDefaultInstanceTypeIndex
+ "), switching to default index " + ConfigConstants.DEFAULT_DEFAULT_INSTANCE_TYPE_INDEX);

tmpDefaultInstanceTypeIndex = ConfigConstants.DEFAULT_DEFAULT_INSTANCE_TYPE_INDEX;
}
Expand Down Expand Up @@ -447,8 +443,8 @@ private InstanceType[] populateInstanceTypeArray() {

if (descr == null) {
if (count == 1) {
LOG.error("Configuration does not contain at least one definition for an instance type, " +
"using default instance type: " + ConfigConstants.DEFAULT_INSTANCE_TYPE);
LOG.error("Configuration does not contain at least one definition for an instance type, "
+ "using default instance type: " + ConfigConstants.DEFAULT_INSTANCE_TYPE);

descr = ConfigConstants.DEFAULT_INSTANCE_TYPE;
} else {
Expand All @@ -463,8 +459,8 @@ private InstanceType[] populateInstanceTypeArray() {
LOG.info("Loaded instance type " + instanceType.getIdentifier() + " from the configuration");
instanceTypes.add(instanceType);
} catch (Throwable t) {
LOG.error("Error parsing " + key + ":" + descr + ". Using default using default instance type: " +
ConfigConstants.DEFAULT_INSTANCE_TYPE + " for instance type " + count + ".", t);
LOG.error("Error parsing " + key + ":" + descr + ". Using default using default instance type: "
+ ConfigConstants.DEFAULT_INSTANCE_TYPE + " for instance type " + count + ".", t);

break;
}
Expand Down Expand Up @@ -699,49 +695,66 @@ public synchronized void reportHeartBeat(InstanceConnectionInfo instanceConnecti
* {@inheritDoc}
*/
@Override
public synchronized void requestInstance(JobID jobID, Configuration conf, InstanceType instanceType)
throws InstanceException {
public synchronized void requestInstance(JobID jobID, Configuration conf, Map<InstanceType, Integer> instanceMap,
List<String> splitAffinityList) throws InstanceException {

// TODO: Introduce topology awareness here
AllocatedSlice slice = null;
// Iterate over all instance types
final Iterator<Map.Entry<InstanceType, Integer>> it = instanceMap.entrySet().iterator();
while (it.hasNext()) {

// Try to match the instance type without slicing first
for (final ClusterInstance host : this.registeredHosts.values()) {
if (host.getType().equals(instanceType)) {
slice = host.createSlice(instanceType, jobID);
if (slice != null) {
break;
// Iterate over all requested instances of a specific type
final Map.Entry<InstanceType, Integer> entry = it.next();

for (int i = 0; i < entry.getValue().intValue(); i++) {

LOG.info("Trying to allocate instance of type " + entry.getKey().getIdentifier());

// TODO: Introduce topology awareness here
// TODO: Daniel: Code taken from AbstractScheduler..
AllocatedSlice slice = null;

// Try to match the instance type without slicing first
for (final ClusterInstance host : this.registeredHosts.values()) {
if (host.getType().equals(entry.getKey())) {
slice = host.createSlice(entry.getKey(), jobID);
if (slice != null) {
break;
}
}
}
}
}

// Use slicing now if necessary
if (slice == null) {
// Use slicing now if necessary
if (slice == null) {

for (final ClusterInstance host : this.registeredHosts.values()) {
slice = host.createSlice(entry.getKey(), jobID);
if (slice != null) {
break;
}
}

for (final ClusterInstance host : this.registeredHosts.values()) {
slice = host.createSlice(instanceType, jobID);
if (slice != null) {
break;
}
}
}

if (slice == null) {
throw new InstanceException("Could not find a suitable instance");
}
if (slice == null) {
throw new InstanceException("Could not find a suitable instance");
}

List<AllocatedSlice> allocatedSlices = this.slicesOfJobs.get(jobID);
if (allocatedSlices == null) {
allocatedSlices = new ArrayList<AllocatedSlice>();
this.slicesOfJobs.put(jobID, allocatedSlices);
}
allocatedSlices.add(slice);
List<AllocatedSlice> allocatedSlices = this.slicesOfJobs.get(jobID);
if (allocatedSlices == null) {
allocatedSlices = new ArrayList<AllocatedSlice>();
this.slicesOfJobs.put(jobID, allocatedSlices);
}
allocatedSlices.add(slice);

if (this.instanceListener != null) {
ClusterInstanceNotifier clusterInstanceNotifier = new ClusterInstanceNotifier(
this.instanceListener, slice);
clusterInstanceNotifier.start();
}

if (this.instanceListener != null) {
ClusterInstanceNotifier clusterInstanceNotifier = new ClusterInstanceNotifier(
this.instanceListener, slice);
clusterInstanceNotifier.start();
}
}

}
}

/**
Expand Down Expand Up @@ -829,8 +842,8 @@ private void updateInstaceTypeDescriptionMap() {
if (minNumberOfCPUCores < Integer.MAX_VALUE && minSizeOfPhysicalMemory < Long.MAX_VALUE
&& minSizeOfFreeMemory < Long.MAX_VALUE) {

pessimisticHardwareDescription = HardwareDescriptionFactory.construct(
minNumberOfCPUCores, minSizeOfPhysicalMemory, minSizeOfFreeMemory);
pessimisticHardwareDescription = HardwareDescriptionFactory.construct(minNumberOfCPUCores,
minSizeOfPhysicalMemory, minSizeOfFreeMemory);

} else {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.io.File;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -252,12 +253,13 @@ public void testAllocationDeallocation() {
// now we should be able to request two instances of type small and one of type medium
final JobID jobID = new JobID();
final Configuration conf = new Configuration();

Map<InstanceType, Integer> instanceMap = new HashMap<InstanceType, Integer>();
instanceMap.put(cm.getInstanceTypeByName(SMALL_INSTANCE_TYPE_NAME), 2);
instanceMap.put(cm.getInstanceTypeByName(MEDIUM_INSTANCE_TYPE_NAME), 1);

try {
cm.requestInstance(jobID, conf, cm.getInstanceTypeByName(SMALL_INSTANCE_TYPE_NAME));
cm.requestInstance(jobID, conf, cm.getInstanceTypeByName(SMALL_INSTANCE_TYPE_NAME));
cm.requestInstance(jobID, conf, cm.getInstanceTypeByName(MEDIUM_INSTANCE_TYPE_NAME));

cm.requestInstance(jobID, conf, instanceMap, null);
} catch (InstanceException ie) {
fail(ie.getMessage());
}
Expand All @@ -284,8 +286,9 @@ public void testAllocationDeallocation() {

// Try to allocate more resources which must result in an error
try {

cm.requestInstance(jobID, conf, cm.getInstanceTypeByName(MEDIUM_INSTANCE_TYPE_NAME));
Map<InstanceType, Integer> instancem = new HashMap<InstanceType, Integer>();
instancem.put(cm.getInstanceTypeByName(MEDIUM_INSTANCE_TYPE_NAME), 1);
cm.requestInstance(jobID, conf, instancem, null);

fail("ClusterManager allowed to request more instances than actually available");

Expand All @@ -306,7 +309,9 @@ public void testAllocationDeallocation() {

// Now further allocations should be possible
try {
cm.requestInstance(jobID, conf, cm.getInstanceTypeByName(LARGE_INSTANCE_TYPE_NAME));
Map<InstanceType, Integer> instancem = new HashMap<InstanceType, Integer>();
instancem.put(cm.getInstanceTypeByName(LARGE_INSTANCE_TYPE_NAME), 1);
cm.requestInstance(jobID, conf, instancem, null);
} catch (InstanceException ie) {
fail(ie.getMessage());
}
Expand Down Expand Up @@ -345,7 +350,9 @@ public void testCleanUp() {

try {

cm.requestInstance(jobID, conf, cm.getInstanceTypeByName(LARGE_INSTANCE_TYPE_NAME));
Map<InstanceType, Integer> instancem = new HashMap<InstanceType, Integer>();
instancem.put(cm.getInstanceTypeByName(LARGE_INSTANCE_TYPE_NAME), 1);
cm.requestInstance(jobID, conf, instancem, null);

} catch (InstanceException ie) {
fail(ie.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,30 @@ public JobClient(JobGraph jobGraph, Configuration configuration) throws IOExcept
this.configuration = configuration;
this.jobCleanUp = new JobCleanUp(this);
}


/**
* Constructs a new job client object and instantiates a local
* RPC proxy for the {@link JobSubmissionProtocol}.
*
* @param jobGraph
* the job graph to run
* @param configuration
* configuration object which can include special configuration settings for the job client
* @param jobManagerAddress
* IP/Port of the jobmanager (not taken from provided configuration object).
* @throws IOException
* thrown on error while initializing the RPC connection to the job manager
*/
public JobClient(JobGraph jobGraph, Configuration configuration, InetSocketAddress jobManagerAddress) throws IOException {


this.jobSubmitClient = (JobManagementProtocol) RPC.getProxy(JobManagementProtocol.class, jobManagerAddress, NetUtils
.getSocketFactory());
this.jobGraph = jobGraph;
this.configuration = configuration;
this.jobCleanUp = new JobCleanUp(this);
}

/**
* Close the <code>JobClient</code>.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import eu.stratosphere.nephele.util.CommonTestUtils;


/**
* @author Mathias Peters <[email protected]>
* TODO: {@link StringRecord} has a lot of public methods that need to be tested.
Expand Down
11 changes: 6 additions & 5 deletions nephele/nephele-ec2cloudmanager/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@
<version>3.0</version>
</dependency>

<dependency>
<groupId>com.google.code.typica</groupId>
<artifactId>typica</artifactId>
<version>1.6</version>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<version>1.2.1</version>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ public class CloudInstance extends AbstractInstance {
/** The instance ID. */
private final String instanceID;

/** The owner of the instance. */
private final String instanceOwner;

/** The time the instance was allocated. */
private final long allocationTime;
Expand All @@ -68,15 +66,15 @@ public class CloudInstance extends AbstractInstance {
* @param hardwareDescription
* the hardware description reported by the instance itself
*/
public CloudInstance(String instanceID, InstanceType type, String instanceOwner,
public CloudInstance(String instanceID, InstanceType type,
InstanceConnectionInfo instanceConnectionInfo, long allocationTime, NetworkNode parentNode,
NetworkTopology networkTopology, HardwareDescription hardwareDescription) {
super(type, instanceConnectionInfo, parentNode, networkTopology, hardwareDescription);

this.allocatedResource = new AllocatedResource(this, type, new AllocationID());

this.instanceID = instanceID;
this.instanceOwner = instanceOwner;

this.allocationTime = allocationTime;
}

Expand Down Expand Up @@ -114,14 +112,7 @@ public long getAllocationTime() {
return this.allocationTime;
}

/**
* Returns the instance's owner.
*
* @return the instance's owner
*/
public String getOwner() {
return this.instanceOwner;
}


public AllocatedResource asAllocatedResource() {
return this.allocatedResource;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package eu.stratosphere.nephele.instance.cloud;

import eu.stratosphere.nephele.instance.AbstractInstance;
import eu.stratosphere.nephele.instance.AllocatedResource;
import eu.stratosphere.nephele.instance.InstanceListener;
import eu.stratosphere.nephele.jobgraph.JobID;

/**
* This class is an auxiliary class to send the notification
* about the availability of an {@link AbstractInstance} to the given {@link InstanceListener} object. The notification
* must be sent from
* a separate thread, otherwise the atomic operation of requesting an instance
* for a vertex and switching to the state ASSINING could not be guaranteed.
* This class is thread-safe.
*
* @author warneke
*/
public class CloudInstanceNotifier extends Thread {

/**
* The {@link InstanceListener} object to send the notification to.
*/
private final InstanceListener instanceListener;

private final CloudInstance instance;

private final JobID id;

/**
* Constructs a new instance notifier object.
*
* @param instanceListener
* the listener to send the notification to
* @param allocatedSlice
* the slice with has been allocated for the job
*/
public CloudInstanceNotifier(InstanceListener instanceListener, JobID id, CloudInstance instance) {
this.instanceListener = instanceListener;
this.instance = instance;
this.id = id;
}

/**
* {@inheritDoc}
*/
@Override
public void run() {
this.instanceListener.resourceAllocated(id, instance.asAllocatedResource());
}
}
Loading

0 comments on commit bdfea47

Please sign in to comment.