Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
making InitializerEventReceiver singleton and fixing references in co…
Browse files Browse the repository at this point in the history
…mponents
  • Loading branch information
Isuru Haththotuwa committed Dec 24, 2015
1 parent 170c27c commit a1ba54c
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,17 @@ public class AutoscalerInitializerTopicReceiver {
private ExecutorService executorService;

public AutoscalerInitializerTopicReceiver() {
this.initializerEventReceiver = new InitializerEventReceiver();
this.initializerEventReceiver = InitializerEventReceiver.getInstance();
addEventListeners();
}

public void execute() {
initializerEventReceiver.setExecutorService(executorService);
initializerEventReceiver.execute();
if (log.isInfoEnabled()) {
log.info("Cloud controller initializer topic receiver started");
}
}
// public void execute() {
// initializerEventReceiver.setExecutorService(executorService);
// initializerEventReceiver.execute();
// if (log.isInfoEnabled()) {
// log.info("Cloud controller initializer topic receiver started");
// }
// }

private void addEventListeners() {
initializerEventReceiver.addEventListener(new CompleteApplicationsRequestEventListener() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,11 @@ private void executeCoordinatorTasks()

// Start initializer receiver
autoscalerInitializerTopicReceiver = new AutoscalerInitializerTopicReceiver();
autoscalerInitializerTopicReceiver.setExecutorService(executorService);
autoscalerInitializerTopicReceiver.execute();
if (log.isDebugEnabled()) {
log.debug("Initializer receiver thread started");
}
// autoscalerInitializerTopicReceiver.setExecutorService(executorService);
// autoscalerInitializerTopicReceiver.execute();
// if (log.isDebugEnabled()) {
// log.debug("Initializer receiver thread started");
// }

if (log.isInfoEnabled()) {
log.info("Scheduling tasks to publish applications");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ private void executeCoordinatorTasks() {
}

initializerTopicReceiver = new InitializerTopicReceiver();
initializerTopicReceiver.setExecutorService(executorService);
initializerTopicReceiver.execute();
// initializerTopicReceiver.setExecutorService(executorService);
// initializerTopicReceiver.execute();

if (log.isInfoEnabled()) {
log.info("Initializer event receiver thread started");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,17 @@ public class InitializerTopicReceiver {
private ExecutorService executorService;

public InitializerTopicReceiver() {
this.initializerEventReceiver = new InitializerEventReceiver();
this.initializerEventReceiver = InitializerEventReceiver.getInstance();
addEventListeners();
}

public void execute() {
initializerEventReceiver.setExecutorService(executorService);
initializerEventReceiver.execute();
if (log.isInfoEnabled()) {
log.info("Autoscaler initializer topic receiver started");
}
}
// public void execute() {
// initializerEventReceiver.setExecutorService(executorService);
// initializerEventReceiver.execute();
// if (log.isInfoEnabled()) {
// log.info("Autoscaler initializer topic receiver started");
// }
// }

private void addEventListeners() {
initializerEventReceiver.addEventListener(new CompleteTopologyRequestEventListener() {
Expand All @@ -62,11 +62,11 @@ protected void onEvent(Event event) {
});
}

public ExecutorService getExecutorService() {
return executorService;
}

public void setExecutorService(ExecutorService executorService) {
this.executorService = executorService;
}
// public ExecutorService getExecutorService() {
// return executorService;
// }
//
// public void setExecutorService(ExecutorService executorService) {
// this.executorService = executorService;
// }
}
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ private void executeCoordinatorTasks(ComponentContext componentContext)

private void initializeInitializerEventReceiver() {
initializerTopicReceiver = new StratosManagerInitializerTopicReceiver();
initializerTopicReceiver.setExecutorService(executorService);
initializerTopicReceiver.execute();
// initializerTopicReceiver.setExecutorService(executorService);
// initializerTopicReceiver.execute();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,22 @@
public class StratosManagerInitializerTopicReceiver {
private static final Log log = LogFactory.getLog(StratosManagerInitializerTopicReceiver.class);
private InitializerEventReceiver initializerEventReceiver;
private ExecutorService executorService;
//private ExecutorService executorService;
private ApplicationSignUpHandler applicationSignUpHandler;

public StratosManagerInitializerTopicReceiver() {
this.initializerEventReceiver = new InitializerEventReceiver();
this.initializerEventReceiver = InitializerEventReceiver.getInstance();
applicationSignUpHandler = new ApplicationSignUpHandler();
addEventListeners();
}

public void execute() {
initializerEventReceiver.setExecutorService(executorService);
initializerEventReceiver.execute();
if (log.isInfoEnabled()) {
log.info("Stratos manager initializer topic receiver started");
}
}
// public void execute() {
// initializerEventReceiver.setExecutorService(executorService);
// initializerEventReceiver.execute();
// if (log.isInfoEnabled()) {
// log.info("Stratos manager initializer topic receiver started");
// }
// }

private void addEventListeners() {
initializerEventReceiver.addEventListener(new CompleteTenantRequestEventListener() {
Expand Down Expand Up @@ -81,11 +81,11 @@ protected void onEvent(Event event) {
});
}

public ExecutorService getExecutorService() {
return executorService;
}

public void setExecutorService(ExecutorService executorService) {
this.executorService = executorService;
}
// public ExecutorService getExecutorService() {
// return executorService;
// }
//
// public void setExecutorService(ExecutorService executorService) {
// this.executorService = executorService;
// }
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,49 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.common.threading.StratosThreadPool;
import org.apache.stratos.messaging.broker.subscribe.EventSubscriber;
import org.apache.stratos.messaging.listener.EventListener;
import org.apache.stratos.messaging.message.receiver.StratosEventReceiver;
import org.apache.stratos.messaging.util.MessagingUtil;

import java.util.concurrent.ExecutorService;

public class InitializerEventReceiver {
public class InitializerEventReceiver extends StratosEventReceiver {
private static final Log log = LogFactory.getLog(InitializerEventReceiver.class);

private InitializerEventMessageDelegator messageDelegator;
private InitializerEventMessageListener messageListener;
private EventSubscriber eventSubscriber;
private ExecutorService executorService;
private static volatile InitializerEventReceiver instance;
//private ExecutorService executorService;

public InitializerEventReceiver() {
private InitializerEventReceiver() {
// TODO: make pool size configurable
this.executorService = StratosThreadPool.getExecutorService("initializer-event-receiver", 100);
InitializerEventMessageQueue initializerEventMessageQueue = new InitializerEventMessageQueue();
this.messageDelegator = new InitializerEventMessageDelegator(initializerEventMessageQueue);
this.messageListener = new InitializerEventMessageListener(initializerEventMessageQueue);
execute();
}

public static InitializerEventReceiver getInstance () {
if (instance == null) {
synchronized (InitializerEventReceiver.class) {
if (instance == null) {
instance = new InitializerEventReceiver();
}
}
}

return instance;
}

public void addEventListener(EventListener eventListener) {
messageDelegator.addEventListener(eventListener);
}

public void execute() {
private void execute() {
try {
// Start topic subscriber thread
eventSubscriber = new EventSubscriber(MessagingUtil.Topics.INITIALIZER_TOPIC.getTopicName(),
Expand All @@ -68,11 +86,11 @@ public void terminate() {
messageDelegator.terminate();
}

public ExecutorService getExecutorService() {
return executorService;
}

public void setExecutorService(ExecutorService executorService) {
this.executorService = executorService;
}
// public ExecutorService getExecutorService() {
// return executorService;
// }
//
// public void setExecutorService(ExecutorService executorService) {
// this.executorService = executorService;
// }
}

0 comments on commit a1ba54c

Please sign in to comment.