Skip to content

Commit

Permalink
Allow send__pub{ack,rec,rel,comp} to send properties.
Browse files Browse the repository at this point in the history
  • Loading branch information
ralight committed Jul 10, 2020
1 parent 318dead commit e3e8dc4
Show file tree
Hide file tree
Showing 10 changed files with 36 additions and 36 deletions.
4 changes: 2 additions & 2 deletions lib/handle_publish.c
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ int handle__publish(struct mosquitto *mosq)
return MOSQ_ERR_SUCCESS;
case 1:
util__decrement_receive_quota(mosq);
rc = send__puback(mosq, message->msg.mid, 0);
rc = send__puback(mosq, message->msg.mid, 0, NULL);
pthread_mutex_lock(&mosq->callback_mutex);
if(mosq->on_message){
mosq->in_callback = true;
Expand All @@ -154,7 +154,7 @@ int handle__publish(struct mosquitto *mosq)
case 2:
message->properties = properties;
util__decrement_receive_quota(mosq);
rc = send__pubrec(mosq, message->msg.mid, 0);
rc = send__pubrec(mosq, message->msg.mid, 0, NULL);
pthread_mutex_lock(&mosq->msgs_in.mutex);
message->state = mosq_ms_wait_for_pubrel;
message__queue(mosq, message, mosq_md_in);
Expand Down
2 changes: 1 addition & 1 deletion lib/handle_pubrec.c
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ int handle__pubrec(struct mosquitto_db *db, struct mosquitto *mosq)
}else if(rc != MOSQ_ERR_SUCCESS){
return rc;
}
rc = send__pubrel(mosq, mid);
rc = send__pubrel(mosq, mid, NULL);
if(rc) return rc;

return MOSQ_ERR_SUCCESS;
Expand Down
4 changes: 2 additions & 2 deletions lib/handle_pubrel.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,14 @@ int handle__pubrel(struct mosquitto_db *db, struct mosquitto *mosq)
return rc;
}

rc = send__pubcomp(mosq, mid);
rc = send__pubcomp(mosq, mid, NULL);
if(rc) return rc;
#else
UNUSED(db);

log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s received PUBREL (Mid: %d)", mosq->id, mid);

rc = send__pubcomp(mosq, mid);
rc = send__pubcomp(mosq, mid, NULL);
if(rc){
message__remove(mosq, mid, mosq_md_in, &message, 2);
return rc;
Expand Down
4 changes: 2 additions & 2 deletions lib/messages_mosq.c
Original file line number Diff line number Diff line change
Expand Up @@ -292,13 +292,13 @@ void message__retry_check(struct mosquitto *mosq)
case mosq_ms_wait_for_pubrel:
msg->timestamp = now;
msg->dup = true;
send__pubrec(mosq, msg->msg.mid, 0);
send__pubrec(mosq, msg->msg.mid, 0, NULL);
break;
case mosq_ms_resend_pubrel:
case mosq_ms_wait_for_pubcomp:
msg->timestamp = now;
msg->dup = true;
send__pubrel(mosq, msg->msg.mid);
send__pubrel(mosq, msg->msg.mid, NULL);
break;
default:
break;
Expand Down
16 changes: 8 additions & 8 deletions lib/send_mosq.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ int send__pingresp(struct mosquitto *mosq)
return send__simple_command(mosq, CMD_PINGRESP);
}

int send__puback(struct mosquitto *mosq, uint16_t mid, uint8_t reason_code)
int send__puback(struct mosquitto *mosq, uint16_t mid, uint8_t reason_code, const mosquitto_property *properties)
{
#ifdef WITH_BROKER
log__printf(NULL, MOSQ_LOG_DEBUG, "Sending PUBACK to %s (m%d, rc%d)", mosq->id, mid, reason_code);
Expand All @@ -74,10 +74,10 @@ int send__puback(struct mosquitto *mosq, uint16_t mid, uint8_t reason_code)
#endif
util__increment_receive_quota(mosq);
/* We don't use Reason String or User Property yet. */
return send__command_with_mid(mosq, CMD_PUBACK, mid, false, reason_code, NULL);
return send__command_with_mid(mosq, CMD_PUBACK, mid, false, reason_code, properties);
}

