Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
adding activator class for messaging and calling terminate of event r…
Browse files Browse the repository at this point in the history
…ecievers in de-activation of te bundle
  • Loading branch information
Isuru Haththotuwa committed Dec 24, 2015
1 parent 63f931f commit 905e140
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,6 @@ public LoadBalancerCommonTopologyEventReceiver(TopologyProvider topologyProvider
}
}

// public void execute() {
// super.execute();
// if (log.isInfoEnabled()) {
// log.info("Load balancer topology receiver thread started");
// }
// }

public void initializeTopology() {
if (initialized) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.common.services.DistributedObjectProvider;
import org.apache.stratos.common.threading.StratosThreadPool;
import org.apache.stratos.load.balancer.common.event.receivers.LoadBalancerCommonApplicationSignUpEventReceiver;
import org.apache.stratos.load.balancer.common.statistics.notifier.LoadBalancerStatisticsNotifier;
import org.apache.stratos.load.balancer.common.topology.TopologyProvider;
Expand All @@ -38,7 +37,6 @@
import org.apache.stratos.load.balancer.event.receivers.LoadBalancerTopologyEventReceiver;
import org.apache.stratos.load.balancer.exception.TenantAwareLoadBalanceEndpointException;
import org.apache.stratos.load.balancer.statistics.LoadBalancerStatisticsCollector;
import org.apache.stratos.load.balancer.util.LoadBalancerConstants;
import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
Expand All @@ -63,7 +61,6 @@

import java.io.File;
import java.util.Collection;
import java.util.concurrent.ExecutorService;

/**
* @scr.component name="org.apache.stratos.load.balancer.internal.LoadBalancerServiceComponent" immediate="true"
Expand All @@ -90,7 +87,6 @@ public class LoadBalancerServiceComponent {
private static final Log log = LogFactory.getLog(LoadBalancerServiceComponent.class);

private boolean activated = false;
private ExecutorService executorService;
private LoadBalancerTopologyEventReceiver topologyEventReceiver;
private TenantEventReceiver tenantEventReceiver;
private LoadBalancerDomainMappingEventReceiver domainMappingEventReceiver;
Expand Down Expand Up @@ -124,11 +120,6 @@ protected void activate(ComponentContext ctxt) {
// Configure topology filters
TopologyFilterConfigurator.configure(configuration);

int threadPoolSize = Integer.getInteger(LoadBalancerConstants.LOAD_BALANCER_THREAD_POOL_SIZE_KEY,
LoadBalancerConstants.LOAD_BALANCER_DEFAULT_THREAD_POOL_SIZE);
executorService = StratosThreadPool.getExecutorService(LoadBalancerConstants.LOAD_BALANCER_THREAD_POOL_ID,
threadPoolSize);

TopologyProvider topologyProvider = LoadBalancerConfiguration.getInstance().getTopologyProvider();
if (topologyProvider == null) {
topologyProvider = new TopologyProvider();
Expand All @@ -137,18 +128,18 @@ protected void activate(ComponentContext ctxt) {

if (configuration.isMultiTenancyEnabled() || configuration.isDomainMappingEnabled()) {
// Start tenant & application signup event receivers
startTenantEventReceiver(executorService);
startApplicationSignUpEventReceiver(executorService, topologyProvider);
startTenantEventReceiver();
startApplicationSignUpEventReceiver(topologyProvider);
}

if (configuration.isDomainMappingEnabled()) {
// Start domain mapping event receiver
startDomainMappingEventReceiver(executorService, topologyProvider);
startDomainMappingEventReceiver(topologyProvider);
}

if (configuration.isTopologyEventListenerEnabled()) {
// Start topology receiver
startTopologyEventReceiver(executorService, topologyProvider);
startTopologyEventReceiver(topologyProvider);
}

if (configuration.isCepStatsPublisherEnabled()) {
Expand All @@ -167,43 +158,28 @@ protected void activate(ComponentContext ctxt) {
}
}

private void startDomainMappingEventReceiver(ExecutorService executorService, TopologyProvider topologyProvider) {
private void startDomainMappingEventReceiver(TopologyProvider topologyProvider) {
if (domainMappingEventReceiver != null) {
return;
}

domainMappingEventReceiver = new LoadBalancerDomainMappingEventReceiver(topologyProvider);
// domainMappingEventReceiver.setExecutorService(executorService);
// domainMappingEventReceiver.execute();
// if (log.isInfoEnabled()) {
// log.info("Domain mapping event receiver thread started");
// }
}

private void startApplicationSignUpEventReceiver(ExecutorService executorService, TopologyProvider topologyProvider) {
private void startApplicationSignUpEventReceiver(TopologyProvider topologyProvider) {
if (applicationSignUpEventReceiver != null) {
return;
}

applicationSignUpEventReceiver = new LoadBalancerCommonApplicationSignUpEventReceiver(topologyProvider);
// applicationSignUpEventReceiver.setExecutorService(executorService);
// applicationSignUpEventReceiver.execute();
if (log.isInfoEnabled()) {
log.info("Application signup event receiver thread started");
}
}

private void startTopologyEventReceiver(ExecutorService executorService, TopologyProvider topologyProvider) {
private void startTopologyEventReceiver(TopologyProvider topologyProvider) {
if (topologyEventReceiver != null) {
return;
}

topologyEventReceiver = new LoadBalancerTopologyEventReceiver(topologyProvider);
// topologyEventReceiver.setExecutorService(executorService);
// topologyEventReceiver.execute();
// if (log.isInfoEnabled()) {
// log.info("Topology receiver thread started");
// }

if (log.isInfoEnabled()) {
if (TopologyServiceFilter.getInstance().isActive()) {
Expand All @@ -223,14 +199,8 @@ private void startTopologyEventReceiver(ExecutorService executorService, Topolog
}
}

private void startTenantEventReceiver(ExecutorService executorService) {

private void startTenantEventReceiver() {
tenantEventReceiver = TenantEventReceiver.getInstance();
// tenantEventReceiver.setExecutorService(executorService);
// tenantEventReceiver.execute();
if (log.isInfoEnabled()) {
log.info("Tenant event receiver thread started");
}
}

private void startStatisticsNotifier(TopologyProvider topologyProvider) {
Expand All @@ -256,33 +226,6 @@ protected void deactivate(ComponentContext context) {
log.warn("An error occurred while removing endpoint deployer", e);
}

// Terminate topology receiver
// if (topologyEventReceiver != null) {
// try {
// topologyEventReceiver.terminate();
// } catch (Exception e) {
// log.warn("An error occurred while terminating topology event receiver", e);
// }
// }

// Terminate application signup event receiver
// if (applicationSignUpEventReceiver != null) {
// try {
// applicationSignUpEventReceiver.terminate();
// } catch (Exception e) {
// log.warn("An error occurred while terminating application signup event receiver", e);
// }
// }

// Terminate domain mapping event receiver
// if (domainMappingEventReceiver != null) {
// try {
// domainMappingEventReceiver.terminate();
// } catch (Exception e) {
// log.warn("An error occurred while terminating domain mapping event receiver", e);
// }
// }

// Terminate statistics notifier
if (statisticsNotifier != null) {
try {
Expand All @@ -291,15 +234,6 @@ protected void deactivate(ComponentContext context) {
log.warn("An error occurred while terminating health statistics notifier", e);
}
}

// Shutdown executor service
if (executorService != null) {
try {
executorService.shutdownNow();
} catch (Exception e) {
log.warn("An error occurred while shutting down load balancer executor service", e);
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.stratos.messaging.internal;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.messaging.message.receiver.application.ApplicationsEventReceiver;
import org.apache.stratos.messaging.message.receiver.application.signup.ApplicationSignUpEventReceiver;
import org.apache.stratos.messaging.message.receiver.cluster.status.ClusterStatusEventReceiver;
import org.apache.stratos.messaging.message.receiver.domain.mapping.DomainMappingEventReceiver;
import org.apache.stratos.messaging.message.receiver.health.stat.HealthStatEventReceiver;
import org.apache.stratos.messaging.message.receiver.initializer.InitializerEventReceiver;
import org.apache.stratos.messaging.message.receiver.tenant.TenantEventReceiver;
import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver;
import org.osgi.service.component.ComponentContext;

/**
* @scr.component name="org.apache.stratos.messaging.internal.MessagingServiceComponent"
* immediate="true"
*/
public class MessagingServiceComponent {

private static final Log log = LogFactory.getLog(MessagingServiceComponent.class);

protected void activate(ComponentContext context) {
try {
log.info("Messaging Service bundle activated");
} catch (Exception e) {
log.error("Could not activate Messaging Service component", e);
}
}

protected void deactivate(ComponentContext context) {
// deactivate all message receivers
try {
ApplicationSignUpEventReceiver.getInstance().terminate();
ApplicationsEventReceiver.getInstance().terminate();
ClusterStatusEventReceiver.getInstance().terminate();
DomainMappingEventReceiver.getInstance().terminate();
HealthStatEventReceiver.getInstance().terminate();
InitializerEventReceiver.getInstance().terminate();
TenantEventReceiver.getInstance().terminate();
TopologyEventReceiver.getInstance().terminate();
log.info("Messaging Service component is deactivated");
} catch (Exception e) {
log.error("Could not de-activate Messaging Service component", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ private void execute() {
}
}

public void terminate() {
eventSubscriber.terminate();
messageDelegator.terminate();
}

public boolean isSubscribed() {
return ((eventSubscriber != null) && (eventSubscriber.isSubscribed()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ public static DomainMappingEventReceiver getInstance () {
return instance;
}

public void terminate() {
eventSubscriber.terminate();
messageDelegator.terminate();
}

private void execute() {
try {
// Start topic subscriber thread
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,9 @@ private void execute() {
}
}
}

public void terminate() {
eventSubscriber.terminate();
messageDelegator.terminate();
}
}

0 comments on commit 905e140

Please sign in to comment.