Skip to content

Commit

Permalink
Optimise use of db__message_write()
Browse files Browse the repository at this point in the history
New messages are now queued for clients when old ones are sent, rather than on every iteration of the main loop. This produces good performance improvements.
  • Loading branch information
ralight committed Aug 12, 2020
1 parent 69f84bb commit 58aa41c
Show file tree
Hide file tree
Showing 23 changed files with 142 additions and 104 deletions.
94 changes: 59 additions & 35 deletions src/database.c
Expand Up @@ -346,7 +346,7 @@ int db__message_delete_outgoing(struct mosquitto_db *db, struct mosquitto *conte
return MOSQ_ERR_SUCCESS;
}

int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, int qos, bool retain, struct mosquitto_msg_store *stored, mosquitto_property *properties)
int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, int qos, bool retain, struct mosquitto_msg_store *stored, mosquitto_property *properties, bool update)
{
struct mosquitto_client_msg *msg;
struct mosquitto_msg_data *msg_data;
Expand Down Expand Up @@ -522,15 +522,15 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1
if(dir == mosq_md_out && msg->qos > 0){
util__decrement_send_quota(context);
}
#ifdef WITH_WEBSOCKETS
if(context->wsi && rc == 0){
return db__message_write(db, context);
}else{
return rc;

if(dir == mosq_md_out && update && context->current_out_packet == NULL){
rc = db__message_write_inflight_out(db, context);
if(rc) return rc;
rc = db__message_write_queued_out(db, context);
if(rc) return rc;
}
#else

return rc;
#endif
}

int db__message_update_outgoing(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_state state, int qos)
Expand Down Expand Up @@ -926,33 +926,17 @@ int db__message_release_incoming(struct mosquitto_db *db, struct mosquitto *cont
}
}

int db__message_write(struct mosquitto_db *db, struct mosquitto *context)
int db__message_write_inflight_in(struct mosquitto_db *db, struct mosquitto *context)
{
int rc;
struct mosquitto_client_msg *tail, *tmp;
uint16_t mid;
int retries;
int retain;
const char *topic;
int qos;
uint32_t payloadlen;
const void *payload;
int msg_count = 0;
mosquitto_property *cmsg_props = NULL, *store_props = NULL;
int rc;
time_t now = 0;
uint32_t expiry_interval;

if(!context || context->sock == INVALID_SOCKET
|| (context->state == mosq_cs_active && !context->id)){
return MOSQ_ERR_INVAL;
}

if(context->state != mosq_cs_active){
return MOSQ_ERR_SUCCESS;
}

DL_FOREACH_SAFE(context->msgs_in.inflight, tail, tmp){
msg_count++;
if(tail->store->message_expiry_time){
if(now == 0){
now = time(NULL);
Expand All @@ -963,11 +947,10 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context)
continue;
}
}
mid = tail->mid;

switch(tail->state){
case mosq_ms_send_pubrec:
rc = send__pubrec(context, mid, 0, NULL);
rc = send__pubrec(context, tail->mid, 0, NULL);
if(!rc){
tail->state = mosq_ms_wait_for_pubrel;
}else{
Expand All @@ -976,7 +959,7 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context)
break;

case mosq_ms_resend_pubcomp:
rc = send__pubcomp(context, mid, NULL);
rc = send__pubcomp(context, tail->mid, NULL);
if(!rc){
tail->state = mosq_ms_wait_for_pubrel;
}else{
Expand All @@ -997,9 +980,31 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context)
break;
}
}
return MOSQ_ERR_SUCCESS;
}


