diff --git a/ChangeLog.txt b/ChangeLog.txt index d2e83a7f1c..f5fccfe8f7 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -31,6 +31,8 @@ Broker: only a single plugin can interact with a unique $CONTROL topic. Using multiple instances of the plugin would produce duplicate entries in the config file. Closes #2601. Closes #2470. +- Fix case where expired messages were causing queued messages not to be + delivered. Closes #2609. Client library: - Fix threads library detection on Windows under cmake. Bumps the minimum diff --git a/src/database.c b/src/database.c index 21f9357d26..5e77a281d1 100644 --- a/src/database.c +++ b/src/database.c @@ -356,6 +356,22 @@ static void db__message_remove_from_inflight(struct mosquitto_msg_data *msg_data } +static void db__message_remove_from_queued(struct mosquitto_msg_data *msg_data, struct mosquitto_client_msg *item) +{ + if(!msg_data || !item){ + return; + } + + DL_DELETE(msg_data->queued, item); + if(item->store){ + db__msg_store_ref_dec(&item->store); + } + + mosquitto_property_free_all(&item->properties); + mosquitto__free(item); +} + + void db__message_dequeue_first(struct mosquitto *context, struct mosquitto_msg_data *msg_data) { struct mosquitto_client_msg *msg; @@ -1021,6 +1037,40 @@ int db__message_release_incoming(struct mosquitto *context, uint16_t mid) } } + +void db__expire_all_messages(struct mosquitto *context) +{ + struct mosquitto_client_msg *msg, *tmp; + + DL_FOREACH_SAFE(context->msgs_out.inflight, msg, tmp){ + if(msg->store->message_expiry_time && db.now_real_s > msg->store->message_expiry_time){ + if(msg->qos > 0){ + util__increment_send_quota(context); + } + db__message_remove_from_inflight(&context->msgs_out, msg); + } + } + DL_FOREACH_SAFE(context->msgs_out.queued, msg, tmp){ + if(msg->store->message_expiry_time && db.now_real_s > msg->store->message_expiry_time){ + db__message_remove_from_queued(&context->msgs_out, msg); + } + } + DL_FOREACH_SAFE(context->msgs_in.inflight, msg, tmp){ + if(msg->store->message_expiry_time && db.now_real_s > msg->store->message_expiry_time){ + if(msg->qos > 0){ + util__increment_receive_quota(context); + } + db__message_remove_from_inflight(&context->msgs_in, msg); + } + } + DL_FOREACH_SAFE(context->msgs_in.queued, msg, tmp){ + if(msg->store->message_expiry_time && db.now_real_s > msg->store->message_expiry_time){ + db__message_remove_from_queued(&context->msgs_in, msg); + } + } +} + + static int db__message_write_inflight_out_single(struct mosquitto *context, struct mosquitto_client_msg *msg) { mosquitto_property *cmsg_props = NULL, *store_props = NULL; diff --git a/src/handle_connect.c b/src/handle_connect.c index 790c88a2b0..21405adff7 100644 --- a/src/handle_connect.c +++ b/src/handle_connect.c @@ -318,6 +318,7 @@ int connect__on_authorised(struct mosquitto *context, void *auth_data_out, uint1 rc = send__connack(context, connect_ack, CONNACK_ACCEPTED, connack_props); mosquitto_property_free_all(&connack_props); if(rc) return rc; + db__expire_all_messages(context); rc = db__message_write_queued_out(context); if(rc) return rc; rc = db__message_write_inflight_out_all(context); diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h index 8f2ad0b25c..c28eaa2a57 100644 --- a/src/mosquitto_broker_internal.h +++ b/src/mosquitto_broker_internal.h @@ -670,6 +670,7 @@ int db__message_write_queued_out(struct mosquitto *context); int db__message_write_queued_in(struct mosquitto *context); void db__msg_add_to_inflight_stats(struct mosquitto_msg_data *msg_data, struct mosquitto_client_msg *msg); void db__msg_add_to_queued_stats(struct mosquitto_msg_data *msg_data, struct mosquitto_client_msg *msg); +void db__expire_all_messages(struct mosquitto *context); /* ============================================================ * Subscription functions