diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index b70a87e327..7e4a2fac5e 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -40,6 +40,7 @@ set (MOSQ_SRCS ../lib/property_mosq.c ../lib/property_mosq.h read_handle.c ../lib/read_handle.h + retain.c security.c security_default.c ../lib/send_mosq.c ../lib/send_mosq.h send_auth.c @@ -57,6 +58,7 @@ set (MOSQ_SRCS sys_tree.c sys_tree.h ../lib/time_mosq.c ../lib/tls_mosq.c + topic_tok.c ../lib/util_mosq.c ../lib/util_topic.c ../lib/util_mosq.h ../lib/utf8_mosq.c websockets.c diff --git a/src/Makefile b/src/Makefile index f204102b79..17de07321b 100644 --- a/src/Makefile +++ b/src/Makefile @@ -46,6 +46,7 @@ OBJS= mosquitto.o \ persist_write_v5.o \ plugin.o \ read_handle.o \ + retain.o \ security.o \ security_default.o \ send_auth.o \ @@ -64,6 +65,7 @@ OBJS= mosquitto.o \ subs.o \ sys_tree.o \ time_mosq.o \ + topic_tok.o \ tls_mosq.o \ utf8_mosq.o \ util_mosq.o \ @@ -189,6 +191,9 @@ plugin.o : plugin.c mosquitto_plugin.h mosquitto_broker_internal.h read_handle.o : read_handle.c mosquitto_broker_internal.h ${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@ +retain.o : retain.c mosquitto_broker_internal.h + ${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@ + security.o : security.c mosquitto_broker_internal.h ${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@ @@ -246,6 +251,9 @@ time_mosq.o : ../lib/time_mosq.c ../lib/time_mosq.h tls_mosq.o : ../lib/tls_mosq.c ${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@ +topic_tok.o : topic_tok.c mosquitto_broker_internal.h + ${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@ + util_mosq.o : ../lib/util_mosq.c ../lib/util_mosq.h ${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@ diff --git a/src/bridge.c b/src/bridge.c index 932ee988e3..d8da3a51f6 100644 --- a/src/bridge.c +++ b/src/bridge.c @@ -174,7 +174,7 @@ int bridge__connect_step1(struct mosquitto_db *db, struct mosquitto *context) &db->subs) > 0){ return 1; } - sub__retain_queue(db, context, + retain__queue(db, context, context->bridge->topics[i].local_topic, context->bridge->topics[i].qos, 0); } @@ -507,7 +507,7 @@ int bridge__on_connect(struct mosquitto_db *db, struct mosquitto *context) } for(i=0; ibridge->topic_count; i++){ if(context->bridge->topics[i].direction == bd_out || context->bridge->topics[i].direction == bd_both){ - sub__retain_queue(db, context, + retain__queue(db, context, context->bridge->topics[i].local_topic, context->bridge->topics[i].qos, 0); } diff --git a/src/database.c b/src/database.c index dbfc0b0fdc..57c4eac846 100644 --- a/src/database.c +++ b/src/database.c @@ -135,6 +135,8 @@ int db__open(struct mosquitto__config *config, struct mosquitto_db *db) subhier = sub__add_hier_entry(NULL, &db->subs, "$SYS", strlen("$SYS")); if(!subhier) return MOSQ_ERR_NOMEM; + retain__init(db); + db->unpwd = NULL; #ifdef WITH_PERSISTENCE @@ -156,9 +158,6 @@ static void subhier_clean(struct mosquitto_db *db, struct mosquitto__subhier **s mosquitto__free(leaf); leaf = nextleaf; } - if(peer->retained){ - db__msg_store_ref_dec(db, &peer->retained); - } subhier_clean(db, &peer->children); mosquitto__free(peer->topic); @@ -170,6 +169,7 @@ static void subhier_clean(struct mosquitto_db *db, struct mosquitto__subhier **s int db__close(struct mosquitto_db *db) { subhier_clean(db, &db->subs); + retain__clean(db, &db->retains); db__msg_store_clean(db); return MOSQ_ERR_SUCCESS; @@ -626,6 +626,10 @@ int db__messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context, } if(db__message_store(db, context, 0, topic_heap, qos, payloadlen, &payload_uhpa, retain, &stored, message_expiry_interval, local_properties, 0, origin)) return 1; + if(retain){ + stored->ref_count++; + } + return sub__messages_queue(db, source_id, topic_heap, qos, retain, &stored); } @@ -911,7 +915,10 @@ int db__message_release_incoming(struct mosquitto_db *db, struct mosquitto *cont * denied/dropped and is being processed so the client doesn't * keep resending it. That means we don't send it to other * clients. */ - if(!topic){ + if(retain){ + tail->store->ref_count++; + } + if(topic == NULL){ db__message_remove(db, &context->msgs_in, tail); deleted = true; }else{ diff --git a/src/handle_subscribe.c b/src/handle_subscribe.c index 2f3eb30592..e9fdfa9ac7 100644 --- a/src/handle_subscribe.c +++ b/src/handle_subscribe.c @@ -171,13 +171,13 @@ int handle__subscribe(struct mosquitto_db *db, struct mosquitto *context) } if(context->protocol == mosq_p_mqtt311 || context->protocol == mosq_p_mqtt31){ if(rc2 == MOSQ_ERR_SUCCESS || rc2 == MOSQ_ERR_SUB_EXISTS){ - if(sub__retain_queue(db, context, sub, qos, 0)) rc = 1; + if(retain__queue(db, context, sub, qos, 0)) rc = 1; } }else{ if((retain_handling == MQTT_SUB_OPT_SEND_RETAIN_ALWAYS) || (rc2 == MOSQ_ERR_SUCCESS && retain_handling == MQTT_SUB_OPT_SEND_RETAIN_NEW)){ - if(sub__retain_queue(db, context, sub, qos, subscription_identifier)) rc = 1; + if(retain__queue(db, context, sub, qos, subscription_identifier)) rc = 1; } } diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h index 1d03c6e82f..4838bb9b88 100644 --- a/src/mosquitto_broker_internal.h +++ b/src/mosquitto_broker_internal.h @@ -353,6 +353,20 @@ struct mosquitto__subhier { struct mosquitto__subhier *children; struct mosquitto__subleaf *subs; struct mosquitto__subshared *shared; + char *topic; + uint16_t topic_len; +}; + +struct sub__token { + struct sub__token *next; + char *topic; + uint16_t topic_len; +}; + +struct mosquitto__retainhier { + UT_hash_handle hh; + struct mosquitto__retainhier *parent; + struct mosquitto__retainhier *children; struct mosquitto_msg_store *retained; char *topic; uint16_t topic_len; @@ -428,6 +442,7 @@ struct mosquitto__acl_user{ struct mosquitto_db{ dbid_t last_db_id; struct mosquitto__subhier *subs; + struct mosquitto__retainhier *retains; struct mosquitto__unpwd *unpwd; struct mosquitto__unpwd *psk_id; struct mosquitto *contexts_by_id; @@ -650,6 +665,8 @@ void sub__tree_print(struct mosquitto__subhier *root, int level); int sub__clean_session(struct mosquitto_db *db, struct mosquitto *context); int sub__retain_queue(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int sub_qos, uint32_t subscription_identifier); int sub__messages_queue(struct mosquitto_db *db, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store **stored); +int sub__topic_tokenise(const char *subtopic, struct sub__token **topics); +void sub__topic_tokens_free(struct sub__token *tokens); /* ============================================================ * Context functions @@ -701,6 +718,14 @@ int property__process_connect(struct mosquitto *context, mosquitto_property **pr int property__process_will(struct mosquitto *context, struct mosquitto_message_all *msg, mosquitto_property **props); int property__process_disconnect(struct mosquitto *context, mosquitto_property **props); +/* ============================================================ + * Retain tree related functions + * ============================================================ */ +int retain__init(struct mosquitto_db *db); +void retain__clean(struct mosquitto_db *db, struct mosquitto__retainhier **retainhier); +int retain__queue(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int sub_qos, uint32_t subscription_identifier); +int retain__store(struct mosquitto_db *db, const char *topic, struct mosquitto_msg_store *stored, struct sub__token *tokens); + /* ============================================================ * Security related functions * ============================================================ */ diff --git a/src/persist_read.c b/src/persist_read.c index 2c73617185..00e677e40c 100644 --- a/src/persist_read.c +++ b/src/persist_read.c @@ -325,7 +325,7 @@ static int persist__retain_chunk_restore(struct mosquitto_db *db, FILE *db_fptr) HASH_FIND(hh, db->msg_store_load, &chunk.F.store_id, sizeof(dbid_t), load); if(load){ - sub__messages_queue(db, NULL, load->store->topic, load->store->qos, load->store->retain, &load->store); + retain__store(db, load->store->topic, load->store, NULL); }else{ log__printf(NULL, MOSQ_LOG_ERR, "Error: Corrupt database whilst restoring a retained message."); return MOSQ_ERR_INVAL; diff --git a/src/persist_write.c b/src/persist_write.c index c24a24d2aa..81a6ad6cde 100644 --- a/src/persist_write.c +++ b/src/persist_write.c @@ -190,17 +190,15 @@ static int persist__client_save(struct mosquitto_db *db, FILE *db_fptr) } -static int persist__subs_retain_save(struct mosquitto_db *db, FILE *db_fptr, struct mosquitto__subhier *node, const char *topic, int level) +static int persist__subs_save(struct mosquitto_db *db, FILE *db_fptr, struct mosquitto__subhier *node, const char *topic, int level) { struct mosquitto__subhier *subhier, *subhier_tmp; struct mosquitto__subleaf *sub; - struct P_retain retain_chunk; struct P_sub sub_chunk; char *thistopic; size_t slen; int rc; - memset(&retain_chunk, 0, sizeof(struct P_retain)); memset(&sub_chunk, 0, sizeof(struct P_sub)); slen = strlen(topic) + node->topic_len + 2; @@ -231,32 +229,57 @@ static int persist__subs_retain_save(struct mosquitto_db *db, FILE *db_fptr, str } sub = sub->next; } - if(node->retained){ - if(strncmp(node->retained->topic, "$SYS", 4)){ - /* Don't save $SYS messages. */ - retain_chunk.F.store_id = node->retained->db_id; - rc = persist__chunk_retain_write_v5(db_fptr, &retain_chunk); - if(rc){ - mosquitto__free(thistopic); - return rc; - } - } - } HASH_ITER(hh, node->children, subhier, subhier_tmp){ - persist__subs_retain_save(db, db_fptr, subhier, thistopic, level+1); + persist__subs_save(db, db_fptr, subhier, thistopic, level+1); } mosquitto__free(thistopic); return MOSQ_ERR_SUCCESS; } -static int persist__subs_retain_save_all(struct mosquitto_db *db, FILE *db_fptr) +static int persist__subs_save_all(struct mosquitto_db *db, FILE *db_fptr) { struct mosquitto__subhier *subhier, *subhier_tmp; HASH_ITER(hh, db->subs, subhier, subhier_tmp){ if(subhier->children){ - persist__subs_retain_save(db, db_fptr, subhier->children, "", 0); + persist__subs_save(db, db_fptr, subhier->children, "", 0); + } + } + + return MOSQ_ERR_SUCCESS; +} + +static int persist__retain_save(struct mosquitto_db *db, FILE *db_fptr, struct mosquitto__retainhier *node, int level) +{ + struct mosquitto__retainhier *retainhier, *retainhier_tmp; + struct P_retain retain_chunk; + int rc; + + memset(&retain_chunk, 0, sizeof(struct P_retain)); + + if(node->retained && strncmp(node->retained->topic, "$SYS", 4)){ + /* Don't save $SYS messages. */ + retain_chunk.F.store_id = node->retained->db_id; + rc = persist__chunk_retain_write_v5(db_fptr, &retain_chunk); + if(rc){ + return rc; + } + } + + HASH_ITER(hh, node->children, retainhier, retainhier_tmp){ + persist__retain_save(db, db_fptr, retainhier, level+1); + } + return MOSQ_ERR_SUCCESS; +} + +static int persist__retain_save_all(struct mosquitto_db *db, FILE *db_fptr) +{ + struct mosquitto__retainhier *retainhier, *retainhier_tmp; + + HASH_ITER(hh, db->retains, retainhier, retainhier_tmp){ + if(retainhier->children){ + persist__retain_save(db, db_fptr, retainhier->children, 0); } } @@ -342,7 +365,8 @@ int persist__backup(struct mosquitto_db *db, bool shutdown) } persist__client_save(db, db_fptr); - persist__subs_retain_save_all(db, db_fptr); + persist__subs_save_all(db, db_fptr); + persist__retain_save_all(db, db_fptr); #ifndef WIN32 /** diff --git a/src/retain.c b/src/retain.c new file mode 100644 index 0000000000..82a87b70fd --- /dev/null +++ b/src/retain.c @@ -0,0 +1,299 @@ +/* +Copyright (c) 2010-2019 Roger Light + +All rights reserved. This program and the accompanying materials +are made available under the terms of the Eclipse Public License v1.0 +and Eclipse Distribution License v1.0 which accompany this distribution. + +The Eclipse Public License is available at + http://www.eclipse.org/legal/epl-v10.html +and the Eclipse Distribution License is available at + http://www.eclipse.org/org/documents/edl-v10.php. + +Contributors: + Roger Light - initial implementation and documentation. +*/ + +#include "config.h" + +#include +#include +#include + +#include "mosquitto_broker_internal.h" +#include "memory_mosq.h" +#include "mqtt_protocol.h" +#include "util_mosq.h" + +#include "utlist.h" + +static struct mosquitto__retainhier *retain__add_hier_entry(struct mosquitto__retainhier *parent, struct mosquitto__retainhier **sibling, const char *topic, size_t len) +{ + struct mosquitto__retainhier *child; + + assert(sibling); + + child = mosquitto__calloc(1, sizeof(struct mosquitto__retainhier)); + if(!child){ + log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); + return NULL; + } + child->parent = parent; + child->topic_len = len; + child->topic = mosquitto__malloc(len+1); + if(!child->topic){ + child->topic_len = 0; + mosquitto__free(child); + log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); + return NULL; + }else{ + strncpy(child->topic, topic, child->topic_len+1); + } + + HASH_ADD_KEYPTR(hh, *sibling, child->topic, child->topic_len, child); + + return child; +} + + +int retain__init(struct mosquitto_db *db) +{ + struct mosquitto__retainhier *retainhier; + + retainhier = retain__add_hier_entry(NULL, &db->retains, "", strlen("")); + if(!retainhier) return MOSQ_ERR_NOMEM; + + retainhier = retain__add_hier_entry(NULL, &db->retains, "$SYS", strlen("$SYS")); + if(!retainhier) return MOSQ_ERR_NOMEM; + + return MOSQ_ERR_SUCCESS; +} + + +int retain__store(struct mosquitto_db *db, const char *topic, struct mosquitto_msg_store *stored, struct sub__token *tokens) +{ + struct mosquitto__retainhier *retainhier; + struct mosquitto__retainhier *branch; + struct sub__token *local_tokens = NULL, *token_current; + + assert(stored); + if(tokens == NULL){ + if(sub__topic_tokenise(topic, &local_tokens)) return 1; + }else{ + local_tokens = tokens; + } + token_current = local_tokens; + + HASH_FIND(hh, db->retains, local_tokens->topic, local_tokens->topic_len, retainhier); + + while(token_current){ + HASH_FIND(hh, retainhier->children, token_current->topic, token_current->topic_len, branch); + if(branch == NULL){ + branch = retain__add_hier_entry(retainhier, &retainhier->children, token_current->topic, token_current->topic_len); + if(branch == NULL){ + if(tokens == NULL){ + sub__topic_tokens_free(local_tokens); + } + return MOSQ_ERR_NOMEM; + } + } + retainhier = branch; + token_current = token_current->next; + } + +#ifdef WITH_PERSISTENCE + if(strncmp(topic, "$SYS", 4)){ + /* Retained messages count as a persistence change, but only if + * they aren't for $SYS. */ + db->persistence_changes++; + } +#endif + if(retainhier->retained){ + db__msg_store_ref_dec(db, &retainhier->retained); +#ifdef WITH_SYS_TREE + db->retained_count--; +#endif + } + if(stored->payloadlen){ + retainhier->retained = stored; + db__msg_store_ref_inc(retainhier->retained); +#ifdef WITH_SYS_TREE + db->retained_count++; +#endif + }else{ + retainhier->retained = NULL; + } + + if(tokens == NULL){ + sub__topic_tokens_free(local_tokens); + } + + return MOSQ_ERR_SUCCESS; +} + + +static int retain__process(struct mosquitto_db *db, struct mosquitto__retainhier *branch, struct mosquitto *context, int sub_qos, uint32_t subscription_identifier, time_t now) +{ + int rc = 0; + int qos; + uint16_t mid; + mosquitto_property *properties = NULL; + struct mosquitto_msg_store *retained; + + if(branch->retained->message_expiry_time > 0 && now > branch->retained->message_expiry_time){ + db__msg_store_ref_dec(db, &branch->retained); + branch->retained = NULL; +#ifdef WITH_SYS_TREE + db->retained_count--; +#endif + return MOSQ_ERR_SUCCESS; + } + + retained = branch->retained; + + rc = mosquitto_acl_check(db, context, retained->topic, retained->payloadlen, UHPA_ACCESS(retained->payload, retained->payloadlen), + retained->qos, retained->retain, MOSQ_ACL_READ); + if(rc == MOSQ_ERR_ACL_DENIED){ + return MOSQ_ERR_SUCCESS; + }else if(rc != MOSQ_ERR_SUCCESS){ + return rc; + } + + /* Check for original source access */ + if(db->config->check_retain_source && retained->origin != mosq_mo_broker && retained->source_id){ + struct mosquitto retain_ctxt; + memset(&retain_ctxt, 0, sizeof(struct mosquitto)); + + retain_ctxt.id = retained->source_id; + retain_ctxt.username = retained->source_username; + retain_ctxt.listener = retained->source_listener; + + rc = acl__find_acls(db, &retain_ctxt); + if(rc) return rc; + + rc = mosquitto_acl_check(db, &retain_ctxt, retained->topic, retained->payloadlen, UHPA_ACCESS(retained->payload, retained->payloadlen), + retained->qos, retained->retain, MOSQ_ACL_WRITE); + if(rc == MOSQ_ERR_ACL_DENIED){ + return MOSQ_ERR_SUCCESS; + }else if(rc != MOSQ_ERR_SUCCESS){ + return rc; + } + } + + if (db->config->upgrade_outgoing_qos){ + qos = sub_qos; + } else { + qos = retained->qos; + if(qos > sub_qos) qos = sub_qos; + } + if(qos > 0){ + mid = mosquitto__mid_generate(context); + }else{ + mid = 0; + } + if(subscription_identifier > 0){ + mosquitto_property_add_varint(&properties, MQTT_PROP_SUBSCRIPTION_IDENTIFIER, subscription_identifier); + } + return db__message_insert(db, context, mid, mosq_md_out, qos, true, retained, properties); +} + + +static int retain__search(struct mosquitto_db *db, struct mosquitto__retainhier *retainhier, struct sub__token *tokens, struct mosquitto *context, const char *sub, int sub_qos, uint32_t subscription_identifier, time_t now, int level) +{ + struct mosquitto__retainhier *branch, *branch_tmp; + int flag = 0; + + if(!strcmp(tokens->topic, "#") && !tokens->next){ + HASH_ITER(hh, retainhier->children, branch, branch_tmp){ + /* Set flag to indicate that we should check for retained messages + * on "foo" when we are subscribing to e.g. "foo/#" and then exit + * this function and return to an earlier retain__search(). + */ + flag = -1; + if(branch->retained){ + retain__process(db, branch, context, sub_qos, subscription_identifier, now); + } + if(branch->children){ + retain__search(db, branch, tokens, context, sub, sub_qos, subscription_identifier, now, level+1); + } + } + }else{ + if(!strcmp(tokens->topic, "+")){ + HASH_ITER(hh, retainhier->children, branch, branch_tmp){ + if(tokens->next){ + if(retain__search(db, branch, tokens->next, context, sub, sub_qos, subscription_identifier, now, level+1) == -1 + || (tokens->next && !strcmp(tokens->next->topic, "#") && level>0)){ + + if(branch->retained){ + retain__process(db, branch, context, sub_qos, subscription_identifier, now); + } + } + }else{ + if(branch->retained){ + retain__process(db, branch, context, sub_qos, subscription_identifier, now); + } + } + } + }else{ + HASH_FIND(hh, retainhier->children, tokens->topic, tokens->topic_len, branch); + if(branch){ + if(tokens->next){ + if(retain__search(db, branch, tokens->next, context, sub, sub_qos, subscription_identifier, now, level+1) == -1 + || (tokens->next && !strcmp(tokens->next->topic, "#") && level>0)){ + + if(branch->retained){ + retain__process(db, branch, context, sub_qos, subscription_identifier, now); + } + } + }else{ + if(branch->retained){ + retain__process(db, branch, context, sub_qos, subscription_identifier, now); + } + } + } + } + } + return flag; +} + + +int retain__queue(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int sub_qos, uint32_t subscription_identifier) +{ + struct mosquitto__retainhier *retainhier; + struct sub__token *tokens = NULL; + time_t now; + + assert(db); + assert(context); + assert(sub); + + if(sub__topic_tokenise(sub, &tokens)) return 1; + + HASH_FIND(hh, db->retains, tokens->topic, tokens->topic_len, retainhier); + + if(retainhier){ + now = time(NULL); + retain__search(db, retainhier, tokens, context, sub, sub_qos, subscription_identifier, now, 0); + } + sub__topic_tokens_free(tokens); + + return MOSQ_ERR_SUCCESS; +} + + +void retain__clean(struct mosquitto_db *db, struct mosquitto__retainhier **retainhier) +{ + struct mosquitto__retainhier *peer, *retainhier_tmp; + + HASH_ITER(hh, *retainhier, peer, retainhier_tmp){ + if(peer->retained){ + db__msg_store_ref_dec(db, &peer->retained); + } + retain__clean(db, &peer->children); + mosquitto__free(peer->topic); + + HASH_DELETE(hh, *retainhier, peer); + mosquitto__free(peer); + } +} + diff --git a/src/subs.c b/src/subs.c index c059874fa3..40613f9a49 100644 --- a/src/subs.c +++ b/src/subs.c @@ -58,13 +58,6 @@ and the Eclipse Distribution License is available at #include "utlist.h" -struct sub__token { - struct sub__token *next; - char *topic; - uint16_t topic_len; -}; - - static int subs__send(struct mosquitto_db *db, struct mosquitto__subleaf *leaf, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored) { bool client_retain; @@ -131,37 +124,12 @@ static int subs__shared_process(struct mosquitto_db *db, struct mosquitto__subhi return rc; } -static int subs__process(struct mosquitto_db *db, struct mosquitto__subhier *hier, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored, bool set_retain) +static int subs__process(struct mosquitto_db *db, struct mosquitto__subhier *hier, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored) { int rc = 0; int rc2; struct mosquitto__subleaf *leaf; - if(retain && set_retain){ -#ifdef WITH_PERSISTENCE - if(strncmp(topic, "$SYS", 4)){ - /* Retained messages count as a persistence change, but only if - * they aren't for $SYS. */ - db->persistence_changes++; - } -#endif - if(hier->retained){ - db__msg_store_ref_dec(db, &hier->retained); -#ifdef WITH_SYS_TREE - db->retained_count--; -#endif - } - if(stored->payloadlen){ - hier->retained = stored; - db__msg_store_ref_inc(hier->retained); -#ifdef WITH_SYS_TREE - db->retained_count++; -#endif - }else{ - hier->retained = NULL; - } - } - rc = subs__shared_process(db, hier, topic, qos, retain, stored); leaf = hier->subs; @@ -183,119 +151,6 @@ static int subs__process(struct mosquitto_db *db, struct mosquitto__subhier *hie } } -static struct sub__token *sub__topic_append(struct sub__token **tail, struct sub__token **topics, char *topic) -{ - struct sub__token *new_topic; - - if(!topic){ - return NULL; - } - new_topic = mosquitto__malloc(sizeof(struct sub__token)); - if(!new_topic){ - return NULL; - } - new_topic->next = NULL; - new_topic->topic_len = strlen(topic); - new_topic->topic = mosquitto__malloc(new_topic->topic_len+1); - if(!new_topic->topic){ - mosquitto__free(new_topic); - return NULL; - } - strncpy(new_topic->topic, topic, new_topic->topic_len+1); - - if(*tail){ - (*tail)->next = new_topic; - *tail = (*tail)->next; - }else{ - *topics = new_topic; - *tail = new_topic; - } - return new_topic; -} - -static int sub__topic_tokenise(const char *subtopic, struct sub__token **topics) -{ - struct sub__token *new_topic, *tail = NULL; - int len; - int start, stop, tlen; - int i; - char *topic; - int count = 0; - - assert(subtopic); - assert(topics); - - if(subtopic[0] != '$'){ - new_topic = sub__topic_append(&tail, topics, ""); - if(!new_topic) goto cleanup; - } - - len = strlen(subtopic); - - if(subtopic[0] == '/'){ - new_topic = sub__topic_append(&tail, topics, ""); - if(!new_topic) goto cleanup; - - start = 1; - }else{ - start = 0; - } - - stop = 0; - for(i=start; i TOPIC_HIERARCHY_LIMIT){ - /* Set limit on hierarchy levels, to restrict stack usage. */ - goto cleanup; - } - - return MOSQ_ERR_SUCCESS; - -cleanup: - tail = *topics; - *topics = NULL; - while(tail){ - mosquitto__free(tail->topic); - new_topic = tail->next; - mosquitto__free(tail); - tail = new_topic; - } - return 1; -} - -static void sub__topic_tokens_free(struct sub__token *tokens) -{ - struct sub__token *tail; - - while(tokens){ - tail = tokens->next; - mosquitto__free(tokens->topic); - mosquitto__free(tokens); - tokens = tail; - } -} - - static int sub__add_leaf(struct mosquitto *context, int qos, uint32_t identifier, int options, struct mosquitto__subleaf **head, struct mosquitto__subleaf **newleaf) { struct mosquitto__subleaf *leaf; @@ -590,7 +445,7 @@ static int sub__remove_recurse(struct mosquitto_db *db, struct mosquitto *contex HASH_FIND(hh, subhier->children, tokens->topic, tokens->topic_len, branch); if(branch){ sub__remove_recurse(db, context, branch, tokens->next, reason, sharename); - if(!branch->children && !branch->subs && !branch->retained && !branch->shared){ + if(!branch->children && !branch->subs && !branch->shared){ HASH_DELETE(hh, subhier->children, branch); mosquitto__free(branch->topic); mosquitto__free(branch); @@ -599,7 +454,7 @@ static int sub__remove_recurse(struct mosquitto_db *db, struct mosquitto *contex return MOSQ_ERR_SUCCESS; } -static int sub__search(struct mosquitto_db *db, struct mosquitto__subhier *subhier, struct sub__token *tokens, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored, bool set_retain) +static int sub__search(struct mosquitto_db *db, struct mosquitto__subhier *subhier, struct sub__token *tokens, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored) { /* FIXME - need to take into account source_id if the client is a bridge */ struct mosquitto__subhier *branch; @@ -611,14 +466,14 @@ static int sub__search(struct mosquitto_db *db, struct mosquitto__subhier *subhi HASH_FIND(hh, subhier->children, tokens->topic, tokens->topic_len, branch); if(branch){ - rc = sub__search(db, branch, tokens->next, source_id, topic, qos, retain, stored, set_retain); + rc = sub__search(db, branch, tokens->next, source_id, topic, qos, retain, stored); if(rc == MOSQ_ERR_SUCCESS){ have_subscribers = true; }else if(rc != MOSQ_ERR_NO_SUBSCRIBERS){ return rc; } if(!tokens->next){ - rc = subs__process(db, branch, source_id, topic, qos, retain, stored, set_retain); + rc = subs__process(db, branch, source_id, topic, qos, retain, stored); if(rc == MOSQ_ERR_SUCCESS){ have_subscribers = true; }else if(rc != MOSQ_ERR_NO_SUBSCRIBERS){ @@ -631,14 +486,14 @@ static int sub__search(struct mosquitto_db *db, struct mosquitto__subhier *subhi HASH_FIND(hh, subhier->children, "+", 1, branch); if(branch){ - rc = sub__search(db, branch, tokens->next, source_id, topic, qos, retain, stored, false); + rc = sub__search(db, branch, tokens->next, source_id, topic, qos, retain, stored); if(rc == MOSQ_ERR_SUCCESS){ have_subscribers = true; }else if(rc != MOSQ_ERR_NO_SUBSCRIBERS){ return rc; } if(!tokens->next){ - rc = subs__process(db, branch, source_id, topic, qos, retain, stored, false); + rc = subs__process(db, branch, source_id, topic, qos, retain, stored); if(rc == MOSQ_ERR_SUCCESS){ have_subscribers = true; }else if(rc != MOSQ_ERR_NO_SUBSCRIBERS){ @@ -655,7 +510,7 @@ static int sub__search(struct mosquitto_db *db, struct mosquitto__subhier *subhi * subscriptions but *don't* return. Although this branch has ended * there may still be other subscriptions to deal with. */ - rc = subs__process(db, branch, source_id, topic, qos, retain, stored, false); + rc = subs__process(db, branch, source_id, topic, qos, retain, stored); if(rc == MOSQ_ERR_SUCCESS){ have_subscribers = true; }else if(rc != MOSQ_ERR_NO_SUBSCRIBERS){ @@ -797,7 +652,7 @@ int sub__remove(struct mosquitto_db *db, struct mosquitto *context, const char * int sub__messages_queue(struct mosquitto_db *db, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store **stored) { - int rc = 0; + int rc = 0, rc2; struct mosquitto__subhier *subhier; struct sub__token *tokens = NULL; @@ -814,16 +669,18 @@ int sub__messages_queue(struct mosquitto_db *db, const char *source_id, const ch HASH_FIND(hh, db->subs, tokens->topic, tokens->topic_len, subhier); if(subhier){ - if(retain){ - /* We have a message that needs to be retained, so ensure that the subscription - * tree for its topic exists. - */ - sub__add_context(db, NULL, 0, 0, 0, subhier, tokens, NULL); + rc = sub__search(db, subhier, tokens, source_id, topic, qos, retain, *stored); + } + + if(retain){ + rc2 = retain__store(db, topic, *stored, tokens); + if(rc2){ + sub__topic_tokens_free(tokens); + db__msg_store_ref_dec(db, stored); + return rc2; } - rc = sub__search(db, subhier, tokens, source_id, topic, qos, retain, *stored, true); } sub__topic_tokens_free(tokens); - /* Remove our reference and free if needed. */ db__msg_store_ref_dec(db, stored); @@ -840,7 +697,7 @@ static struct mosquitto__subhier *tmp_remove_subs(struct mosquitto__subhier *sub return NULL; } - if(sub->children || sub->subs || sub->retained){ + if(sub->children || sub->subs){ return NULL; } @@ -851,7 +708,6 @@ static struct mosquitto__subhier *tmp_remove_subs(struct mosquitto__subhier *sub if(parent->subs == NULL && parent->children == NULL - && parent->retained == NULL && parent->shared == NULL && parent->parent){ @@ -885,7 +741,6 @@ static int sub__clean_session_shared(struct mosquitto_db *db, struct mosquitto * } if(context->shared_subs[i]->hier->subs == NULL && context->shared_subs[i]->hier->children == NULL - && context->shared_subs[i]->hier->retained == NULL && context->shared_subs[i]->hier->shared == NULL && context->shared_subs[i]->hier->parent){ @@ -930,7 +785,6 @@ int sub__clean_session(struct mosquitto_db *db, struct mosquitto *context) } if(context->subs[i]->subs == NULL && context->subs[i]->children == NULL - && context->subs[i]->retained == NULL && context->subs[i]->shared == NULL && context->subs[i]->parent){ @@ -969,9 +823,6 @@ void sub__tree_print(struct mosquitto__subhier *root, int level) } leaf = leaf->next; } - if(branch->retained){ - printf(" (r)"); - } printf("\n"); } @@ -979,154 +830,3 @@ void sub__tree_print(struct mosquitto__subhier *root, int level) } } -static int retain__process(struct mosquitto_db *db, struct mosquitto__subhier *branch, struct mosquitto *context, int sub_qos, uint32_t subscription_identifier, time_t now) -{ - int rc = 0; - int qos; - uint16_t mid; - mosquitto_property *properties = NULL; - struct mosquitto_msg_store *retained; - - if(branch->retained->message_expiry_time > 0 && now > branch->retained->message_expiry_time){ - db__msg_store_ref_dec(db, &branch->retained); - branch->retained = NULL; -#ifdef WITH_SYS_TREE - db->retained_count--; -#endif - return MOSQ_ERR_SUCCESS; - } - - retained = branch->retained; - - rc = mosquitto_acl_check(db, context, retained->topic, retained->payloadlen, UHPA_ACCESS(retained->payload, retained->payloadlen), - retained->qos, retained->retain, MOSQ_ACL_READ); - if(rc == MOSQ_ERR_ACL_DENIED){ - return MOSQ_ERR_SUCCESS; - }else if(rc != MOSQ_ERR_SUCCESS){ - return rc; - } - - /* Check for original source access */ - if(db->config->check_retain_source && retained->origin != mosq_mo_broker && retained->source_id){ - struct mosquitto retain_ctxt; - memset(&retain_ctxt, 0, sizeof(struct mosquitto)); - - retain_ctxt.id = retained->source_id; - retain_ctxt.username = retained->source_username; - retain_ctxt.listener = retained->source_listener; - - rc = acl__find_acls(db, &retain_ctxt); - if(rc) return rc; - - rc = mosquitto_acl_check(db, &retain_ctxt, retained->topic, retained->payloadlen, UHPA_ACCESS(retained->payload, retained->payloadlen), - retained->qos, retained->retain, MOSQ_ACL_WRITE); - if(rc == MOSQ_ERR_ACL_DENIED){ - return MOSQ_ERR_SUCCESS; - }else if(rc != MOSQ_ERR_SUCCESS){ - return rc; - } - } - - if (db->config->upgrade_outgoing_qos){ - qos = sub_qos; - } else { - qos = retained->qos; - if(qos > sub_qos) qos = sub_qos; - } - if(qos > 0){ - mid = mosquitto__mid_generate(context); - }else{ - mid = 0; - } - if(subscription_identifier > 0){ - mosquitto_property_add_varint(&properties, MQTT_PROP_SUBSCRIPTION_IDENTIFIER, subscription_identifier); - } - return db__message_insert(db, context, mid, mosq_md_out, qos, true, retained, properties); -} - -static int retain__search(struct mosquitto_db *db, struct mosquitto__subhier *subhier, struct sub__token *tokens, struct mosquitto *context, const char *sub, int sub_qos, uint32_t subscription_identifier, time_t now, int level) -{ - struct mosquitto__subhier *branch, *branch_tmp; - int flag = 0; - - if(!strcmp(tokens->topic, "#") && !tokens->next){ - HASH_ITER(hh, subhier->children, branch, branch_tmp){ - /* Set flag to indicate that we should check for retained messages - * on "foo" when we are subscribing to e.g. "foo/#" and then exit - * this function and return to an earlier retain__search(). - */ - flag = -1; - if(branch->retained){ - retain__process(db, branch, context, sub_qos, subscription_identifier, now); - } - if(branch->children){ - retain__search(db, branch, tokens, context, sub, sub_qos, subscription_identifier, now, level+1); - } - } - }else{ - if(!strcmp(tokens->topic, "+")){ - HASH_ITER(hh, subhier->children, branch, branch_tmp){ - if(tokens->next){ - if(retain__search(db, branch, tokens->next, context, sub, sub_qos, subscription_identifier, now, level+1) == -1 - || (tokens->next && !strcmp(tokens->next->topic, "#") && level>0)){ - - if(branch->retained){ - retain__process(db, branch, context, sub_qos, subscription_identifier, now); - } - } - }else{ - if(branch->retained){ - retain__process(db, branch, context, sub_qos, subscription_identifier, now); - } - } - } - }else{ - HASH_FIND(hh, subhier->children, tokens->topic, tokens->topic_len, branch); - if(branch){ - if(tokens->next){ - if(retain__search(db, branch, tokens->next, context, sub, sub_qos, subscription_identifier, now, level+1) == -1 - || (tokens->next && !strcmp(tokens->next->topic, "#") && level>0)){ - - if(branch->retained){ - retain__process(db, branch, context, sub_qos, subscription_identifier, now); - } - } - }else{ - if(branch->retained){ - retain__process(db, branch, context, sub_qos, subscription_identifier, now); - } - } - } - } - } - return flag; -} - -int sub__retain_queue(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int sub_qos, uint32_t subscription_identifier) -{ - struct mosquitto__subhier *subhier; - struct sub__token *tokens = NULL, *tail; - time_t now; - - assert(db); - assert(context); - assert(sub); - - if(sub__topic_tokenise(sub, &tokens)) return 1; - - HASH_FIND(hh, db->subs, tokens->topic, tokens->topic_len, subhier); - - if(subhier){ - now = time(NULL); - retain__search(db, subhier, tokens, context, sub, sub_qos, subscription_identifier, now, 0); - } - while(tokens){ - tail = tokens->next; - mosquitto__free(tokens->topic); - mosquitto__free(tokens); - tokens = tail; - } - - return MOSQ_ERR_SUCCESS; -} - diff --git a/src/topic_tok.c b/src/topic_tok.c new file mode 100644 index 0000000000..65aadd2a2c --- /dev/null +++ b/src/topic_tok.c @@ -0,0 +1,137 @@ +/* +Copyright (c) 2010-2019 Roger Light + +All rights reserved. This program and the accompanying materials +are made available under the terms of the Eclipse Public License v1.0 +and Eclipse Distribution License v1.0 which accompany this distribution. + +The Eclipse Public License is available at + http://www.eclipse.org/legal/epl-v10.html +and the Eclipse Distribution License is available at + http://www.eclipse.org/org/documents/edl-v10.php. + +Contributors: + Roger Light - initial implementation and documentation. +*/ + +#include "config.h" + +#include +#include +#include + +#include "mosquitto_broker_internal.h" +#include "memory_mosq.h" +#include "mqtt_protocol.h" +#include "util_mosq.h" + +#include "utlist.h" + + +static struct sub__token *sub__topic_append(struct sub__token **tail, struct sub__token **topics, char *topic) +{ + struct sub__token *new_topic; + + if(!topic){ + return NULL; + } + new_topic = mosquitto__malloc(sizeof(struct sub__token)); + if(!new_topic){ + return NULL; + } + new_topic->next = NULL; + new_topic->topic_len = strlen(topic); + new_topic->topic = mosquitto__malloc(new_topic->topic_len+1); + if(!new_topic->topic){ + mosquitto__free(new_topic); + return NULL; + } + strncpy(new_topic->topic, topic, new_topic->topic_len+1); + + if(*tail){ + (*tail)->next = new_topic; + *tail = (*tail)->next; + }else{ + *topics = new_topic; + *tail = new_topic; + } + return new_topic; +} + + +int sub__topic_tokenise(const char *subtopic, struct sub__token **topics) +{ + struct sub__token *new_topic, *tail = NULL; + int len; + int start, stop, tlen; + int i; + char *topic; + + assert(subtopic); + assert(topics); + + if(subtopic[0] != '$'){ + new_topic = sub__topic_append(&tail, topics, ""); + if(!new_topic) goto cleanup; + } + + len = strlen(subtopic); + + if(subtopic[0] == '/'){ + new_topic = sub__topic_append(&tail, topics, ""); + if(!new_topic) goto cleanup; + + start = 1; + }else{ + start = 0; + } + + stop = 0; + for(i=start; itopic); + new_topic = tail->next; + mosquitto__free(tail); + tail = new_topic; + } + return 1; +} + + +void sub__topic_tokens_free(struct sub__token *tokens) +{ + struct sub__token *tail; + + while(tokens){ + tail = tokens->next; + mosquitto__free(tokens->topic); + mosquitto__free(tokens); + tokens = tail; + } +} + diff --git a/test/unit/Makefile b/test/unit/Makefile index 98af7eb1d5..85ee1d03e1 100644 --- a/test/unit/Makefile +++ b/test/unit/Makefile @@ -44,6 +44,8 @@ PERSIST_READ_OBJS = \ persist_read_v234.o \ persist_read_v5.o \ property_mosq.o \ + retain.o \ + topic_tok.o \ utf8_mosq.o \ util_mosq.o @@ -61,7 +63,9 @@ PERSIST_WRITE_OBJS = \ persist_write.o \ persist_write_v5.o \ property_mosq.o \ + retain.o \ subs.o \ + topic_tok.o \ utf8_mosq.o \ util_mosq.o @@ -112,9 +116,15 @@ persist_write_v5.o : ../../src/persist_write_v5.c property_mosq.o : ../../lib/property_mosq.c $(CROSS_COMPILE)$(CC) $(CPPFLAGS) $(CFLAGS) -c -o $@ $^ +retain.o : ../../src/retain.c + $(CROSS_COMPILE)$(CC) $(CPPFLAGS) $(CFLAGS) -DWITH_BROKER -DWITH_PERSISTENCE -c -o $@ $^ + subs.o : ../../src/subs.c $(CROSS_COMPILE)$(CC) $(CPPFLAGS) $(CFLAGS) -DWITH_BROKER -DWITH_PERSISTENCE -c -o $@ $^ +topic_tok.o : ../../src/topic_tok.c + $(CROSS_COMPILE)$(CC) $(CPPFLAGS) $(CFLAGS) -DWITH_BROKER -DWITH_PERSISTENCE -c -o $@ $^ + util_mosq.o : ../../lib/util_mosq.c $(CROSS_COMPILE)$(CC) $(CPPFLAGS) $(CFLAGS) -c -o $@ $^ diff --git a/test/unit/persist_read_stubs.c b/test/unit/persist_read_stubs.c index 568016ec7f..5471f034fb 100644 --- a/test/unit/persist_read_stubs.c +++ b/test/unit/persist_read_stubs.c @@ -9,7 +9,6 @@ #include #include -extern uint64_t last_retained; extern char *last_sub; extern int last_qos; extern uint32_t last_identifier; @@ -125,6 +124,17 @@ int send__pingreq(struct mosquitto *mosq) return MOSQ_ERR_SUCCESS; } +int mosquitto_acl_check(struct mosquitto_db *db, struct mosquitto *context, const char *topic, long payloadlen, void* payload, int qos, bool retain, int access) +{ + return MOSQ_ERR_SUCCESS; +} + +int acl__find_acls(struct mosquitto_db *db, struct mosquitto *context) +{ + return MOSQ_ERR_SUCCESS; +} + + int sub__add(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int qos, uint32_t identifier, int options, struct mosquitto__subhier **root) { last_sub = strdup(sub); @@ -134,14 +144,14 @@ int sub__add(struct mosquitto_db *db, struct mosquitto *context, const char *sub return MOSQ_ERR_SUCCESS; } -int sub__messages_queue(struct mosquitto_db *db, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store **stored) +int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, int qos, bool retain, struct mosquitto_msg_store *stored, mosquitto_property *properties) { - if(retain){ - last_retained = (*stored)->db_id; - } return MOSQ_ERR_SUCCESS; } +void db__msg_store_ref_dec(struct mosquitto_db *db, struct mosquitto_msg_store **store) +{ +} void db__msg_store_ref_inc(struct mosquitto_msg_store *store) { diff --git a/test/unit/persist_read_test.c b/test/unit/persist_read_test.c index d1c7e78647..ed4e93752e 100644 --- a/test/unit/persist_read_test.c +++ b/test/unit/persist_read_test.c @@ -13,7 +13,6 @@ #include "persist.h" #include "property_mosq.h" -uint64_t last_retained; char *last_sub = NULL; int last_qos; uint32_t last_identifier; @@ -307,12 +306,11 @@ static void TEST_v3_retain(void) struct mosquitto__config config; int rc; - last_retained = 0; - memset(&db, 0, sizeof(struct mosquitto_db)); memset(&config, 0, sizeof(struct mosquitto__config)); db.config = &config; + retain__init(&db); config.persistence = true; config.persistence_filepath = "files/persist_read/v3-retain.test-db"; @@ -337,7 +335,18 @@ static void TEST_v3_retain(void) CU_ASSERT_NSTRING_EQUAL(UHPA_ACCESS_PAYLOAD(db.msg_store), "payload", 7); } } - CU_ASSERT_EQUAL(last_retained, 0x54); + CU_ASSERT_PTR_NOT_NULL(db.retains); + if(db.retains){ + CU_ASSERT_STRING_EQUAL(db.retains->topic, ""); + CU_ASSERT_PTR_NOT_NULL(db.retains->children); + if(db.retains->children){ + CU_ASSERT_STRING_EQUAL(db.retains->children->topic, ""); + CU_ASSERT_PTR_NOT_NULL(db.retains->children->children); + if(db.retains->children->children){ + CU_ASSERT_STRING_EQUAL(db.retains->children->children->topic, "topic"); + } + } + } } static void TEST_v3_sub(void) @@ -681,8 +690,6 @@ static void TEST_v5_retain(void) struct mosquitto__config config; int rc; - last_retained = 0; - memset(&db, 0, sizeof(struct mosquitto_db)); memset(&config, 0, sizeof(struct mosquitto__config)); db.config = &config; @@ -690,6 +697,7 @@ static void TEST_v5_retain(void) config.persistence = true; config.persistence_filepath = "files/persist_read/v5-retain.test-db"; + retain__init(&db); rc = persist__restore(&db); CU_ASSERT_EQUAL(rc, MOSQ_ERR_SUCCESS); CU_ASSERT_EQUAL(db.msg_store_count, 1); @@ -708,7 +716,18 @@ static void TEST_v5_retain(void) CU_ASSERT_NSTRING_EQUAL(UHPA_ACCESS_PAYLOAD(db.msg_store), "payload", 7); } } - CU_ASSERT_EQUAL(last_retained, 0x54); + CU_ASSERT_PTR_NOT_NULL(db.retains); + if(db.retains){ + CU_ASSERT_STRING_EQUAL(db.retains->topic, ""); + CU_ASSERT_PTR_NOT_NULL(db.retains->children); + if(db.retains->children){ + CU_ASSERT_STRING_EQUAL(db.retains->children->topic, ""); + CU_ASSERT_PTR_NOT_NULL(db.retains->children->children); + if(db.retains->children->children){ + CU_ASSERT_STRING_EQUAL(db.retains->children->children->topic, "topic"); + } + } + } } static void TEST_v5_sub(void) diff --git a/test/unit/persist_write_stubs.c b/test/unit/persist_write_stubs.c index 97bb8e05f5..995dffca6f 100644 --- a/test/unit/persist_write_stubs.c +++ b/test/unit/persist_write_stubs.c @@ -68,4 +68,3 @@ int send__pubrel(struct mosquitto *mosq, uint16_t mid) { return MOSQ_ERR_SUCCESS; } -