Skip to content

Commit

Permalink
Fix inflight quota inc/dec for both broker and library.
Browse files Browse the repository at this point in the history
  • Loading branch information
ralight committed Apr 17, 2019
1 parent 9372f42 commit 689989c
Show file tree
Hide file tree
Showing 9 changed files with 26 additions and 45 deletions.
19 changes: 4 additions & 15 deletions lib/actions.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ int mosquitto_publish_v5(struct mosquitto *mosq, int *mid, const char *topic, in
{
struct mosquitto_message_all *message;
uint16_t local_mid;
int queue_status;
const mosquitto_property *p;
const mosquitto_property *outgoing_properties = NULL;
mosquitto_property local_property;
Expand Down Expand Up @@ -140,20 +139,10 @@ int mosquitto_publish_v5(struct mosquitto *mosq, int *mid, const char *topic, in
message->dup = false;

pthread_mutex_lock(&mosq->msgs_out.mutex);
queue_status = message__queue(mosq, message, mosq_md_out);
if(queue_status == 0){
if(qos == 1){
message->state = mosq_ms_wait_for_puback;
}else if(qos == 2){
message->state = mosq_ms_wait_for_pubrec;
}
pthread_mutex_unlock(&mosq->msgs_out.mutex);
return send__publish(mosq, message->msg.mid, message->msg.topic, message->msg.payloadlen, message->msg.payload, message->msg.qos, message->msg.retain, message->dup, outgoing_properties, NULL, 0);
}else{
message->state = mosq_ms_invalid;
pthread_mutex_unlock(&mosq->msgs_out.mutex);
return MOSQ_ERR_SUCCESS;
}
message->state = mosq_ms_invalid;
message__queue(mosq, message, mosq_md_out);
pthread_mutex_unlock(&mosq->msgs_out.mutex);
return MOSQ_ERR_SUCCESS;
}
}

Expand Down
4 changes: 4 additions & 0 deletions lib/handle_pubackcomp.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ int handle__pubackcomp(struct mosquitto *mosq, const char *type)
return MOSQ_ERR_PROTOCOL;
}

pthread_mutex_lock(&mosq->msgs_out.mutex);
util__increment_send_quota(mosq);
pthread_mutex_unlock(&mosq->msgs_out.mutex);

rc = packet__read_uint16(&mosq->in_packet, &mid);
if(rc) return rc;
Expand Down Expand Up @@ -106,7 +108,9 @@ int handle__pubackcomp(struct mosquitto *mosq, const char *type)
pthread_mutex_unlock(&mosq->callback_mutex);
mosquitto_property_free_all(&properties);
}
pthread_mutex_lock(&mosq->msgs_out.mutex);
message__release_to_inflight(mosq, mosq_md_out);
pthread_mutex_unlock(&mosq->msgs_out.mutex);

return MOSQ_ERR_SUCCESS;
#endif
Expand Down
3 changes: 2 additions & 1 deletion lib/handle_publish.c
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ int handle__publish(struct mosquitto *mosq)
return MOSQ_ERR_PROTOCOL;
}
}
util__decrement_receive_quota(mosq);

