Skip to content

Commit

Permalink
Fix expired messages causing queued messages not to be delivered.
Browse files Browse the repository at this point in the history
Closes #2609. Thanks to JSchy65.
  • Loading branch information
ralight committed Aug 16, 2022
1 parent cd88906 commit 7917553
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 0 deletions.
2 changes: 2 additions & 0 deletions ChangeLog.txt
Expand Up @@ -31,6 +31,8 @@ Broker:
only a single plugin can interact with a unique $CONTROL topic. Using
multiple instances of the plugin would produce duplicate entries in the
config file. Closes #2601. Closes #2470.
- Fix case where expired messages were causing queued messages not to be
delivered. Closes #2609.

Client library:
- Fix threads library detection on Windows under cmake. Bumps the minimum
Expand Down
50 changes: 50 additions & 0 deletions src/database.c
Expand Up @@ -356,6 +356,22 @@ static void db__message_remove_from_inflight(struct mosquitto_msg_data *msg_data
}


static void db__message_remove_from_queued(struct mosquitto_msg_data *msg_data, struct mosquitto_client_msg *item)
{
if(!msg_data || !item){
return;
}

DL_DELETE(msg_data->queued, item);
if(item->store){
db__msg_store_ref_dec(&item->store);
}

mosquitto_property_free_all(&item->properties);
mosquitto__free(item);
}


void db__message_dequeue_first(struct mosquitto *context, struct mosquitto_msg_data *msg_data)
{
struct mosquitto_client_msg *msg;
Expand Down Expand Up @@ -1021,6 +1037,40 @@ int db__message_release_incoming(struct mosquitto *context, uint16_t mid)
}
}


void db__expire_all_messages(struct mosquitto *context)
{
struct mosquitto_client_msg *msg, *tmp;

DL_FOREACH_SAFE(context->msgs_out.inflight, msg, tmp){
if(msg->store->message_expiry_time && db.now_real_s > msg->store->message_expiry_time){
if(msg->qos > 0){
util__increment_send_quota(context);
}
db__message_remove_from_inflight(&context->msgs_out, msg);
}
}
DL_FOREACH_SAFE(context->msgs_out.queued, msg, tmp){
if(msg->store->message_expiry_time && db.now_real_s > msg->store->message_expiry_time){
db__message_remove_from_queued(&context->msgs_out, msg);
}
}
DL_FOREACH_SAFE(context->msgs_in.inflight, msg, tmp){
if(msg->store->message_expiry_time && db.now_real_s > msg->store->message_expiry_time){
if(msg->qos > 0){
util__increment_receive_quota(context);
}
db__message_remove_from_inflight(&context->msgs_in, msg);
}
}
DL_FOREACH_SAFE(context->msgs_in.queued, msg, tmp){
if(msg->store->message_expiry_time && db.now_real_s > msg->store->message_expiry_time){
db__message_remove_from_queued(&context->msgs_in, msg);
}
}
}


static int db__message_write_inflight_out_single(struct mosquitto *context, struct mosquitto_client_msg *msg)
{
mosquitto_property *cmsg_props = NULL, *store_props = NULL;
Expand Down
1 change: 1 addition & 0 deletions src/handle_connect.c
Expand Up @@ -318,6 +318,7 @@ int connect__on_authorised(struct mosquitto *context, void *auth_data_out, uint1
rc = send__connack(context, connect_ack, CONNACK_ACCEPTED, connack_props);
mosquitto_property_free_all(&connack_props);
if(rc) return rc;
db__expire_all_messages(context);
rc = db__message_write_queued_out(context);
if(rc) return rc;
rc = db__message_write_inflight_out_all(context);
Expand Down
1 change: 1 addition & 0 deletions src/mosquitto_broker_internal.h
Expand Up @@ -670,6 +670,7 @@ int db__message_write_queued_out(struct mosquitto *context);
int db__message_write_queued_in(struct mosquitto *context);
void db__msg_add_to_inflight_stats(struct mosquitto_msg_data *msg_data, struct mosquitto_client_msg *msg);
void db__msg_add_to_queued_stats(struct mosquitto_msg_data *msg_data, struct mosquitto_client_msg *msg);
void db__expire_all_messages(struct mosquitto *context);

/* ============================================================
* Subscription functions
Expand Down

0 comments on commit 7917553

Please sign in to comment.