Skip to content

Commit

Permalink
broker: support max_queued_bytes option
Browse files Browse the repository at this point in the history
TODO: inflight bytes?
TODO: queue_qos0_messages still looks flaky...
TODO: review fixmes?

Limiting queued message depth purely based on message count is hard to
control for memory constrained devices.  The size of messages can vary
wildly, from a few bytes, to a few kilobytes.  Support a new
max_queued_bytes option, and drop packets when the first limit is
reached.  Option defaults to 0 (disabled) by default.

Fixes (partof) eclipse#100

Signed-off-by: Karl Palsson <[email protected]>
  • Loading branch information
karlp committed Jun 29, 2016
1 parent af2f93c commit f7ac25e
Show file tree
Hide file tree
Showing 10 changed files with 248 additions and 7 deletions.
2 changes: 2 additions & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ Broker:
- mosquitto_db_dump tool can now output some stats on clients.
- new $SYS/broker/store/messages/count (deprecates $SYS/broker/messages/stored)
- new $SYS/broker/store/messages/bytes
- max_queued_bytes feature to limit queues by real size rather than
than just message count. Closes Eclipse #452919 or Github #100

Client library:
- Outgoing messages with QoS>1 are no longer retried after a timeout period.
Expand Down
2 changes: 2 additions & 0 deletions lib/mosquitto_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ struct mosquitto {
struct mosquitto_client_msg *last_inflight_msg;
struct mosquitto_client_msg *queued_msgs;
struct mosquitto_client_msg *last_queued_msg;
unsigned long msg_bytes;
unsigned long msg_bytes12;
int msg_count;
int msg_count12;
struct mosquitto__acl_user *acl_list;
Expand Down
17 changes: 16 additions & 1 deletion man/mosquitto.conf.5.xml
Original file line number Diff line number Diff line change
Expand Up @@ -371,14 +371,29 @@
<para>Reloaded on reload signal.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>max_queued_bytes</option> <replaceable>count</replaceable></term>
<listitem>
<para>QoS 1 and 2 messages above those currently in-flight will be
queued (per client) until this limit is exceeded.
Defaults to 0. (No maximum) See also the
<option>max_queued_messages</option> option.
If both max_queued_messages and max_queued_bytes are specified,
packets will be queued until the first limit is reached.
</para>
<para>Reloaded on reload signal.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>max_queued_messages</option> <replaceable>count</replaceable></term>
<listitem>
<para>The maximum number of QoS 1 or 2 messages to hold in the
queue (per client) above those messages that are currently
in flight. Defaults to 100. Set to 0 for no maximum (not
recommended). See also the
<option>queue_qos0_messages</option> option.</para>
<option>queue_qos0_messages</option> and
<option>max_queued_bytes</option> options.
</para>
<para>Reloaded on reload signal.</para>
</listitem>
</varlistentry>
Expand Down
10 changes: 9 additions & 1 deletion mosquitto.conf
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,19 @@
# above those that are currently in-flight. Defaults to 100. Set
# to 0 for no maximum (not recommended).
# See also queue_qos0_messages.
# See also max_queued_bytes.
#max_queued_messages 100

# QoS 1 and 2 messages above those currently in-flight will be queued per
# client until this limit is exceeded. Defaults to 0. (No maximum)
# See also max_queued_messages.
# If both max_queued_messages and max_queued_bytes are specified, packets will
# be queued until the first limit is reached.
#max_queued_bytes 0

# Set to true to queue messages with QoS 0 when a persistent client is
# disconnected. These messages are included in the limit imposed by
# max_queued_messages.
# max_queued_messages and max_queued_bytes
# Defaults to false.
# This is a non-standard option for the MQTT v3.1 spec but is allowed in
# v3.1.1.
Expand Down
20 changes: 19 additions & 1 deletion src/conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ struct config_recurse {
int log_dest_set;
int log_type;
int log_type_set;
unsigned long max_inflight_bytes;
int max_inflight_messages;
unsigned long max_queued_bytes;
int max_queued_messages;
};

Expand Down Expand Up @@ -484,7 +486,9 @@ int config__read(struct mosquitto__config *config, bool reload)
cr.log_dest_set = 0;
cr.log_type = MOSQ_LOG_NONE;
cr.log_type_set = 0;
cr.max_inflight_bytes = 0;
cr.max_inflight_messages = 20;
cr.max_queued_bytes = 0;
cr.max_queued_messages = 100;

if(!config->config_file) return 0;
Expand Down Expand Up @@ -524,7 +528,7 @@ int config__read(struct mosquitto__config *config, bool reload)
config->user = "mosquitto";
}