rc = packet__read_uint16(&mosq->in_packet, &mid);
if(rc){
Expand Down Expand Up @@ -133,6 +132,7 @@ int handle__publish(struct mosquitto *mosq)
mosquitto_property_free_all(&properties);
return MOSQ_ERR_SUCCESS;
case 1:
util__decrement_receive_quota(mosq);
rc = send__puback(mosq, message->msg.mid, 0);
pthread_mutex_lock(&mosq->callback_mutex);
if(mosq->on_message){
Expand All @@ -150,6 +150,7 @@ int handle__publish(struct mosquitto *mosq)
mosquitto_property_free_all(&properties);
return rc;
case 2:
util__decrement_receive_quota(mosq);
rc = send__pubrec(mosq, message->msg.mid, 0);
pthread_mutex_lock(&mosq->msgs_in.mutex);
message->state = mosq_ms_wait_for_pubrel;
Expand Down
4 changes: 3 additions & 1 deletion lib/handle_pubrec.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ int handle__pubrec(struct mosquitto_db *db, struct mosquitto *mosq)

log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s received PUBREC (Mid: %d)", mosq->id, mid);

if(reason_code < 0x80){
if(reason_code < 0x80 || mosq->protocol != mosq_p_mqtt5){
rc = message__out_update(mosq, mid, mosq_ms_wait_for_pubcomp, 2);
}else{
if(!message__delete(mosq, mid, mosq_md_out, 2)){
Expand All @@ -91,7 +91,9 @@ int handle__pubrec(struct mosquitto_db *db, struct mosquitto *mosq)
pthread_mutex_unlock(&mosq->callback_mutex);
}
util__increment_send_quota(mosq);
pthread_mutex_lock(&mosq->msgs_out.mutex);
message__release_to_inflight(mosq, mosq_md_out);
pthread_mutex_unlock(&mosq->msgs_out.mutex);
return MOSQ_ERR_SUCCESS;
}
#endif
Expand Down
26 changes: 8 additions & 18 deletions lib/messages_mosq.c
Original file line number Diff line number Diff line change
Expand Up @@ -118,29 +118,20 @@ void mosquitto_message_free_contents(struct mosquitto_message *message)

int message__queue(struct mosquitto *mosq, struct mosquitto_message_all *message, enum mosquitto_msg_direction dir)
{
struct mosquitto_msg_data *msg_data;
int rc = 0;

/* mosq->*_message_mutex should be locked before entering this function */
assert(mosq);
assert(message);
assert(message->msg.qos != 0);

if(dir == mosq_md_out){
msg_data = &mosq->msgs_out;
if(mosq->msgs_out.inflight_quota == 0){
rc = 1;
}
util__decrement_send_quota(mosq);
DL_APPEND(mosq->msgs_out.inflight, message);
mosq->msgs_out.queue_len++;
}else{
msg_data = &mosq->msgs_in;
util__decrement_receive_quota(mosq);
DL_APPEND(mosq->msgs_in.inflight, message);
mosq->msgs_in.queue_len++;
}

msg_data->queue_len++;
DL_APPEND(msg_data->inflight, message);

return rc;
return message__release_to_inflight(mosq, dir);
}

void message__reconnect_reset(struct mosquitto *mosq)
Expand All @@ -149,6 +140,7 @@ void message__reconnect_reset(struct mosquitto *mosq)
assert(mosq);

pthread_mutex_lock(&mosq->msgs_in.mutex);
mosq->msgs_in.inflight_quota = mosq->msgs_in.inflight_maximum;
mosq->msgs_in.queue_len = 0;
DL_FOREACH_SAFE(mosq->msgs_in.inflight, message, tmp){
mosq->msgs_in.queue_len++;
Expand All @@ -159,6 +151,7 @@ void message__reconnect_reset(struct mosquitto *mosq)
}else{
/* Message state can be preserved here because it should match
* whatever the client has got. */
util__decrement_receive_quota(mosq);
}
}
pthread_mutex_unlock(&mosq->msgs_in.mutex);
Expand Down Expand Up @@ -193,11 +186,11 @@ void message__reconnect_reset(struct mosquitto *mosq)

int message__release_to_inflight(struct mosquitto *mosq, enum mosquitto_msg_direction dir)
{
/* mosq->*_message_mutex should be locked before entering this function */
struct mosquitto_message_all *cur, *tmp;
int rc = MOSQ_ERR_SUCCESS;

if(dir == mosq_md_out){
pthread_mutex_lock(&mosq->msgs_out.mutex);
DL_FOREACH_SAFE(mosq->msgs_out.inflight, cur, tmp){
if(mosq->msgs_out.inflight_quota > 0){
if(cur->msg.qos > 0 && cur->state == mosq_ms_invalid){
Expand All @@ -208,17 +201,14 @@ int message__release_to_inflight(struct mosquitto *mosq, enum mosquitto_msg_dire
}
rc = send__publish(mosq, cur->msg.mid, cur->msg.topic, cur->msg.payloadlen, cur->msg.payload, cur->msg.qos, cur->msg.retain, cur->dup, NULL, NULL, 0);
if(rc){
pthread_mutex_unlock(&mosq->msgs_out.mutex);
return rc;
}
util__decrement_send_quota(mosq);
}
}else{
pthread_mutex_unlock(&mosq->msgs_out.mutex);
return MOSQ_ERR_SUCCESS;
}
}
pthread_mutex_unlock(&mosq->msgs_out.mutex);
}

return rc;
Expand Down
2 changes: 1 addition & 1 deletion lib/send_mosq.c
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ int send__pubrec(struct mosquitto *mosq, uint16_t mid, uint8_t reason_code)
#else
if(mosq) log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PUBREC (m%d, rc%d)", mosq->id, mid, reason_code);
#endif
if(reason_code >= 0x80){
if(reason_code >= 0x80 && mosq->protocol == mosq_p_mqtt5){
util__increment_receive_quota(mosq);
}
/* We don't use Reason String or User Property yet. */
Expand Down
3 changes: 0 additions & 3 deletions src/database.c
Original file line number Diff line number Diff line change
Expand Up @@ -286,9 +286,6 @@ void db__message_dequeue_first(struct mosquitto *context, struct mosquitto_msg_d
msg = msg_data->queued;
DL_DELETE(msg_data->queued, msg);
DL_APPEND(msg_data->inflight, msg);
if(msg_data->inflight_quota > 0){
msg_data->inflight_quota--;
}
}


Expand Down
8 changes: 3 additions & 5 deletions src/handle_publish.c
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context)
if(rc2 > 0) rc = 1;
break;
case 1:
util__decrement_receive_quota(context);
rc2 = sub__messages_queue(db, context->id, topic, qos, retain, &stored);
if(rc2 == MOSQ_ERR_SUCCESS || context->protocol != mosq_p_mqtt5){
if(send__puback(context, mid, 0)) rc = 1;
Expand All @@ -329,7 +330,8 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context)
}
break;
case 2:
if(!dup){
if(dup == 0){
util__decrement_receive_quota(context);
res = db__message_insert(db, context, mid, mosq_md_in, qos, retain, stored, NULL);
}else{
res = 0;
Expand All @@ -344,10 +346,6 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context)
break;
}

if(rc == MOSQ_ERR_SUCCESS && qos > 0){
util__decrement_receive_quota(context);
}

return rc;
process_bad_message:
mosquitto__free(topic);
Expand Down
2 changes: 1 addition & 1 deletion test/lib/cpp/Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
.PHONY: all test 01 02 03 04 08 09 clean reallyclean

CFLAGS=-I../../../lib -I../../../lib/cpp -DDEBUG -Werror
CFLAGS=-I../../../lib -I../../../lib/cpp -DDEBUG
LIBS=../../../lib/libmosquitto.so.1 ../../../lib/cpp/libmosquittopp.so.1

all : 01 02 03 04 08 09
Expand Down

0 comments on commit 689989c

Please sign in to comment.