int db__message_write_inflight_out(struct mosquitto_db *db, struct mosquitto *context)
{
struct mosquitto_client_msg *tail, *tmp;
mosquitto_property *cmsg_props = NULL, *store_props = NULL;
int rc;
uint16_t mid;
int retries;
int retain;
const char *topic;
int qos;
uint32_t payloadlen;
const void *payload;
time_t now = 0;
uint32_t expiry_interval;

if(context->state != mosq_cs_active || context->sock == INVALID_SOCKET){
return MOSQ_ERR_SUCCESS;
}

DL_FOREACH_SAFE(context->msgs_out.inflight, tail, tmp){
msg_count++;
expiry_interval = 0;
if(tail->store->message_expiry_time){
if(now == 0){
now = time(NULL);
Expand Down Expand Up @@ -1080,14 +1085,24 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context)
break;
}
}
return MOSQ_ERR_SUCCESS;
}


int db__message_write_queued_in(struct mosquitto_db *db, struct mosquitto *context)
{
struct mosquitto_client_msg *tail, *tmp;
int rc;

if(context->state != mosq_cs_active){
return MOSQ_ERR_SUCCESS;
}

DL_FOREACH_SAFE(context->msgs_in.queued, tail, tmp){
if(context->msgs_out.inflight_maximum != 0 && context->msgs_in.inflight_quota == 0){
break;
}

msg_count++;

if(tail->qos == 2){
tail->state = mosq_ms_send_pubrec;
db__message_dequeue_first(context, &context->msgs_in);
Expand All @@ -1099,14 +1114,23 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context)
}
}
}
return MOSQ_ERR_SUCCESS;
}


int db__message_write_queued_out(struct mosquitto_db *db, struct mosquitto *context)
{
struct mosquitto_client_msg *tail, *tmp;

if(context->state != mosq_cs_active){
return MOSQ_ERR_SUCCESS;
}

DL_FOREACH_SAFE(context->msgs_out.queued, tail, tmp){
if(context->msgs_out.inflight_maximum != 0 && context->msgs_out.inflight_quota == 0){
break;
}

msg_count++;

switch(tail->qos){
case 0:
tail->state = mosq_ms_publish_qos0;
Expand All @@ -1120,10 +1144,10 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context)
}
db__message_dequeue_first(context, &context->msgs_out);
}

return MOSQ_ERR_SUCCESS;
}


void db__limits_set(unsigned long inflight_bytes, int queued, unsigned long queued_bytes)
{
max_inflight_bytes = inflight_bytes;
Expand Down
4 changes: 4 additions & 0 deletions src/handle_connect.c
Expand Up @@ -272,6 +272,10 @@ int connect__on_authorised(struct mosquitto_db *db, struct mosquitto *context, v
mosquitto__set_state(context, mosq_cs_active);
rc = send__connack(db, context, connect_ack, CONNACK_ACCEPTED, connack_props);
mosquitto_property_free_all(&connack_props);
if(rc) return rc;
rc = db__message_write_inflight_out(db, context);
if(rc) return rc;
rc = db__message_write_queued_out(db, context);
return rc;
error:
free(auth_data_out);
Expand Down
3 changes: 2 additions & 1 deletion src/handle_publish.c
Expand Up @@ -290,7 +290,7 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context)
break;
case 2:
if(dup == 0){
res = db__message_insert(db, context, stored->source_mid, mosq_md_in, stored->qos, stored->retain, stored, NULL);
res = db__message_insert(db, context, stored->source_mid, mosq_md_in, stored->qos, stored->retain, stored, NULL, false);
}else{
res = 0;
}
Expand All @@ -304,6 +304,7 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context)
break;
}

db__message_write_queued_in(db, context);
return rc;
process_bad_message:
rc = 1;
Expand Down
7 changes: 7 additions & 0 deletions src/handle_subscribe.c
Expand Up @@ -220,6 +220,13 @@ int handle__subscribe(struct mosquitto_db *db, struct mosquitto *context)
db->persistence_changes++;
#endif

if(context->current_out_packet == NULL){
rc = db__message_write_inflight_out(db, context);
if(rc) return rc;
rc = db__message_write_queued_out(db, context);
if(rc) return rc;
}

return rc;
}

