Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
adding comments for StratosEventReceiver abstraction, starting the ev…
Browse files Browse the repository at this point in the history
…ent reseivers from messaging activator and adding shutdown for tenant, application and signup synchronizers

Conflicts:
	components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
	components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java
	components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
	components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
	components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
	components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
	components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
	components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java
	components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
	components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
	components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
	components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
  • Loading branch information
Isuru Haththotuwa committed Dec 24, 2015
1 parent 905e140 commit c90eb9a
Show file tree
Hide file tree
Showing 24 changed files with 186 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -229,5 +229,8 @@ protected void deactivate(ComponentContext ctx) {
} catch (Exception e) {
log.warn("An error occurred while closing cloud controller topology event publisher", e);
}

// shutdown TopologyEventSync task
StratosThreadPool.shutdown(THREAD_POOL_ID);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@
import org.apache.commons.logging.LogFactory;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.*;

/**
* Utility class for Stratos thread pool
Expand All @@ -37,7 +34,7 @@ public class StratosThreadPool {
private static final Log log = LogFactory.getLog(StratosThreadPool.class);

private static volatile Map<String, ExecutorService> executorServiceMap = new ConcurrentHashMap<>();
private static volatile Map<String, ScheduledExecutorService> scheduledServiceMap = new ConcurrentHashMap<String, ScheduledExecutorService>();
private static volatile Map<String, ScheduledExecutorService> scheduledServiceMap = new ConcurrentHashMap<>();
private static Object executorServiceMapLock = new Object();
private static Object scheduledServiceMapLock = new Object();

Expand Down Expand Up @@ -84,4 +81,32 @@ public static ScheduledExecutorService getScheduledExecutorService(String identi
}
return scheduledExecutorService;
}

public static void shutdown (String identifier) {

ExecutorService executorService = executorServiceMap.get(identifier);
if (executorService == null) {
log.warn("No executor service found for id " + identifier + ", unable to shut down");
return;
}

// try to shut down gracefully
executorService.shutdown();
// wait 10 secs till terminated
try {
if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
log.info("Thread Pool [id] " + identifier + " did not finish all tasks before " +
"timeout, forcefully shutting down");
executorService.shutdownNow();
}
} catch (InterruptedException e) {
// interrupted, shutdown now
executorService.shutdownNow();
}

// remove from the map
executorServiceMap.remove(identifier);

log.info("Successfully shutdown thread pool associated with id: " + identifier);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -328,5 +328,8 @@ protected void deactivate(ComponentContext context) {
// Close event publisher connections to message broker
EventPublisherPool.close(MessagingUtil.Topics.INSTANCE_NOTIFIER_TOPIC.getTopicName());
EventPublisherPool.close(MessagingUtil.Topics.TENANT_TOPIC.getTopicName());

// shut down the scheduled thread pool
StratosThreadPool.shutdown(THREAD_POOL_ID);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.common.threading.StratosThreadPool;
import org.apache.stratos.messaging.message.receiver.StratosEventReceiver;
import org.apache.stratos.messaging.message.receiver.application.ApplicationsEventReceiver;
import org.apache.stratos.messaging.message.receiver.application.signup.ApplicationSignUpEventReceiver;
import org.apache.stratos.messaging.message.receiver.cluster.status.ClusterStatusEventReceiver;
Expand All @@ -40,8 +42,20 @@ public class MessagingServiceComponent {
private static final Log log = LogFactory.getLog(MessagingServiceComponent.class);

protected void activate(ComponentContext context) {
// activate all message receivers
try {
log.info("Messaging Service bundle activated");
ApplicationSignUpEventReceiver.getInstance();
ApplicationsEventReceiver.getInstance();
ClusterStatusEventReceiver.getInstance();
DomainMappingEventReceiver.getInstance();
HealthStatEventReceiver.getInstance();
InitializerEventReceiver.getInstance();
TenantEventReceiver.getInstance();
TopologyEventReceiver.getInstance();

if (log.isDebugEnabled()) {
log.debug("Messaging Service bundle activated");
}
} catch (Exception e) {
log.error("Could not activate Messaging Service component", e);
}
Expand All @@ -58,7 +72,10 @@ protected void deactivate(ComponentContext context) {
InitializerEventReceiver.getInstance().terminate();
TenantEventReceiver.getInstance().terminate();
TopologyEventReceiver.getInstance().terminate();
log.info("Messaging Service component is deactivated");
StratosThreadPool.shutdown(StratosEventReceiver.STRATOS_EVENT_RECEIEVER_THREAD_POOL_ID);
if (log.isDebugEnabled()) {
log.debug("Messaging Service component is deactivated");
}
} catch (Exception e) {
log.error("Could not de-activate Messaging Service component", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,71 @@

package org.apache.stratos.messaging.message.receiver;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.common.threading.StratosThreadPool;
import org.apache.stratos.messaging.listener.EventListener;

import java.util.concurrent.ExecutorService;

public class StratosEventReceiver {
/**
* Abstraction for Event Receivers used in Stratos
*/
public abstract class StratosEventReceiver {

private static final Log log = LogFactory.getLog(StratosEventReceiver.class);

/**
* Thread pool information for all StratosEventReceiver implementations
*/

public static String STRATOS_EVENT_RECEIEVER_THREAD_POOL_ID = "stratos-event-receiver-pool";
private static String STRATOS_EVENT_RECEIEVER_THREAD_POOL_SIZE = "stratos.event.receiver.pool.size";

// thread pool id
protected String threadPoolId;
// executor service used
protected ExecutorService executorService;
// pool size
protected static int threadPoolSize = 15;

static {
// check if the thread pool size is given as a system parameter
String poolSize = System.getProperty(STRATOS_EVENT_RECEIEVER_THREAD_POOL_SIZE);
if (poolSize != null) {
try {
threadPoolSize = Integer.parseInt(poolSize);
} catch (NumberFormatException e) {
log.error("Invalid configuration found for StratosEventReceiver thread pool size", e);
threadPoolSize = 15;
}
}
if (log.isDebugEnabled()) {
log.debug("Number of threads used in pool " + STRATOS_EVENT_RECEIEVER_THREAD_POOL_ID + " : " + threadPoolSize);
}
}

public StratosEventReceiver () {
this.threadPoolId = STRATOS_EVENT_RECEIEVER_THREAD_POOL_ID;
this.executorService = StratosThreadPool.getExecutorService(threadPoolId, threadPoolSize);
}

/**
* Adds an EventListener to this StratosEventReceiver instance
*
* @param eventListener EventListener instance to add
*/
public abstract void addEventListener(EventListener eventListener);

/**
* Removed an EventListener from this StratosEventReceiver instance
*
* @param eventListener EventListener instance to remove
*/
public abstract void removeEventListener(EventListener eventListener);

/**
* Terminates this StratosEventReceiver instance
*/
public abstract void terminate();
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ public class ApplicationsEventReceiver extends StratosEventReceiver{
private static volatile ApplicationsEventReceiver instance;

private ApplicationsEventReceiver() {
// TODO: make pool size configurable
this.executorService = StratosThreadPool.getExecutorService("application-event-receiver", 100);
ApplicationsEventMessageQueue messageQueue = new ApplicationsEventMessageQueue();
this.messageDelegator = new ApplicationsEventMessageDelegator(messageQueue);
this.messageListener = new ApplicationsEventMessageListener(messageQueue);
Expand All @@ -66,7 +64,7 @@ public void removeEventListener(EventListener eventListener) {
messageDelegator.removeEventListener(eventListener);
}

public void execute() {
private void execute() {
try {
// Start topic subscriber thread
eventSubscriber = new EventSubscriber(MessagingUtil.Topics.APPLICATION_TOPIC.getTopicName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ public void addEventListener(EventListener eventListener) {
processorChain.addEventListener(eventListener);
}

public void removeEventListener(EventListener eventListener) {
processorChain.removeEventListener(eventListener);
}

@Override
public void run() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ public class ApplicationSignUpEventReceiver extends StratosEventReceiver {
private static volatile ApplicationSignUpEventReceiver instance;

private ApplicationSignUpEventReceiver() {
// TODO: make pool size configurable
this.executorService = StratosThreadPool.getExecutorService("application-signup-event-receiver", 100);
ApplicationSignUpEventMessageQueue messageQueue = new ApplicationSignUpEventMessageQueue();
this.messageDelegator = new ApplicationSignUpEventMessageDelegator(messageQueue);
this.messageListener = new ApplicationSignUpEventMessageListener(messageQueue);
Expand All @@ -67,6 +65,10 @@ public void addEventListener(EventListener eventListener) {
messageDelegator.addEventListener(eventListener);
}

public void removeEventListener(EventListener eventListener) {
messageDelegator.removeEventListener(eventListener);
}

private void execute() {
try {
// Start topic subscriber thread
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ public void addEventListener(EventListener eventListener) {
processorChain.addEventListener(eventListener);
}

public void removeEventListener(EventListener eventListener) {
processorChain.removeEventListener(eventListener);
}

@Override
public void run() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ public class ClusterStatusEventReceiver extends StratosEventReceiver {
private static volatile ClusterStatusEventReceiver instance;

private ClusterStatusEventReceiver() {
// TODO: make pool size configurable
this.executorService = StratosThreadPool.getExecutorService("clusterstatus-event-receiver", 100);
ClusterStatusEventMessageQueue messageQueue = new ClusterStatusEventMessageQueue();
this.messageDelegator = new ClusterStatusEventMessageDelegator(messageQueue);
this.messageListener = new ClusterStatusEventMessageListener(messageQueue);
Expand All @@ -51,6 +49,10 @@ public void addEventListener(EventListener eventListener) {
messageDelegator.addEventListener(eventListener);
}

public void removeEventListener(EventListener eventListener) {
messageDelegator.removeEventListener(eventListener);
}

public static ClusterStatusEventReceiver getInstance () {
if (instance == null) {
synchronized (ClusterStatusEventReceiver.class) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ public void addEventListener(EventListener eventListener) {
processorChain.addEventListener(eventListener);
}

public void removeEventListener(EventListener eventListener) {
processorChain.removeEventListener(eventListener);
}

@Override
public void run() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ public class DomainMappingEventReceiver extends StratosEventReceiver {
private static volatile DomainMappingEventReceiver instance;

private DomainMappingEventReceiver() {
// TODO: make pool size configurable
this.executorService = StratosThreadPool.getExecutorService("domainmapping-event-receiver", 100);
DomainMappingEventMessageQueue messageQueue = new DomainMappingEventMessageQueue();
this.messageDelegator = new DomainMappingEventMessageDelegator(messageQueue);
this.messageListener = new DomainMappingEventMessageListener(messageQueue);
Expand All @@ -52,6 +50,10 @@ public void addEventListener(EventListener eventListener) {
messageDelegator.addEventListener(eventListener);
}

public void removeEventListener(EventListener eventListener) {
messageDelegator.removeEventListener(eventListener);
}

public static DomainMappingEventReceiver getInstance () {
if (instance == null) {
synchronized (DomainMappingEventReceiver.class) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ public void addEventListener(EventListener eventListener) {
processorChain.addEventListener(eventListener);
}

public void removeEventListener(EventListener eventListener) {
processorChain.removeEventListener(eventListener);
}

@Override
public void run() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ public class HealthStatEventReceiver extends StratosEventReceiver {
private static volatile HealthStatEventReceiver instance;

private HealthStatEventReceiver() {
// TODO: make pool size configurable
this.executorService = StratosThreadPool.getExecutorService("healthstat-event-receiver", 100);
HealthStatEventMessageQueue messageQueue = new HealthStatEventMessageQueue();
this.messageDelegator = new HealthStatEventMessageDelegator(messageQueue);
this.messageListener = new HealthStatEventMessageListener(messageQueue);
Expand All @@ -63,6 +61,9 @@ public void addEventListener(EventListener eventListener) {
messageDelegator.addEventListener(eventListener);
}

public void removeEventListener(EventListener eventListener) {
messageDelegator.removeEventListener(eventListener);
}

private void execute() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ public void addEventListener(EventListener eventListener) {
processorChain.addEventListener(eventListener);
}

public void removeEventListener(EventListener eventListener) {
processorChain.removeEventListener(eventListener);
}

@Override
public void run() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.stratos.messaging.message.receiver.StratosEventReceiver;
import org.apache.stratos.messaging.util.MessagingUtil;

import java.util.concurrent.ExecutorService;

public class InitializerEventReceiver extends StratosEventReceiver {
private static final Log log = LogFactory.getLog(InitializerEventReceiver.class);
Expand All @@ -38,8 +37,6 @@ public class InitializerEventReceiver extends StratosEventReceiver {
//private ExecutorService executorService;

private InitializerEventReceiver() {
// TODO: make pool size configurable
this.executorService = StratosThreadPool.getExecutorService("initializer-event-receiver", 100);
InitializerEventMessageQueue initializerEventMessageQueue = new InitializerEventMessageQueue();
this.messageDelegator = new InitializerEventMessageDelegator(initializerEventMessageQueue);
this.messageListener = new InitializerEventMessageListener(initializerEventMessageQueue);
Expand All @@ -62,6 +59,10 @@ public void addEventListener(EventListener eventListener) {
messageDelegator.addEventListener(eventListener);
}

public void removeEventListener(EventListener eventListener) {
messageDelegator.removeEventListener(eventListener);
}

private void execute() {
try {
// Start topic subscriber thread
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ public void addEventListener(EventListener eventListener) {
processorChain.addEventListener(eventListener);
}

public void removeEventListener(EventListener eventListener) {
processorChain.removeEventListener(eventListener);
}

@Override
public void run() {
try {
Expand Down

0 comments on commit c90eb9a

Please sign in to comment.