Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
making ClusterStatusEventReceiver singleton and fixing references in …
Browse files Browse the repository at this point in the history
…components
  • Loading branch information
Isuru Haththotuwa committed Dec 24, 2015
1 parent c627ff1 commit 27ae4ba
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ private void executeCoordinatorTasks() {
}

clusterStatusTopicReceiver = new ClusterStatusTopicReceiver();
clusterStatusTopicReceiver.setExecutorService(executorService);
clusterStatusTopicReceiver.execute();
// clusterStatusTopicReceiver.setExecutorService(executorService);
// clusterStatusTopicReceiver.execute();

if (log.isInfoEnabled()) {
log.info("Cluster status event receiver thread started");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,28 +32,27 @@
public class ClusterStatusTopicReceiver {
private static final Log log = LogFactory.getLog(ClusterStatusTopicReceiver.class);

private ClusterStatusEventReceiver statusEventReceiver;
private boolean terminated;
private ExecutorService executorService;
private ClusterStatusEventReceiver clusterStatusEventReceiver;
//private boolean terminated;
//private ExecutorService executorService;

public ClusterStatusTopicReceiver() {
this.statusEventReceiver = new ClusterStatusEventReceiver();

this.clusterStatusEventReceiver = ClusterStatusEventReceiver.getInstance();
addEventListeners();
}

public void execute() {
statusEventReceiver.setExecutorService(executorService);
statusEventReceiver.execute();
if (log.isInfoEnabled()) {
log.info("Cloud controller Cluster status thread started");
}

}
// public void execute() {
// clusterStatusEventReceiver.setExecutorService(executorService);
// clusterStatusEventReceiver.execute();
// if (log.isInfoEnabled()) {
// log.info("Cloud controller Cluster status thread started");
// }
//
// }

private void addEventListeners() {
// Listen to topology events that affect clusters
statusEventReceiver.addEventListener(new ClusterStatusClusterResetEventListener() {
clusterStatusEventReceiver.addEventListener(new ClusterStatusClusterResetEventListener() {
@Override
protected void onEvent(Event event) {
try {
Expand All @@ -64,14 +63,14 @@ protected void onEvent(Event event) {
}
});

statusEventReceiver.addEventListener(new ClusterStatusClusterInstanceCreatedEventListener() {
clusterStatusEventReceiver.addEventListener(new ClusterStatusClusterInstanceCreatedEventListener() {
@Override
protected void onEvent(Event event) {
//TopologyBuilder.handleClusterInstanceCreated((ClusterStatusClusterInstanceCreatedEvent) event);
}
});

statusEventReceiver.addEventListener(new ClusterStatusClusterActivatedEventListener() {
clusterStatusEventReceiver.addEventListener(new ClusterStatusClusterActivatedEventListener() {
@Override
protected void onEvent(Event event) {
try {
Expand All @@ -82,7 +81,7 @@ protected void onEvent(Event event) {
}
});

statusEventReceiver.addEventListener(new ClusterStatusClusterTerminatedEventListener() {
clusterStatusEventReceiver.addEventListener(new ClusterStatusClusterTerminatedEventListener() {
@Override
protected void onEvent(Event event) {
try {
Expand All @@ -93,7 +92,7 @@ protected void onEvent(Event event) {
}
});

statusEventReceiver.addEventListener(new ClusterStatusClusterTerminatingEventListener() {
clusterStatusEventReceiver.addEventListener(new ClusterStatusClusterTerminatingEventListener() {
@Override
protected void onEvent(Event event) {
try {
Expand All @@ -104,7 +103,7 @@ protected void onEvent(Event event) {
}
});

statusEventReceiver.addEventListener(new ClusterStatusClusterInactivateEventListener() {
clusterStatusEventReceiver.addEventListener(new ClusterStatusClusterInactivateEventListener() {
@Override
protected void onEvent(Event event) {
try {
Expand All @@ -116,15 +115,15 @@ protected void onEvent(Event event) {
});
}

public void setTerminated(boolean terminated) {
this.terminated = terminated;
}

public ExecutorService getExecutorService() {
return executorService;
}

public void setExecutorService(ExecutorService executorService) {
this.executorService = executorService;
}
// public void setTerminated(boolean terminated) {
// this.terminated = terminated;
// }
//
// public ExecutorService getExecutorService() {
// return executorService;
// }
//
// public void setExecutorService(ExecutorService executorService) {
// this.executorService = executorService;
// }
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,35 +21,50 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.common.threading.StratosThreadPool;
import org.apache.stratos.messaging.broker.subscribe.EventSubscriber;
import org.apache.stratos.messaging.listener.EventListener;
import org.apache.stratos.messaging.message.receiver.StratosEventReceiver;
import org.apache.stratos.messaging.util.MessagingUtil;

import java.util.concurrent.ExecutorService;

/**
* A thread for receiving instance notifier information from message broker.
*/
public class ClusterStatusEventReceiver {
public class ClusterStatusEventReceiver extends StratosEventReceiver {
private static final Log log = LogFactory.getLog(ClusterStatusEventReceiver.class);
private final ClusterStatusEventMessageDelegator messageDelegator;
private final ClusterStatusEventMessageListener messageListener;
private EventSubscriber eventSubscriber;
private boolean terminated;
private ExecutorService executorService;
private static volatile ClusterStatusEventReceiver instance;

public ClusterStatusEventReceiver() {
private ClusterStatusEventReceiver() {
// TODO: make pool size configurable
this.executorService = StratosThreadPool.getExecutorService("clusterstatus-event-receiver", 100);
ClusterStatusEventMessageQueue messageQueue = new ClusterStatusEventMessageQueue();
this.messageDelegator = new ClusterStatusEventMessageDelegator(messageQueue);
this.messageListener = new ClusterStatusEventMessageListener(messageQueue);
execute();
}

public void addEventListener(EventListener eventListener) {
messageDelegator.addEventListener(eventListener);
}

public static ClusterStatusEventReceiver getInstance () {
if (instance == null) {
synchronized (ClusterStatusEventReceiver.class) {
if (instance == null) {
instance = new ClusterStatusEventReceiver();
}
}
}

public void execute() {
return instance;
}

private void execute() {
try {
// Start topic subscriber thread
eventSubscriber = new EventSubscriber(MessagingUtil.Topics.CLUSTER_STATUS_TOPIC.getTopicName(), messageListener);
Expand Down Expand Up @@ -77,17 +92,17 @@ public boolean isSubscribed() {
return ((eventSubscriber != null) && (eventSubscriber.isSubscribed()));
}

public void terminate() {
eventSubscriber.terminate();
messageDelegator.terminate();
terminated = true;
}

public ExecutorService getExecutorService() {
return executorService;
}

public void setExecutorService(ExecutorService executorService) {
this.executorService = executorService;
}
// public void terminate() {
// eventSubscriber.terminate();
// messageDelegator.terminate();
// terminated = true;
// }
//
// public ExecutorService getExecutorService() {
// return executorService;
// }
//
// public void setExecutorService(ExecutorService executorService) {
// this.executorService = executorService;
// }
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ private DomainMappingEventReceiver() {
DomainMappingEventMessageQueue messageQueue = new DomainMappingEventMessageQueue();
this.messageDelegator = new DomainMappingEventMessageDelegator(messageQueue);
this.messageListener = new DomainMappingEventMessageListener(messageQueue);
execute();
}

public void addEventListener(EventListener eventListener) {
Expand All @@ -65,7 +66,7 @@ public static DomainMappingEventReceiver getInstance () {
return instance;
}

public void execute() {
private void execute() {
try {
// Start topic subscriber thread
eventSubscriber = new EventSubscriber(MessagingUtil.Topics.DOMAIN_MAPPING_TOPIC.getTopicName(), messageListener);
Expand All @@ -91,16 +92,16 @@ public void execute() {
}
}

public void terminate() {
eventSubscriber.terminate();
messageDelegator.terminate();
}

public ExecutorService getExecutorService() {
return executorService;
}

public void setExecutorService(ExecutorService executorService) {
this.executorService = executorService;
}
// public void terminate() {
// eventSubscriber.terminate();
// messageDelegator.terminate();
// }
//
// public ExecutorService getExecutorService() {
// return executorService;
// }
//
// public void setExecutorService(ExecutorService executorService) {
// this.executorService = executorService;
// }
}

0 comments on commit 27ae4ba

Please sign in to comment.