Skip to content

Commit

Permalink
added additional logs to kafka compaction topics and partitions assig…
Browse files Browse the repository at this point in the history
…nment instead of subscribe
  • Loading branch information
dmytro-landiak committed May 22, 2024
1 parent 4e43b92 commit 1828375
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,26 +72,27 @@ public void init() {

@Override
public Map<String, ClientSessionInfo> initLoad() throws QueuePersistenceException {
log.info("Loading client sessions.");
log.debug("Starting client sessions initLoad");
long startTime = System.nanoTime();
long totalMessageCount = 0L;

String dummySessionClientId = persistDummySession();

clientSessionConsumer.subscribe();
clientSessionConsumer.assignOrSubscribe();

List<TbProtoQueueMsg<QueueProtos.ClientSessionInfoProto>> messages;
boolean encounteredDummySession = false;
Map<String, ClientSessionInfo> allClientSessions = new HashMap<>();
do {
try {
// TODO: think how to migrate data inside of the Kafka (in case of any changes to the protocol)
messages = clientSessionConsumer.poll(pollDuration);
int packSize = messages.size();
log.debug("Read {} client session messages from single poll", packSize);
totalMessageCount += packSize;
for (TbProtoQueueMsg<QueueProtos.ClientSessionInfoProto> msg : messages) {
String clientId = msg.getKey();
if (isClientSessionInfoProtoEmpty(msg.getValue())) {
// this means Kafka log compaction service haven't cleared empty message yet
if (log.isDebugEnabled()) {
log.debug("[{}] Encountered empty ClientSessionInfo.", clientId);
}
log.trace("[{}] Encountered empty ClientSessionInfo.", clientId);
allClientSessions.remove(clientId);
} else {
ClientSessionInfo clientSession = ProtoConverter.convertToClientSessionInfo(msg.getValue());
Expand All @@ -113,21 +114,22 @@ public Map<String, ClientSessionInfo> initLoad() throws QueuePersistenceExceptio

initializing = false;

if (log.isDebugEnabled()) {
long endTime = System.nanoTime();
log.debug("Finished client session messages initLoad for {} messages within time: {} nanos", totalMessageCount, endTime - startTime);
}

return allClientSessions;
}

@Override
public void listen(ClientSessionChangesCallback callback) {
// TODO: if 'serviceId' of session == 'currentServiceId' -> it's OK, else we need to ensure that all events from other services are consumed (we can publish blank msg for that client)
// need to have 'versionId' to check if ClientSession is updated based on the correct value
if (initializing) {
throw new RuntimeException("Cannot start listening before initialization is finished.");
}
// TODO: add concurrent consumers for multiple partitions
consumerExecutor.execute(() -> {
while (!stopped) {
try {
// TODO: test what happens if we got disconnected and connected again (will we read all msgs from beginning?)
List<TbProtoQueueMsg<QueueProtos.ClientSessionInfoProto>> messages = clientSessionConsumer.poll(pollDuration);
if (messages.isEmpty()) {
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,23 +75,27 @@ public RetainedMsgConsumerImpl(RetainedMsgQueueFactory retainedMsgQueueFactory,

@Override
public Map<String, RetainedMsg> initLoad() throws QueuePersistenceException {
String dummyTopic = persistDummyRetainedMsg();
log.debug("Starting retained messages initLoad");
long startTime = System.nanoTime();
long totalMessageCount = 0L;

retainedMsgConsumer.subscribe();
String dummyTopic = persistDummyRetainedMsg();
retainedMsgConsumer.assignOrSubscribe();

List<TbProtoQueueMsg<QueueProtos.RetainedMsgProto>> messages;
boolean encounteredDummyTopic = false;
Map<String, RetainedMsg> allRetainedMsgs = new HashMap<>();
do {
try {
messages = retainedMsgConsumer.poll(pollDuration);
int packSize = messages.size();
log.debug("Read {} retained messages from single poll", packSize);
totalMessageCount += packSize;
for (TbProtoQueueMsg<QueueProtos.RetainedMsgProto> msg : messages) {
String topic = msg.getKey();
if (isRetainedMsgProtoEmpty(msg.getValue())) {
// this means Kafka log compaction service haven't cleared empty message yet
if (log.isDebugEnabled()) {
log.debug("[{}] Encountered empty RetainedMsg.", topic);
}
log.trace("[{}] Encountered empty RetainedMsg.", topic);
allRetainedMsgs.remove(topic);
} else {
RetainedMsg retainedMsg = convertToRetainedMsg(msg);
Expand All @@ -113,6 +117,11 @@ public Map<String, RetainedMsg> initLoad() throws QueuePersistenceException {

initializing = false;

if (log.isDebugEnabled()) {
long endTime = System.nanoTime();
log.debug("Finished retained messages initLoad for {} messages within time: {} nanos", totalMessageCount, endTime - startTime);
}

return allRetainedMsgs;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,29 +73,30 @@ public ClientSubscriptionConsumerImpl(ClientSubscriptionsQueueFactory clientSubs

@Override
public Map<String, Set<TopicSubscription>> initLoad() throws QueuePersistenceException {
String dummyClientId = persistDummyClientSubscriptions();
log.debug("Starting subscriptions initLoad");
long startTime = System.nanoTime();
long totalMessageCount = 0L;

clientSubscriptionsConsumer.subscribe();
String dummyClientId = persistDummyClientSubscriptions();
clientSubscriptionsConsumer.assignOrSubscribe();

List<TbProtoQueueMsg<QueueProtos.ClientSubscriptionsProto>> messages;
boolean encounteredDummyClient = false;
Map<String, Set<TopicSubscription>> allSubscriptions = new HashMap<>();
do {
try {
// TODO: think how to migrate data inside of the Kafka (in case of any changes to the protocol)
messages = clientSubscriptionsConsumer.poll(pollDuration);
int packSize = messages.size();
log.debug("Read {} subscription messages from single poll", packSize);
totalMessageCount += packSize;
for (TbProtoQueueMsg<QueueProtos.ClientSubscriptionsProto> msg : messages) {
String clientId = msg.getKey();
// TODO: replace with events (instead of storing the whole state) - but need to think about the logic when and how to make snapshots (so that we don't need to store all event log)
// also think about order of messages (sub A -> unsub A is different than unsub A -> sub A)
Set<TopicSubscription> clientSubscriptions = ProtoConverter.convertProtoToClientSubscriptions(msg.getValue());
if (dummyClientId.equals(clientId)) {
encounteredDummyClient = true;
} else if (clientSubscriptions.isEmpty()) {
// this means Kafka log compaction service haven't cleared empty message yet
if (log.isDebugEnabled()) {
log.debug("[{}] Encountered empty ClientSubscriptions.", clientId);
}
log.trace("[{}] Encountered empty ClientSubscriptions.", clientId);
allSubscriptions.remove(clientId);
} else {
allSubscriptions.put(clientId, clientSubscriptions);
Expand All @@ -112,6 +113,11 @@ public Map<String, Set<TopicSubscription>> initLoad() throws QueuePersistenceExc

initializing = false;

if (log.isDebugEnabled()) {
long endTime = System.nanoTime();
log.debug("Finished subscriptions initLoad for {} messages within time: {} nanos", totalMessageCount, endTime - startTime);
}

return allSubscriptions;
}

Expand All @@ -120,7 +126,6 @@ public void listen(ClientSubscriptionChangesCallback callback) {
if (initializing) {
throw new RuntimeException("Cannot start listening before initialization is finished.");
}
// TODO: add concurrent consumers for multiple partitions (in the same consumer-group as InitLoader)
consumerExecutor.execute(() -> {
while (!stopped) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,24 @@
*/
package org.thingsboard.mqtt.broker.queue;

import java.util.Map;
import java.util.Optional;

public interface TbQueueControlledOffsetConsumer<T extends TbQueueMsg> extends TbQueueConsumer<T> {

void commit(int partition, long offset);

void assignPartition(int partition);

void assignAllPartitions();

void assignOrSubscribe();

void seekToTheBeginning();

long getEndOffset(String topic, int partition);

Optional<Long> getCommittedOffset(String topic, int partition);

Map<String, String> getTopicConfigs();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.mqtt.broker.queue.TbQueueControlledOffsetConsumer;
import org.thingsboard.mqtt.broker.queue.TbQueueMsg;
import org.thingsboard.mqtt.broker.queue.constants.QueueConstants;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -76,6 +77,21 @@ public void assignAllPartitions() {
}
}

@Override
public void assignOrSubscribe() {
int partitions = 1;
String configuredPartitions = getTopicConfigs().get(QueueConstants.PARTITIONS);
if (configuredPartitions != null) {
partitions = Integer.parseInt(configuredPartitions);
}
log.debug("Found {} partitions for {} topic", partitions, getTopic());
if (partitions == 1) {
assignPartition(0);
} else {
subscribe();
}
}

@Override
public void unsubscribeAndClose() {
stopped = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package org.thingsboard.mqtt.broker.queue.kafka;

import lombok.Builder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand Down Expand Up @@ -50,6 +51,7 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQue
private final TbQueueAdmin admin;
private final KafkaConsumer<String, byte[]> consumer;
private final TbKafkaDecoder<T> decoder;
@Getter
private final Map<String, String> topicConfigs;

private final TbKafkaConsumerStatsService statsService;
Expand All @@ -59,9 +61,8 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQue
private final long closeTimeoutMs;
private final boolean createTopicIfNotExists;


/*
Not thread-safe
/**
* Not thread-safe
*/

@Builder
Expand Down

0 comments on commit 1828375

Please sign in to comment.