Skip to content

Commit

Permalink
Process receive maximum (as max_inflight_messages).
Browse files Browse the repository at this point in the history
  • Loading branch information
ralight committed Dec 30, 2018
1 parent 00c4fd1 commit 16e83bf
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 21 deletions.
2 changes: 1 addition & 1 deletion lib/mosquitto_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -279,11 +279,11 @@ struct mosquitto {
char threaded;
struct mosquitto__packet *out_packet_last;
int inflight_messages;
int max_inflight_messages;
# ifdef WITH_SRV
ares_channel achan;
# endif
#endif
int max_inflight_messages;

#ifdef WITH_BROKER
UT_hash_handle hh_id;
Expand Down
18 changes: 9 additions & 9 deletions src/conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ struct config_recurse {
int log_type_set;
unsigned long max_inflight_bytes;
unsigned long max_queued_bytes;
int max_inflight_messages;
int max_queued_messages;
};

Expand Down Expand Up @@ -213,6 +212,7 @@ static void config__init_reload(struct mosquitto_db *db, struct mosquitto__confi
#endif
config->log_timestamp = true;
config->max_keepalive = 65535;
config->max_inflight_messages = 20;
config->persistence = false;
mosquitto__free(config->persistence_location);
config->persistence_location = NULL;
Expand Down Expand Up @@ -598,7 +598,6 @@ int config__read(struct mosquitto_db *db, struct mosquitto__config *config, bool
cr.log_type = MOSQ_LOG_NONE;
cr.log_type_set = 0;
cr.max_inflight_bytes = 0;
cr.max_inflight_messages = 20;
cr.max_queued_bytes = 0;
cr.max_queued_messages = 100;

Expand Down Expand Up @@ -681,7 +680,7 @@ int config__read(struct mosquitto_db *db, struct mosquitto__config *config, bool
config->user = "mosquitto";
}

db__limits_set(cr.max_inflight_messages, cr.max_inflight_bytes, cr.max_queued_messages, cr.max_queued_bytes);
db__limits_set(cr.max_inflight_bytes, cr.max_queued_messages, cr.max_queued_bytes);

#ifdef WITH_BRIDGE
for(i=0; i<config->bridge_count; i++){
Expand Down Expand Up @@ -1511,13 +1510,14 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty max_inflight_bytes value in configuration.");
}
}else if(!strcmp(token, "max_inflight_messages")){
token = strtok_r(NULL, " ", &saveptr);
if(token){
cr->max_inflight_messages = atoi(token);
if(cr->max_inflight_messages < 0) cr->max_inflight_messages = 0;
}else{
log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty max_inflight_messages value in configuration.");
if(conf__parse_int(&token, "max_inflight_messages", &tmp_int, saveptr)) return MOSQ_ERR_INVAL;
if(tmp_int < 0 || tmp_int == 65535){
tmp_int = 0;
}else if(tmp_int > 65535){
log__printf(NULL, MOSQ_LOG_ERR, "Error: max_inflight_messages must be <= 65535.");
return MOSQ_ERR_INVAL;
}
config->max_inflight_messages = tmp_int;
}else if(!strcmp(token, "max_keepalive")){
if(conf__parse_int(&token, "max_keepalive", &tmp_int, saveptr)) return MOSQ_ERR_INVAL;
if(tmp_int < 10 || tmp_int > 65535){
Expand Down
1 change: 1 addition & 0 deletions src/context.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ struct mosquitto *context__init(struct mosquitto_db *db, mosq_sock_t sock)
context->last_inflight_msg = NULL;
context->queued_msgs = NULL;
context->last_queued_msg = NULL;
context->max_inflight_messages = db->config->max_inflight_messages;
context->msg_bytes = 0;
context->msg_bytes12 = 0;
context->msg_count = 0;
Expand Down
18 changes: 8 additions & 10 deletions src/database.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ and the Eclipse Distribution License is available at
#include "sys_tree.h"
#include "time_mosq.h"

static int max_inflight = 20;
static unsigned long max_inflight_bytes = 0;
static int max_queued = 100;
static unsigned long max_queued_bytes = 0;
Expand All @@ -38,14 +37,14 @@ static unsigned long max_queued_bytes = 0;
*/
static bool db__ready_for_flight(struct mosquitto *context, int qos)
{
if(qos == 0 || (max_inflight == 0 && max_inflight_bytes == 0)){
if(qos == 0 || (context->max_inflight_messages == 0 && max_inflight_bytes == 0)){
return true;
}

bool valid_bytes = context->msg_bytes12 < max_inflight_bytes;
bool valid_count = context->msg_count12 < max_inflight;
bool valid_count = context->msg_count12 < context->max_inflight_messages;

if(max_inflight == 0){
if(context->max_inflight_messages == 0){
return valid_bytes;
}
if(max_inflight_bytes == 0){
Expand Down Expand Up @@ -73,7 +72,7 @@ static bool db__ready_for_queue(struct mosquitto *context, int qos)
unsigned long source_bytes = context->msg_bytes12;
int source_count = context->msg_count12;
unsigned long adjust_bytes = max_inflight_bytes;
int adjust_count = max_inflight;
int adjust_count = context->max_inflight_messages;

/* nothing in flight for offline clients */
if(context->sock == INVALID_SOCKET){
Expand Down Expand Up @@ -307,7 +306,7 @@ int db__message_delete(struct mosquitto_db *db, struct mosquitto *context, uint1
tail = tail->next;
}
}
while (context->queued_msgs && (max_inflight == 0 || msg_index < max_inflight)){
while (context->queued_msgs && (context->max_inflight_messages == 0 || msg_index < context->max_inflight_messages)){
msg_index++;
tail = context->queued_msgs;
tail->timestamp = mosquitto_time();
Expand Down Expand Up @@ -838,7 +837,7 @@ int db__message_release(struct mosquitto_db *db, struct mosquitto *context, uint
}
}

while(context->queued_msgs && (max_inflight == 0 || msg_index < max_inflight)){
while(context->queued_msgs && (context->max_inflight_messages == 0 || msg_index < context->max_inflight_messages)){
msg_index++;
tail = context->queued_msgs;
tail->timestamp = mosquitto_time();
Expand Down Expand Up @@ -989,7 +988,7 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context)
}
}

while(context->queued_msgs && (max_inflight == 0 || msg_count < max_inflight)){
while(context->queued_msgs && (context->max_inflight_messages == 0 || msg_count < context->max_inflight_messages)){
msg_count++;
tail = context->queued_msgs;
if(tail->direction == mosq_md_out){
Expand Down Expand Up @@ -1022,9 +1021,8 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context)
return MOSQ_ERR_SUCCESS;
}

void db__limits_set(int inflight, unsigned long inflight_bytes, int queued, unsigned long queued_bytes)
void db__limits_set(unsigned long inflight_bytes, int queued, unsigned long queued_bytes)
{
max_inflight = inflight;
max_inflight_bytes = inflight_bytes;
max_queued = queued;
max_queued_bytes = queued_bytes;
Expand Down
3 changes: 2 additions & 1 deletion src/mosquitto_broker_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ struct mosquitto__config {
bool log_timestamp;
char *log_file;
FILE *log_fptr;
uint16_t max_inflight_messages;
uint16_t max_keepalive;
uint32_t message_size_limit;
bool persistence;
Expand Down Expand Up @@ -552,7 +553,7 @@ int db__close(struct mosquitto_db *db);
int persist__backup(struct mosquitto_db *db, bool shutdown);
int persist__restore(struct mosquitto_db *db);
#endif
void db__limits_set(int inflight, unsigned long inflight_bytes, int queued, unsigned long queued_bytes);
void db__limits_set(unsigned long inflight_bytes, int queued, unsigned long queued_bytes);
/* Return the number of in-flight messages in count. */
int db__message_count(int *count);
int db__message_delete(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir);
Expand Down
10 changes: 10 additions & 0 deletions src/property_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@ int property__process_connect(struct mosquitto *context, mosquitto_property *pro
while(p){
if(p->identifier == MQTT_PROP_SESSION_EXPIRY_INTERVAL){
context->session_expiry_interval = p->value.i32;
}else if(p->identifier == MQTT_PROP_RECEIVE_MAXIMUM){
if(p->value.i16 == 0){
return MOSQ_ERR_PROTOCOL;
}

if(p->value.i16 == 65535){
context->max_inflight_messages = 0;
}else{
context->max_inflight_messages = p->value.i16;
}
}
p = p->next;
}
Expand Down

0 comments on commit 16e83bf

Please sign in to comment.