Skip to content

Commit

Permalink
cloud manager working
Browse files Browse the repository at this point in the history
  • Loading branch information
Philipp Richter committed Jul 20, 2011
1 parent e5414d0 commit 490d89a
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ public class CloudInstance extends AbstractInstance {
/** The instance ID. */
private final String instanceID;

/** The owner of the instance. */
private final String instanceOwner;

/** The time the instance was allocated. */
private final long allocationTime;
Expand All @@ -68,15 +66,15 @@ public class CloudInstance extends AbstractInstance {
* @param hardwareDescription
* the hardware description reported by the instance itself
*/
public CloudInstance(String instanceID, InstanceType type, String instanceOwner,
public CloudInstance(String instanceID, InstanceType type,
InstanceConnectionInfo instanceConnectionInfo, long allocationTime, NetworkNode parentNode,
NetworkTopology networkTopology, HardwareDescription hardwareDescription) {
super(type, instanceConnectionInfo, parentNode, networkTopology, hardwareDescription);

this.allocatedResource = new AllocatedResource(this, type, new AllocationID());

this.instanceID = instanceID;
this.instanceOwner = instanceOwner;

this.allocationTime = allocationTime;
}

Expand Down Expand Up @@ -114,14 +112,7 @@ public long getAllocationTime() {
return this.allocationTime;
}

/**
* Returns the instance's owner.
*
* @return the instance's owner
*/
public String getOwner() {
return this.instanceOwner;
}


public AllocatedResource asAllocatedResource() {
return this.allocatedResource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import com.amazonaws.services.ec2.model.BlockDeviceMapping;
import com.amazonaws.services.ec2.model.DescribeInstancesRequest;
import com.amazonaws.services.ec2.model.DescribeInstancesResult;
import com.amazonaws.services.ec2.model.Filter;
import com.amazonaws.services.ec2.model.Instance;
import com.amazonaws.services.ec2.model.Reservation;
import com.amazonaws.services.ec2.model.RunInstancesRequest;
Expand Down Expand Up @@ -81,15 +80,6 @@ public class CloudManager extends TimerTask implements InstanceManager {
/** The log for the cloud manager. */
private static final Log LOG = LogFactory.getLog(CloudManager.class);

/** Decides whether the data should be encrypted on the wire on the way from or to EC2 Web Services. */
private static final String EC2WSSECUREKEY = "cloud.ec2ws.secure";

/** The host name of EC2 Web Services. */
private static final String EC2WSSERVERKEY = "cloud.ec2ws.server";

/** The port number of EC2 Web Services. */
private static final String EC2WSPORTKEY = "cloud.ec2ws.port";

/**
* The cloud manager checks the floating instances every base interval to terminate the floating instances which
* expire.
Expand All @@ -105,7 +95,7 @@ public class CloudManager extends TimerTask implements InstanceManager {
/** The user pays fee for his instances every time unit. */
private static final long TIMEUNIT = 60 * 60 * 1000; // 1 hour in ms.

/** time to full next hour when instance is kicked. */
/** timelimit to full next hour when instance is kicked. */
private static final long TIMETHRESHOLD = 2 * 60 * 1000; // 2 mins in ms.

/** The array of all available instance types in the cloud. */
Expand Down Expand Up @@ -238,7 +228,7 @@ public InstanceType getSuitableInstanceType(int minNumComputeUnits, int minNumCP
}

/**
* A cloud instance is released and changed into a floating instance, waiting for a new job from its owner.
* A cloud instance is released and changed into a floating instance, waiting for a new job.
*
* @param jobID
* the ID of the finished job
Expand Down Expand Up @@ -269,8 +259,6 @@ public synchronized void releaseAllocatedResource(JobID jobID, Configuration con
jobToInstanceMapping.unassignedInstanceFromJob((CloudInstance) instance);
this.cloudInstances.remove(instance);

final long currentTime = System.currentTimeMillis();

this.floatingInstances.put(instance.getInstanceConnectionInfo(), new FloatingInstance(
((CloudInstance) instance).getInstanceID(), instance.getInstanceConnectionInfo(),
((CloudInstance) instance).getAllocationTime()));
Expand Down Expand Up @@ -468,7 +456,7 @@ private CloudInstance isReservedInstance(InstanceConnectionInfo instanceConnecti
}

if (instanceConnectionInfo.getAddress().equals(candidateAddress)) {
return convertIntoCloudInstance(t, instanceConnectionInfo, mapping.getOwner());
return convertIntoCloudInstance(t, instanceConnectionInfo);
}
}
}
Expand All @@ -487,12 +475,10 @@ private CloudInstance isReservedInstance(InstanceConnectionInfo instanceConnecti
* the instance to be converted into the cloud instance
* @param instanceConnectionInfo
* the information required to connect to the instance's task manager later on
* @param owner
* the owner of the instance
* @return a cloud instance
*/
private CloudInstance convertIntoCloudInstance(com.amazonaws.services.ec2.model.Instance instance,
InstanceConnectionInfo instanceConnectionInfo, String owner) {
InstanceConnectionInfo instanceConnectionInfo) {

InstanceType type = null;

Expand All @@ -510,9 +496,8 @@ private CloudInstance convertIntoCloudInstance(com.amazonaws.services.ec2.model.
return null;
}

final CloudInstance cloudInstance = new CloudInstance(instance.getInstanceId(), type, owner,
instanceConnectionInfo, instance.getLaunchTime().getTime(), this.networkTopology.getRootNode(),
this.networkTopology, null);
final CloudInstance cloudInstance = new CloudInstance(instance.getInstanceId(), type, instanceConnectionInfo,
instance.getLaunchTime().getTime(), this.networkTopology.getRootNode(), this.networkTopology, null);

// TODO: Define hardware descriptions for cloud instance types
this.cloudInstances.add(cloudInstance);
Expand All @@ -532,24 +517,16 @@ public void requestInstance(JobID jobID, Configuration conf, Map<InstanceType, I
// TODO: maybe load default credentials from config?

// First check, if all required configuration entries are available
/*
* final String owner = conf.getString("job.cloud.username", null);
* if (owner == null) {
* throw new InstanceException("Unable to allocate cloud instance: Cannot find username");
* }
* final String awsAccessId = conf.getString("job.cloud.awsaccessid", null);
* if (awsAccessId == null) {
* throw new InstanceException("Unable to allocate cloud instance: Cannot find AWS access ID");
* }
* final String awsSecretKey = conf.getString("job.cloud.awssecretkey", null);
* if (awsSecretKey == null) {
* throw new InstanceException("Unable to allocate cloud instance: Cannot find AWS secret key");
* }
*/

final String awsAccessId = "AKIAJYQJNI7QH227NDQA";
final String awsSecretKey = "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";
final String owner = "nobody";

final String awsAccessId = conf.getString("job.cloud.awsaccessid", null);
LOG.info("found AWS access ID from Job Conf: " + awsAccessId);
if (awsAccessId == null) {
throw new InstanceException("Unable to allocate cloud instance: Cannot find AWS access ID");
}
final String awsSecretKey = conf.getString("job.cloud.awssecretkey", null);
if (awsSecretKey == null) {
throw new InstanceException("Unable to allocate cloud instance: Cannot find AWS secret key");
}

final String sshKeyPair = conf.getString("job.cloud.sshkeypair", null);

Expand All @@ -562,13 +539,13 @@ public void requestInstance(JobID jobID, Configuration conf, Map<InstanceType, I
// Create new mapping if it does not yet exist
if (jobToInstanceMapping == null) {
LOG.debug("Creating new mapping for job " + jobID);
jobToInstanceMapping = new JobToInstancesMapping(owner, awsAccessId, awsSecretKey);
jobToInstanceMapping = new JobToInstancesMapping(awsAccessId, awsSecretKey);
this.jobToInstancesMap.put(jobID, jobToInstanceMapping);
}
}

// Map containing the instances that will actually be requested via the EC2 interface..
Map<InstanceType, Integer> instancesToBeRequested = new HashMap<InstanceType, Integer>();
final Map<InstanceType, Integer> instancesToBeRequested = new HashMap<InstanceType, Integer>();

// First we check, if there are any floating instances available that we can use

Expand All @@ -582,8 +559,8 @@ public void requestInstance(JobID jobID, Configuration conf, Map<InstanceType, I
LOG.info("Request for " + neededinstancecount + " instances of type " + actualInstanceType.getIdentifier());

// Now check, if floating instances of specific type are available...
final LinkedList<CloudInstance> floatinginstances = anyFloatingInstanceAvailable(owner, awsAccessId,
awsSecretKey, actualInstanceType, neededinstancecount);
final LinkedList<CloudInstance> floatinginstances = anyFloatingInstanceAvailable(awsAccessId, awsSecretKey,
actualInstanceType, neededinstancecount);

// now we assign all found floating instances...
final JobToInstancesMapping mapping = this.jobToInstancesMap.get(jobID);
Expand Down Expand Up @@ -612,8 +589,8 @@ public void requestInstance(JobID jobID, Configuration conf, Map<InstanceType, I

// Now, we need to request the EC2 instances..

LinkedList<String> instanceIDs = allocateCloudInstance(awsAccessId, awsSecretKey, instancesToBeRequested,
sshKeyPair);
final LinkedList<String> instanceIDs = allocateCloudInstance(conf, awsAccessId, awsSecretKey,
instancesToBeRequested, sshKeyPair);

for (String i : instanceIDs) {
this.reservedInstances.put(i, jobID);
Expand All @@ -633,18 +610,22 @@ public void requestInstance(JobID jobID, Configuration conf, Map<InstanceType, I
* @param sshKeyPair
* Optional parameter to insert an EC2 SSH key/value pair
* @return
* List containing the instance IDs of the allocated instances.
*/
private LinkedList<String> allocateCloudInstance(String awsAccessId, String awsSecretKey,
private LinkedList<String> allocateCloudInstance(Configuration conf, String awsAccessId, String awsSecretKey,
Map<InstanceType, Integer> instancesToBeRequested, String sshKeyPair) {

/*
* final String imageID = GlobalConfiguration.getString("ec2.image.id", null);
* if (imageID == null) {
* LOG.error("Unable to allocate instance: Image ID is unknown");
* return null;
* }
*/
final String imageID = "ami-a24272d6";
String imageID = conf.getString("job.ec2.image.id", null);
LOG.info("EC2 Image ID from job conf: " + imageID);
if (imageID == null) {

imageID = GlobalConfiguration.getString("ec2.image.id", null);
if (imageID == null) {
LOG.error("Unable to allocate instance: Image ID is unknown");
return null;
}
}

final String jobManagerIPAddress = GlobalConfiguration.getString("jobmanager.rpc.address", null);
if (jobManagerIPAddress == null) {
LOG.error("JobManager IP address is not set (jobmanager.rpc.address)");
Expand Down Expand Up @@ -695,12 +676,10 @@ private LinkedList<String> allocateCloudInstance(String awsAccessId, String awsS
}

/**
* Checks whether there is a floating instance with the specific type belonging to the owner. If there is a floating
* Checks whether there is a floating instance with the specific type. If there is a floating
* instance, it will be
* changed into a cloud instance and returned.
*
* @param owner
* the owner of the floating instances
* @param awsAccessId
* the access ID into AWS
* @param awsSecretKey
Expand All @@ -711,8 +690,8 @@ private LinkedList<String> allocateCloudInstance(String awsAccessId, String awsS
* @throws InstanceException
* something wrong happens to the global configuration
*/
private LinkedList<CloudInstance> anyFloatingInstanceAvailable(String owner, String awsAccessId,
String awsSecretKey, InstanceType type, int count) throws InstanceException {
private LinkedList<CloudInstance> anyFloatingInstanceAvailable(String awsAccessId, String awsSecretKey,
InstanceType type, int count) throws InstanceException {

LOG.info("Check for floating instance of type" + type.getIdentifier() + " requested count: " + count + ".");

Expand Down Expand Up @@ -768,7 +747,7 @@ private LinkedList<CloudInstance> anyFloatingInstanceAvailable(String owner, Str
this.floatingInstanceIDs.remove(floatingInstance.getInstanceID());

floatinginstances.add(convertIntoCloudInstance(t,
floatingInstance.getInstanceConnectionInfo(), owner));
floatingInstance.getInstanceConnectionInfo()));

// If we already have enough floating instances found: return!
if (floatinginstances.size() >= count) {
Expand All @@ -786,7 +765,7 @@ private LinkedList<CloudInstance> anyFloatingInstanceAvailable(String owner, Str
this.floatingInstanceIDs.remove(floatingInstance.getInstanceID());

floatinginstances.add(convertIntoCloudInstance(t,
floatingInstance.getInstanceConnectionInfo(), owner));
floatingInstance.getInstanceConnectionInfo()));

// If we already have enough floating instances found: return!
if (floatinginstances.size() >= count) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,24 @@
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.ec2.AmazonEC2Client;

import eu.stratosphere.nephele.configuration.GlobalConfiguration;

/**
* This class is managing the EC2 clients.
* @author casp
*
*/
public class EC2ClientFactory {

//Already created EC2 clients are stored in this hashtable.
//This makes it possible to use multiple EC2 credentials.
// Already created EC2 clients are stored in this hashtable.
// This makes it possible to use multiple EC2 credentials.
private static Hashtable<String, AmazonEC2Client> ec2clients = new Hashtable<String, AmazonEC2Client>();

/**
* This factory method returns a the corresponding EC2Client object for the given credentials.
* @param awsAccessId
* @param awsSecretKey
* @return
* @return the desired AmazonEC2Client.
*/
static synchronized AmazonEC2Client getEC2Client(String awsAccessId, String awsSecretKey){

Expand All @@ -50,8 +52,10 @@ static synchronized AmazonEC2Client getEC2Client(String awsAccessId, String awsS
BasicAWSCredentials credentials = new BasicAWSCredentials(awsAccessId, awsSecretKey);
AmazonEC2Client client = new AmazonEC2Client(credentials);

//TODO: Make endpoints configurable (US, EU, Asia etc).
client.setEndpoint("ec2.eu-west-1.amazonaws.com");
final String endpoint = GlobalConfiguration.getString("ec2.webservice.endpoint", "ec2.eu-west-1.amazonaws.com");

client.setEndpoint(endpoint);

ec2clients.put(awsAccessId, client);
return client;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ public class JobToInstancesMapping {
/** The list of assigned cloud instances for the job. */
private final List<CloudInstance> assignedInstances = new ArrayList<CloudInstance>();

/** The owner of the job. */
private final String owner;

/** The access ID into Amazon Web Services. */
private final String awsAccessId;

Expand All @@ -42,15 +39,13 @@ public class JobToInstancesMapping {
/**
* Creates a new mapping for job to instances.
*
* @param owner
* the owner of the job
* @param awsAccessId
* the access ID into AWS
* @param awsSecretKey
* the secret key used to generate signatures for authentication
*/
public JobToInstancesMapping(String owner, String awsAccessId, String awsSecretKey) {
this.owner = owner;
public JobToInstancesMapping(String awsAccessId, String awsSecretKey) {

this.awsAccessId = awsAccessId;
this.awsSecretKey = awsSecretKey;
}
Expand Down Expand Up @@ -132,14 +127,6 @@ public CloudInstance getInstanceByConnectionInfo(InstanceConnectionInfo instance
return null;
}

/**
* Returns the owner of the job.
*
* @return the owner of the job
*/
public String getOwner() {
return this.owner;
}

/**
* Returns the access ID into AWS.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ private CloudInstance constructSmallCloudInstance() {

final CloudInstance cloudInstance = new CloudInstance("i-1234ABCD",
InstanceTypeFactory.constructFromDescription("m1.small,1,1,2048,40,10"),
"wenjun", new InstanceConnectionInfo(new InetSocketAddress("localhost", 6122).getAddress(), 6122, 6121),
new InstanceConnectionInfo(new InetSocketAddress("localhost", 6122).getAddress(), 6122, 6121),
1234567890, networkTopology.getRootNode(), networkTopology, hardwareDescription);

return cloudInstance;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ public void testAssignedInstances() {
final NetworkTopology networkTopology = NetworkTopology.createEmptyTopology();
final HardwareDescription hardwareDescription = HardwareDescriptionFactory.construct(1, 2048L*1024L*1024L, 2048L*1024L*1024L);

JobToInstancesMapping map = new JobToInstancesMapping("wenjun", "1234567", "abcdefg");
CloudInstance ci = new CloudInstance("i-1234ABCD", InstanceTypeFactory.constructFromDescription("m1.small,1,1,2048,40,10"), "wenjun",
JobToInstancesMapping map = new JobToInstancesMapping("1234567", "abcdefg");
CloudInstance ci = new CloudInstance("i-1234ABCD", InstanceTypeFactory.constructFromDescription("m1.small,1,1,2048,40,10"),
new InstanceConnectionInfo(new InetSocketAddress("localhost", 6122).getAddress(), 6122, 6121), 1234567890,
networkTopology.getRootNode(), networkTopology, hardwareDescription);

Expand Down

0 comments on commit 490d89a

Please sign in to comment.