Skip to content

Commit

Permalink
Receive maximum support for clients.
Browse files Browse the repository at this point in the history
  • Loading branch information
ralight committed Jan 8, 2019
1 parent 0546e7b commit 67c1d44
Show file tree
Hide file tree
Showing 11 changed files with 72 additions and 7 deletions.
3 changes: 2 additions & 1 deletion lib/connect.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ static int mosquitto__connect_init(struct mosquitto *mosq, const char *host, int
}

mosq->keepalive = keepalive;
mosq->receive_quota = mosq->receive_maximum;

if(mosq->sockpairR != INVALID_SOCKET){
COMPAT_CLOSE(mosq->sockpairR);
Expand Down Expand Up @@ -191,7 +192,7 @@ static int mosquitto__reconnect(struct mosquitto *mosq, bool blocking, const mos
mosq->ping_t = 0;

packet__cleanup(&mosq->in_packet);

pthread_mutex_lock(&mosq->current_out_packet_mutex);
pthread_mutex_lock(&mosq->out_packet_mutex);

Expand Down
9 changes: 9 additions & 0 deletions lib/handle_publish.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@ int handle__publish(struct mosquitto *mosq)
}

if(message->msg.qos > 0){
if(mosq->protocol == mosq_p_mqtt5){
if(mosq->receive_quota == 0){
message__cleanup(&message);
/* FIXME - should send a DISCONNECT here */
return MOSQ_ERR_PROTOCOL;
}
mosq->receive_quota--;
}

rc = packet__read_uint16(&mosq->in_packet, &mid);
if(rc){
message__cleanup(&message);
Expand Down
1 change: 1 addition & 0 deletions lib/mosquitto.c
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ int mosquitto_reinitialise(struct mosquitto *mosq, const char *id, bool clean_st
mosq->out_messages = NULL;
mosq->out_messages_last = NULL;
mosq->max_inflight_messages = 20;
mosq->receive_maximum = 20;
mosq->will = NULL;
mosq->on_connect = NULL;
mosq->on_publish = NULL;
Expand Down
11 changes: 11 additions & 0 deletions lib/mosquitto.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ enum mosq_opt_t {
MOSQ_OPT_PROTOCOL_VERSION = 1,
MOSQ_OPT_SSL_CTX = 2,
MOSQ_OPT_SSL_CTX_WITH_DEFAULTS = 3,
MOSQ_OPT_RECEIVE_MAXIMUM = 4,
};

/* MQTT specification restricts client ids to a maximum of 23 characters */
Expand Down Expand Up @@ -1364,6 +1365,16 @@ libmosq_EXPORT int mosquitto_opts_set(struct mosquitto *mosq, enum mosq_opt_t op
* MQTT_PROTOCOL_V311, or MQTT_PROTOCOL_V5. Must be set before the
* client connects. Defaults to MQTT_PROTOCOL_V311.
*
* MOSQ_OPT_RECEIVE_MAXIMUM
* Value can be set between 1 and 65535 inclusive, and represents
* the maximum number of incoming QoS 1 and QoS 2 messages that this
* client wants to process at once. Defaults to 20. This option is
* not valid for MQTT v3.1 or v3.1.1 clients.
* Note that if the MQTT_PROP_RECEIVE_MAXIMUM property is in the
* proplist passed to mosquitto_connect_v5(), then that property
* will override this option. Using this option is the recommended
* method however.
*
* MOSQ_OPT_SSL_CTX_WITH_DEFAULTS
* If value is set to a non zero value, then the user specified
* SSL_CTX passed in using MOSQ_OPT_SSL_CTX will have the default
Expand Down
2 changes: 2 additions & 0 deletions lib/mosquitto_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ struct mosquitto {
ares_channel achan;
# endif
#endif
int receive_quota;
int receive_maximum;
int max_inflight_messages;

#ifdef WITH_BROKER
Expand Down
7 changes: 7 additions & 0 deletions lib/options.c
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,13 @@ int mosquitto_int_option(struct mosquitto *mosq, enum mosq_opt_t option, int val
}
break;

case MOSQ_OPT_RECEIVE_MAXIMUM:
if(value < 0 || value > 65535){
return MOSQ_ERR_INVAL;
}
mosq->receive_maximum = value;
break;

case MOSQ_OPT_SSL_CTX_WITH_DEFAULTS:
#if defined(WITH_TLS) && OPENSSL_VERSION_NUMBER >= 0x10100000L
if(value){
Expand Down
27 changes: 21 additions & 6 deletions lib/send_connect.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ int send__connect(struct mosquitto *mosq, uint16_t keepalive, bool clean_session
uint8_t version;
char *clientid, *username, *password;
int headerlen;
int proplen, varbytes;
int proplen = 0, will_proplen, varbytes;
mosquitto_property *local_props = NULL;
uint16_t receive_maximum;

assert(mosq);

Expand All @@ -64,9 +66,20 @@ int send__connect(struct mosquitto *mosq, uint16_t keepalive, bool clean_session
#endif

if(mosq->protocol == mosq_p_mqtt5){
/* Generate properties from options */
if(!mosquitto_property_read_int16(properties, MQTT_PROP_RECEIVE_MAXIMUM, &receive_maximum, false)){
rc = mosquitto_property_add_int16(&local_props, MQTT_PROP_RECEIVE_MAXIMUM, mosq->receive_maximum);
if(rc) return rc;
}else{
mosq->receive_maximum = receive_maximum;
mosq->receive_quota = receive_maximum;
}

version = MQTT_PROTOCOL_V5;
headerlen = 10;
proplen = property__get_length_all(properties);
proplen = 0;
proplen += property__get_length_all(properties);
proplen += property__get_length_all(local_props);
varbytes = packet__varint_bytes(proplen);
headerlen += proplen + varbytes;
}else if(mosq->protocol == mosq_p_mqtt311){
Expand All @@ -93,9 +106,9 @@ int send__connect(struct mosquitto *mosq, uint16_t keepalive, bool clean_session

payloadlen += 2+strlen(mosq->will->msg.topic) + 2+mosq->will->msg.payloadlen;
if(mosq->protocol == mosq_p_mqtt5){
proplen = property__get_length_all(mosq->will->properties);
varbytes = packet__varint_bytes(proplen);
payloadlen += proplen + varbytes;
will_proplen = property__get_length_all(mosq->will->properties);
varbytes = packet__varint_bytes(will_proplen);
payloadlen += will_proplen + varbytes;
}
}
if(username){
Expand Down Expand Up @@ -141,7 +154,9 @@ int send__connect(struct mosquitto *mosq, uint16_t keepalive, bool clean_session

if(mosq->protocol == mosq_p_mqtt5){
/* Write properties */
property__write_all(packet, properties, true);
packet__write_varint(packet, proplen);
property__write_all(packet, properties, false);
property__write_all(packet, local_props, false);
}

/* Payload */
Expand Down
7 changes: 7 additions & 0 deletions lib/send_mosq.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ int send__puback(struct mosquitto *mosq, uint16_t mid)
#else
if(mosq) log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PUBACK (Mid: %d)", mosq->id, mid);
#endif
util__increment_receive_quota(mosq);
/* We don't use Reason String or User Property yet. */
return send__command_with_mid(mosq, CMD_PUBACK, mid, false, 0, NULL);
}
Expand All @@ -83,6 +84,7 @@ int send__pubcomp(struct mosquitto *mosq, uint16_t mid)
#else
if(mosq) log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PUBCOMP (Mid: %d)", mosq->id, mid);
#endif
util__increment_receive_quota(mosq);
/* We don't use Reason String or User Property yet. */
return send__command_with_mid(mosq, CMD_PUBCOMP, mid, false, 0, NULL);
}
Expand All @@ -95,6 +97,11 @@ int send__pubrec(struct mosquitto *mosq, uint16_t mid)
#else
if(mosq) log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PUBREC (Mid: %d)", mosq->id, mid);
#endif
/* FIXME - if rc >= 0x80 quota needs incrementing
if(rc >= 0x80){
util__increment_receive_quota(mosq);
}
*/
/* We don't use Reason String or User Property yet. */
return send__command_with_mid(mosq, CMD_PUBREC, mid, false, 0, NULL);
}
Expand Down
9 changes: 9 additions & 0 deletions lib/util_mosq.c
Original file line number Diff line number Diff line change
Expand Up @@ -252,3 +252,12 @@ FILE *mosquitto__fopen(const char *path, const char *mode, bool restrict_read)
}
#endif
}

void util__increment_receive_quota(struct mosquitto *mosq)
{
if(mosq->protocol == mosq_p_mqtt5){
if(mosq->receive_quota < mosq->receive_maximum){
mosq->receive_quota++;
}
}
}
1 change: 1 addition & 0 deletions lib/util_mosq.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,5 @@ FILE *mosquitto__fopen(const char *path, const char *mode, bool restrict_read);
int mosquitto__hex2bin(const char *hex, unsigned char *bin, int bin_max_len);
#endif

void util__increment_receive_quota(struct mosquitto *mosq);
#endif
2 changes: 2 additions & 0 deletions test/mosq_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,8 @@ def gen_connect(client_id, clean_session=True, keepalive=60, username=None, pass
connect_flags = connect_flags | 0x02

if proto_ver == 5:
properties += mqtt5_props.gen_uint16_prop(mqtt5_props.PROP_RECEIVE_MAXIMUM, 20)
properties = mqtt5_props.prop_finalise(properties)
if properties == "":
properties = struct.pack("B", 0)
remaining_length += len(properties)
Expand Down

0 comments on commit 67c1d44

Please sign in to comment.