Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
Add tenant based metering info to metering dashboard - STRATOS-1654
Browse files Browse the repository at this point in the history
This closes #503
  • Loading branch information
Thanuja authored and Sajith committed Feb 9, 2016
1 parent b9d18ca commit c795440
Show file tree
Hide file tree
Showing 43 changed files with 60,084 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.stratos.manager.exception.ApplicationSignUpException;
import org.apache.stratos.manager.messaging.publisher.ApplicationSignUpEventPublisher;
import org.apache.stratos.manager.registry.RegistryManager;
import org.apache.stratos.manager.statistics.publisher.DASApplicationSignUpDataPublisher;
import org.apache.stratos.manager.user.management.StratosUserManagerUtils;
import org.apache.stratos.messaging.domain.application.signup.ApplicationSignUp;
import org.apache.stratos.messaging.domain.application.signup.ArtifactRepository;

Expand All @@ -37,8 +39,11 @@
public class ApplicationSignUpHandler {

private static final Log log = LogFactory.getLog(ApplicationSignUpHandler.class);

private static final String APPLICATION_SIGNUP_RESOURCE_PATH = "/stratos.manager/application.signups/";
private static final long DEFAULT_APPLICATION_SIGNUP_REMOVED_TIMESTAMP = -1;
private static final long DEFAULT_APPLICATION_SIGNUP_DURATION = 0;
private static DASApplicationSignUpDataPublisher applicationSignUpDataPublisher = DASApplicationSignUpDataPublisher
.getInstance();

private String prepareApplicationSignupResourcePath(String applicationId, int tenantId) {
return APPLICATION_SIGNUP_RESOURCE_PATH + applicationId + "-tenant-" + tenantId;
Expand Down Expand Up @@ -67,21 +72,34 @@ public void addApplicationSignUp(ApplicationSignUp applicationSignUp) throws App

try {
if (log.isInfoEnabled()) {
log.info(String.format("Adding application signup: [application-id] %s [tenant-id] %d",
applicationId, tenantId));
log.info(String.format("Adding application signup: [application-id] %s [tenant-id] %d", applicationId,
tenantId));
}

if (applicationSignUpExist(applicationId, tenantId)) {
throw new RuntimeException(String.format("Tenant has already signed up for application: " +
"[application-id] %s [tenant-id] %d", applicationId, tenantId));
throw new RuntimeException(String.format(
"Tenant has already signed up for application: " + "[application-id] %s [tenant-id] %d",
applicationId, tenantId));
}

// Persist application signup
String resourcePath = prepareApplicationSignupResourcePath(applicationId, tenantId);
long signUpAddedTimestamp = System.currentTimeMillis();
applicationSignUp.setSignupAddedTimestamp(signUpAddedTimestamp);

RegistryManager.getInstance().persist(resourcePath, applicationSignUp);

ApplicationSignUpEventPublisher.publishApplicationSignUpAddedEvent(applicationId, tenantId, clusterIdList);

if (applicationSignUpDataPublisher.isEnabled()) {
if (log.isInfoEnabled()) {
log.info("Publishing application signup added data to DAS");
}
String tenantDomain = StratosUserManagerUtils.getTenantDomain(tenantId);
applicationSignUpDataPublisher.publish(applicationId, tenantId, tenantDomain, signUpAddedTimestamp,
DEFAULT_APPLICATION_SIGNUP_REMOVED_TIMESTAMP, DEFAULT_APPLICATION_SIGNUP_DURATION);
}

if (log.isInfoEnabled()) {
log.info(String.format("Application signup added successfully: [application-id] %s [tenant-id] %d",
applicationId, tenantId));
Expand Down Expand Up @@ -115,6 +133,7 @@ public boolean applicationSignUpExist(String applicationId, int tenantId) throws

/**
* Check application signup availability.
*
* @param applicationId
* @return
* @throws ApplicationSignUpException
Expand Down Expand Up @@ -151,20 +170,33 @@ public boolean applicationSignUpsExist(String applicationId) throws ApplicationS
public void removeApplicationSignUp(String applicationId, int tenantId) throws ApplicationSignUpException {
try {
if (log.isInfoEnabled()) {
log.info(String.format("Removing application signup: [application-id] %s [tenant-id] %d",
applicationId, tenantId));
log.info(String.format("Removing application signup: [application-id] %s [tenant-id] %d", applicationId,
tenantId));
}

if (!applicationSignUpExist(applicationId, tenantId)) {
throw new RuntimeException(String.format("Application signup not found: [application-id] %s " +
"[tenant-id] %d", applicationId, tenantId));
throw new RuntimeException(
String.format("Application signup not found: [application-id] %s " + "[tenant-id] %d",
applicationId, tenantId));
}

String resourcePath = prepareApplicationSignupResourcePath(applicationId, tenantId);
long signUpAddedTimestamp = ((ApplicationSignUp) RegistryManager.getInstance().read(resourcePath))
.getSignupAddedTimestamp();
RegistryManager.getInstance().remove(resourcePath);

ApplicationSignUpEventPublisher.publishApplicationSignUpRemovedEvent(applicationId, tenantId);

if (applicationSignUpDataPublisher.isEnabled()) {
if (log.isInfoEnabled()) {
log.info("Publishing application signup removed data to DAS");
}
long signUpRemovedTimestamp = System.currentTimeMillis();
String tenantDomain = StratosUserManagerUtils.getTenantDomain(tenantId);
applicationSignUpDataPublisher.publish(applicationId, tenantId, tenantDomain, signUpAddedTimestamp,
signUpRemovedTimestamp, signUpRemovedTimestamp - signUpAddedTimestamp);
}

if (log.isInfoEnabled()) {
log.info(String.format("Application signup removed successfully: [application-id] %s [tenant-id] %d",
applicationId, tenantId));
Expand All @@ -184,19 +216,21 @@ public void removeApplicationSignUp(String applicationId, int tenantId) throws A
* @return
* @throws ApplicationSignUpException
*/
public ApplicationSignUp getApplicationSignUp(String applicationId, int tenantId) throws ApplicationSignUpException {
public ApplicationSignUp getApplicationSignUp(String applicationId, int tenantId)
throws ApplicationSignUpException {
try {
if (log.isDebugEnabled()) {
log.debug(String.format("Get application signup: [application-id] %s [tenant-id] %d",
applicationId, tenantId));
log.debug(String.format("Get application signup: [application-id] %s [tenant-id] %d", applicationId,
tenantId));
}

String resourcePath = prepareApplicationSignupResourcePath(applicationId, tenantId);
ApplicationSignUp applicationSignUp = (ApplicationSignUp) RegistryManager.getInstance().read(resourcePath);
return applicationSignUp;
} catch (Exception e) {
String message = String.format("Could not get application signup: [application-id] %s [tenant-id] %d",
applicationId, tenantId);
String message = String
.format("Could not get application signup: [application-id] %s [tenant-id] %d", applicationId,
tenantId);
log.error(message, e);
throw new ApplicationSignUpException(message, e);
}
Expand Down Expand Up @@ -224,8 +258,8 @@ public ApplicationSignUp[] getApplicationSignUps(String applicationId) throws Ap
if (resourcePaths != null) {
for (String resourcePath : resourcePaths) {
if (resourcePath != null) {
ApplicationSignUp applicationSignUp = (ApplicationSignUp)
RegistryManager.getInstance().read(resourcePath);
ApplicationSignUp applicationSignUp = (ApplicationSignUp) RegistryManager.getInstance()
.read(resourcePath);
if (applicationId.equals(applicationSignUp.getApplicationId())) {
applicationSignUps.add(applicationSignUp);
}
Expand Down Expand Up @@ -259,8 +293,8 @@ public List<ApplicationSignUp> getApplicationSignUps() throws ApplicationSignUpE
if (resourcePaths != null) {
for (String resourcePath : resourcePaths) {
if (resourcePath != null) {
ApplicationSignUp applicationSignUp = (ApplicationSignUp)
RegistryManager.getInstance().read(resourcePath);
ApplicationSignUp applicationSignUp = (ApplicationSignUp) RegistryManager.getInstance()
.read(resourcePath);
applicationSignUps.add(applicationSignUp);
}
}
Expand All @@ -281,7 +315,8 @@ public List<ApplicationSignUp> getApplicationSignUps() throws ApplicationSignUpE
* @return
* @throws ApplicationSignUpException
*/
public List<ApplicationSignUp> getApplicationSignUpsForRepository(String repoUrl) throws ApplicationSignUpException {
public List<ApplicationSignUp> getApplicationSignUpsForRepository(String repoUrl)
throws ApplicationSignUpException {
try {
List<ApplicationSignUp> filteredResult = new ArrayList<ApplicationSignUp>();

Expand Down Expand Up @@ -322,8 +357,9 @@ public void updateApplicationSignUp(ApplicationSignUp applicationSignUp) throws
applicationId, tenantId));
}
} catch (Exception e) {
String message = String.format("Could not get application signup: [application-id] %s [tenant-id] %d",
applicationId, tenantId);
String message = String
.format("Could not get application signup: [application-id] %s [tenant-id] %d", applicationId,
tenantId);
log.error(message, e);
throw new ApplicationSignUpException(message, e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.stratos.manager.statistics.publisher;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.common.statistics.publisher.ThriftClientConfig;
import org.apache.stratos.common.statistics.publisher.ThriftStatisticsPublisher;
import org.apache.stratos.common.threading.StratosThreadPool;
import org.apache.stratos.manager.utils.StratosManagerConstants;
import org.wso2.carbon.databridge.commons.Attribute;
import org.wso2.carbon.databridge.commons.AttributeType;
import org.wso2.carbon.databridge.commons.StreamDefinition;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;

/**
* Application signup data publisher.
*/
public class DASApplicationSignUpDataPublisher extends ThriftStatisticsPublisher {

private static final Log log = LogFactory.getLog(DASApplicationSignUpDataPublisher.class);

private static final String DATASTREAM_NAME = "application_signups";
private static final String DATASTREAM_NICKNAME = "Application Signup Statistics";
private static final String DATASTREAM_DESC = "Application signup statistics for generating metering information.";
private static final String VERSION = "1.0.0";
private static final String APPLICATION_ID = "application_id";
private static final String TENANT_ID = "tenant_id";
private static final String TENANT_DOMAIN = "tenant_domain";
private static final String START_TIME = "start_time";
private static final String END_TIME = "end_time";
private static final String DURATION = "duration";

private ExecutorService executorService;

private static DASApplicationSignUpDataPublisher instance;

public enum SignUpAction {Added, Removed}

/**
* Constructor for initializing the data publisher.
*/
private DASApplicationSignUpDataPublisher() {
super(getStreamDefinition(), ThriftClientConfig.DAS_THRIFT_CLIENT_NAME);
int threadPoolSize = Integer.getInteger(StratosManagerConstants.STATS_PUBLISHER_THREAD_POOL_ID,
StratosManagerConstants.STATS_PUBLISHER_THREAD_POOL_SIZE);
executorService = StratosThreadPool
.getExecutorService(StratosManagerConstants.STATS_PUBLISHER_THREAD_POOL_ID, threadPoolSize);
}

public static DASApplicationSignUpDataPublisher getInstance() {
if (instance == null) {
synchronized (DASApplicationSignUpDataPublisher.class) {
if (instance == null) {
instance = new DASApplicationSignUpDataPublisher();
}
}
}
return instance;
}

private static StreamDefinition getStreamDefinition() {
try {
// Create stream definition
StreamDefinition streamDefinition = new StreamDefinition(DATASTREAM_NAME, VERSION);
streamDefinition.setNickName(DATASTREAM_NICKNAME);
streamDefinition.setDescription(DATASTREAM_DESC);
List<Attribute> payloadData = new ArrayList<Attribute>();

// Set payload definition
payloadData.add(new Attribute(APPLICATION_ID, AttributeType.STRING));
payloadData.add(new Attribute(TENANT_ID, AttributeType.INT));
payloadData.add(new Attribute(TENANT_DOMAIN, AttributeType.STRING));
payloadData.add(new Attribute(START_TIME, AttributeType.LONG));
payloadData.add(new Attribute(END_TIME, AttributeType.LONG));
payloadData.add(new Attribute(DURATION, AttributeType.LONG));

streamDefinition.setPayloadData(payloadData);
return streamDefinition;
} catch (Exception e) {
throw new RuntimeException("Could not create stream definition", e);
}
}

/**
* Publish application signup statistics
*
* @param applicationId
* @param tenantId
* @param tenantDomain
* @param startTime
* @param endTime
* @param duration
*/
public void publish(final String applicationId, final int tenantId, final String tenantDomain,
final long startTime, final long endTime, final long duration) {
Runnable publisher = new Runnable() {
@Override public void run() {
if (log.isDebugEnabled()) {
log.debug(String.format("Publishing application signup statistics: [application_id] %s "
+ "[tenant_id] %d [tenant_domain] %s [start_time] %d [end_time] %d "
+ "[duration] %d ", applicationId, tenantId, tenantDomain, startTime, endTime,
duration));
}
//adding payload data
List<Object> payload = new ArrayList<Object>();
payload.add(applicationId);
payload.add(tenantId);
payload.add(tenantDomain);
payload.add(startTime);
payload.add(endTime);
payload.add(duration);
publish(payload.toArray());
}
};
executorService.execute(publisher);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,13 @@
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.common.beans.UserInfoBean;
import org.apache.stratos.manager.user.management.exception.UserManagerException;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.user.api.UserStoreException;
import org.wso2.carbon.user.api.UserStoreManager;
import org.wso2.carbon.user.core.UserCoreConstants;
import org.wso2.carbon.user.core.service.RealmService;
import org.wso2.carbon.user.core.tenant.Tenant;
import org.wso2.carbon.user.core.tenant.TenantManager;

import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -205,4 +209,35 @@ private static String[] getRefinedListOfRolesOfUser(UserStoreManager userStoreMa
throw new UserManagerException(msg, e);
}
}

public static String getTenantDomain(int tenantId) {
if(tenantId == -1234) {
return "carbon.super";
}

TenantManager tenantManager = getTenantManager();
Tenant[] tenants = null;
try {
tenants = (Tenant[]) tenantManager.getAllTenants();
} catch (Exception e) {
String msg = "Error in retrieving the tenant information";
log.error(msg, e);
}

if(tenants != null) {
for(Tenant tenant : tenants) {
if(tenant.getId() == tenantId) {
return tenant.getDomain();
}
}
}
log.warn(String.format("Could not find tenant domain: [tenant-id] %d", tenantId));
return null;
}

public static TenantManager getTenantManager() {
PrivilegedCarbonContext carbonContext = PrivilegedCarbonContext.getThreadLocalCarbonContext();
RealmService realmService = (RealmService) carbonContext.getOSGiService(RealmService.class);
return realmService.getTenantManager();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@
* Stratos manager constants.
*/
public class StratosManagerConstants {

public static final String DEFAULT_CRON = "1 * * * * ? *";
public static final String TENANT_SYNC_TASK_TYPE = "TENANT_SYNC_TASK_TYPE";
public static final String TENANT_SYNC_TASK_NAME = "TENANT_SYNC_TASK";
public static final String APPLICATION_SIGNUP_SYNC_TASK_TYPE = "APPLICATION_SIGNUP_SYNC_TASK_TYPE";
public static final String APPLICATION_SIGNUP_SYNC_TASK_NAME = "APPLICATION_SIGNUP_SYNC_TASK";
public static final String STATS_PUBLISHER_THREAD_POOL_ID = "stratos.manager.stats.publisher.thread.pool";
public static final int STATS_PUBLISHER_THREAD_POOL_SIZE = 10;
}
Loading

0 comments on commit c795440

Please sign in to comment.