Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
making HealthStatEventReceiver singleton and fixing references in com…
Browse files Browse the repository at this point in the history
…ponents
  • Loading branch information
Isuru Haththotuwa committed Dec 24, 2015
1 parent a1ba54c commit 6ac3b87
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,18 @@ public class AutoscalerHealthStatEventReceiver {
private ExecutorService executorService;

public AutoscalerHealthStatEventReceiver() {
this.healthStatEventReceiver = new HealthStatEventReceiver();
this.healthStatEventReceiver = HealthStatEventReceiver.getInstance();
addEventListeners();
}

public void execute() {
healthStatEventReceiver.setExecutorService(executorService);
healthStatEventReceiver.execute();

if (log.isInfoEnabled()) {
log.info("Autoscaler health stat event receiver thread started");
}
}
// public void execute() {
// healthStatEventReceiver.setExecutorService(executorService);
// healthStatEventReceiver.execute();
//
// if (log.isInfoEnabled()) {
// log.info("Autoscaler health stat event receiver thread started");
// }
// }

private void addEventListeners() {
// Listen to health stat events that affect clusters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,8 @@ private void executeCoordinatorTasks()

// Start health stat receiver
autoscalerHealthStatEventReceiver = new AutoscalerHealthStatEventReceiver();
autoscalerHealthStatEventReceiver.setExecutorService(executorService);
autoscalerHealthStatEventReceiver.execute();
// autoscalerHealthStatEventReceiver.setExecutorService(executorService);
// autoscalerHealthStatEventReceiver.execute();
if (log.isDebugEnabled()) {
log.debug("Health statistics receiver thread started");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,36 +21,52 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.common.threading.StratosThreadPool;
import org.apache.stratos.messaging.broker.subscribe.EventSubscriber;
import org.apache.stratos.messaging.listener.EventListener;
import org.apache.stratos.messaging.message.receiver.StratosEventReceiver;
import org.apache.stratos.messaging.util.MessagingUtil;

import java.util.concurrent.ExecutorService;

/**
* A thread for receiving health stat information from message broker
*/
public class HealthStatEventReceiver {
public class HealthStatEventReceiver extends StratosEventReceiver {
private static final Log log = LogFactory.getLog(HealthStatEventReceiver.class);

private final HealthStatEventMessageDelegator messageDelegator;
private final HealthStatEventMessageListener messageListener;
private EventSubscriber eventSubscriber;
private boolean terminated;
private ExecutorService executorService;
private static volatile HealthStatEventReceiver instance;

public HealthStatEventReceiver() {
private HealthStatEventReceiver() {
// TODO: make pool size configurable
this.executorService = StratosThreadPool.getExecutorService("healthstat-event-receiver", 100);
HealthStatEventMessageQueue messageQueue = new HealthStatEventMessageQueue();
this.messageDelegator = new HealthStatEventMessageDelegator(messageQueue);
this.messageListener = new HealthStatEventMessageListener(messageQueue);
execute();
}

public static HealthStatEventReceiver getInstance () {
if (instance == null) {
synchronized (HealthStatEventReceiver.class) {
if (instance == null) {
instance = new HealthStatEventReceiver();
}
}
}

return instance;
}

public void addEventListener(EventListener eventListener) {
messageDelegator.addEventListener(eventListener);
}


public void execute() {
private void execute() {
try {
// Start topic subscriber thread
eventSubscriber = new EventSubscriber(MessagingUtil.Topics.HEALTH_STAT_TOPIC.getTopicName(), messageListener);
Expand All @@ -69,17 +85,17 @@ public void execute() {
}
}

public void terminate() {
eventSubscriber.terminate();
messageDelegator.terminate();
terminated = true;
}

public ExecutorService getExecutorService() {
return executorService;
}

public void setExecutorService(ExecutorService executorService) {
this.executorService = executorService;
}
// public void terminate() {
// eventSubscriber.terminate();
// messageDelegator.terminate();
// terminated = true;
// }
//
// public ExecutorService getExecutorService() {
// return executorService;
// }
//
// public void setExecutorService(ExecutorService executorService) {
// this.executorService = executorService;
// }
}

0 comments on commit 6ac3b87

Please sign in to comment.