Skip to content

Commit

Permalink
broker: support max_queued_bytes option
Browse files Browse the repository at this point in the history
Limiting queued message depth purely based on message count is hard to
control for memory constrained devices.  The size of messages can vary
wildly, from a few bytes, to a few kilobytes.  Support a new
max_queued_bytes option, and drop packets when the first limit is
reached.  Option defaults to 0 (disabled) by default.

Fixes (partof) eclipse#100

This pulls up some helper routines for calculating whether to allow
inflight or queuing, resolving some inconsistences in connection
resumption.

Signed-off-by: Karl Palsson <[email protected]>
  • Loading branch information
karlp committed Jul 1, 2016
1 parent c4ed6fa commit afc323e
Show file tree
Hide file tree
Showing 10 changed files with 303 additions and 15 deletions.
2 changes: 2 additions & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ Broker:
- mosquitto_db_dump tool can now output some stats on clients.
- new $SYS/broker/store/messages/count (deprecates $SYS/broker/messages/stored)
- new $SYS/broker/store/messages/bytes
- max_queued_bytes feature to limit queues by real size rather than
than just message count. Closes Eclipse #452919 or Github #100

Client library:
- Outgoing messages with QoS>1 are no longer retried after a timeout period.
Expand Down
2 changes: 2 additions & 0 deletions lib/mosquitto_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ struct mosquitto {
struct mosquitto_client_msg *last_inflight_msg;
struct mosquitto_client_msg *queued_msgs;
struct mosquitto_client_msg *last_queued_msg;
unsigned long msg_bytes;
unsigned long msg_bytes12;
int msg_count;
int msg_count12;
struct mosquitto__acl_user *acl_list;
Expand Down
17 changes: 16 additions & 1 deletion man/mosquitto.conf.5.xml
Original file line number Diff line number Diff line change
Expand Up @@ -371,14 +371,29 @@
<para>Reloaded on reload signal.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>max_queued_bytes</option> <replaceable>count</replaceable></term>
<listitem>
<para>QoS 1 and 2 messages above those currently in-flight will be
queued (per client) until this limit is exceeded.
Defaults to 0. (No maximum) See also the
<option>max_queued_messages</option> option.
If both max_queued_messages and max_queued_bytes are specified,
packets will be queued until the first limit is reached.
</para>
<para>Reloaded on reload signal.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>max_queued_messages</option> <replaceable>count</replaceable></term>
<listitem>
<para>The maximum number of QoS 1 or 2 messages to hold in the
queue (per client) above those messages that are currently
in flight. Defaults to 100. Set to 0 for no maximum (not
recommended). See also the
<option>queue_qos0_messages</option> option.</para>
<option>queue_qos0_messages</option> and
<option>max_queued_bytes</option> options.
</para>
<para>Reloaded on reload signal.</para>
</listitem>
</varlistentry>
Expand Down
10 changes: 9 additions & 1 deletion mosquitto.conf
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,19 @@
# above those that are currently in-flight. Defaults to 100. Set
# to 0 for no maximum (not recommended).
# See also queue_qos0_messages.
# See also max_queued_bytes.
#max_queued_messages 100

# QoS 1 and 2 messages above those currently in-flight will be queued per
# client until this limit is exceeded. Defaults to 0. (No maximum)
# See also max_queued_messages.
# If both max_queued_messages and max_queued_bytes are specified, packets will
# be queued until the first limit is reached.
#max_queued_bytes 0

# Set to true to queue messages with QoS 0 when a persistent client is
# disconnected. These messages are included in the limit imposed by
# max_queued_messages.
# max_queued_messages and max_queued_bytes
# Defaults to false.
# This is a non-standard option for the MQTT v3.1 spec but is allowed in
# v3.1.1.
Expand Down
20 changes: 19 additions & 1 deletion src/conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ struct config_recurse {
int log_dest_set;
int log_type;
int log_type_set;
unsigned long max_inflight_bytes;
int max_inflight_messages;
unsigned long max_queued_bytes;
int max_queued_messages;
};

Expand Down Expand Up @@ -484,7 +486,9 @@ int config__read(struct mosquitto__config *config, bool reload)
cr.log_dest_set = 0;
cr.log_type = MOSQ_LOG_NONE;
cr.log_type_set = 0;
cr.max_inflight_bytes = 0;
cr.max_inflight_messages = 20;
cr.max_queued_bytes = 0;
cr.max_queued_messages = 100;

if(!config->config_file) return 0;
Expand Down Expand Up @@ -524,7 +528,7 @@ int config__read(struct mosquitto__config *config, bool reload)
config->user = "mosquitto";
}

