Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
making TopologyEventReceiver a singleton and fixing references
Browse files Browse the repository at this point in the history
  • Loading branch information
Isuru Haththotuwa committed Dec 24, 2015
1 parent 78db9f1 commit b3dc546
Show file tree
Hide file tree
Showing 14 changed files with 163 additions and 157 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -510,16 +510,16 @@ protected void onEvent(Event event) {
/**
* Terminate load balancer topology receiver thread.
*/
public void terminate() {
topologyEventReceiver.terminate();
terminated = true;
}

public ExecutorService getExecutorService() {
return executorService;
}

public void setExecutorService(ExecutorService executorService) {
this.executorService = executorService;
}
// public void terminate() {
// topologyEventReceiver.terminate();
// terminated = true;
// }
//
// public ExecutorService getExecutorService() {
// return executorService;
// }
//
// public void setExecutorService(ExecutorService executorService) {
// this.executorService = executorService;
// }
}
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,8 @@ private void executeCoordinatorTasks()

// Start topology receiver
asTopologyReceiver = new AutoscalerTopologyEventReceiver();
asTopologyReceiver.setExecutorService(executorService);
asTopologyReceiver.execute();
// asTopologyReceiver.setExecutorService(executorService);
//asTopologyReceiver.execute();
if (log.isDebugEnabled()) {
log.debug("Topology receiver executor service started");
}
Expand Down Expand Up @@ -245,13 +245,13 @@ private void syncInMemoryWithRegistry() throws AutoScalingPolicyAlreadyExistExce
}

protected void deactivate(ComponentContext context) {
if (asTopologyReceiver != null) {
try {
asTopologyReceiver.terminate();
} catch (Exception e) {
log.warn("An error occurred while terminating autoscaler topology event receiver", e);
}
}
// if (asTopologyReceiver != null) {
// try {
// asTopologyReceiver.terminate();
// } catch (Exception e) {
// log.warn("An error occurred while terminating autoscaler topology event receiver", e);
// }
// }

if (autoscalerHealthStatEventReceiver != null) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ public void run() {
}

// Start topology event receiver thread
registerTopologyEventListeners();
if (log.isInfoEnabled()) {
log.info("Cartridge agent registerTopologyEventListeners done");
}
// registerTopologyEventListeners();
// if (log.isInfoEnabled()) {
// log.info("Cartridge agent registerTopologyEventListeners done");
// }

if (log.isInfoEnabled()) {
log.info("Waiting for CompleteTopologyEvent..");
Expand Down Expand Up @@ -186,16 +186,16 @@ protected void registerInstanceNotifierEventListeners() {
}
}

protected void registerTopologyEventListeners() {
if (log.isDebugEnabled()) {
log.debug("registerTopologyEventListeners before");
}
eventListenerns.startTopologyEventReceiver();

if (log.isDebugEnabled()) {
log.debug("registerTopologyEventListeners after");
}
}
// protected void registerTopologyEventListeners() {
// if (log.isDebugEnabled()) {
// log.debug("registerTopologyEventListeners before");
// }
// eventListenerns.startTopologyEventReceiver();
//
// if (log.isDebugEnabled()) {
// log.debug("registerTopologyEventListeners after");
// }
// }

// protected void registerTenantEventListeners() {
// if (log.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,24 +94,24 @@ public CartridgeAgentEventListeners() {
}
}

public void startTopologyEventReceiver() {

if (log.isDebugEnabled()) {
log.debug("Starting cartridge agent topology event message receiver");
}

eventListenerExecutorService.submit(new Runnable() {
@Override
public void run() {
topologyEventReceiver.execute();
}
});

if (log.isInfoEnabled()) {
log.info("Cartridge agent topology receiver thread started, waiting for event messages ...");
}

}
// public void startTopologyEventReceiver() {
//
// if (log.isDebugEnabled()) {
// log.debug("Starting cartridge agent topology event message receiver");
// }
//
// eventListenerExecutorService.submit(new Runnable() {
// @Override
// public void run() {
// topologyEventReceiver.execute();
// }
// });
//
// if (log.isInfoEnabled()) {
// log.info("Cartridge agent topology receiver thread started, waiting for event messages ...");
// }
//
// }