int send__pubcomp(struct mosquitto *mosq, uint16_t mid)
int send__pubcomp(struct mosquitto *mosq, uint16_t mid, const mosquitto_property *properties)
{
#ifdef WITH_BROKER
log__printf(NULL, MOSQ_LOG_DEBUG, "Sending PUBCOMP to %s (m%d)", mosq->id, mid);
Expand All @@ -86,11 +86,11 @@ int send__pubcomp(struct mosquitto *mosq, uint16_t mid)
#endif
util__increment_receive_quota(mosq);
/* We don't use Reason String or User Property yet. */
return send__command_with_mid(mosq, CMD_PUBCOMP, mid, false, 0, NULL);
return send__command_with_mid(mosq, CMD_PUBCOMP, mid, false, 0, properties);
}


int send__pubrec(struct mosquitto *mosq, uint16_t mid, uint8_t reason_code)
int send__pubrec(struct mosquitto *mosq, uint16_t mid, uint8_t reason_code, const mosquitto_property *properties)
{
#ifdef WITH_BROKER
log__printf(NULL, MOSQ_LOG_DEBUG, "Sending PUBREC to %s (m%d, rc%d)", mosq->id, mid, reason_code);
Expand All @@ -101,18 +101,18 @@ int send__pubrec(struct mosquitto *mosq, uint16_t mid, uint8_t reason_code)
util__increment_receive_quota(mosq);
}
/* We don't use Reason String or User Property yet. */
return send__command_with_mid(mosq, CMD_PUBREC, mid, false, reason_code, NULL);
return send__command_with_mid(mosq, CMD_PUBREC, mid, false, reason_code, properties);
}

int send__pubrel(struct mosquitto *mosq, uint16_t mid)
int send__pubrel(struct mosquitto *mosq, uint16_t mid, const mosquitto_property *properties)
{
#ifdef WITH_BROKER
log__printf(NULL, MOSQ_LOG_DEBUG, "Sending PUBREL to %s (m%d)", mosq->id, mid);
#else
log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PUBREL (m%d)", mosq->id, mid);
#endif
/* We don't use Reason String or User Property yet. */
return send__command_with_mid(mosq, CMD_PUBREL|2, mid, false, 0, NULL);
return send__command_with_mid(mosq, CMD_PUBREL|2, mid, false, 0, properties);
}

/* For PUBACK, PUBCOMP, PUBREC, and PUBREL */
Expand Down
8 changes: 4 additions & 4 deletions lib/send_mosq.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ int send__connect(struct mosquitto *mosq, uint16_t keepalive, bool clean_session
int send__disconnect(struct mosquitto *mosq, uint8_t reason_code, const mosquitto_property *properties);
int send__pingreq(struct mosquitto *mosq);
int send__pingresp(struct mosquitto *mosq);
int send__puback(struct mosquitto *mosq, uint16_t mid, uint8_t reason_code);
int send__pubcomp(struct mosquitto *mosq, uint16_t mid);
int send__puback(struct mosquitto *mosq, uint16_t mid, uint8_t reason_code, const mosquitto_property *properties);
int send__pubcomp(struct mosquitto *mosq, uint16_t mid, const mosquitto_property *properties);
int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, int qos, bool retain, bool dup, const mosquitto_property *cmsg_props, const mosquitto_property *store_props, uint32_t expiry_interval);
int send__pubrec(struct mosquitto *mosq, uint16_t mid, uint8_t reason_code);
int send__pubrel(struct mosquitto *mosq, uint16_t mid);
int send__pubrec(struct mosquitto *mosq, uint16_t mid, uint8_t reason_code, const mosquitto_property *properties);
int send__pubrel(struct mosquitto *mosq, uint16_t mid, const mosquitto_property *properties);
int send__subscribe(struct mosquitto *mosq, int *mid, int topic_count, char *const *const topic, int topic_qos, const mosquitto_property *properties);
int send__unsubscribe(struct mosquitto *mosq, int *mid, int topic_count, char *const *const topic, const mosquitto_property *properties);

Expand Down
10 changes: 5 additions & 5 deletions src/database.c
Original file line number Diff line number Diff line change
Expand Up @@ -940,7 +940,7 @@ int db__message_release_incoming(struct mosquitto_db *db, struct mosquitto *cont
tail->timestamp = mosquitto_time();

if(tail->qos == 2){
send__pubrec(context, tail->mid, 0);
send__pubrec(context, tail->mid, 0, NULL);
tail->state = mosq_ms_wait_for_pubrel;
db__message_dequeue_first(context, &context->msgs_in);
}
Expand Down Expand Up @@ -993,7 +993,7 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context)