db__limits_set(cr.max_inflight_messages, cr.max_queued_messages);
db__limits_set(cr.max_inflight_messages, cr.max_inflight_bytes, cr.max_queued_messages, cr.max_queued_bytes);

#ifdef WITH_BRIDGE
for(i=0; i<config->bridge_count; i++){
Expand Down Expand Up @@ -1442,6 +1446,13 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, const
}else{
log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty max_connections value in configuration.");
}
}else if(!strcmp(token, "max_inflight_bytes")){
token = strtok_r(NULL, " ", &saveptr);
if(token){
cr->max_inflight_bytes = atol(token);
}else{
log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty max_inflight_bytes value in configuration.");
}
}else if(!strcmp(token, "max_inflight_messages")){
token = strtok_r(NULL, " ", &saveptr);
if(token){
Expand All @@ -1450,6 +1461,13 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, const
}else{
log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty max_inflight_messages value in configuration.");
}
}else if(!strcmp(token, "max_queued_bytes")){
token = strtok_r(NULL, " ", &saveptr);
if(token){
cr->max_queued_bytes = atol(token); /* 63 bits is ok right? */
}else{
log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty max_queued_bytes value in configuration.");
}
}else if(!strcmp(token, "max_queued_messages")){
token = strtok_r(NULL, " ", &saveptr);
if(token){
Expand Down
2 changes: 2 additions & 0 deletions src/context.c
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ struct mosquitto *context__init(struct mosquitto_db *db, mosq_sock_t sock)
context->last_inflight_msg = NULL;
context->queued_msgs = NULL;
context->last_queued_msg = NULL;
context->msg_bytes = 0;
context->msg_bytes12 = 0;
context->msg_count = 0;
context->msg_count12 = 0;
#ifdef WITH_TLS
Expand Down
98 changes: 87 additions & 11 deletions src/database.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,68 @@ and the Eclipse Distribution License is available at
#include "time_mosq.h"

static int max_inflight = 20;
static unsigned long max_inflight_bytes = 0;
static int max_queued = 100;
static unsigned long max_queued_bytes = 0;

/**
* Is this context ready to take more in flight messages right now?
* @param context the client context of interest
* @param qos qos for the packet of interest
* @return true if more in flight are allowed.
*/
static bool db__ready_for_flight(struct mosquitto *context, int qos)
{
if(qos == 0 || (max_inflight == 0 && max_inflight_bytes == 0)){
return true;
}

bool valid_bytes = context->msg_bytes12 < max_inflight_bytes;
bool valid_count = context->msg_count12 < max_inflight;

if(max_inflight == 0){
return valid_bytes;
}
if(max_inflight_bytes == 0){
return valid_count;
}

return valid_bytes && valid_count;
}


/**
* For a given client context, are more messages allowed to be queued?
* @param context client of interest
* @return true if queuing is allowed, false if should be dropped
*/
static bool db__ready_for_queue(struct mosquitto *context)
{
if(max_queued == 0 && max_queued_bytes == 0){
return true;
}

unsigned long adjust_bytes = max_inflight_bytes;
int adjust_count = max_inflight;
/* nothing in flight for offline clients */
if(context->sock == INVALID_SOCKET){
adjust_bytes = 0;
adjust_count = 0;
}

bool valid_bytes = context->msg_bytes12 - adjust_bytes < max_queued_bytes;
bool valid_count = context->msg_count12 - adjust_count < max_queued;

if(max_queued_bytes == 0){
return valid_count;
}
if(max_queued == 0){
return valid_bytes;
}

return valid_bytes && valid_count;
}


int db__open(struct mosquitto__config *config, struct mosquitto_db *db)
{
Expand Down Expand Up @@ -214,6 +275,12 @@ static void db__message_remove(struct mosquitto_db *db, struct mosquitto *contex
}

if((*msg)->store){
context->msg_count--;
context->msg_bytes -= (*msg)->store->payloadlen;
if((*msg)->qos > 0){
context->msg_count12--;
context->msg_bytes12 -= (*msg)->store->payloadlen;
}
db__msg_store_deref(db, &(*msg)->store);
}
if(last){
Expand All @@ -227,10 +294,6 @@ static void db__message_remove(struct mosquitto_db *db, struct mosquitto *contex
context->last_inflight_msg = NULL;
}
}
context->msg_count--;
if((*msg)->qos > 0){
context->msg_count12--;
}
mosquitto__free(*msg);
if(last){
*msg = last->next;
Expand Down Expand Up @@ -350,7 +413,7 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1
}

if(context->sock != INVALID_SOCKET){
if(qos == 0 || max_inflight == 0 || context->msg_count12 < max_inflight){
if(db__ready_for_flight(context, qos)){
if(dir == mosq_md_out){
switch(qos){
case 0:
Expand All @@ -370,7 +433,7 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1
return 1;
}
}
}else if(max_queued == 0 || context->msg_count12-max_inflight < max_queued){
}else if(db__ready_for_queue(context)){
state = mosq_ms_queued;
rc = 2;
}else{
Expand All @@ -385,7 +448,10 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1
return 2;
}
}else{
if(max_queued > 0 && context->msg_count12 >= max_queued){
if (db__ready_for_queue(context)){
log__printf(NULL, MOSQ_LOG_DEBUG, " off: queueing");
state = mosq_ms_queued;
}else{
G_MSGS_DROPPED_INC();
if(context->is_dropping == false){
context->is_dropping = true;
Expand All @@ -394,8 +460,6 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1
context->id);
}
return 2;
}else{
state = mosq_ms_queued;
}
}
assert(state != mosq_ms_invalid);
Expand Down Expand Up @@ -434,8 +498,10 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1
*last_msg = msg;
}
context->msg_count++;
context->msg_bytes += msg->store->payloadlen;
if(qos > 0){
context->msg_count12++;
context->msg_bytes12 += msg->store->payloadlen;
}

if(db->config->allow_duplicate_messages == false && dir == mosq_md_out && retain == false){
Expand Down Expand Up @@ -519,6 +585,8 @@ int db__messages_delete(struct mosquitto_db *db, struct mosquitto *context)
}
context->queued_msgs = NULL;
context->last_queued_msg = NULL;
context->msg_bytes = 0;
context->msg_bytes12 = 0;
context->msg_count = 0;
context->msg_count12 = 0;

Expand Down Expand Up @@ -651,14 +719,18 @@ int db__message_reconnect_reset(struct mosquitto_db *db, struct mosquitto *conte
struct mosquitto_client_msg *prev = NULL;

msg = context->inflight_msgs;
context->msg_bytes = 0;
context->msg_bytes12 = 0;
context->msg_count = 0;
context->msg_count12 = 0;
while(msg){
context->last_inflight_msg = msg;

context->msg_count++;
context->msg_bytes += msg->store->payloadlen;
if(msg->qos > 0){
context->msg_count12++;
context->msg_bytes12 += msg->store->payloadlen;
}

if(msg->direction == mosq_md_out){
Expand Down Expand Up @@ -702,10 +774,12 @@ int db__message_reconnect_reset(struct mosquitto_db *db, struct mosquitto *conte
context->last_queued_msg = msg;

context->msg_count++;
context->msg_bytes += msg->store->payloadlen;
if(msg->qos > 0){
context->msg_count12++;
context->msg_bytes12 += msg->store->payloadlen;
}
if (max_inflight == 0 || context->msg_count <= max_inflight){
if (db__ready_for_flight(context, msg->qos)) {
switch(msg->qos){
case 0:
msg->state = mosq_ms_publish_qos0;
Expand Down Expand Up @@ -940,10 +1014,12 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context)
return MOSQ_ERR_SUCCESS;
}

void db__limits_set(int inflight, int queued)
void db__limits_set(int inflight, unsigned long inflight_bytes, int queued, unsigned long queued_bytes)
{
max_inflight = inflight;
max_inflight_bytes = inflight_bytes;
max_queued = queued;
max_queued_bytes = queued_bytes;
}

void db__vacuum(void)
Expand Down
2 changes: 1 addition & 1 deletion src/mosquitto_broker.h
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ int db__close(struct mosquitto_db *db);
int persist__backup(struct mosquitto_db *db, bool shutdown);
int persist__restore(struct mosquitto_db *db);
#endif
void db__limits_set(int inflight, int queued);
void db__limits_set(int inflight, unsigned long inflight_bytes, int queued, unsigned long queued_bytes);
/* Return the number of in-flight messages in count. */
int db__message_count(int *count);
int db__message_delete(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir);
Expand Down
4 changes: 4 additions & 0 deletions test/broker/03-publish-qos1-queued-bytes.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
sys_interval 1
max_queued_messages 0
max_queued_bytes 400
port 1888

0 comments on commit afc323e

Please sign in to comment.