Skip to content

Commit

Permalink
Separate broker message queues into in/out.
Browse files Browse the repository at this point in the history
This allows and includes better flow control handling for QoS>0.
  • Loading branch information
ralight committed Apr 17, 2019
1 parent baf1909 commit 8531cb1
Show file tree
Hide file tree
Showing 28 changed files with 658 additions and 521 deletions.
6 changes: 3 additions & 3 deletions lib/actions.c
Original file line number Diff line number Diff line change
Expand Up @@ -139,19 +139,19 @@ int mosquitto_publish_v5(struct mosquitto *mosq, int *mid, const char *topic, in
message->msg.retain = retain;
message->dup = false;

pthread_mutex_lock(&mosq->out_message_mutex);
pthread_mutex_lock(&mosq->msgs_out.mutex);
queue_status = message__queue(mosq, message, mosq_md_out);
if(queue_status == 0){
if(qos == 1){
message->state = mosq_ms_wait_for_puback;
}else if(qos == 2){
message->state = mosq_ms_wait_for_pubrec;
}
pthread_mutex_unlock(&mosq->out_message_mutex);
pthread_mutex_unlock(&mosq->msgs_out.mutex);
return send__publish(mosq, message->msg.mid, message->msg.topic, message->msg.payloadlen, message->msg.payload, message->msg.qos, message->msg.retain, message->dup, outgoing_properties, NULL, 0);
}else{
message->state = mosq_ms_invalid;
pthread_mutex_unlock(&mosq->out_message_mutex);
pthread_mutex_unlock(&mosq->msgs_out.mutex);
return MOSQ_ERR_SUCCESS;
}
}
Expand Down
4 changes: 2 additions & 2 deletions lib/connect.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ static int mosquitto__connect_init(struct mosquitto *mosq, const char *host, int
}

mosq->keepalive = keepalive;
mosq->receive_quota = mosq->receive_maximum;
mosq->send_quota = mosq->send_maximum;
mosq->msgs_in.inflight_quota = mosq->msgs_in.inflight_maximum;
mosq->msgs_out.inflight_quota = mosq->msgs_out.inflight_maximum;

if(mosq->sockpairR != INVALID_SOCKET){
COMPAT_CLOSE(mosq->sockpairR);
Expand Down
4 changes: 2 additions & 2 deletions lib/handle_connack.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ int handle__connack(struct mosquitto *mosq)
}

mosquitto_property_read_byte(properties, MQTT_PROP_MAXIMUM_QOS, &mosq->maximum_qos, false);
mosquitto_property_read_int16(properties, MQTT_PROP_RECEIVE_MAXIMUM, &mosq->send_maximum, false);
mosquitto_property_read_int16(properties, MQTT_PROP_RECEIVE_MAXIMUM, &mosq->msgs_out.inflight_maximum, false);
mosquitto_property_read_int16(properties, MQTT_PROP_SERVER_KEEP_ALIVE, &mosq->keepalive, false);
mosquitto_property_read_int32(properties, MQTT_PROP_MAXIMUM_PACKET_SIZE, &mosq->maximum_packet_size, false);

mosq->send_quota = mosq->send_maximum;
mosq->msgs_out.inflight_quota = mosq->msgs_out.inflight_maximum;

log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s received CONNACK (%d)", mosq->id, reason_code);
pthread_mutex_lock(&mosq->callback_mutex);
Expand Down
5 changes: 4 additions & 1 deletion lib/handle_pubackcomp.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ int handle__pubackcomp(struct mosquitto *mosq, const char *type)
return MOSQ_ERR_PROTOCOL;
}

util__increment_send_quota(mosq);

rc = packet__read_uint16(&mosq->in_packet, &mid);
if(rc) return rc;
qos = type[3] == 'A'?1:2; /* pubAck or pubComp */
Expand All @@ -75,7 +77,7 @@ int handle__pubackcomp(struct mosquitto *mosq, const char *type)
/* Immediately free, we don't do anything with Reason String or User Property at the moment */
mosquitto_property_free_all(&properties);

rc = db__message_delete(db, mosq, mid, mosq_md_out, mosq_ms_wait_for_pubcomp, qos);
rc = db__message_delete_outgoing(db, mosq, mid, mosq_ms_wait_for_pubcomp, qos);
if(rc == MOSQ_ERR_NOT_FOUND){
log__printf(mosq, MOSQ_LOG_WARNING, "Warning: Received %s from %s for an unknown packet identifier %d.", type, mosq->id, mid);
return MOSQ_ERR_SUCCESS;
Expand Down Expand Up @@ -104,6 +106,7 @@ int handle__pubackcomp(struct mosquitto *mosq, const char *type)
pthread_mutex_unlock(&mosq->callback_mutex);
mosquitto_property_free_all(&properties);
}
message__release_to_inflight(mosq, mosq_md_out);

return MOSQ_ERR_SUCCESS;
#endif
Expand Down
9 changes: 5 additions & 4 deletions lib/handle_publish.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ and the Eclipse Distribution License is available at
#include "property_mosq.h"
#include "send_mosq.h"
#include "time_mosq.h"
#include "util_mosq.h"