switch(tail->state){
case mosq_ms_send_pubrec:
rc = send__pubrec(context, mid, 0);
rc = send__pubrec(context, mid, 0, NULL);
if(!rc){
tail->state = mosq_ms_wait_for_pubrel;
}else{
Expand All @@ -1002,7 +1002,7 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context)
break;

case mosq_ms_resend_pubcomp:
rc = send__pubcomp(context, mid);
rc = send__pubcomp(context, mid, NULL);
if(!rc){
tail->state = mosq_ms_wait_for_pubrel;
}else{
Expand Down Expand Up @@ -1087,7 +1087,7 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context)
break;

case mosq_ms_resend_pubrel:
rc = send__pubrel(context, mid);
rc = send__pubrel(context, mid, NULL);
if(!rc){
tail->state = mosq_ms_wait_for_pubcomp;
}else{
Expand Down Expand Up @@ -1117,7 +1117,7 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context)
if(tail->qos == 2){
tail->state = mosq_ms_send_pubrec;
db__message_dequeue_first(context, &context->msgs_in);
rc = send__pubrec(context, tail->mid, 0);
rc = send__pubrec(context, tail->mid, 0, NULL);
if(!rc){
tail->state = mosq_ms_wait_for_pubrel;
}else{
Expand Down
12 changes: 6 additions & 6 deletions src/handle_publish.c
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,9 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context)
util__decrement_receive_quota(context);
rc2 = sub__messages_queue(db, context->id, topic, qos, retain, &stored);
if(rc2 == MOSQ_ERR_SUCCESS || context->protocol != mosq_p_mqtt5){
if(send__puback(context, mid, 0)) rc = 1;
if(send__puback(context, mid, 0, NULL)) rc = 1;
}else if(rc2 == MOSQ_ERR_NO_SUBSCRIBERS){
if(send__puback(context, mid, MQTT_RC_NO_MATCHING_SUBSCRIBERS)) rc = 1;
if(send__puback(context, mid, MQTT_RC_NO_MATCHING_SUBSCRIBERS, NULL)) rc = 1;
}else{
rc = rc2;
}
Expand All @@ -283,7 +283,7 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context)
/* db__message_insert() returns 2 to indicate dropped message
* due to queue. This isn't an error so don't disconnect them. */
if(!res){
if(send__pubrec(context, mid, 0)) rc = 1;
if(send__pubrec(context, mid, 0, NULL)) rc = 1;
}else if(res == 1){
rc = 1;
}
Expand All @@ -298,12 +298,12 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context)
case 0:
return MOSQ_ERR_SUCCESS;
case 1:
return send__puback(context, mid, reason_code);
return send__puback(context, mid, reason_code, NULL);
case 2:
if(context->protocol == mosq_p_mqtt5){
return send__pubrec(context, mid, reason_code);
return send__pubrec(context, mid, reason_code, NULL);
}else{
return send__pubrec(context, mid, 0);
return send__pubrec(context, mid, 0, NULL);
}
}
return 1;
Expand Down
6 changes: 3 additions & 3 deletions test/unit/persist_write_stubs.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,17 @@ int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint3
return MOSQ_ERR_SUCCESS;
}

int send__pubcomp(struct mosquitto *mosq, uint16_t mid)
int send__pubcomp(struct mosquitto *mosq, uint16_t mid, const mosquitto_property *properties)
{
return MOSQ_ERR_SUCCESS;
}

int send__pubrec(struct mosquitto *mosq, uint16_t mid, uint8_t reason_code)
int send__pubrec(struct mosquitto *mosq, uint16_t mid, uint8_t reason_code, const mosquitto_property *properties)
{
return MOSQ_ERR_SUCCESS;
}

int send__pubrel(struct mosquitto *mosq, uint16_t mid)
int send__pubrel(struct mosquitto *mosq, uint16_t mid, const mosquitto_property *properties)
{
return MOSQ_ERR_SUCCESS;
}
6 changes: 3 additions & 3 deletions test/unit/subs_stubs.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,17 @@ int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint3
return MOSQ_ERR_SUCCESS;
}

int send__pubcomp(struct mosquitto *mosq, uint16_t mid)
int send__pubcomp(struct mosquitto *mosq, uint16_t mid, const mosquitto_property *properties)
{
return MOSQ_ERR_SUCCESS;
}

int send__pubrec(struct mosquitto *mosq, uint16_t mid, uint8_t reason_code)
int send__pubrec(struct mosquitto *mosq, uint16_t mid, uint8_t reason_code, const mosquitto_property *properties)
{
return MOSQ_ERR_SUCCESS;
}

int send__pubrel(struct mosquitto *mosq, uint16_t mid)
int send__pubrel(struct mosquitto *mosq, uint16_t mid, const mosquitto_property *properties)
{
return MOSQ_ERR_SUCCESS;
}
Expand Down

0 comments on commit e3e8dc4

Please sign in to comment.