Skip to content

Commit

Permalink
Split sub and retain trees.
Browse files Browse the repository at this point in the history
  • Loading branch information
ralight committed Nov 22, 2019
1 parent d003fed commit d92360d
Show file tree
Hide file tree
Showing 15 changed files with 599 additions and 359 deletions.
2 changes: 2 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ set (MOSQ_SRCS
../lib/property_mosq.c ../lib/property_mosq.h
read_handle.c
../lib/read_handle.h
retain.c
security.c security_default.c
../lib/send_mosq.c ../lib/send_mosq.h
send_auth.c
Expand All @@ -57,6 +58,7 @@ set (MOSQ_SRCS
sys_tree.c sys_tree.h
../lib/time_mosq.c
../lib/tls_mosq.c
topic_tok.c
../lib/util_mosq.c ../lib/util_topic.c ../lib/util_mosq.h
../lib/utf8_mosq.c
websockets.c
Expand Down
8 changes: 8 additions & 0 deletions src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ OBJS= mosquitto.o \
persist_write_v5.o \
plugin.o \
read_handle.o \
retain.o \
security.o \
security_default.o \
send_auth.o \
Expand All @@ -64,6 +65,7 @@ OBJS= mosquitto.o \
subs.o \
sys_tree.o \
time_mosq.o \
topic_tok.o \
tls_mosq.o \
utf8_mosq.o \
util_mosq.o \
Expand Down Expand Up @@ -189,6 +191,9 @@ plugin.o : plugin.c mosquitto_plugin.h mosquitto_broker_internal.h
read_handle.o : read_handle.c mosquitto_broker_internal.h
${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@

retain.o : retain.c mosquitto_broker_internal.h
${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@

security.o : security.c mosquitto_broker_internal.h
${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@

Expand Down Expand Up @@ -246,6 +251,9 @@ time_mosq.o : ../lib/time_mosq.c ../lib/time_mosq.h
tls_mosq.o : ../lib/tls_mosq.c
${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@

topic_tok.o : topic_tok.c mosquitto_broker_internal.h
${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@

util_mosq.o : ../lib/util_mosq.c ../lib/util_mosq.h
${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@

Expand Down
4 changes: 2 additions & 2 deletions src/bridge.c
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ int bridge__connect_step1(struct mosquitto_db *db, struct mosquitto *context)
&db->subs) > 0){
return 1;
}
sub__retain_queue(db, context,
retain__queue(db, context,
context->bridge->topics[i].local_topic,
context->bridge->topics[i].qos, 0);
}
Expand Down Expand Up @@ -507,7 +507,7 @@ int bridge__on_connect(struct mosquitto_db *db, struct mosquitto *context)
}
for(i=0; i<context->bridge->topic_count; i++){
if(context->bridge->topics[i].direction == bd_out || context->bridge->topics[i].direction == bd_both){
sub__retain_queue(db, context,
retain__queue(db, context,
context->bridge->topics[i].local_topic,
context->bridge->topics[i].qos, 0);
}
Expand Down
15 changes: 11 additions & 4 deletions src/database.c
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ int db__open(struct mosquitto__config *config, struct mosquitto_db *db)
subhier = sub__add_hier_entry(NULL, &db->subs, "$SYS", strlen("$SYS"));
if(!subhier) return MOSQ_ERR_NOMEM;

retain__init(db);

db->unpwd = NULL;

#ifdef WITH_PERSISTENCE
Expand All @@ -156,9 +158,6 @@ static void subhier_clean(struct mosquitto_db *db, struct mosquitto__subhier **s
mosquitto__free(leaf);
leaf = nextleaf;
}
if(peer->retained){
db__msg_store_ref_dec(db, &peer->retained);
}
subhier_clean(db, &peer->children);
mosquitto__free(peer->topic);

Expand All @@ -170,6 +169,7 @@ static void subhier_clean(struct mosquitto_db *db, struct mosquitto__subhier **s
int db__close(struct mosquitto_db *db)
{
subhier_clean(db, &db->subs);
retain__clean(db, &db->retains);
db__msg_store_clean(db);

return MOSQ_ERR_SUCCESS;
Expand Down Expand Up @@ -626,6 +626,10 @@ int db__messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context,
}
if(db__message_store(db, context, 0, topic_heap, qos, payloadlen, &payload_uhpa, retain, &stored, message_expiry_interval, local_properties, 0, origin)) return 1;

if(retain){
stored->ref_count++;
}

return sub__messages_queue(db, source_id, topic_heap, qos, retain, &stored);
}

Expand Down Expand Up @@ -911,7 +915,10 @@ int db__message_release_incoming(struct mosquitto_db *db, struct mosquitto *cont
* denied/dropped and is being processed so the client doesn't
* keep resending it. That means we don't send it to other
* clients. */
if(!topic){
if(retain){
tail->store->ref_count++;
}
if(topic == NULL){
db__message_remove(db, &context->msgs_in, tail);
deleted = true;
}else{
Expand Down
4 changes: 2 additions & 2 deletions src/handle_subscribe.c
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,13 @@ int handle__subscribe(struct mosquitto_db *db, struct mosquitto *context)
}
if(context->protocol == mosq_p_mqtt311 || context->protocol == mosq_p_mqtt31){
if(rc2 == MOSQ_ERR_SUCCESS || rc2 == MOSQ_ERR_SUB_EXISTS){
if(sub__retain_queue(db, context, sub, qos, 0)) rc = 1;
if(retain__queue(db, context, sub, qos, 0)) rc = 1;
}
}else{
if((retain_handling == MQTT_SUB_OPT_SEND_RETAIN_ALWAYS)
|| (rc2 == MOSQ_ERR_SUCCESS && retain_handling == MQTT_SUB_OPT_SEND_RETAIN_NEW)){

if(sub__retain_queue(db, context, sub, qos, subscription_identifier)) rc = 1;
if(retain__queue(db, context, sub, qos, subscription_identifier)) rc = 1;
}
}

Expand Down
25 changes: 25 additions & 0 deletions src/mosquitto_broker_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,20 @@ struct mosquitto__subhier {
struct mosquitto__subhier *children;
struct mosquitto__subleaf *subs;
struct mosquitto__subshared *shared;
char *topic;
uint16_t topic_len;
};

struct sub__token {
struct sub__token *next;
char *topic;
uint16_t topic_len;
};

struct mosquitto__retainhier {
UT_hash_handle hh;
struct mosquitto__retainhier *parent;
struct mosquitto__retainhier *children;
struct mosquitto_msg_store *retained;
char *topic;
uint16_t topic_len;
Expand Down Expand Up @@ -428,6 +442,7 @@ struct mosquitto__acl_user{
struct mosquitto_db{
dbid_t last_db_id;
struct mosquitto__subhier *subs;
struct mosquitto__retainhier *retains;
struct mosquitto__unpwd *unpwd;
struct mosquitto__unpwd *psk_id;
struct mosquitto *contexts_by_id;
Expand Down Expand Up @@ -650,6 +665,8 @@ void sub__tree_print(struct mosquitto__subhier *root, int level);
int sub__clean_session(struct mosquitto_db *db, struct mosquitto *context);
int sub__retain_queue(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int sub_qos, uint32_t subscription_identifier);
int sub__messages_queue(struct mosquitto_db *db, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store **stored);
int sub__topic_tokenise(const char *subtopic, struct sub__token **topics);
void sub__topic_tokens_free(struct sub__token *tokens);

/* ============================================================
* Context functions
Expand Down Expand Up @@ -701,6 +718,14 @@ int property__process_connect(struct mosquitto *context, mosquitto_property **pr
int property__process_will(struct mosquitto *context, struct mosquitto_message_all *msg, mosquitto_property **props);
int property__process_disconnect(struct mosquitto *context, mosquitto_property **props);

/* ============================================================
* Retain tree related functions
* ============================================================ */
int retain__init(struct mosquitto_db *db);
void retain__clean(struct mosquitto_db *db, struct mosquitto__retainhier **retainhier);
int retain__queue(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int sub_qos, uint32_t subscription_identifier);
int retain__store(struct mosquitto_db *db, const char *topic, struct mosquitto_msg_store *stored, struct sub__token *tokens);

/* ============================================================
* Security related functions
* ============================================================ */
Expand Down
2 changes: 1 addition & 1 deletion src/persist_read.c
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ static int persist__retain_chunk_restore(struct mosquitto_db *db, FILE *db_fptr)

HASH_FIND(hh, db->msg_store_load, &chunk.F.store_id, sizeof(dbid_t), load);
if(load){
sub__messages_queue(db, NULL, load->store->topic, load->store->qos, load->store->retain, &load->store);
retain__store(db, load->store->topic, load->store, NULL);
}else{
log__printf(NULL, MOSQ_LOG_ERR, "Error: Corrupt database whilst restoring a retained message.");
return MOSQ_ERR_INVAL;
Expand Down
60 changes: 42 additions & 18 deletions src/persist_write.c
Original file line number Diff line number Diff line change
Expand Up @@ -190,17 +190,15 @@ static int persist__client_save(struct mosquitto_db *db, FILE *db_fptr)
}


static int persist__subs_retain_save(struct mosquitto_db *db, FILE *db_fptr, struct mosquitto__subhier *node, const char *topic, int level)
static int persist__subs_save(struct mosquitto_db *db, FILE *db_fptr, struct mosquitto__subhier *node, const char *topic, int level)
{
struct mosquitto__subhier *subhier, *subhier_tmp;
struct mosquitto__subleaf *sub;
struct P_retain retain_chunk;
struct P_sub sub_chunk;
char *thistopic;
size_t slen;
int rc;

memset(&retain_chunk, 0, sizeof(struct P_retain));
memset(&sub_chunk, 0, sizeof(struct P_sub));

slen = strlen(topic) + node->topic_len + 2;
Expand Down Expand Up @@ -231,32 +229,57 @@ static int persist__subs_retain_save(struct mosquitto_db *db, FILE *db_fptr, str
}
sub = sub->next;
}
if(node->retained){
if(strncmp(node->retained->topic, "$SYS", 4)){
/* Don't save $SYS messages. */
retain_chunk.F.store_id = node->retained->db_id;
rc = persist__chunk_retain_write_v5(db_fptr, &retain_chunk);
if(rc){
mosquitto__free(thistopic);
return rc;
}
}
}

HASH_ITER(hh, node->children, subhier, subhier_tmp){
persist__subs_retain_save(db, db_fptr, subhier, thistopic, level+1);
persist__subs_save(db, db_fptr, subhier, thistopic, level+1);
}
mosquitto__free(thistopic);
return MOSQ_ERR_SUCCESS;
}

static int persist__subs_retain_save_all(struct mosquitto_db *db, FILE *db_fptr)
static int persist__subs_save_all(struct mosquitto_db *db, FILE *db_fptr)
{
struct mosquitto__subhier *subhier, *subhier_tmp;

HASH_ITER(hh, db->subs, subhier, subhier_tmp){
if(subhier->children){
persist__subs_retain_save(db, db_fptr, subhier->children, "", 0);
persist__subs_save(db, db_fptr, subhier->children, "", 0);
}
}

return MOSQ_ERR_SUCCESS;
}

static int persist__retain_save(struct mosquitto_db *db, FILE *db_fptr, struct mosquitto__retainhier *node, int level)
{
struct mosquitto__retainhier *retainhier, *retainhier_tmp;
struct P_retain retain_chunk;
int rc;

memset(&retain_chunk, 0, sizeof(struct P_retain));

if(node->retained && strncmp(node->retained->topic, "$SYS", 4)){
/* Don't save $SYS messages. */
retain_chunk.F.store_id = node->retained->db_id;
rc = persist__chunk_retain_write_v5(db_fptr, &retain_chunk);
if(rc){
return rc;
}
}

HASH_ITER(hh, node->children, retainhier, retainhier_tmp){
persist__retain_save(db, db_fptr, retainhier, level+1);
}
return MOSQ_ERR_SUCCESS;
}

static int persist__retain_save_all(struct mosquitto_db *db, FILE *db_fptr)
{
struct mosquitto__retainhier *retainhier, *retainhier_tmp;

HASH_ITER(hh, db->retains, retainhier, retainhier_tmp){
if(retainhier->children){
persist__retain_save(db, db_fptr, retainhier->children, 0);
}
}

Expand Down Expand Up @@ -342,7 +365,8 @@ int persist__backup(struct mosquitto_db *db, bool shutdown)
}

persist__client_save(db, db_fptr);
persist__subs_retain_save_all(db, db_fptr);
persist__subs_save_all(db, db_fptr);
persist__retain_save_all(db, db_fptr);

#ifndef WIN32
/**
Expand Down

0 comments on commit d92360d

Please sign in to comment.