From ac05a7117c54683606f19a0b21ca0148de15e03e Mon Sep 17 00:00:00 2001 From: Philipp Richter Date: Thu, 21 Jul 2011 15:40:46 +0200 Subject: [PATCH] cloudmgr blacklist/exception handling --- .../nephele/instance/cloud/CloudManager.java | 26 +++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/nephele/nephele-ec2cloudmanager/src/main/java/eu/stratosphere/nephele/instance/cloud/CloudManager.java b/nephele/nephele-ec2cloudmanager/src/main/java/eu/stratosphere/nephele/instance/cloud/CloudManager.java index 50fc318a0d724..1af0166bc780c 100644 --- a/nephele/nephele-ec2cloudmanager/src/main/java/eu/stratosphere/nephele/instance/cloud/CloudManager.java +++ b/nephele/nephele-ec2cloudmanager/src/main/java/eu/stratosphere/nephele/instance/cloud/CloudManager.java @@ -95,8 +95,11 @@ public class CloudManager extends TimerTask implements InstanceManager { /** The user pays fee for his instances every time unit. */ private static final long TIMEUNIT = 60 * 60 * 1000; // 1 hour in ms. - /** timelimit to full next hour when instance is kicked. */ + /** Timelimit to full next hour when instance is kicked. */ private static final long TIMETHRESHOLD = 2 * 60 * 1000; // 2 mins in ms. + + /** TMs that send HeartBeats but do not belong to any job will be blacklisted */ + private final HashSet blackListedTms = new HashSet(); /** The array of all available instance types in the cloud. */ private final InstanceType[] availableInstanceTypes; @@ -321,6 +324,13 @@ private String destroyCloudInstance(Configuration conf, String instanceID) throw public synchronized void reportHeartBeat(InstanceConnectionInfo instanceConnectionInfo, HardwareDescription hardwareDescription) { + // Check if this TM is blacklisted + if(this.blackListedTms.contains(instanceConnectionInfo)){ + LOG.debug("Received HeartBeat from blacklisted TM " + instanceConnectionInfo); + return; + } + + // Check if heart beat belongs to a floating instance if (this.floatingInstances.containsKey(instanceConnectionInfo)) { final FloatingInstance floatingInstance = this.floatingInstances.get(instanceConnectionInfo); @@ -363,7 +373,11 @@ public synchronized void reportHeartBeat(InstanceConnectionInfo instanceConnecti this.instanceListener.resourceAllocated(jobID, instance.asAllocatedResource()); return; } - + + // This TM seems to be unknown to the JobManager.. blacklist + LOG.info("Received HeartBeat from unknown TM. Blacklisting. Address is: " + instanceConnectionInfo); + this.blackListedTms.add(instanceConnectionInfo); + } /** @@ -665,6 +679,14 @@ private LinkedList allocateCloudInstance(Configuration conf, String awsA // Request instances! RunInstancesResult result = ec2client.runInstances(request); + + // Check if reservation went well... + + if(result.getReservation().getInstances().size() != neededinstancecount){ + // Something went wront.. + LOG.error("Requested " + neededinstancecount + " instances of type " + actualInstanceType.getIdentifier() + " but only got " + result.getReservation().getInstances().size() + " instances reserved."); + } + for (Instance i : result.getReservation().getInstances()) { instanceIDs.add(i.getInstanceId());