int handle__publish(struct mosquitto *mosq)
Expand Down Expand Up @@ -67,13 +68,13 @@ int handle__publish(struct mosquitto *mosq)

if(message->msg.qos > 0){
if(mosq->protocol == mosq_p_mqtt5){
if(mosq->receive_quota == 0){
if(mosq->msgs_in.inflight_quota == 0){
message__cleanup(&message);
/* FIXME - should send a DISCONNECT here */
return MOSQ_ERR_PROTOCOL;
}
mosq->receive_quota--;
}
util__decrement_receive_quota(mosq);

rc = packet__read_uint16(&mosq->in_packet, &mid);
if(rc){
Expand Down Expand Up @@ -150,10 +151,10 @@ int handle__publish(struct mosquitto *mosq)
return rc;
case 2:
rc = send__pubrec(mosq, message->msg.mid, 0);
pthread_mutex_lock(&mosq->in_message_mutex);
pthread_mutex_lock(&mosq->msgs_in.mutex);
message->state = mosq_ms_wait_for_pubrel;
message__queue(mosq, message, mosq_md_in);
pthread_mutex_unlock(&mosq->in_message_mutex);
pthread_mutex_unlock(&mosq->msgs_in.mutex);
mosquitto_property_free_all(&properties);
return rc;
default:
Expand Down
6 changes: 4 additions & 2 deletions lib/handle_pubrec.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ int handle__pubrec(struct mosquitto_db *db, struct mosquitto *mosq)
log__printf(NULL, MOSQ_LOG_DEBUG, "Received PUBREC from %s (Mid: %d)", mosq->id, mid);

if(reason_code < 0x80){
rc = db__message_update(mosq, mid, mosq_md_out, mosq_ms_wait_for_pubcomp, 2);
rc = db__message_update_outgoing(mosq, mid, mosq_ms_wait_for_pubcomp, 2);
}else{
return db__message_delete(db, mosq, mid, mosq_md_out, mosq_ms_wait_for_pubrec, 2);
return db__message_delete_outgoing(db, mosq, mid, mosq_ms_wait_for_pubrec, 2);
}
#else
UNUSED(db);
Expand All @@ -90,6 +90,8 @@ int handle__pubrec(struct mosquitto_db *db, struct mosquitto *mosq)
}
pthread_mutex_unlock(&mosq->callback_mutex);
}
util__increment_send_quota(mosq);
message__release_to_inflight(mosq, mosq_md_out);
return MOSQ_ERR_SUCCESS;
}
#endif
Expand Down
2 changes: 1 addition & 1 deletion lib/handle_pubrel.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ int handle__pubrel(struct mosquitto_db *db, struct mosquitto *mosq)
/* Immediately free, we don't do anything with Reason String or User Property at the moment */
mosquitto_property_free_all(&properties);

rc = db__message_release(db, mosq, mid, mosq_md_in);
rc = db__message_release_incoming(db, mosq, mid);
if(rc == MOSQ_ERR_PROTOCOL){
return rc;
}else if(rc != MOSQ_ERR_SUCCESS){
Expand Down
24 changes: 12 additions & 12 deletions lib/loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -332,13 +332,13 @@ int mosquitto_loop_read(struct mosquitto *mosq, int max_packets)
}
#endif

pthread_mutex_lock(&mosq->out_message_mutex);
max_packets = mosq->out_queue_len;
pthread_mutex_unlock(&mosq->out_message_mutex);
pthread_mutex_lock(&mosq->msgs_out.mutex);
max_packets = mosq->msgs_out.queue_len;
pthread_mutex_unlock(&mosq->msgs_out.mutex);

pthread_mutex_lock(&mosq->in_message_mutex);
max_packets += mosq->in_queue_len;
pthread_mutex_unlock(&mosq->in_message_mutex);
pthread_mutex_lock(&mosq->msgs_in.mutex);
max_packets += mosq->msgs_in.queue_len;
pthread_mutex_unlock(&mosq->msgs_in.mutex);

if(max_packets < 1) max_packets = 1;
/* Queue len here tells us how many messages are awaiting processing and
Expand Down Expand Up @@ -367,13 +367,13 @@ int mosquitto_loop_write(struct mosquitto *mosq, int max_packets)
int i;
if(max_packets < 1) return MOSQ_ERR_INVAL;

pthread_mutex_lock(&mosq->out_message_mutex);
max_packets = mosq->out_queue_len;
pthread_mutex_unlock(&mosq->out_message_mutex);
pthread_mutex_lock(&mosq->msgs_out.mutex);
max_packets = mosq->msgs_out.queue_len;
pthread_mutex_unlock(&mosq->msgs_out.mutex);

pthread_mutex_lock(&mosq->in_message_mutex);
max_packets += mosq->in_queue_len;
pthread_mutex_unlock(&mosq->in_message_mutex);
pthread_mutex_lock(&mosq->msgs_in.mutex);
max_packets += mosq->msgs_in.queue_len;
pthread_mutex_unlock(&mosq->msgs_in.mutex);

if(max_packets < 1) max_packets = 1;
/* Queue len here tells us how many messages are awaiting processing and
Expand Down
Loading

0 comments on commit 8531cb1

Please sign in to comment.