db__limits_set(cr.max_inflight_messages, cr.max_queued_messages);
db__limits_set(cr.max_inflight_messages, cr.max_inflight_bytes, cr.max_queued_messages, cr.max_queued_bytes);

#ifdef WITH_BRIDGE
for(i=0; i<config->bridge_count; i++){
Expand Down Expand Up @@ -1442,6 +1446,13 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, const
}else{
log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty max_connections value in configuration.");
}
}else if(!strcmp(token, "max_inflight_bytes")){
token = strtok_r(NULL, " ", &saveptr);
if(token){
cr->max_inflight_bytes = atol(token);
}else{
log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty max_inflight_bytes value in configuration.");
}
}else if(!strcmp(token, "max_inflight_messages")){
token = strtok_r(NULL, " ", &saveptr);
if(token){
Expand All @@ -1450,6 +1461,13 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, const
}else{
log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty max_inflight_messages value in configuration.");
}
}else if(!strcmp(token, "max_queued_bytes")){
token = strtok_r(NULL, " ", &saveptr);
if(token){
cr->max_queued_bytes = atol(token); /* 63 bits is ok right? */
}else{
log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty max_queued_bytes value in configuration.");
}
}else if(!strcmp(token, "max_queued_messages")){
token = strtok_r(NULL, " ", &saveptr);
if(token){
Expand Down
2 changes: 2 additions & 0 deletions src/context.c
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ struct mosquitto *context__init(struct mosquitto_db *db, mosq_sock_t sock)
context->last_inflight_msg = NULL;
context->queued_msgs = NULL;
context->last_queued_msg = NULL;
context->msg_bytes = 0;
context->msg_bytes12 = 0;
context->msg_count = 0;
context->msg_count12 = 0;
#ifdef WITH_TLS
Expand Down
32 changes: 29 additions & 3 deletions src/database.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ and the Eclipse Distribution License is available at
#include "time_mosq.h"

static int max_inflight = 20;
static unsigned long max_inflight_bytes = 0;
static int max_queued = 100;
static unsigned long max_queued_bytes = 0;

int db__open(struct mosquitto__config *config, struct mosquitto_db *db)
{
Expand Down Expand Up @@ -228,8 +230,10 @@ static void db__message_remove(struct mosquitto_db *db, struct mosquitto *contex
}
}
context->msg_count--;
context->msg_bytes -= (long unsigned int)&(*msg)->store->payloadlen;
if((*msg)->qos > 0){
context->msg_count12--;
context->msg_bytes12 -= (long unsigned int)&(*msg)->store->payloadlen;
}
mosquitto__free(*msg);
if(last){
Expand Down Expand Up @@ -351,6 +355,7 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1

if(context->sock != INVALID_SOCKET){
if(qos == 0 || max_inflight == 0 || context->msg_count12 < max_inflight){
// FIXME - probably need a new bytes in flight?
if(dir == mosq_md_out){
switch(qos){
case 0:
Expand All @@ -370,7 +375,14 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1
return 1;
}
}
}else if(max_queued == 0 || context->msg_count12-max_inflight < max_queued){
}else if( (max_queued == 0 && max_queued_bytes == 0) ||
(max_queued_bytes == 0 &&
(context->msg_count12-max_inflight < max_queued)) ||
(max_queued == 0 &&
(context->msg_bytes12 < max_queued_bytes)) ||
((context->msg_count12-max_inflight < max_queued) &&
(context->msg_bytes12 < max_queued_bytes))){
// FIXME - bytes inflight? fixme - how does queue qos 0 work here?!
state = mosq_ms_queued;
rc = 2;
}else{
Expand All @@ -385,7 +397,8 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1
return 2;
}
}else{
if(max_queued > 0 && context->msg_count12 >= max_queued){
if ((max_queued_bytes == 0 && max_queued > 0 && context->msg_count12 >= max_queued) ||
(max_queued == 0 && max_queued_bytes > 0 && context->msg_bytes12 >= max_queued_bytes)){
G_MSGS_DROPPED_INC();
if(context->is_dropping == false){
context->is_dropping = true;
Expand Down Expand Up @@ -434,8 +447,10 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1
*last_msg = msg;
}
context->msg_count++;
context->msg_bytes += msg->store->payloadlen;
if(qos > 0){
context->msg_count12++;
context->msg_bytes12 += msg->store->payloadlen;
}

if(db->config->allow_duplicate_messages == false && dir == mosq_md_out && retain == false){
Expand Down Expand Up @@ -519,6 +534,8 @@ int db__messages_delete(struct mosquitto_db *db, struct mosquitto *context)
}
context->queued_msgs = NULL;
context->last_queued_msg = NULL;
context->msg_bytes = 0;
context->msg_bytes12 = 0;
context->msg_count = 0;
context->msg_count12 = 0;

Expand Down Expand Up @@ -651,14 +668,18 @@ int db__message_reconnect_reset(struct mosquitto_db *db, struct mosquitto *conte
struct mosquitto_client_msg *prev = NULL;

msg = context->inflight_msgs;
context->msg_bytes = 0;
context->msg_bytes12 = 0;
context->msg_count = 0;
context->msg_count12 = 0;
while(msg){
context->last_inflight_msg = msg;

context->msg_count++;
context->msg_bytes += msg->store->payloadlen;
if(msg->qos > 0){
context->msg_count12++;
context->msg_bytes12 += msg->store->payloadlen;
}

if(msg->direction == mosq_md_out){
Expand Down Expand Up @@ -702,10 +723,13 @@ int db__message_reconnect_reset(struct mosquitto_db *db, struct mosquitto *conte
context->last_queued_msg = msg;

context->msg_count++;
context->msg_bytes += msg->store->payloadlen;
if(msg->qos > 0){
context->msg_count12++;
context->msg_bytes12 += msg->store->payloadlen;
}
if (max_inflight == 0 || context->msg_count <= max_inflight){
// FIXME - here too?
switch(msg->qos){
case 0:
msg->state = mosq_ms_publish_qos0;
Expand Down Expand Up @@ -940,10 +964,12 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context)
return MOSQ_ERR_SUCCESS;
}

void db__limits_set(int inflight, int queued)
void db__limits_set(int inflight, unsigned long inflight_bytes, int queued, unsigned long queued_bytes)
{
max_inflight = inflight;
max_inflight_bytes = inflight_bytes;
max_queued = queued;
max_queued_bytes = queued_bytes;
}

void db__vacuum(void)
Expand Down
2 changes: 1 addition & 1 deletion src/mosquitto_broker.h
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ int db__close(struct mosquitto_db *db);
int persist__backup(struct mosquitto_db *db, bool shutdown);
int persist__restore(struct mosquitto_db *db);
#endif
void db__limits_set(int inflight, int queued);
void db__limits_set(int inflight, unsigned long inflight_bytes, int queued, unsigned long queued_bytes);
/* Return the number of in-flight messages in count. */
int db__message_count(int *count);
int db__message_delete(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir);
Expand Down
4 changes: 4 additions & 0 deletions test/broker/03-publish-qos1-queued-bytes.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
sys_interval 1
max_queued_messages 0
max_queued_bytes 400
port 1888

0 comments on commit f7ac25e

Please sign in to comment.