public void startInstanceNotifierReceiver() {

Expand All @@ -131,24 +131,24 @@ public void run() {
}
}

public void startTenantEventReceiver() {

if (log.isDebugEnabled()) {
log.debug("Starting cartridge agent tenant event message receiver");
}

eventListenerExecutorService.submit(new Runnable() {
@Override
public void run() {
topologyEventReceiver.execute();
}
});

if (log.isInfoEnabled()) {
log.info("Cartridge agent tenant receiver thread started, waiting for event messages ...");
}

}
// public void startTenantEventReceiver() {
//
// if (log.isDebugEnabled()) {
// log.debug("Starting cartridge agent tenant event message receiver");
// }
//
// eventListenerExecutorService.submit(new Runnable() {
// @Override
// public void run() {
// topologyEventReceiver.execute();
// }
// });
//
// if (log.isInfoEnabled()) {
// log.info("Cartridge agent tenant receiver thread started, waiting for event messages ...");
// }
//
// }

// public void startApplicationsEventReceiver() {
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
import org.apache.stratos.messaging.message.receiver.application.ApplicationsEventReceiver;
import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver;

import java.util.concurrent.ExecutorService;

Expand Down Expand Up @@ -123,8 +124,8 @@ private void startTopologyEventReceiver(ExecutorService executorService, Topolog
addTopologyEventListeners(topologyEventReceiver);
// Add default topology provider event listeners
topologyEventReceiver.addEventListeners();
topologyEventReceiver.setExecutorService(executorService);
topologyEventReceiver.execute();
// topologyEventReceiver.setExecutorService(executorService);
// topologyEventReceiver.execute();
if (log.isInfoEnabled()) {
log.info("Topology receiver thread started");
}
Expand Down Expand Up @@ -212,12 +213,12 @@ private void startApplicationSignUpEventReceiver(ExecutorService executorService
* @param topologyEventReceiver topology event receiver instance
*/
private void addTopologyEventListeners(final LoadBalancerCommonTopologyEventReceiver topologyEventReceiver) {
topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() {
TopologyEventReceiver.getInstance().addEventListener(new CompleteTopologyEventListener() {

@Override
protected void onEvent(Event event) {
try {
if (!loadBalancerStarted) {
if (!loadBalancerStarted) {
configureAndStart();
}
} catch (Exception e) {
Expand All @@ -228,37 +229,37 @@ protected void onEvent(Event event) {
}
}
});
topologyEventReceiver.addEventListener(new MemberActivatedEventListener() {
TopologyEventReceiver.getInstance().addEventListener(new MemberActivatedEventListener() {
@Override
protected void onEvent(Event event) {
reloadConfiguration();
}
});
topologyEventReceiver.addEventListener(new MemberSuspendedEventListener() {
TopologyEventReceiver.getInstance().addEventListener(new MemberSuspendedEventListener() {
@Override
protected void onEvent(Event event) {
reloadConfiguration();
}
});
topologyEventReceiver.addEventListener(new MemberTerminatedEventListener() {
TopologyEventReceiver.getInstance().addEventListener(new MemberTerminatedEventListener() {
@Override
protected void onEvent(Event event) {
reloadConfiguration();
}
});
topologyEventReceiver.addEventListener(new ClusterRemovedEventListener() {
TopologyEventReceiver.getInstance().addEventListener(new ClusterRemovedEventListener() {
@Override
protected void onEvent(Event event) {
reloadConfiguration();
}
});
topologyEventReceiver.addEventListener(new ServiceRemovedEventListener() {
TopologyEventReceiver.getInstance().addEventListener(new ServiceRemovedEventListener() {
@Override
protected void onEvent(Event event) {
reloadConfiguration();
}
});
topologyEventReceiver.addEventListener(new MemberMaintenanceListener() {
TopologyEventReceiver.getInstance().addEventListener(new MemberMaintenanceListener() {
@Override
protected void onEvent(Event event) {
reloadConfiguration();
Expand Down Expand Up @@ -338,12 +339,12 @@ private void reloadConfiguration() {
* Stop load balancer instance.
*/
public void stop() {
try {
if (topologyEventReceiver != null) {
topologyEventReceiver.terminate();
}
} catch (Exception ignore) {
}
// try {
// if (topologyEventReceiver != null) {
// topologyEventReceiver.terminate();
// }
// } catch (Exception ignore) {
// }

try {
if (statisticsNotifier != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,11 @@ private void startTopologyEventReceiver(ExecutorService executorService, Topolog
}

topologyEventReceiver = new LoadBalancerTopologyEventReceiver(topologyProvider);
topologyEventReceiver.setExecutorService(executorService);
topologyEventReceiver.execute();
if (log.isInfoEnabled()) {
log.info("Topology receiver thread started");
}
// topologyEventReceiver.setExecutorService(executorService);
// topologyEventReceiver.execute();
// if (log.isInfoEnabled()) {
// log.info("Topology receiver thread started");
// }

if (log.isInfoEnabled()) {
if (TopologyServiceFilter.getInstance().isActive()) {
Expand Down Expand Up @@ -257,13 +257,13 @@ protected void deactivate(ComponentContext context) {
}

// Terminate topology receiver
if (topologyEventReceiver != null) {
try {
topologyEventReceiver.terminate();
} catch (Exception e) {
log.warn("An error occurred while terminating topology event receiver", e);
}
}
// if (topologyEventReceiver != null) {
// try {
// topologyEventReceiver.terminate();
// } catch (Exception e) {
// log.warn("An error occurred while terminating topology event receiver", e);
// }
// }

// Terminate application signup event receiver
// if (applicationSignUpEventReceiver != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ public class MetadataTopologyEventReceiver {
private ExecutorService executorService;

public MetadataTopologyEventReceiver() {
this.topologyEventReceiver = new TopologyEventReceiver();
executorService = StratosThreadPool.getExecutorService(Constants.METADATA_SERVICE_THREAD_POOL_ID, 20);
this.topologyEventReceiver = TopologyEventReceiver.getInstance();
// //executorService = StratosThreadPool.getExecutorService(Constants
// .METADATA_SERVICE_THREAD_POOL_ID, 20);
addEventListeners();
}

Expand All @@ -67,21 +68,21 @@ protected void onEvent(Event event) {
});
}

public void execute() {
topologyEventReceiver.setExecutorService(getExecutorService());
topologyEventReceiver.execute();

if (log.isInfoEnabled()) {
log.info("Metadata service topology receiver started.");
}
}

public void terminate() {
topologyEventReceiver.terminate();
if (log.isInfoEnabled()) {
log.info("Metadata service topology receiver stopped.");
}
}
// public void execute() {
// topologyEventReceiver.setExecutorService(getExecutorService());
// topologyEventReceiver.execute();
//
// if (log.isInfoEnabled()) {
// log.info("Metadata service topology receiver started.");
// }
// }
//
// public void terminate() {
// topologyEventReceiver.terminate();
// if (log.isInfoEnabled()) {
// log.info("Metadata service topology receiver stopped.");
// }
// }

public ExecutorService getExecutorService() {
return executorService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class MetadataApiRegistry implements DataStore {

public MetadataApiRegistry() {
metadataTopologyEventReceiver = new MetadataTopologyEventReceiver();
metadataTopologyEventReceiver.execute();
// metadataTopologyEventReceiver.execute();

metadataApplicationEventReceiver = new MetadataApplicationEventReceiver();
metadataApplicationEventReceiver.execute();
Expand Down Expand Up @@ -417,9 +417,9 @@ public static ConcurrentHashMap<String, ReadWriteLock> getApplicationIdToReadWri
return applicationIdToReadWriteLockMap;
}

public void stopTopologyReceiver() {
metadataTopologyEventReceiver.terminate();
}
// public void stopTopologyReceiver() {
// metadataTopologyEventReceiver.terminate();
// }

public void stopApplicationReceiver() {
metadataApplicationEventReceiver.terminate();
Expand Down
Loading

0 comments on commit b3dc546

Please sign in to comment.