Skip to content

Commit

Permalink
Retain handling support.
Browse files Browse the repository at this point in the history
  • Loading branch information
ralight committed Dec 6, 2018
1 parent 4fe75b1 commit f90ba23
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 13 deletions.
1 change: 1 addition & 0 deletions lib/mosquitto.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ extern "C" {

/* Error values */
enum mosq_err_t {
MOSQ_ERR_SUB_EXISTS = -2,
MOSQ_ERR_CONN_PENDING = -1,
MOSQ_ERR_SUCCESS = 0,
MOSQ_ERR_NOMEM = 1,
Expand Down
8 changes: 6 additions & 2 deletions src/bridge.c
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,9 @@ int bridge__connect_step1(struct mosquitto_db *db, struct mosquitto *context)
for(i=0; i<context->bridge->topic_count; i++){
if(context->bridge->topics[i].direction == bd_out || context->bridge->topics[i].direction == bd_both){
log__printf(NULL, MOSQ_LOG_DEBUG, "Bridge %s doing local SUBSCRIBE on topic %s", context->id, context->bridge->topics[i].local_topic);
if(sub__add(db, context, context->bridge->topics[i].local_topic, context->bridge->topics[i].qos, &db->subs)) return 1;
if(sub__add(db, context, context->bridge->topics[i].local_topic, context->bridge->topics[i].qos, &db->subs) > 0){
return 1;
}
sub__retain_queue(db, context,
context->bridge->topics[i].local_topic,
context->bridge->topics[i].qos);
Expand Down Expand Up @@ -310,7 +312,9 @@ int bridge__connect(struct mosquitto_db *db, struct mosquitto *context)
for(i=0; i<context->bridge->topic_count; i++){
if(context->bridge->topics[i].direction == bd_out || context->bridge->topics[i].direction == bd_both){
log__printf(NULL, MOSQ_LOG_DEBUG, "Bridge %s doing local SUBSCRIBE on topic %s", context->id, context->bridge->topics[i].local_topic);
if(sub__add(db, context, context->bridge->topics[i].local_topic, context->bridge->topics[i].qos, &db->subs)) return 1;
if(sub__add(db, context, context->bridge->topics[i].local_topic, context->bridge->topics[i].qos, &db->subs) > 0){
return 1;
}
sub__retain_queue(db, context,
context->bridge->topics[i].local_topic,
context->bridge->topics[i].qos);
Expand Down
36 changes: 31 additions & 5 deletions src/handle_subscribe.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ int handle__subscribe(struct mosquitto_db *db, struct mosquitto *context)
int rc2;
uint16_t mid;
char *sub;
uint8_t subscription_options;
uint8_t qos;
uint8_t retain_handling = 0;
uint8_t retain_as_published = 0;
uint8_t *payload = NULL, *tmp_payload;
uint32_t payloadlen = 0;
int len;
Expand Down Expand Up @@ -84,11 +87,25 @@ int handle__subscribe(struct mosquitto_db *db, struct mosquitto *context)
return 1;
}

if(packet__read_byte(&context->in_packet, &qos)){
if(packet__read_byte(&context->in_packet, &subscription_options)){
mosquitto__free(sub);
mosquitto__free(payload);
return 1;
}
if(context->protocol == mosq_p_mqtt31 || context->protocol == mosq_p_mqtt311){
qos = subscription_options;
}else{
qos = subscription_options & 0x03;

retain_as_published = subscription_options & 0x04;

retain_handling = (subscription_options & 0x30) >> 4;
if(retain_handling == 3){
mosquitto__free(sub);
mosquitto__free(payload);
return MOSQ_ERR_PROTOCOL;
}
}
if(qos > 2){
log__printf(NULL, MOSQ_LOG_INFO,
"Invalid QoS in subscription command from %s, disconnecting.",
Expand All @@ -97,6 +114,8 @@ int handle__subscribe(struct mosquitto_db *db, struct mosquitto *context)
mosquitto__free(payload);
return 1;
}


if(context->listener && context->listener->mount_point){
len = strlen(context->listener->mount_point) + slen + 1;
sub_mount = mosquitto__malloc(len+1);
Expand Down Expand Up @@ -130,11 +149,18 @@ int handle__subscribe(struct mosquitto_db *db, struct mosquitto *context)

if(qos != 0x80){
rc2 = sub__add(db, context, sub, qos, &db->subs);
if(rc2 == MOSQ_ERR_SUCCESS){
if(sub__retain_queue(db, context, sub, qos)) rc = 1;
}else if(rc2 != -1){
rc = rc2;
if(context->protocol == mosq_p_mqtt311 || context->protocol == mosq_p_mqtt31){
if(rc2 == MOSQ_ERR_SUCCESS || rc2 == MOSQ_ERR_SUB_EXISTS){
if(sub__retain_queue(db, context, sub, qos)) rc = 1;
}
}else{
if((rc2 == MOSQ_ERR_SUCCESS && retain_handling == 0)
|| (rc2 == MOSQ_ERR_SUB_EXISTS && retain_handling == 1)){

if(sub__retain_queue(db, context, sub, qos)) rc = 1;
}
}

log__printf(NULL, MOSQ_LOG_SUBSCRIBE, "%s %d %s", context->id, qos, sub);
}
mosquitto__free(sub);
Expand Down
2 changes: 1 addition & 1 deletion src/persist.c
Original file line number Diff line number Diff line change
Expand Up @@ -799,7 +799,7 @@ static int persist__sub_chunk_restore(struct mosquitto_db *db, FILE *db_fptr)
topic[slen] = '\0';

read_e(db_fptr, &qos, sizeof(uint8_t));
if(persist__restore_sub(db, client_id, topic, qos)){
if(persist__restore_sub(db, client_id, topic, qos) > 0){
rc = 1;
}
mosquitto__free(client_id);
Expand Down
8 changes: 3 additions & 5 deletions src/subs.c
Original file line number Diff line number Diff line change
Expand Up @@ -260,11 +260,11 @@ static int sub__add_recurse(struct mosquitto_db *db, struct mosquitto *context,
while(leaf){
if(leaf->context && leaf->context->id && !strcmp(leaf->context->id, context->id)){
/* Client making a second subscription to same topic. Only
* need to update QoS. Return -1 to indicate this to the
* calling function. */
* need to update QoS. Return MOSQ_ERR_SUB_EXISTS to
* indicate this to the calling function. */
leaf->qos = qos;
if(context->protocol == mosq_p_mqtt31){
return -1;
return MOSQ_ERR_SUB_EXISTS;
}else{
/* mqttv311/mqttv5 requires retained messages are resent on
* resubscribe. */
Expand Down Expand Up @@ -472,8 +472,6 @@ int sub__add(struct mosquitto_db *db, struct mosquitto *context, const char *sub

sub__topic_tokens_free(tokens);

/* We aren't worried about -1 (already subscribed) return codes. */
if(rc == -1) rc = MOSQ_ERR_SUCCESS;
return rc;
}

Expand Down

0 comments on commit f90ba23

Please sign in to comment.