Expand Down
45 changes: 15 additions & 30 deletions src/loop.c
Expand Up @@ -166,14 +166,12 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li
time_t last_backup = mosquitto_time();
#endif
time_t now = 0;
int time_count;
time_t last_keepalive_check = mosquitto_time();
struct mosquitto *context, *ctxt_tmp;
#ifdef WITH_BRIDGE
int rc;
#endif
#ifdef WITH_WEBSOCKETS
int i;
#endif
int rc;


#if defined(WITH_WEBSOCKETS) && LWS_LIBRARY_VERSION_NUMBER == 3002000
Expand All @@ -197,41 +195,28 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li
}
#endif

time_count = 0;
HASH_ITER(hh_sock, db->contexts_by_sock, context, ctxt_tmp){
if(time_count > 0){
time_count--;
}else{
time_count = 1000;
now = mosquitto_time();
}
now = mosquitto_time();
if(last_keepalive_check != now){
last_keepalive_check = now;

if(context->sock != INVALID_SOCKET){
/* Local bridges never time out in this fashion. */
if(!(context->keepalive)
|| context->bridge
|| now - context->last_msg_in <= (time_t)(context->keepalive)*3/2){
HASH_ITER(hh_sock, db->contexts_by_sock, context, ctxt_tmp){
if(context->sock != INVALID_SOCKET){
/* Local bridges never time out in this fashion. */
if(!(context->keepalive)
|| context->bridge
|| now - context->last_msg_in <= (time_t)(context->keepalive)*3/2){

if(db__message_write(db, context) == MOSQ_ERR_SUCCESS){
if(context->current_out_packet || context->state == mosq_cs_connect_pending || context->ws_want_write){
mux__add_out(db, context);
context->ws_want_write = false;
}
else{
mux__remove_out(db, context);
}
}else{
do_disconnect(db, context, MOSQ_ERR_CONN_LOST);
/* Client has exceeded keepalive*1.5 */
do_disconnect(db, context, MOSQ_ERR_KEEPALIVE);
}
}else{
/* Client has exceeded keepalive*1.5 */
do_disconnect(db, context, MOSQ_ERR_KEEPALIVE);
}
}
}


#ifdef WITH_BRIDGE
bridge_check(db);
#endif

rc = mux__handle(db, listensock, listensock_count);
if(rc) return rc;
Expand Down
6 changes: 4 additions & 2 deletions src/mosquitto_broker_internal.h
Expand Up @@ -653,10 +653,9 @@ void db__limits_set(unsigned long inflight_bytes, int queued, unsigned long queu
/* Return the number of in-flight messages in count. */
int db__message_count(int *count);
int db__message_delete_outgoing(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_state expect_state, int qos);
int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, int qos, bool retain, struct mosquitto_msg_store *stored, mosquitto_property *properties);
int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, int qos, bool retain, struct mosquitto_msg_store *stored, mosquitto_property *properties, bool update);
int db__message_release_incoming(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid);
int db__message_update_outgoing(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_state state, int qos);
int db__message_write(struct mosquitto_db *db, struct mosquitto *context);
void db__message_dequeue_first(struct mosquitto *context, struct mosquitto_msg_data *msg_data);
int db__messages_delete(struct mosquitto_db *db, struct mosquitto *context, bool force_free);
int db__messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain, uint32_t message_expiry_interval, mosquitto_property **properties);
Expand All @@ -672,6 +671,9 @@ void db__msg_store_free(struct mosquitto_msg_store *store);
int db__message_reconnect_reset(struct mosquitto_db *db, struct mosquitto *context);
void sys_tree__init(struct mosquitto_db *db);
void sys_tree__update(struct mosquitto_db *db, int interval, time_t start_time);
int db__message_write_inflight_out(struct mosquitto_db *db, struct mosquitto *context);
int db__message_write_queued_out(struct mosquitto_db *db, struct mosquitto *context);
int db__message_write_queued_in(struct mosquitto_db *db, struct mosquitto *context);

/* ============================================================
* Subscription functions
Expand Down
2 changes: 1 addition & 1 deletion src/retain.c
Expand Up @@ -184,7 +184,7 @@ static int retain__process(struct mosquitto_db *db, struct mosquitto__retainhier
if(subscription_identifier > 0){
mosquitto_property_add_varint(&properties, MQTT_PROP_SUBSCRIPTION_IDENTIFIER, subscription_identifier);
}
return db__message_insert(db, context, mid, mosq_md_out, qos, true, retained, properties);
return db__message_insert(db, context, mid, mosq_md_out, qos, true, retained, properties, false);
}


Expand Down
2 changes: 1 addition & 1 deletion src/subs.c
Expand Up @@ -95,7 +95,7 @@ static int subs__send(struct mosquitto_db *db, struct mosquitto__subleaf *leaf,
if(leaf->identifier){
mosquitto_property_add_varint(&properties, MQTT_PROP_SUBSCRIPTION_IDENTIFIER, leaf->identifier);
}
if(db__message_insert(db, leaf->context, mid, mosq_md_out, msg_qos, client_retain, stored, properties) == 1){
if(db__message_insert(db, leaf->context, mid, mosq_md_out, msg_qos, client_retain, stored, properties, true) == 1){
return 1;
}
}else{
Expand Down
12 changes: 9 additions & 3 deletions src/websockets.c
Expand Up @@ -242,6 +242,7 @@ static int callback_mqtt(struct libwebsocket_context *context,
}
mosq->sock = libwebsocket_get_socket_fd(wsi);
HASH_ADD(hh_sock, db->contexts_by_sock, sock, sizeof(mosq->sock), mosq);
mux__add_in(db, mosq);
break;

case LWS_CALLBACK_CLOSED:
Expand Down Expand Up @@ -273,7 +274,7 @@ static int callback_mqtt(struct libwebsocket_context *context,
return -1;
}

db__message_write(db, mosq);
db__message_write_inflight_out(db, mosq);

if(mosq->out_packet && !mosq->current_out_packet){
mosq->current_out_packet = mosq->out_packet;
Expand Down Expand Up @@ -672,8 +673,13 @@ static int callback_http(struct libwebsocket_context *context,
case LWS_CALLBACK_DEL_POLL_FD:
case LWS_CALLBACK_CHANGE_MODE_POLL_FD:
HASH_FIND(hh_sock, db->contexts_by_sock, &pollargs->fd, sizeof(pollargs->fd), mosq);
if(mosq && (pollargs->events & POLLOUT)){
mosq->ws_want_write = true;
if(mosq){
if(pollargs->events & POLLOUT){
mux__add_out(db, mosq);
mosq->ws_want_write = true;
}else{
mux__remove_out(db, mosq);
}
}
break;

Expand Down
4 changes: 2 additions & 2 deletions test/broker/02-subpub-qos1-nolocal.py
Expand Up @@ -42,9 +42,9 @@ def do_test():
mosq_test.do_send_receive(sock, subscribe2_packet, suback2_packet, "suback2")

mosq_test.do_send_receive(sock, publish_packet, puback_packet, "puback")
mosq_test.do_send_receive(sock, publish2_packet, puback2_packet, "puback2")
sock.send(publish2_packet)

mosq_test.expect_packet(sock, "publish3", publish3_packet)
mosq_test.receive_unordered(sock, puback2_packet, publish3_packet, "puback2/publish3")
rc = 0

sock.close()
Expand Down
5 changes: 2 additions & 3 deletions test/broker/02-subpub-qos1.py
Expand Up @@ -30,9 +30,8 @@ def do_test(proto_ver):

mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback")

mosq_test.do_send_receive(sock, publish_packet, puback_packet, "puback")

mosq_test.expect_packet(sock, "publish2", publish_packet2)
sock.send(publish_packet)
mosq_test.receive_unordered(sock, puback_packet, publish_packet2, "puback/publish2")
rc = 0

sock.close()
Expand Down

0 comments on commit 58aa41c

Please sign in to comment.