Skip to content

Commit

Permalink
Fix memory leak on handling QoS 2 PUBLISH.
Browse files Browse the repository at this point in the history
In some circumstances, Mosquitto could leak memory when handling PUBLISH messages. This is limited to incoming QoS 2 messages, and is related to the combination of the broker having persistence enabled, a clean session=false client, which was connected prior to the broker restarting, then has reconnected and has now sent messages at a sufficiently high rate that the incoming queue at the broker has filled up and hence messages are being dropped. This is more likely to have an effect where max_queued_messages is a small value. This has now been fixed.

Closes #1793.
  • Loading branch information
ralight committed Aug 19, 2020
1 parent f47b565 commit 3190d29
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 5 deletions.
13 changes: 13 additions & 0 deletions ChangeLog.txt
@@ -1,3 +1,16 @@
1.5.10 - 2020-08-19
===================

Security:
- In some circumstances, Mosquitto could leak memory when handling PUBLISH
messages. This is limited to incoming QoS 2 messages, and is related
to the combination of the broker having persistence enabled, a clean
session=false client, which was connected prior to the broker restarting,
then has reconnected and has now sent messages at a sufficiently high rate
that the incoming queue at the broker has filled up and hence messages are
being dropped. This is more likely to have an effect where
max_queued_messages is a small value. This has now been fixed. Closes #1793.

1.5.9 - 20190917
================

Expand Down
4 changes: 2 additions & 2 deletions src/database.c
Expand Up @@ -36,7 +36,7 @@ static unsigned long max_queued_bytes = 0;
* @param qos qos for the packet of interest
* @return true if more in flight are allowed.
*/
static bool db__ready_for_flight(struct mosquitto *context, int qos)
bool db__ready_for_flight(struct mosquitto *context, int qos)
{
if(qos == 0 || (max_inflight == 0 && max_inflight_bytes == 0)){
return true;
Expand Down Expand Up @@ -64,7 +64,7 @@ static bool db__ready_for_flight(struct mosquitto *context, int qos)
* @param qos destination qos for the packet of interest
* @return true if queuing is allowed, false if should be dropped
*/
static bool db__ready_for_queue(struct mosquitto *context, int qos)
bool db__ready_for_queue(struct mosquitto *context, int qos)
{
if(max_queued == 0 && max_queued_bytes == 0){
return true;
Expand Down
20 changes: 17 additions & 3 deletions src/handle_publish.c
Expand Up @@ -184,9 +184,23 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context)
db__message_store_find(context, mid, &stored);
}
if(!stored){
dup = 0;
if(db__message_store(db, context, mid, topic, qos, payloadlen, &payload, retain, &stored, 0)){
return 1;
if(qos == 0
|| db__ready_for_flight(context, qos)
|| db__ready_for_queue(context, qos)){

dup = 0;
if(db__message_store(db, context, mid, topic, qos, payloadlen, &payload, retain, &stored, 0)){
return 1;
}
}else{
/* Client isn't allowed any more incoming messages, so fail early */
mosquitto__free(topic);
UHPA_FREE(payload, payloadlen);
if(qos == 1){
return send__puback(context, mid);
}else{
return send__pubrec(context, mid);
}
}
}else{
mosquitto__free(topic);
Expand Down
2 changes: 2 additions & 0 deletions src/mosquitto_broker_internal.h
Expand Up @@ -567,6 +567,8 @@ void db__msg_store_deref(struct mosquitto_db *db, struct mosquitto_msg_store **s
void db__msg_store_clean(struct mosquitto_db *db);
void db__msg_store_compact(struct mosquitto_db *db);
int db__message_reconnect_reset(struct mosquitto_db *db, struct mosquitto *context);
bool db__ready_for_flight(struct mosquitto *context, int qos);
bool db__ready_for_queue(struct mosquitto *context, int qos);
void sys_tree__init(struct mosquitto_db *db);
void sys_tree__update(struct mosquitto_db *db, int interval, time_t start_time);

Expand Down

0 comments on commit 3190d29

Please sign in to comment.