Skip to content

Commit

Permalink
Don't use uhpa for topics, incompatible with uthash.
Browse files Browse the repository at this point in the history
  • Loading branch information
ralight committed Mar 8, 2019
1 parent 2ea97a6 commit a57bba0
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 62 deletions.
2 changes: 1 addition & 1 deletion src/database.c
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ static void subhier_clean(struct mosquitto_db *db, struct mosquitto__subhier **s
db__msg_store_deref(db, &peer->retained);
}
subhier_clean(db, &peer->children);
UHPA_FREE_TOPIC(peer);
mosquitto__free(peer->topic);

HASH_DELETE(hh, *subhier, peer);
mosquitto__free(peer);
Expand Down
12 changes: 1 addition & 11 deletions src/mosquitto_broker_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,16 +125,6 @@ typedef union {
#define UHPA_FREE_PAYLOAD(A) UHPA_FREE((A)->payload, (A)->payloadlen)
#define UHPA_MOVE_PAYLOAD(DEST, SRC) UHPA_MOVE((DEST)->payload, (SRC)->payload, (SRC)->payloadlen)

#define MOSQ_TOPIC_ELEMENT_UNION_SIZE 8
typedef union {
char *ptr;
char array[MOSQ_TOPIC_ELEMENT_UNION_SIZE];
} mosquitto__topic_element_uhpa;
#define UHPA_ALLOC_TOPIC(A) UHPA_ALLOC((A)->topic, (A)->topic_len+1)
#define UHPA_ACCESS_TOPIC(A) UHPA_ACCESS((A)->topic, (A)->topic_len+1)
#define UHPA_FREE_TOPIC(A) UHPA_FREE((A)->topic, (A)->topic_len+1)
#define UHPA_MOVE_TOPIC(DEST, SRC) UHPA_MOVE((DEST)->topic, (SRC)->topic, (SRC)->topic_len+1)

/* ========================================
* End UHPA data types
* ======================================== */
Expand Down Expand Up @@ -312,7 +302,7 @@ struct mosquitto__subhier {
struct mosquitto__subhier *children;
struct mosquitto__subleaf *subs;
struct mosquitto_msg_store *retained;
mosquitto__topic_element_uhpa topic;
char *topic;
uint16_t topic_len;
};

Expand Down
4 changes: 2 additions & 2 deletions src/persist.c
Original file line number Diff line number Diff line change
Expand Up @@ -349,9 +349,9 @@ static int persist__subs_retain_write(struct mosquitto_db *db, FILE *db_fptr, st
thistopic = mosquitto__malloc(sizeof(char)*slen);
if(!thistopic) return MOSQ_ERR_NOMEM;
if(level > 1 || strlen(topic)){
snprintf(thistopic, slen, "%s/%s", topic, UHPA_ACCESS_TOPIC(node));
snprintf(thistopic, slen, "%s/%s", topic, node->topic);
}else{
snprintf(thistopic, slen, "%s", UHPA_ACCESS_TOPIC(node));
snprintf(thistopic, slen, "%s", node->topic);
}

sub = node->subs;
Expand Down
87 changes: 39 additions & 48 deletions src/subs.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ and the Eclipse Distribution License is available at

struct sub__token {
struct sub__token *next;
mosquitto__topic_element_uhpa topic;
char *topic;
uint16_t topic_len;
};

Expand All @@ -74,9 +74,6 @@ static int subs__process(struct mosquitto_db *db, struct mosquitto__subhier *hie

leaf = hier->subs;

if(topic[0] != '$'){
log__printf(NULL, MOSQ_LOG_INFO, "LEAF:%p", leaf);
}
if(retain && set_retain){
#ifdef WITH_PERSISTENCE
if(strncmp(topic, "$SYS", 4)){
Expand Down Expand Up @@ -162,11 +159,12 @@ static struct sub__token *sub__topic_append(struct sub__token **tail, struct sub
}
new_topic->next = NULL;
new_topic->topic_len = strlen(topic);
if(UHPA_ALLOC_TOPIC(new_topic) == 0){
new_topic->topic = mosquitto__malloc(new_topic->topic_len+1);
if(!new_topic->topic){
mosquitto__free(new_topic);
return NULL;
}
strncpy(UHPA_ACCESS_TOPIC(new_topic), topic, new_topic->topic_len+1);
strncpy(new_topic->topic, topic, new_topic->topic_len+1);

if(*tail){
(*tail)->next = new_topic;
Expand All @@ -184,7 +182,7 @@ static int sub__topic_tokenise(const char *subtopic, struct sub__token **topics)
int len;
int start, stop, tlen;
int i;
mosquitto__topic_element_uhpa topic;
char *topic;

assert(subtopic);
assert(topics);
Expand Down Expand Up @@ -213,11 +211,12 @@ static int sub__topic_tokenise(const char *subtopic, struct sub__token **topics)
if(start != stop){
tlen = stop-start;

if(UHPA_ALLOC(topic, tlen+1) == 0) goto cleanup;
memcpy(UHPA_ACCESS(topic, tlen+1), &subtopic[start], tlen);
UHPA_ACCESS(topic, tlen+1)[tlen] = '\0';
new_topic = sub__topic_append(&tail, topics, UHPA_ACCESS(topic, tlen+1));
UHPA_FREE(topic, tlen+1);
topic = mosquitto__malloc(tlen+1);
if(!topic) goto cleanup;
memcpy(topic, &subtopic[start], tlen);
topic[tlen] = '\0';
new_topic = sub__topic_append(&tail, topics, topic);
mosquitto__free(topic);
}else{
new_topic = sub__topic_append(&tail, topics, "");
}
Expand All @@ -232,7 +231,7 @@ static int sub__topic_tokenise(const char *subtopic, struct sub__token **topics)
tail = *topics;
*topics = NULL;
while(tail){
UHPA_FREE_TOPIC(tail);
mosquitto__free(tail->topic);
new_topic = tail->next;
mosquitto__free(tail);
tail = new_topic;
Expand All @@ -246,7 +245,7 @@ static void sub__topic_tokens_free(struct sub__token *tokens)

while(tokens){
tail = tokens->next;
UHPA_FREE_TOPIC(tokens);
mosquitto__free(tokens->topic);
mosquitto__free(tokens);
tokens = tail;
}
Expand Down Expand Up @@ -320,12 +319,12 @@ static int sub__add_recurse(struct mosquitto_db *db, struct mosquitto *context,
return MOSQ_ERR_SUCCESS;
}

HASH_FIND(hh, subhier->children, UHPA_ACCESS_TOPIC(tokens), tokens->topic_len, branch);
HASH_FIND(hh, subhier->children, tokens->topic, tokens->topic_len, branch);
if(branch){
return sub__add_recurse(db, context, qos, identifier, options, branch, tokens->next);
}else{
/* Not found */
branch = sub__add_hier_entry(subhier, &subhier->children, UHPA_ACCESS_TOPIC(tokens), tokens->topic_len);
branch = sub__add_hier_entry(subhier, &subhier->children, tokens->topic, tokens->topic_len);
if(!branch) return MOSQ_ERR_NOMEM;

return sub__add_recurse(db, context, qos, identifier, options, branch, tokens->next);
Expand Down Expand Up @@ -373,12 +372,12 @@ static int sub__remove_recurse(struct mosquitto_db *db, struct mosquitto *contex
return MOSQ_ERR_SUCCESS;
}

HASH_FIND(hh, subhier->children, UHPA_ACCESS_TOPIC(tokens), tokens->topic_len, branch);
HASH_FIND(hh, subhier->children, tokens->topic, tokens->topic_len, branch);
if(branch){
sub__remove_recurse(db, context, branch, tokens->next, reason);
if(!branch->children && !branch->subs && !branch->retained){
HASH_DELETE(hh, subhier->children, branch);
UHPA_FREE_TOPIC(branch);
mosquitto__free(branch->topic);
mosquitto__free(branch);
}
}
Expand All @@ -396,12 +395,12 @@ static int sub__search(struct mosquitto_db *db, struct mosquitto__subhier *subhi
HASH_ITER(hh, subhier->children, branch, branch_tmp){
sr = set_retain;

if(tokens && UHPA_ACCESS_TOPIC(tokens)
&& (!strcmp(UHPA_ACCESS_TOPIC(branch), UHPA_ACCESS_TOPIC(tokens))
|| !strcmp(UHPA_ACCESS_TOPIC(branch), "+"))){
if(tokens && tokens->topic
&& (!strcmp(branch->topic, tokens->topic)
|| !strcmp(branch->topic, "+"))){
/* The topic matches this subscription.
* Doesn't include # wildcards */
if(!strcmp(UHPA_ACCESS_TOPIC(branch), "+")){
if(!strcmp(branch->topic, "+")){
/* Don't set a retained message where + is in the hierarchy. */
sr = false;
}
Expand All @@ -419,7 +418,7 @@ static int sub__search(struct mosquitto_db *db, struct mosquitto__subhier *subhi
return rc;
}
}
}else if(!strcmp(UHPA_ACCESS_TOPIC(branch), "#") && !branch->children){
}else if(!strcmp(branch->topic, "#") && !branch->children){
/* The topic matches due to a # wildcard - process the
* subscriptions but *don't* return. Although this branch has ended
* there may still be other subscriptions to deal with.
Expand Down Expand Up @@ -453,28 +452,20 @@ struct mosquitto__subhier *sub__add_hier_entry(struct mosquitto__subhier *parent
}
child->parent = parent;
child->topic_len = len;
if(UHPA_ALLOC_TOPIC(child) == 0){
child->topic = malloc(len+1);
if(!child->topic){
child->topic_len = 0;
mosquitto__free(child);
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return NULL;
}else{
strncpy(UHPA_ACCESS_TOPIC(child), topic, child->topic_len+1);
strncpy(child->topic, topic, child->topic_len+1);
}
child->subs = NULL;
child->children = NULL;
child->retained = NULL;

if(child->topic_len+1 > sizeof(child->topic.array)){
if(child->topic.ptr){
HASH_ADD_KEYPTR(hh, *sibling, child->topic.ptr, child->topic_len, child);
}else{
mosquitto__free(child);
return NULL;
}
}else{
HASH_ADD(hh, *sibling, topic.array, child->topic_len, child);
}
HASH_ADD_KEYPTR(hh, *sibling, child->topic, child->topic_len, child);

return child;
}
Expand All @@ -492,9 +483,9 @@ int sub__add(struct mosquitto_db *db, struct mosquitto *context, const char *sub

if(sub__topic_tokenise(sub, &tokens)) return 1;

HASH_FIND(hh, *root, UHPA_ACCESS_TOPIC(tokens), tokens->topic_len, subhier);
HASH_FIND(hh, *root, tokens->topic, tokens->topic_len, subhier);
if(!subhier){
subhier = sub__add_hier_entry(NULL, root, UHPA_ACCESS_TOPIC(tokens), tokens->topic_len);
subhier = sub__add_hier_entry(NULL, root, tokens->topic, tokens->topic_len);
if(!subhier){
sub__topic_tokens_free(tokens);
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
Expand All @@ -520,7 +511,7 @@ int sub__remove(struct mosquitto_db *db, struct mosquitto *context, const char *

if(sub__topic_tokenise(sub, &tokens)) return 1;

HASH_FIND(hh, root, UHPA_ACCESS_TOPIC(tokens), tokens->topic_len, subhier);
HASH_FIND(hh, root, tokens->topic, tokens->topic_len, subhier);
if(subhier){
*reason = MQTT_RC_NO_SUBSCRIPTION_EXISTED;
rc = sub__remove_recurse(db, context, subhier, tokens, reason);
Expand Down Expand Up @@ -548,7 +539,7 @@ int sub__messages_queue(struct mosquitto_db *db, const char *source_id, const ch
*/
(*stored)->ref_count++;

HASH_FIND(hh, db->subs, UHPA_ACCESS_TOPIC(tokens), tokens->topic_len, subhier);
HASH_FIND(hh, db->subs, tokens->topic, tokens->topic_len, subhier);
if(subhier){
if(retain){
/* We have a message that needs to be retained, so ensure that the subscription
Expand Down Expand Up @@ -582,7 +573,7 @@ static struct mosquitto__subhier *tmp_remove_subs(struct mosquitto__subhier *sub

parent = sub->parent;
HASH_DELETE(hh, parent->children, sub);
UHPA_FREE_TOPIC(sub);
mosquitto__free(sub->topic);
mosquitto__free(sub);

if(parent->subs == NULL
Expand Down Expand Up @@ -658,7 +649,7 @@ void sub__tree_print(struct mosquitto__subhier *root, int level)
for(i=0; i<(level+2)*2; i++){
printf(" ");
}
printf("%s", UHPA_ACCESS_TOPIC(branch));
printf("%s", branch->topic);
leaf = branch->subs;
while(leaf){
if(leaf->context){
Expand Down Expand Up @@ -752,7 +743,7 @@ static int retain__search(struct mosquitto_db *db, struct mosquitto__subhier *su
/* Subscriptions with wildcards in aren't really valid topics to publish to
* so they can't have retained messages.
*/
if(!strcmp(UHPA_ACCESS_TOPIC(tokens), "#") && !tokens->next){
if(!strcmp(tokens->topic, "#") && !tokens->next){
/* Set flag to indicate that we should check for retained messages
* on "foo" when we are subscribing to e.g. "foo/#" and then exit
* this function and return to an earlier retain__search().
Expand All @@ -764,12 +755,12 @@ static int retain__search(struct mosquitto_db *db, struct mosquitto__subhier *su
if(branch->children){
retain__search(db, branch, tokens, context, sub, sub_qos, subscription_identifier, now, level+1);
}
}else if(strcmp(UHPA_ACCESS_TOPIC(branch), "+")
&& (!strcmp(UHPA_ACCESS_TOPIC(branch), UHPA_ACCESS_TOPIC(tokens))
|| !strcmp(UHPA_ACCESS_TOPIC(tokens), "+"))){
}else if(strcmp(branch->topic, "+")
&& (!strcmp(branch->topic, tokens->topic)
|| !strcmp(tokens->topic, "+"))){
if(tokens->next){
if(retain__search(db, branch, tokens->next, context, sub, sub_qos, subscription_identifier, now, level+1) == -1
|| (!branch_tmp && tokens->next && !strcmp(UHPA_ACCESS_TOPIC(tokens->next), "#") && level>0)){
|| (!branch_tmp && tokens->next && !strcmp(tokens->next->topic, "#") && level>0)){

if(branch->retained){
retain__process(db, branch, context, sub, sub_qos, subscription_identifier, now);
Expand Down Expand Up @@ -797,15 +788,15 @@ int sub__retain_queue(struct mosquitto_db *db, struct mosquitto *context, const

if(sub__topic_tokenise(sub, &tokens)) return 1;

HASH_FIND(hh, db->subs, UHPA_ACCESS_TOPIC(tokens), tokens->topic_len, subhier);
HASH_FIND(hh, db->subs, tokens->topic, tokens->topic_len, subhier);

if(subhier){
now = time(NULL);
retain__search(db, subhier, tokens, context, sub, sub_qos, subscription_identifier, now, 0);
}
while(tokens){
tail = tokens->next;
UHPA_FREE_TOPIC(tokens);
mosquitto__free(tokens->topic);
mosquitto__free(tokens);
tokens = tail;
}
Expand Down

0 comments on commit a57bba0

Please sign in to comment.