Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
making TenantEventReceiver a singleton and fixing the references in t…
Browse files Browse the repository at this point in the history
…he components
  • Loading branch information
Isuru Haththotuwa committed Dec 24, 2015
1 parent b3dc546 commit 170c27c
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ public CartridgeAgentEventListeners() {

this.instanceNotifierEventReceiver = new InstanceNotifierEventReceiver();

this.tenantEventReceiver = new TenantEventReceiver();
this.tenantEventReceiver.setExecutorService(eventListenerExecutorService);
this.tenantEventReceiver = TenantEventReceiver.getInstance();
// this.tenantEventReceiver.setExecutorService(eventListenerExecutorService);

extensionHandler = new DefaultExtensionHandler();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,9 @@ private void startTopologyEventReceiver(ExecutorService executorService, Topolog

private void startTenantEventReceiver(ExecutorService executorService) {

tenantEventReceiver = new TenantEventReceiver();
tenantEventReceiver.setExecutorService(executorService);
tenantEventReceiver.execute();
tenantEventReceiver = TenantEventReceiver.getInstance();
// tenantEventReceiver.setExecutorService(executorService);
// tenantEventReceiver.execute();
if (log.isInfoEnabled()) {
log.info("Tenant event receiver thread started");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.common.threading.StratosThreadPool;
import org.apache.stratos.messaging.broker.publish.EventPublisher;
import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
import org.apache.stratos.messaging.broker.subscribe.EventSubscriber;
import org.apache.stratos.messaging.event.initializer.CompleteTenantRequestEvent;
import org.apache.stratos.messaging.listener.EventListener;
import org.apache.stratos.messaging.message.receiver.StratosEventReceiver;
import org.apache.stratos.messaging.util.MessagingUtil;

import java.util.concurrent.ExecutorService;
Expand All @@ -34,28 +36,43 @@
* A thread for receiving tenant information from message broker and
* build tenant information in tenant manager.
*/
public class TenantEventReceiver {
public class TenantEventReceiver extends StratosEventReceiver {
private static final Log log = LogFactory.getLog(TenantEventReceiver.class);
private TenantEventMessageDelegator messageDelegator;
private TenantEventMessageListener messageListener;
private EventSubscriber eventSubscriber;
private ExecutorService executorService;
private static volatile TenantEventReceiver instance;

public TenantEventReceiver() {
private TenantEventReceiver() {
// TODO: make pool size configurable
this.executorService = StratosThreadPool.getExecutorService("tenant-event-receiver", 100);
TenantEventMessageQueue messageQueue = new TenantEventMessageQueue();
this.messageDelegator = new TenantEventMessageDelegator(messageQueue);
this.messageListener = new TenantEventMessageListener(messageQueue);
execute();
}

public static TenantEventReceiver getInstance () {
if (instance == null) {
synchronized (TenantEventReceiver.class) {
if (instance == null) {
instance = new TenantEventReceiver();
}
}
}

return instance;
}

public void addEventListener(EventListener eventListener) {
messageDelegator.addEventListener(eventListener);
}

public void setExecutorService(ExecutorService executorService) {
this.executorService = executorService;
}
// public void setExecutorService(ExecutorService executorService) {
// this.executorService = executorService;
// }

public void execute() {
private void execute() {
try {
// Start topic subscriber thread
eventSubscriber = new EventSubscriber(MessagingUtil.Topics.TENANT_TOPIC.getTopicName(), messageListener);
Expand Down

0 comments on commit 170c27c

Please sign in to comment.