Skip to content

Commit

Permalink
Merge pull request #119 from thingsboard/sessions-limit-update
Browse files Browse the repository at this point in the history
[1.3.1] Sessions limit improvement
  • Loading branch information
dmytro-landiak committed May 29, 2024
2 parents 852adc5 + b316eb8 commit 5dc3ffc
Show file tree
Hide file tree
Showing 16 changed files with 451 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.thingsboard.mqtt.broker.common.data.id.ActorType;
import org.thingsboard.mqtt.broker.common.data.subscription.TopicSubscription;
import org.thingsboard.mqtt.broker.exception.QueuePersistenceException;
import org.thingsboard.mqtt.broker.service.limits.RateLimitCacheService;
import org.thingsboard.mqtt.broker.service.mqtt.client.disconnect.DisconnectClientCommandConsumer;
import org.thingsboard.mqtt.broker.service.mqtt.client.event.ClientSessionEventConsumer;
import org.thingsboard.mqtt.broker.service.mqtt.client.event.ClientSessionEventService;
Expand Down Expand Up @@ -71,6 +72,7 @@ public class BrokerInitializer {

private final ClientSessionEventService clientSessionEventService;
private final ServiceInfoProvider serviceInfoProvider;
private final RateLimitCacheService rateLimitCacheService;

private final ClientSessionEventConsumer clientSessionEventConsumer;
private final PublishMsgConsumerService publishMsgConsumerService;
Expand Down Expand Up @@ -104,6 +106,7 @@ public void onApplicationEvent(ApplicationReadyEvent event) {
Map<String, ClientSessionInfo> initClientSessions() throws QueuePersistenceException {
Map<String, ClientSessionInfo> allClientSessions = clientSessionConsumer.initLoad();
log.info("Loaded {} stored client sessions from Kafka.", allClientSessions.size());
rateLimitCacheService.initSessionCount(allClientSessions.size());

Map<String, ClientSessionInfo> currentNodeSessions = filterAndDisconnectCurrentNodeSessions(allClientSessions);
allClientSessions.putAll(currentNodeSessions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.thingsboard.mqtt.broker.common.data.ClientInfo;
import org.thingsboard.mqtt.broker.common.util.BrokerConstants;
import org.thingsboard.mqtt.broker.service.auth.AuthorizationRuleService;
import org.thingsboard.mqtt.broker.service.limits.RateLimitCacheService;
import org.thingsboard.mqtt.broker.service.limits.RateLimitService;
import org.thingsboard.mqtt.broker.service.mqtt.MqttMessageGenerator;
import org.thingsboard.mqtt.broker.service.mqtt.client.event.ClientSessionEventService;
Expand Down Expand Up @@ -57,6 +58,7 @@ public class DisconnectServiceImpl implements DisconnectService {
private final MqttMessageGenerator mqttMessageGenerator;
private final AuthorizationRuleService authorizationRuleService;
private final FlowControlService flowControlService;
private final RateLimitCacheService rateLimitCacheService;

@Override
public void disconnect(ClientActorStateInfo actorState, MqttDisconnectMsg disconnectMsg) {
Expand Down Expand Up @@ -159,6 +161,10 @@ void clearClientSession(ClientActorStateInfo actorState, MqttDisconnectMsg disco

if (sessionCtx.getSessionInfo().isPersistent()) {
processPersistenceDisconnect(sessionCtx, clientInfo, sessionId);
} else {
if (!DisconnectReasonType.ON_CONFLICTING_SESSIONS.equals(disconnectReasonType)) {
rateLimitCacheService.decrementSessionCount();
}
}

clientSessionCtxService.unregisterSession(clientInfo.getClientId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.thingsboard.mqtt.broker.queue.common.DefaultTbQueueMsgHeaders;
import org.thingsboard.mqtt.broker.queue.common.TbProtoQueueMsg;
import org.thingsboard.mqtt.broker.queue.provider.ClientSessionEventQueueFactory;
import org.thingsboard.mqtt.broker.service.limits.RateLimitCacheService;
import org.thingsboard.mqtt.broker.service.mqtt.client.disconnect.DisconnectClientCommandService;
import org.thingsboard.mqtt.broker.service.mqtt.client.event.ClientSessionEventType;
import org.thingsboard.mqtt.broker.service.mqtt.persistence.MsgPersistenceManager;
Expand Down Expand Up @@ -78,6 +79,7 @@ public class SessionClusterManagerImpl implements SessionClusterManager {
private final MsgPersistenceManager msgPersistenceManager;
private final ApplicationRemovedEventService applicationRemovedEventService;
private final ApplicationTopicService applicationTopicService;
private final RateLimitCacheService rateLimitCacheService;

private TbQueueProducer<TbProtoQueueMsg<QueueProtos.ClientSessionEventResponseProto>> eventResponseProducer;
private ExecutorService eventResponseSenderExecutor;
Expand All @@ -103,16 +105,13 @@ public void init() {
public void processConnectionRequest(SessionInfo sessionInfo, ConnectionRequestInfo requestInfo) {
// It is possible that for some time sessions can be connected with the same clientId to different Nodes
String clientId = sessionInfo.getClientInfo().getClientId();
log.trace("[{}] Processing connection request, sessionId - {}", clientId, sessionInfo.getSessionId());

if (isRequestTimedOut(requestInfo.getRequestTime())) {
log.warn("[{}][{}] Connection request timed out.", clientId, requestInfo.getRequestId());
return;
}

if (log.isTraceEnabled()) {
log.trace("[{}] Processing connection request, sessionId - {}", clientId, sessionInfo.getSessionId());
}

ClientSession currentClientSession = getClientSessionForClient(clientId);
UUID currentClientSessionId = getCurrentClientSessionIdIfPresent(currentClientSession);

Expand Down Expand Up @@ -223,6 +222,7 @@ public void processClearSession(String clientId, UUID sessionId) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Clearing client session.", clientId, currentSessionId);
}
rateLimitCacheService.decrementSessionCount();
clearSessionAndSubscriptions(clientId);
clearPersistedMessages(clientSession.getSessionInfo().getClientInfo());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.mqtt.broker.service.limits;

public interface RateLimitCacheService {

void initSessionCount(int count);

long incrementSessionCount();

void decrementSessionCount();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.mqtt.broker.service.limits;

import com.github.benmanes.caffeine.cache.Cache;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cache.CacheManager;
import org.springframework.stereotype.Service;
import org.thingsboard.mqtt.broker.cache.CacheConstants;

import javax.annotation.PostConstruct;

@Service
@ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "caffeine")
@RequiredArgsConstructor
@Slf4j
public class RateLimitCaffeineCacheServiceImpl implements RateLimitCacheService {

private final CacheManager cacheManager;

private Cache<String, Long> clientSessionsLimitCache;

@Value("${mqtt.sessions-limit:0}")
@Setter
private int sessionsLimit;

@PostConstruct
public void init() {
if (sessionsLimit <= 0) {
return;
}
clientSessionsLimitCache = (Cache<String, Long>) cacheManager.getCache(CacheConstants.CLIENT_SESSIONS_LIMIT_CACHE).getNativeCache();
}

@Override
public void initSessionCount(int count) {
if (sessionsLimit <= 0) {
return;
}
log.debug("Initializing session count");
clientSessionsLimitCache.asMap().putIfAbsent(CacheConstants.CLIENT_SESSIONS_LIMIT_CACHE_KEY, (long) count);
}

@Override
public long incrementSessionCount() {
log.debug("Incrementing session count");
return clientSessionsLimitCache.asMap().compute(CacheConstants.CLIENT_SESSIONS_LIMIT_CACHE_KEY, (k, v) -> (v == null ? 1L : v + 1));
}

@Override
public void decrementSessionCount() {
if (sessionsLimit <= 0) {
return;
}
log.debug("Decrementing session count");
clientSessionsLimitCache.asMap().computeIfPresent(CacheConstants.CLIENT_SESSIONS_LIMIT_CACHE_KEY, (k, v) -> v > 0 ? v - 1 : 0);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.mqtt.broker.service.limits;

import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Service;
import org.thingsboard.mqtt.broker.cache.CacheConstants;

/**
* should not be null since not used in pipeline / transaction.
*/
@Service
@ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "redis")
@RequiredArgsConstructor
@Slf4j
public class RateLimitRedisCacheServiceImpl implements RateLimitCacheService {

private final RedisTemplate<String, Object> redisTemplate;

@Value("${mqtt.sessions-limit:0}")
@Setter
private int sessionsLimit;

@Override
public void initSessionCount(int count) {
if (sessionsLimit <= 0) {
return;
}
ValueOperations<String, Object> valueOps = redisTemplate.opsForValue();
boolean initialized = valueOps.setIfAbsent(CacheConstants.CLIENT_SESSIONS_LIMIT_CACHE_KEY, Integer.toString(count));
if (initialized) {
log.info("Session count initialized to {}", count);
}
}

@Override
public long incrementSessionCount() {
log.debug("Incrementing session count");
ValueOperations<String, Object> valueOps = redisTemplate.opsForValue();
return valueOps.increment(CacheConstants.CLIENT_SESSIONS_LIMIT_CACHE_KEY);
}

@Override
public void decrementSessionCount() {
if (sessionsLimit <= 0) {
return;
}
log.debug("Decrementing session count");
ValueOperations<String, Object> valueOps = redisTemplate.opsForValue();
valueOps.decrement(CacheConstants.CLIENT_SESSIONS_LIMIT_CACHE_KEY);
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class RateLimitServiceImpl implements RateLimitService {
private final IncomingRateLimitsConfiguration incomingRateLimitsConfiguration;
private final OutgoingRateLimitsConfiguration outgoingRateLimitsConfiguration;
private final ClientSessionService clientSessionService;
private final RateLimitCacheService rateLimitCacheService;

@Getter
private ConcurrentMap<String, TbRateLimits> incomingPublishClientLimits;
Expand Down Expand Up @@ -112,14 +113,20 @@ public boolean checkSessionsLimit(String clientId) {
if (sessionsLimit <= 0) {
return true;
}
int clientSessionsCount = clientSessionService.getClientSessionsCount();
if (clientSessionsCount >= sessionsLimit) {
if (log.isTraceEnabled()) {
log.trace("Client sessions count limit detected! Allowed: [{}], current count: [{}]", sessionsLimit, clientSessionsCount);
}
ClientSessionInfo clientSessionInfo = clientSessionService.getClientSessionInfo(clientId);
return clientSessionInfo != null;
long newSessionCount = rateLimitCacheService.incrementSessionCount();

ClientSessionInfo clientSessionInfo = clientSessionService.getClientSessionInfo(clientId);
if (clientSessionInfo != null) {
rateLimitCacheService.decrementSessionCount();
return true;
}

if (newSessionCount > sessionsLimit) {
log.trace("Client sessions count limit detected! Allowed: {} sessions", sessionsLimit);
rateLimitCacheService.decrementSessionCount();
return false;
}

return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.thingsboard.mqtt.broker.cluster.ServiceInfoProvider;
import org.thingsboard.mqtt.broker.common.data.ClientSessionInfo;
import org.thingsboard.mqtt.broker.exception.QueuePersistenceException;
import org.thingsboard.mqtt.broker.service.limits.RateLimitCacheService;
import org.thingsboard.mqtt.broker.service.mqtt.client.disconnect.DisconnectClientCommandConsumer;
import org.thingsboard.mqtt.broker.service.mqtt.client.event.ClientSessionEventConsumer;
import org.thingsboard.mqtt.broker.service.mqtt.client.event.ClientSessionEventService;
Expand All @@ -53,17 +54,17 @@
@ContextConfiguration(classes = BrokerInitializer.class)
public class BrokerInitializerTest {

@MockBean
ClientSubscriptionConsumer clientSubscriptionConsumer;
@MockBean
ClientSessionConsumer clientSessionConsumer;
@MockBean
RetainedMsgConsumer retainedMsgConsumer;
ClientSubscriptionConsumer clientSubscriptionConsumer;
@MockBean
ClientSubscriptionService clientSubscriptionService;
RetainedMsgConsumer retainedMsgConsumer;
@MockBean
ClientSessionService clientSessionService;
@MockBean
ClientSubscriptionService clientSubscriptionService;
@MockBean
RetainedMsgListenerService retainedMsgListenerService;
@MockBean
ActorSystemContext actorSystemContext;
Expand All @@ -74,14 +75,16 @@ public class BrokerInitializerTest {
@MockBean
ServiceInfoProvider serviceInfoProvider;
@MockBean
DisconnectClientCommandConsumer disconnectClientCommandConsumer;
RateLimitCacheService rateLimitCacheService;
@MockBean
ClientSessionEventConsumer clientSessionEventConsumer;
@MockBean
DeviceMsgQueueConsumer deviceMsgQueueConsumer;
@MockBean
PublishMsgConsumerService publishMsgConsumerService;
@MockBean
DisconnectClientCommandConsumer disconnectClientCommandConsumer;
@MockBean
DeviceMsgQueueConsumer deviceMsgQueueConsumer;
@MockBean
BasicDownLinkConsumer basicDownLinkConsumer;
@MockBean
PersistentDownLinkConsumer persistentDownLinkConsumer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.thingsboard.mqtt.broker.common.data.ClientInfo;
import org.thingsboard.mqtt.broker.common.data.SessionInfo;
import org.thingsboard.mqtt.broker.service.auth.AuthorizationRuleService;
import org.thingsboard.mqtt.broker.service.limits.RateLimitCacheService;
import org.thingsboard.mqtt.broker.service.limits.RateLimitService;
import org.thingsboard.mqtt.broker.service.mqtt.MqttMessageGenerator;
import org.thingsboard.mqtt.broker.service.mqtt.client.event.ClientSessionEventService;
Expand Down Expand Up @@ -80,6 +81,8 @@ public class DisconnectServiceImplTest {
AuthorizationRuleService authorizationRuleService;
@MockBean
FlowControlService flowControlService;
@MockBean
RateLimitCacheService rateLimitCacheService;

@SpyBean
DisconnectServiceImpl disconnectService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.thingsboard.mqtt.broker.queue.TbQueueProducer;
import org.thingsboard.mqtt.broker.queue.common.TbProtoQueueMsg;
import org.thingsboard.mqtt.broker.queue.provider.ClientSessionEventQueueFactory;
import org.thingsboard.mqtt.broker.service.limits.RateLimitCacheService;
import org.thingsboard.mqtt.broker.service.mqtt.client.disconnect.DisconnectClientCommandService;
import org.thingsboard.mqtt.broker.service.mqtt.persistence.MsgPersistenceManager;
import org.thingsboard.mqtt.broker.service.mqtt.persistence.application.topic.ApplicationRemovedEventService;
Expand Down Expand Up @@ -77,6 +78,8 @@ public class SessionClusterManagerImplTest {
ApplicationRemovedEventService applicationRemovedEventService;
@MockBean
ApplicationTopicService applicationTopicService;
@MockBean
RateLimitCacheService rateLimitCacheService;

@SpyBean
SessionClusterManagerImpl sessionClusterManager;
Expand Down
Loading

0 comments on commit 5dc3ffc

Please sign in to comment.