diff --git a/src/handle_subscribe.c b/src/handle_subscribe.c index 7cf62855d8..aac6ad6d21 100644 --- a/src/handle_subscribe.c +++ b/src/handle_subscribe.c @@ -36,7 +36,6 @@ int handle__subscribe(struct mosquitto_db *db, struct mosquitto *context) uint8_t subscription_options; uint8_t qos; uint8_t retain_handling = 0; - uint8_t retain_as_published = 0; uint8_t *payload = NULL, *tmp_payload; uint32_t payloadlen = 0; int len; @@ -95,17 +94,14 @@ int handle__subscribe(struct mosquitto_db *db, struct mosquitto *context) if(context->protocol == mosq_p_mqtt31 || context->protocol == mosq_p_mqtt311){ qos = subscription_options; if(context->is_bridge){ - retain_as_published = 1; + subscription_options = MQTT_SUB_OPT_RETAIN_AS_PUBLISHED | MQTT_SUB_OPT_NO_LOCAL; } }else{ qos = subscription_options & 0x03; - - retain_as_published = subscription_options & 0x08; + subscription_options &= 0xFC; retain_handling = (subscription_options & 0x30) >> 4; - if(retain_handling == 3){ - mosquitto__free(sub); - mosquitto__free(payload); + if(retain_handling == 3 || (subscription_options & 0xC0) != 0){ return MOSQ_ERR_PROTOCOL; } } @@ -151,7 +147,7 @@ int handle__subscribe(struct mosquitto_db *db, struct mosquitto *context) } if(qos != 0x80){ - rc2 = sub__add(db, context, sub, qos, retain_as_published, &db->subs); + rc2 = sub__add(db, context, sub, qos, subscription_options, &db->subs); if(context->protocol == mosq_p_mqtt311 || context->protocol == mosq_p_mqtt31){ if(rc2 == MOSQ_ERR_SUCCESS || rc2 == MOSQ_ERR_SUB_EXISTS){ if(sub__retain_queue(db, context, sub, qos)) rc = 1; diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h index 2156cc4ef4..d80accceeb 100644 --- a/src/mosquitto_broker_internal.h +++ b/src/mosquitto_broker_internal.h @@ -290,6 +290,7 @@ struct mosquitto__subleaf { struct mosquitto__subleaf *next; struct mosquitto *context; int qos; + bool no_local; bool retain_as_published; }; @@ -573,7 +574,7 @@ void sys_tree__update(struct mosquitto_db *db, int interval, time_t start_time); /* ============================================================ * Subscription functions * ============================================================ */ -int sub__add(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int qos, bool retain_as_published, struct mosquitto__subhier **root); +int sub__add(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int qos, int options, struct mosquitto__subhier **root); struct mosquitto__subhier *sub__add_hier_entry(struct mosquitto__subhier *parent, struct mosquitto__subhier **sibling, const char *topic, size_t len); int sub__remove(struct mosquitto_db *db, struct mosquitto *context, const char *sub, struct mosquitto__subhier *root); void sub__tree_print(struct mosquitto__subhier *root, int level); diff --git a/src/subs.c b/src/subs.c index d608f36e9d..0899383c83 100644 --- a/src/subs.c +++ b/src/subs.c @@ -97,7 +97,7 @@ static int subs__process(struct mosquitto_db *db, struct mosquitto__subhier *hie } } while(source_id && leaf){ - if(!leaf->context->id || (leaf->context->is_bridge && !strcmp(leaf->context->id, source_id))){ + if(!leaf->context->id || leaf->no_local){ leaf = leaf->next; continue; } @@ -240,7 +240,7 @@ static void sub__topic_tokens_free(struct sub__token *tokens) } } -static int sub__add_recurse(struct mosquitto_db *db, struct mosquitto *context, int qos, bool retain_as_published, struct mosquitto__subhier *subhier, struct sub__token *tokens) +static int sub__add_recurse(struct mosquitto_db *db, struct mosquitto *context, int qos, int options, struct mosquitto__subhier *subhier, struct sub__token *tokens) /* FIXME - this function has the potential to leak subhier, audit calling functions. */ { struct mosquitto__subhier *branch; @@ -274,7 +274,8 @@ static int sub__add_recurse(struct mosquitto_db *db, struct mosquitto *context, leaf->next = NULL; leaf->context = context; leaf->qos = qos; - leaf->retain_as_published = retain_as_published; + leaf->no_local = ((options & 0x04) != 0); + leaf->retain_as_published = ((options & 0x08) != 0); for(i=0; isub_count; i++){ if(!context->subs[i]){ context->subs[i] = subhier; @@ -307,13 +308,13 @@ static int sub__add_recurse(struct mosquitto_db *db, struct mosquitto *context, HASH_FIND(hh, subhier->children, UHPA_ACCESS_TOPIC(tokens), tokens->topic_len, branch); if(branch){ - return sub__add_recurse(db, context, qos, retain_as_published, branch, tokens->next); + return sub__add_recurse(db, context, qos, options, branch, tokens->next); }else{ /* Not found */ branch = sub__add_hier_entry(subhier, &subhier->children, UHPA_ACCESS_TOPIC(tokens), tokens->topic_len); if(!branch) return MOSQ_ERR_NOMEM; - return sub__add_recurse(db, context, qos, retain_as_published, branch, tokens->next); + return sub__add_recurse(db, context, qos, options, branch, tokens->next); } } @@ -442,7 +443,7 @@ struct mosquitto__subhier *sub__add_hier_entry(struct mosquitto__subhier *parent } -int sub__add(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int qos, bool retain_as_published, struct mosquitto__subhier **root) +int sub__add(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int qos, int options, struct mosquitto__subhier **root) { int rc = 0; struct mosquitto__subhier *subhier; @@ -464,7 +465,7 @@ int sub__add(struct mosquitto_db *db, struct mosquitto *context, const char *sub } } - rc = sub__add_recurse(db, context, qos, retain_as_published, subhier, tokens); + rc = sub__add_recurse(db, context, qos, options, subhier, tokens); sub__topic_tokens_free(tokens);