diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java index c30fc63672..5b01330788 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java @@ -229,5 +229,8 @@ protected void deactivate(ComponentContext ctx) { } catch (Exception e) { log.warn("An error occurred while closing cloud controller topology event publisher", e); } + + // shutdown TopologyEventSync task + StratosThreadPool.shutdown(THREAD_POOL_ID); } } diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java index 687cec2182..8037ce3ba6 100644 --- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java +++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java @@ -24,10 +24,7 @@ import org.apache.commons.logging.LogFactory; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.*; /** * Utility class for Stratos thread pool @@ -37,7 +34,7 @@ public class StratosThreadPool { private static final Log log = LogFactory.getLog(StratosThreadPool.class); private static volatile Map executorServiceMap = new ConcurrentHashMap<>(); - private static volatile Map scheduledServiceMap = new ConcurrentHashMap(); + private static volatile Map scheduledServiceMap = new ConcurrentHashMap<>(); private static Object executorServiceMapLock = new Object(); private static Object scheduledServiceMapLock = new Object(); @@ -84,4 +81,32 @@ public static ScheduledExecutorService getScheduledExecutorService(String identi } return scheduledExecutorService; } + + public static void shutdown (String identifier) { + + ExecutorService executorService = executorServiceMap.get(identifier); + if (executorService == null) { + log.warn("No executor service found for id " + identifier + ", unable to shut down"); + return; + } + + // try to shut down gracefully + executorService.shutdown(); + // wait 10 secs till terminated + try { + if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) { + log.info("Thread Pool [id] " + identifier + " did not finish all tasks before " + + "timeout, forcefully shutting down"); + executorService.shutdownNow(); + } + } catch (InterruptedException e) { + // interrupted, shutdown now + executorService.shutdownNow(); + } + + // remove from the map + executorServiceMap.remove(identifier); + + log.info("Successfully shutdown thread pool associated with id: " + identifier); + } } diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java index aa7cc028d5..5bd3f76795 100644 --- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java +++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java @@ -328,5 +328,8 @@ protected void deactivate(ComponentContext context) { // Close event publisher connections to message broker EventPublisherPool.close(MessagingUtil.Topics.INSTANCE_NOTIFIER_TOPIC.getTopicName()); EventPublisherPool.close(MessagingUtil.Topics.TENANT_TOPIC.getTopicName()); + + // shut down the scheduled thread pool + StratosThreadPool.shutdown(THREAD_POOL_ID); } } diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/internal/MessagingServiceComponent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/internal/MessagingServiceComponent.java index c97125bf4b..b582d56249 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/internal/MessagingServiceComponent.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/internal/MessagingServiceComponent.java @@ -21,6 +21,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.common.threading.StratosThreadPool; +import org.apache.stratos.messaging.message.receiver.StratosEventReceiver; import org.apache.stratos.messaging.message.receiver.application.ApplicationsEventReceiver; import org.apache.stratos.messaging.message.receiver.application.signup.ApplicationSignUpEventReceiver; import org.apache.stratos.messaging.message.receiver.cluster.status.ClusterStatusEventReceiver; @@ -40,8 +42,20 @@ public class MessagingServiceComponent { private static final Log log = LogFactory.getLog(MessagingServiceComponent.class); protected void activate(ComponentContext context) { + // activate all message receivers try { - log.info("Messaging Service bundle activated"); + ApplicationSignUpEventReceiver.getInstance(); + ApplicationsEventReceiver.getInstance(); + ClusterStatusEventReceiver.getInstance(); + DomainMappingEventReceiver.getInstance(); + HealthStatEventReceiver.getInstance(); + InitializerEventReceiver.getInstance(); + TenantEventReceiver.getInstance(); + TopologyEventReceiver.getInstance(); + + if (log.isDebugEnabled()) { + log.debug("Messaging Service bundle activated"); + } } catch (Exception e) { log.error("Could not activate Messaging Service component", e); } @@ -58,7 +72,10 @@ protected void deactivate(ComponentContext context) { InitializerEventReceiver.getInstance().terminate(); TenantEventReceiver.getInstance().terminate(); TopologyEventReceiver.getInstance().terminate(); - log.info("Messaging Service component is deactivated"); + StratosThreadPool.shutdown(StratosEventReceiver.STRATOS_EVENT_RECEIEVER_THREAD_POOL_ID); + if (log.isDebugEnabled()) { + log.debug("Messaging Service component is deactivated"); + } } catch (Exception e) { log.error("Could not de-activate Messaging Service component", e); } diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java index 5ac89e6ad1..8c29816fc2 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java @@ -19,12 +19,71 @@ package org.apache.stratos.messaging.message.receiver; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.common.threading.StratosThreadPool; +import org.apache.stratos.messaging.listener.EventListener; + import java.util.concurrent.ExecutorService; -public class StratosEventReceiver { +/** + * Abstraction for Event Receivers used in Stratos + */ +public abstract class StratosEventReceiver { + + private static final Log log = LogFactory.getLog(StratosEventReceiver.class); + + /** + * Thread pool information for all StratosEventReceiver implementations + */ + + public static String STRATOS_EVENT_RECEIEVER_THREAD_POOL_ID = "stratos-event-receiver-pool"; + private static String STRATOS_EVENT_RECEIEVER_THREAD_POOL_SIZE = "stratos.event.receiver.pool.size"; + // thread pool id + protected String threadPoolId; + // executor service used protected ExecutorService executorService; + // pool size + protected static int threadPoolSize = 15; + + static { + // check if the thread pool size is given as a system parameter + String poolSize = System.getProperty(STRATOS_EVENT_RECEIEVER_THREAD_POOL_SIZE); + if (poolSize != null) { + try { + threadPoolSize = Integer.parseInt(poolSize); + } catch (NumberFormatException e) { + log.error("Invalid configuration found for StratosEventReceiver thread pool size", e); + threadPoolSize = 15; + } + } + if (log.isDebugEnabled()) { + log.debug("Number of threads used in pool " + STRATOS_EVENT_RECEIEVER_THREAD_POOL_ID + " : " + threadPoolSize); + } + } public StratosEventReceiver () { + this.threadPoolId = STRATOS_EVENT_RECEIEVER_THREAD_POOL_ID; + this.executorService = StratosThreadPool.getExecutorService(threadPoolId, threadPoolSize); } + + /** + * Adds an EventListener to this StratosEventReceiver instance + * + * @param eventListener EventListener instance to add + */ + public abstract void addEventListener(EventListener eventListener); + + /** + * Removed an EventListener from this StratosEventReceiver instance + * + * @param eventListener EventListener instance to remove + */ + public abstract void removeEventListener(EventListener eventListener); + + /** + * Terminates this StratosEventReceiver instance + */ + public abstract void terminate(); } diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java index 69dba01798..89dd73ecce 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java @@ -38,8 +38,6 @@ public class ApplicationsEventReceiver extends StratosEventReceiver{ private static volatile ApplicationsEventReceiver instance; private ApplicationsEventReceiver() { - // TODO: make pool size configurable - this.executorService = StratosThreadPool.getExecutorService("application-event-receiver", 100); ApplicationsEventMessageQueue messageQueue = new ApplicationsEventMessageQueue(); this.messageDelegator = new ApplicationsEventMessageDelegator(messageQueue); this.messageListener = new ApplicationsEventMessageListener(messageQueue); @@ -66,7 +64,7 @@ public void removeEventListener(EventListener eventListener) { messageDelegator.removeEventListener(eventListener); } - public void execute() { + private void execute() { try { // Start topic subscriber thread eventSubscriber = new EventSubscriber(MessagingUtil.Topics.APPLICATION_TOPIC.getTopicName(), diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventMessageDelegator.java index adf805dbcf..59374bb4b2 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventMessageDelegator.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventMessageDelegator.java @@ -46,6 +46,10 @@ public void addEventListener(EventListener eventListener) { processorChain.addEventListener(eventListener); } + public void removeEventListener(EventListener eventListener) { + processorChain.removeEventListener(eventListener); + } + @Override public void run() { try { diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java index df90cf988d..5ad6070722 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java @@ -43,8 +43,6 @@ public class ApplicationSignUpEventReceiver extends StratosEventReceiver { private static volatile ApplicationSignUpEventReceiver instance; private ApplicationSignUpEventReceiver() { - // TODO: make pool size configurable - this.executorService = StratosThreadPool.getExecutorService("application-signup-event-receiver", 100); ApplicationSignUpEventMessageQueue messageQueue = new ApplicationSignUpEventMessageQueue(); this.messageDelegator = new ApplicationSignUpEventMessageDelegator(messageQueue); this.messageListener = new ApplicationSignUpEventMessageListener(messageQueue); @@ -67,6 +65,10 @@ public void addEventListener(EventListener eventListener) { messageDelegator.addEventListener(eventListener); } + public void removeEventListener(EventListener eventListener) { + messageDelegator.removeEventListener(eventListener); + } + private void execute() { try { // Start topic subscriber thread diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageDelegator.java index 5c9c50259c..954d9bebb0 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageDelegator.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageDelegator.java @@ -46,6 +46,10 @@ public void addEventListener(EventListener eventListener) { processorChain.addEventListener(eventListener); } + public void removeEventListener(EventListener eventListener) { + processorChain.removeEventListener(eventListener); + } + @Override public void run() { try { diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java index be42b435ab..9de351bcd2 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java @@ -39,8 +39,6 @@ public class ClusterStatusEventReceiver extends StratosEventReceiver { private static volatile ClusterStatusEventReceiver instance; private ClusterStatusEventReceiver() { - // TODO: make pool size configurable - this.executorService = StratosThreadPool.getExecutorService("clusterstatus-event-receiver", 100); ClusterStatusEventMessageQueue messageQueue = new ClusterStatusEventMessageQueue(); this.messageDelegator = new ClusterStatusEventMessageDelegator(messageQueue); this.messageListener = new ClusterStatusEventMessageListener(messageQueue); @@ -51,6 +49,10 @@ public void addEventListener(EventListener eventListener) { messageDelegator.addEventListener(eventListener); } + public void removeEventListener(EventListener eventListener) { + messageDelegator.removeEventListener(eventListener); + } + public static ClusterStatusEventReceiver getInstance () { if (instance == null) { synchronized (ClusterStatusEventReceiver.class) { diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventMessageDelegator.java index fa783a9218..03154f2bd6 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventMessageDelegator.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventMessageDelegator.java @@ -46,6 +46,10 @@ public void addEventListener(EventListener eventListener) { processorChain.addEventListener(eventListener); } + public void removeEventListener(EventListener eventListener) { + processorChain.removeEventListener(eventListener); + } + @Override public void run() { try { diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java index 6de99c0412..6c88f73229 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java @@ -40,8 +40,6 @@ public class DomainMappingEventReceiver extends StratosEventReceiver { private static volatile DomainMappingEventReceiver instance; private DomainMappingEventReceiver() { - // TODO: make pool size configurable - this.executorService = StratosThreadPool.getExecutorService("domainmapping-event-receiver", 100); DomainMappingEventMessageQueue messageQueue = new DomainMappingEventMessageQueue(); this.messageDelegator = new DomainMappingEventMessageDelegator(messageQueue); this.messageListener = new DomainMappingEventMessageListener(messageQueue); @@ -52,6 +50,10 @@ public void addEventListener(EventListener eventListener) { messageDelegator.addEventListener(eventListener); } + public void removeEventListener(EventListener eventListener) { + messageDelegator.removeEventListener(eventListener); + } + public static DomainMappingEventReceiver getInstance () { if (instance == null) { synchronized (DomainMappingEventReceiver.class) { diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java index 2cde2a9c3a..29fb47b8e7 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java @@ -48,6 +48,10 @@ public void addEventListener(EventListener eventListener) { processorChain.addEventListener(eventListener); } + public void removeEventListener(EventListener eventListener) { + processorChain.removeEventListener(eventListener); + } + @Override public void run() { try { diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java index a9d260203d..442bdb6a27 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java @@ -39,8 +39,6 @@ public class HealthStatEventReceiver extends StratosEventReceiver { private static volatile HealthStatEventReceiver instance; private HealthStatEventReceiver() { - // TODO: make pool size configurable - this.executorService = StratosThreadPool.getExecutorService("healthstat-event-receiver", 100); HealthStatEventMessageQueue messageQueue = new HealthStatEventMessageQueue(); this.messageDelegator = new HealthStatEventMessageDelegator(messageQueue); this.messageListener = new HealthStatEventMessageListener(messageQueue); @@ -63,6 +61,9 @@ public void addEventListener(EventListener eventListener) { messageDelegator.addEventListener(eventListener); } + public void removeEventListener(EventListener eventListener) { + messageDelegator.removeEventListener(eventListener); + } private void execute() { try { diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageDelegator.java index ffd2ae4a24..baca3509ef 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageDelegator.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageDelegator.java @@ -41,6 +41,10 @@ public void addEventListener(EventListener eventListener) { processorChain.addEventListener(eventListener); } + public void removeEventListener(EventListener eventListener) { + processorChain.removeEventListener(eventListener); + } + @Override public void run() { try { diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java index 805a8bfe45..c7e5daf55e 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java @@ -26,7 +26,6 @@ import org.apache.stratos.messaging.message.receiver.StratosEventReceiver; import org.apache.stratos.messaging.util.MessagingUtil; -import java.util.concurrent.ExecutorService; public class InitializerEventReceiver extends StratosEventReceiver { private static final Log log = LogFactory.getLog(InitializerEventReceiver.class); @@ -38,8 +37,6 @@ public class InitializerEventReceiver extends StratosEventReceiver { //private ExecutorService executorService; private InitializerEventReceiver() { - // TODO: make pool size configurable - this.executorService = StratosThreadPool.getExecutorService("initializer-event-receiver", 100); InitializerEventMessageQueue initializerEventMessageQueue = new InitializerEventMessageQueue(); this.messageDelegator = new InitializerEventMessageDelegator(initializerEventMessageQueue); this.messageListener = new InitializerEventMessageListener(initializerEventMessageQueue); @@ -62,6 +59,10 @@ public void addEventListener(EventListener eventListener) { messageDelegator.addEventListener(eventListener); } + public void removeEventListener(EventListener eventListener) { + messageDelegator.removeEventListener(eventListener); + } + private void execute() { try { // Start topic subscriber thread diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageDelegator.java index 73ef9fe268..b695db7fbe 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageDelegator.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageDelegator.java @@ -46,6 +46,10 @@ public void addEventListener(EventListener eventListener) { processorChain.addEventListener(eventListener); } + public void removeEventListener(EventListener eventListener) { + processorChain.removeEventListener(eventListener); + } + @Override public void run() { try { diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java index e0b8e9f502..5bcd75ac42 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java @@ -39,8 +39,6 @@ public class InstanceNotifierEventReceiver extends StratosEventReceiver { //private boolean terminated; private InstanceNotifierEventReceiver() { - // TODO: make pool size configurable - this.executorService = StratosThreadPool.getExecutorService("topology-event-receiver", 100); InstanceNotifierEventMessageQueue messageQueue = new InstanceNotifierEventMessageQueue(); this.messageDelegator = new InstanceNotifierEventMessageDelegator(messageQueue); this.messageListener = new InstanceNotifierEventMessageListener(messageQueue); @@ -63,6 +61,10 @@ public void addEventListener(EventListener eventListener) { messageDelegator.addEventListener(eventListener); } + public void removeEventListener(EventListener eventListener) { + messageDelegator.removeEventListener(eventListener); + } + private void execute() { try { // Start topic subscriber thread @@ -94,7 +96,5 @@ public boolean isSubscribed() { public synchronized void terminate() { eventSubscriber.terminate(); messageDelegator.terminate(); - //terminated = true; - log.info("InstanceNotifierEventReceiver terminated"); } } diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventMessageDelegator.java index 9f754b0d18..e5df65e761 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventMessageDelegator.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventMessageDelegator.java @@ -46,6 +46,10 @@ public void addEventListener(EventListener eventListener) { processorChain.addEventListener(eventListener); } + public void removeEventListener(EventListener eventListener) { + processorChain.removeEventListener(eventListener); + } + @Override public void run() { try { diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java index a565ea9905..3d9f793bdc 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java @@ -38,8 +38,6 @@ public class InstanceStatusEventReceiver extends StratosEventReceiver { private static volatile InstanceStatusEventReceiver instance; private InstanceStatusEventReceiver() { - // TODO: make pool size configurable - this.executorService = StratosThreadPool.getExecutorService("topology-event-receiver", 100); InstanceStatusEventMessageQueue messageQueue = new InstanceStatusEventMessageQueue(); this.messageDelegator = new InstanceStatusEventMessageDelegator(messageQueue); this.messageListener = new InstanceStatusEventMessageListener(messageQueue); @@ -62,6 +60,9 @@ public void addEventListener(EventListener eventListener) { messageDelegator.addEventListener(eventListener); } + public void removeEventListener(EventListener eventListener) { + messageDelegator.removeEventListener(eventListener); + } private void execute() { try { @@ -91,6 +92,5 @@ public boolean isSubscribed() { public void terminate() { eventSubscriber.terminate(); messageDelegator.terminate(); - // terminated = true; } } diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageDelegator.java index c735d9bb2d..cd8724ca56 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageDelegator.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageDelegator.java @@ -48,6 +48,10 @@ public void addEventListener(EventListener eventListener) { processorChain.addEventListener(eventListener); } + public void removeEventListener(EventListener eventListener) { + processorChain.removeEventListener(eventListener); + } + @Override public void run() { try { diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java index a52cb2000c..e30d3abbe9 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java @@ -42,8 +42,6 @@ public class TenantEventReceiver extends StratosEventReceiver { private static volatile TenantEventReceiver instance; private TenantEventReceiver() { - // TODO: make pool size configurable - this.executorService = StratosThreadPool.getExecutorService("tenant-event-receiver", 100); TenantEventMessageQueue messageQueue = new TenantEventMessageQueue(); this.messageDelegator = new TenantEventMessageDelegator(messageQueue); this.messageListener = new TenantEventMessageListener(messageQueue); @@ -66,6 +64,10 @@ public void addEventListener(EventListener eventListener) { messageDelegator.addEventListener(eventListener); } + public void removeEventListener(EventListener eventListener) { + messageDelegator.removeEventListener(eventListener); + } + private void execute() { try { // Start topic subscriber thread diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java index 8508d91695..d2664f4442 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java @@ -47,6 +47,10 @@ public void addEventListener(EventListener eventListener) { processorChain.addEventListener(eventListener); } + public void removeEventListener(EventListener eventListener) { + processorChain.removeEventListener(eventListener); + } + @Override public void run() { try { diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java index bfa395038b..4f1f254996 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java @@ -44,8 +44,6 @@ public class TopologyEventReceiver extends StratosEventReceiver { private static volatile TopologyEventReceiver instance; private TopologyEventReceiver() { - // TODO: make pool size configurable - this.executorService = StratosThreadPool.getExecutorService("topology-event-receiver", 100); TopologyEventMessageQueue messageQueue = new TopologyEventMessageQueue(); this.messageDelegator = new TopologyEventMessageDelegator(messageQueue); this.messageListener = new TopologyEventMessageListener(messageQueue); @@ -68,6 +66,10 @@ public void addEventListener(EventListener eventListener) { messageDelegator.addEventListener(eventListener); } + public void removeEventListener(EventListener eventListener) { + messageDelegator.removeEventListener(eventListener); + } + private void execute() { try { // Start topic subscriber thread