Skip to content

Commit

Permalink
Various fixes around inflight quota management.
Browse files Browse the repository at this point in the history
Closes #2306. Thanks to canique.
  • Loading branch information
ralight committed Sep 22, 2021
1 parent 7551a29 commit 330bf6e
Show file tree
Hide file tree
Showing 9 changed files with 275 additions and 72 deletions.
1 change: 1 addition & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ Broker:
- Fix `max_keepalive` option not being able to be set to 0.
- Fix LWT messages not being delivered if `per_listener_settings` was set to
true. Closes #2314.
- Various fixes around inflight quota management. Closes #2306.


2.0.12 - 2021-08-31
Expand Down
12 changes: 8 additions & 4 deletions lib/mosquitto_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,14 @@ struct mosquitto_msg_data{
#ifdef WITH_BROKER
struct mosquitto_client_msg *inflight;
struct mosquitto_client_msg *queued;
long msg_bytes;
long msg_bytes12;
int msg_count;
int msg_count12;
long inflight_bytes;
long inflight_bytes12;
int inflight_count;
int inflight_count12;
long queued_bytes;
long queued_bytes12;
int queued_count;
int queued_count12;
#else
struct mosquitto_message_all *inflight;
int queue_len;
Expand Down
152 changes: 94 additions & 58 deletions src/database.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,11 @@ bool db__ready_for_flight(struct mosquitto *context, enum mosquitto_msg_directio
if(db.config->max_queued_messages == 0 && db.config->max_inflight_bytes == 0){
return true;
}
valid_bytes = ((msgs->msg_bytes - (ssize_t)db.config->max_inflight_bytes) < (ssize_t)db.config->max_queued_bytes);
valid_bytes = ((msgs->inflight_bytes - (ssize_t)db.config->max_inflight_bytes) < (ssize_t)db.config->max_queued_bytes);
if(dir == mosq_md_out){
valid_count = context->out_packet_count < db.config->max_queued_messages;
}else{
valid_count = msgs->msg_count - msgs->inflight_maximum < db.config->max_queued_messages;
valid_count = msgs->inflight_count - msgs->inflight_maximum < db.config->max_queued_messages;
}

if(db.config->max_queued_messages == 0){
Expand All @@ -74,7 +74,7 @@ bool db__ready_for_flight(struct mosquitto *context, enum mosquitto_msg_directio
return valid_count;
}
}else{
valid_bytes = (ssize_t)msgs->msg_bytes12 < (ssize_t)db.config->max_inflight_bytes;
valid_bytes = (ssize_t)msgs->inflight_bytes12 < (ssize_t)db.config->max_inflight_bytes;
valid_count = msgs->inflight_quota > 0;

if(msgs->inflight_maximum == 0){
Expand Down Expand Up @@ -113,8 +113,8 @@ bool db__ready_for_queue(struct mosquitto *context, int qos, struct mosquitto_ms
if(qos == 0 && db.config->queue_qos0_messages == false){
return false; /* This case is handled in db__ready_for_flight() */
}else{
source_bytes = (ssize_t)msg_data->msg_bytes12;
source_count = msg_data->msg_count12;
source_bytes = (ssize_t)msg_data->queued_bytes12;
source_count = msg_data->queued_count12;
}
adjust_count = msg_data->inflight_maximum;

Expand All @@ -138,6 +138,48 @@ bool db__ready_for_queue(struct mosquitto *context, int qos, struct mosquitto_ms
}


void db__msg_add_to_inflight_stats(struct mosquitto_msg_data *msg_data, struct mosquitto_client_msg *msg)
{
msg_data->inflight_count++;
msg_data->inflight_bytes += msg->store->payloadlen;
if(msg->qos != 0){
msg_data->inflight_count12++;
msg_data->inflight_bytes12 += msg->store->payloadlen;
}
}

static void db__msg_remove_from_inflight_stats(struct mosquitto_msg_data *msg_data, struct mosquitto_client_msg *msg)
{
msg_data->inflight_count--;
msg_data->inflight_bytes -= msg->store->payloadlen;
if(msg->qos != 0){
msg_data->inflight_count12--;
msg_data->inflight_bytes12 -= msg->store->payloadlen;
}
}


void db__msg_add_to_queued_stats(struct mosquitto_msg_data *msg_data, struct mosquitto_client_msg *msg)
{
msg_data->queued_count++;
msg_data->queued_bytes += msg->store->payloadlen;
if(msg->qos != 0){
msg_data->queued_count12++;
msg_data->queued_bytes12 += msg->store->payloadlen;
}
}

static void db__msg_remove_from_queued_stats(struct mosquitto_msg_data *msg_data, struct mosquitto_client_msg *msg)
{
msg_data->queued_count--;
msg_data->queued_bytes -= msg->store->payloadlen;
if(msg->qos != 0){
msg_data->queued_count12--;
msg_data->queued_bytes12 -= msg->store->payloadlen;
}
}


int db__open(struct mosquitto__config *config)
{
struct mosquitto__subhier *subhier;
Expand Down Expand Up @@ -305,12 +347,7 @@ static void db__message_remove(struct mosquitto_msg_data *msg_data, struct mosqu

DL_DELETE(msg_data->inflight, item);
if(item->store){
msg_data->msg_count--;
msg_data->msg_bytes -= item->store->payloadlen;
if(item->qos > 0){
msg_data->msg_count12--;
msg_data->msg_bytes12 -= item->store->payloadlen;
}
db__msg_remove_from_inflight_stats(msg_data, item);
db__msg_store_ref_dec(&item->store);
}

Expand All @@ -331,6 +368,9 @@ void db__message_dequeue_first(struct mosquitto *context, struct mosquitto_msg_d
if(msg_data->inflight_quota > 0){
msg_data->inflight_quota--;
}

db__msg_remove_from_queued_stats(msg_data, msg);
db__msg_add_to_inflight_stats(msg_data, msg);
}


Expand All @@ -356,7 +396,7 @@ int db__message_delete_outgoing(struct mosquitto *context, uint16_t mid, enum mo
}

DL_FOREACH_SAFE(context->msgs_out.queued, tail, tmp){
if(context->msgs_out.inflight_maximum != 0 && msg_index >= context->msgs_out.inflight_maximum){
if(!db__ready_for_flight(context, mosq_md_out, tail->qos)){
break;
}

Expand Down Expand Up @@ -520,14 +560,10 @@ int db__message_insert(struct mosquitto *context, uint16_t mid, enum mosquitto_m

if(state == mosq_ms_queued){
DL_APPEND(msg_data->queued, msg);
db__msg_add_to_queued_stats(msg_data, msg);
}else{
DL_APPEND(msg_data->inflight, msg);
}
msg_data->msg_count++;
msg_data->msg_bytes+= msg->store->payloadlen;
if(qos > 0){
msg_data->msg_count12++;
msg_data->msg_bytes12 += msg->store->payloadlen;
db__msg_add_to_inflight_stats(msg_data, msg);
}

if(db.config->allow_duplicate_messages == false && dir == mosq_md_out && retain == false){
Expand All @@ -553,13 +589,13 @@ int db__message_insert(struct mosquitto *context, uint16_t mid, enum mosquitto_m
#ifdef WITH_BRIDGE
if(context->bridge && context->bridge->start_type == bst_lazy
&& context->sock == INVALID_SOCKET
&& context->msgs_out.msg_count >= context->bridge->threshold){
&& context->msgs_out.inflight_count + context->msgs_out.queued_count >= context->bridge->threshold){

context->bridge->lazy_reconnect = true;
}
#endif

if(dir == mosq_md_out && msg->qos > 0){
if(dir == mosq_md_out && msg->qos > 0 && state != mosq_ms_queued){
util__decrement_send_quota(context);
}

Expand Down Expand Up @@ -612,21 +648,29 @@ int db__messages_delete(struct mosquitto *context, bool force_free)
if(force_free || context->clean_start || (context->bridge && context->bridge->clean_start)){
db__messages_delete_list(&context->msgs_in.inflight);
db__messages_delete_list(&context->msgs_in.queued);
context->msgs_in.msg_bytes = 0;
context->msgs_in.msg_bytes12 = 0;
context->msgs_in.msg_count = 0;
context->msgs_in.msg_count12 = 0;
context->msgs_in.inflight_bytes = 0;
context->msgs_in.inflight_bytes12 = 0;
context->msgs_in.inflight_count = 0;
context->msgs_in.inflight_count12 = 0;
context->msgs_in.queued_bytes = 0;
context->msgs_in.queued_bytes12 = 0;
context->msgs_in.queued_count = 0;
context->msgs_in.queued_count12 = 0;
}

if(force_free || (context->bridge && context->bridge->clean_start_local)
|| (context->bridge == NULL && context->clean_start)){

db__messages_delete_list(&context->msgs_out.inflight);
db__messages_delete_list(&context->msgs_out.queued);
context->msgs_out.msg_bytes = 0;
context->msgs_out.msg_bytes12 = 0;
context->msgs_out.msg_count = 0;
context->msgs_out.msg_count12 = 0;
context->msgs_out.inflight_bytes = 0;
context->msgs_out.inflight_bytes12 = 0;
context->msgs_out.inflight_count = 0;
context->msgs_out.inflight_count12 = 0;
context->msgs_out.queued_bytes = 0;
context->msgs_out.queued_bytes12 = 0;
context->msgs_out.queued_count = 0;
context->msgs_out.queued_count12 = 0;
}

return MOSQ_ERR_SUCCESS;
Expand Down Expand Up @@ -766,18 +810,19 @@ static int db__message_reconnect_reset_outgoing(struct mosquitto *context)
{
struct mosquitto_client_msg *msg, *tmp;

context->msgs_out.msg_bytes = 0;
context->msgs_out.msg_bytes12 = 0;
context->msgs_out.msg_count = 0;
context->msgs_out.msg_count12 = 0;
context->msgs_out.inflight_bytes = 0;
context->msgs_out.inflight_bytes12 = 0;
context->msgs_out.inflight_count = 0;
context->msgs_out.inflight_count12 = 0;
context->msgs_out.queued_bytes = 0;
context->msgs_out.queued_bytes12 = 0;
context->msgs_out.queued_count = 0;
context->msgs_out.queued_count12 = 0;
context->msgs_out.inflight_quota = context->msgs_out.inflight_maximum;

DL_FOREACH_SAFE(context->msgs_out.inflight, msg, tmp){
context->msgs_out.msg_count++;
context->msgs_out.msg_bytes += msg->store->payloadlen;
db__msg_add_to_inflight_stats(&context->msgs_out, msg);
if(msg->qos > 0){
context->msgs_out.msg_count12++;
context->msgs_out.msg_bytes12 += msg->store->payloadlen;
util__decrement_send_quota(context);
}

Expand All @@ -804,12 +849,7 @@ static int db__message_reconnect_reset_outgoing(struct mosquitto *context)
* will be sent out of order.
*/
DL_FOREACH_SAFE(context->msgs_out.queued, msg, tmp){
context->msgs_out.msg_count++;
context->msgs_out.msg_bytes += msg->store->payloadlen;
if(msg->qos > 0){
context->msgs_out.msg_count12++;
context->msgs_out.msg_bytes12 += msg->store->payloadlen;
}
db__msg_add_to_queued_stats(&context->msgs_out, msg);
if(db__ready_for_flight(context, mosq_md_out, msg->qos)){
switch(msg->qos){
case 0:
Expand All @@ -835,18 +875,19 @@ static int db__message_reconnect_reset_incoming(struct mosquitto *context)
{
struct mosquitto_client_msg *msg, *tmp;

context->msgs_in.msg_bytes = 0;
context->msgs_in.msg_bytes12 = 0;
context->msgs_in.msg_count = 0;
context->msgs_in.msg_count12 = 0;
context->msgs_in.inflight_bytes = 0;
context->msgs_in.inflight_bytes12 = 0;
context->msgs_in.inflight_count = 0;
context->msgs_in.inflight_count12 = 0;
context->msgs_in.queued_bytes = 0;
context->msgs_in.queued_bytes12 = 0;
context->msgs_in.queued_count = 0;
context->msgs_in.queued_count12 = 0;
context->msgs_in.inflight_quota = context->msgs_in.inflight_maximum;

DL_FOREACH_SAFE(context->msgs_in.inflight, msg, tmp){
context->msgs_in.msg_count++;
context->msgs_in.msg_bytes += msg->store->payloadlen;
db__msg_add_to_inflight_stats(&context->msgs_in, msg);
if(msg->qos > 0){
context->msgs_in.msg_count12++;
context->msgs_in.msg_bytes12 += msg->store->payloadlen;
util__decrement_receive_quota(context);
}

Expand All @@ -867,12 +908,7 @@ static int db__message_reconnect_reset_incoming(struct mosquitto *context)
* will be sent out of order.
*/
DL_FOREACH_SAFE(context->msgs_in.queued, msg, tmp){
context->msgs_in.msg_count++;
context->msgs_in.msg_bytes += msg->store->payloadlen;
if(msg->qos > 0){
context->msgs_in.msg_count12++;
context->msgs_in.msg_bytes12 += msg->store->payloadlen;
}
db__msg_add_to_queued_stats(&context->msgs_in, msg);
if(db__ready_for_flight(context, mosq_md_in, msg->qos)){
switch(msg->qos){
case 0:
Expand Down Expand Up @@ -965,7 +1001,7 @@ int db__message_release_incoming(struct mosquitto *context, uint16_t mid)
}

DL_FOREACH_SAFE(context->msgs_in.queued, tail, tmp){
if(context->msgs_in.inflight_maximum != 0 && msg_index >= context->msgs_in.inflight_maximum){
if(db__ready_for_flight(context, mosq_md_in, tail->qos)){
break;
}

Expand Down Expand Up @@ -1180,7 +1216,7 @@ int db__message_write_queued_out(struct mosquitto *context)
}

DL_FOREACH_SAFE(context->msgs_out.queued, tail, tmp){
if(context->msgs_out.inflight_maximum != 0 && context->msgs_out.inflight_quota == 0){
if(!db__ready_for_flight(context, mosq_md_out, tail->qos)){
break;
}

Expand Down
2 changes: 2 additions & 0 deletions src/mosquitto_broker_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,8 @@ int db__message_write_inflight_out_all(struct mosquitto *context);
int db__message_write_inflight_out_latest(struct mosquitto *context);
int db__message_write_queued_out(struct mosquitto *context);
int db__message_write_queued_in(struct mosquitto *context);
void db__msg_add_to_inflight_stats(struct mosquitto_msg_data *msg_data, struct mosquitto_client_msg *msg);
void db__msg_add_to_queued_stats(struct mosquitto_msg_data *msg_data, struct mosquitto_client_msg *msg);

/* ============================================================
* Subscription functions
Expand Down
8 changes: 2 additions & 6 deletions src/persist_read.c
Original file line number Diff line number Diff line change
Expand Up @@ -156,17 +156,13 @@ static int persist__client_msg_restore(struct P_client_msg *chunk)

if(chunk->F.state == mosq_ms_queued || (chunk->F.qos > 0 && msg_data->inflight_quota == 0)){
DL_APPEND(msg_data->queued, cmsg);
db__msg_add_to_queued_stats(msg_data, cmsg);
}else{
DL_APPEND(msg_data->inflight, cmsg);
if(chunk->F.qos > 0 && msg_data->inflight_quota > 0){
msg_data->inflight_quota--;
}
}
msg_data->msg_count++;
msg_data->msg_bytes += cmsg->store->payloadlen;
if(chunk->F.qos > 0){
msg_data->msg_count12++;
msg_data->msg_bytes12 += cmsg->store->payloadlen;
db__msg_add_to_inflight_stats(msg_data, cmsg);
}

return MOSQ_ERR_SUCCESS;
Expand Down
8 changes: 4 additions & 4 deletions src/xtreport.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ static void client_cost(FILE *fptr, struct mosquitto *context, int fn_index)
pkt_tmp = pkt_tmp->next;
}

cmsg_count = context->msgs_in.msg_count;
cmsg_bytes = context->msgs_in.msg_bytes;
cmsg_count += context->msgs_out.msg_count;
cmsg_bytes += context->msgs_out.msg_bytes;
cmsg_count = context->msgs_in.inflight_count + context->msgs_in.queued_count;
cmsg_bytes = context->msgs_in.inflight_bytes + context->msgs_in.queued_bytes;
cmsg_count += context->msgs_out.inflight_count + context->msgs_out.queued_count;
cmsg_bytes += context->msgs_out.inflight_bytes + context->msgs_out.queued_bytes;

tBytes = pkt_bytes + cmsg_bytes;
if(context->id){
Expand Down
Loading

0 comments on commit 330bf6e

Please sign in to comment.