Skip to content

Commit

Permalink
Add maximum-qos support to broker and client.
Browse files Browse the repository at this point in the history
This comes in the form of:

* Per listener maximum_qos option, which can be in the range 0-2.
* Changes to mosquitto_publish*() to return MOSQ_ERR_QOS_NOT_SUPPORTED
  if attempting to publish with a higher QoS than supported.
* Bridges will downgrade messages to match the maximum QoS.

More tests on the broker side (specifically bridges) are required. This
needs bridge support for MQTT 5 first.
  • Loading branch information
ralight committed Jan 9, 2019
1 parent 930a314 commit 3262926
Show file tree
Hide file tree
Showing 20 changed files with 323 additions and 2 deletions.
3 changes: 3 additions & 0 deletions client/pub_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ void my_connect_callback(struct mosquitto *mosq, void *obj, int result, int flag
case MOSQ_ERR_PAYLOAD_SIZE:
fprintf(stderr, "Error: Message payload is too large.\n");
break;
case MOSQ_ERR_QOS_NOT_SUPPORTED:
fprintf(stderr, "Error: Message QoS not supported on broker, try a lower QoS.\n");
break;
}
}
mosquitto_disconnect_v5(mosq, 0, cfg.disconnect_props);
Expand Down
1 change: 1 addition & 0 deletions lib/actions.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ int mosquitto_publish_v5(struct mosquitto *mosq, int *mid, const char *topic, in

if(!mosq || qos<0 || qos>2) return MOSQ_ERR_INVAL;
if(mosq->protocol != mosq_p_mqtt5 && properties) return MOSQ_ERR_NOT_SUPPORTED;
if(qos > mosq->maximum_qos) return MOSQ_ERR_QOS_NOT_SUPPORTED;

if(properties){
if(properties->client_generated){
Expand Down
4 changes: 3 additions & 1 deletion lib/handle_connack.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,10 @@ int handle__connack(struct mosquitto *mosq)
}
}

mosquitto_property_read_int16(properties, MQTT_PROP_SERVER_KEEP_ALIVE, &mosq->keepalive, false);
mosquitto_property_read_byte(properties, MQTT_PROP_MAXIMUM_QOS, &mosq->maximum_qos, false);
mosquitto_property_read_int16(properties, MQTT_PROP_RECEIVE_MAXIMUM, &mosq->send_maximum, false);
mosquitto_property_read_int16(properties, MQTT_PROP_SERVER_KEEP_ALIVE, &mosq->keepalive, false);

mosq->send_quota = mosq->send_maximum;

log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s received CONNACK (%d)", mosq->id, reason_code);
Expand Down
1 change: 1 addition & 0 deletions lib/mosquitto.c
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ int mosquitto_reinitialise(struct mosquitto *mosq, const char *id, bool clean_st
mosq->in_messages_last = NULL;
mosq->out_messages = NULL;
mosq->out_messages_last = NULL;
mosq->maximum_qos = 2;
mosq->receive_maximum = 20;
mosq->send_maximum = 20;
mosq->will = NULL;
Expand Down
5 changes: 5 additions & 0 deletions lib/mosquitto.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ enum mosq_err_t {
MOSQ_ERR_MALFORMED_PACKET = 19,
MOSQ_ERR_DUPLICATE_PROPERTY = 20,
MOSQ_ERR_TLS_HANDSHAKE = 21,
MOSQ_ERR_QOS_NOT_SUPPORTED = 22,
};

/* Error values */
Expand Down Expand Up @@ -739,6 +740,8 @@ libmosq_EXPORT int mosquitto_disconnect_v5(struct mosquitto *mosq, int reason_co
* broker.
* MOSQ_ERR_PAYLOAD_SIZE - if payloadlen is too large.
* MOSQ_ERR_MALFORMED_UTF8 - if the topic is not valid UTF-8
* MOSQ_ERR_QOS_NOT_SUPPORTED - if the QoS is greater than that supported by
* the broker.
*
* See Also:
* <mosquitto_max_inflight_messages_set>
Expand Down Expand Up @@ -787,6 +790,8 @@ libmosq_EXPORT int mosquitto_publish(struct mosquitto *mosq, int *mid, const cha
* MOSQ_ERR_MALFORMED_UTF8 - if the topic is not valid UTF-8
* MOSQ_ERR_DUPLICATE_PROPERTY - if a property is duplicated where it is forbidden.
* MOSQ_ERR_PROTOCOL - if any property is invalid for use with PUBLISH.
* MOSQ_ERR_QOS_NOT_SUPPORTED - if the QoS is greater than that supported by
* the broker.
*/
libmosq_EXPORT int mosquitto_publish_v5(
struct mosquitto *mosq,
Expand Down
1 change: 1 addition & 0 deletions lib/mosquitto_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ struct mosquitto {
int receive_quota;
uint16_t send_maximum;
uint16_t receive_maximum;
uint8_t maximum_qos;

#ifdef WITH_BROKER
UT_hash_handle hh_id;
Expand Down
14 changes: 14 additions & 0 deletions man/mosquitto.conf.5.xml
Original file line number Diff line number Diff line change
Expand Up @@ -802,6 +802,20 @@
<para>Not reloaded on reload signal.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>maximum_qos</option> <replaceable>count</replaceable></term>
<listitem>
<para>Limit the QoS value allowed when using this
listener. Defaults to 2, which means any QoS can be
used. Set to 0 or 1 to limit to those QoS values.
This makes use of an MQTT v5 feature to notify
clients of the limitation. MQTT v3.1.1 clients will
not be aware of the limitation. Clients publshing
to this listener with a too-high QoS will be
disconnected.</para>
<para>Not reloaded on reload signal.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>mount_point</option> <replaceable>topic prefix</replaceable></term>
<listitem>
Expand Down
11 changes: 11 additions & 0 deletions src/conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ void config__init(struct mosquitto_db *db, struct mosquitto__config *config)
config->default_listener.max_connections = -1;
config->default_listener.protocol = mp_mqtt;
config->default_listener.security_options.allow_anonymous = -1;
config->default_listener.maximum_qos = 2;
}

void config__cleanup(struct mosquitto__config *config)
Expand Down Expand Up @@ -446,6 +447,7 @@ int config__parse_args(struct mosquitto_db *db, struct mosquitto__config *config
|| config->default_listener.host
|| config->default_listener.port
|| config->default_listener.max_connections != -1
|| config->default_listener.maximum_qos != 2
|| config->default_listener.mount_point
|| config->default_listener.protocol != mp_mqtt
|| config->default_listener.socket_domain
Expand Down Expand Up @@ -485,6 +487,7 @@ int config__parse_args(struct mosquitto_db *db, struct mosquitto__config *config
config->listeners[config->listener_count-1].sock_count = 0;
config->listeners[config->listener_count-1].client_count = 0;
config->listeners[config->listener_count-1].use_username_as_clientid = config->default_listener.use_username_as_clientid;
config->listeners[config->listener_count-1].maximum_qos = config->default_listener.maximum_qos;
#ifdef WITH_TLS
config->listeners[config->listener_count-1].tls_version = config->default_listener.tls_version;
config->listeners[config->listener_count-1].cafile = config->default_listener.cafile;
Expand Down Expand Up @@ -1502,6 +1505,14 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
}else{
log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty max_connections value in configuration.");
}
}else if(!strcmp(token, "maximum_qos")){
if(reload) continue; // Listeners not valid for reloading.
if(conf__parse_int(&token, "maximum_qos", &tmp_int, saveptr)) return MOSQ_ERR_INVAL;
if(tmp_int < 0 || tmp_int > 2){
log__printf(NULL, MOSQ_LOG_ERR, "Error: maximum_qos must be between 0 and 2 inclusive.");
return MOSQ_ERR_INVAL;
}
cur_listener->maximum_qos = tmp_int;
}else if(!strcmp(token, "max_inflight_bytes")){
token = strtok_r(NULL, " ", &saveptr);
if(token){
Expand Down
1 change: 1 addition & 0 deletions src/context.c
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ struct mosquitto *context__init(struct mosquitto_db *db, mosq_sock_t sock)
context->last_queued_msg = NULL;
context->receive_maximum = db->config->max_inflight_messages;
context->send_maximum = db->config->max_inflight_messages;
context->maximum_qos = 2;
context->msg_bytes = 0;
context->msg_bytes12 = 0;
context->msg_count = 0;
Expand Down
6 changes: 5 additions & 1 deletion src/database.c
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,11 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1
msg->direction = dir;
msg->state = state;
msg->dup = false;
msg->qos = qos;
if(qos > context->maximum_qos){
msg->qos = context->maximum_qos;
}else{
msg->qos = qos;
}
msg->retain = retain;
msg->properties = properties;

Expand Down
8 changes: 8 additions & 0 deletions src/handle_connect.c
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,14 @@ int handle__connect(struct mosquitto_db *db, struct mosquitto *context)
goto handle_connect_error;
}

context->maximum_qos = context->listener->maximum_qos;
if(protocol_version == PROTOCOL_VERSION_v5 && context->maximum_qos != 2){
if(mosquitto_property_add_byte(&connack_props, MQTT_PROP_MAXIMUM_QOS, context->maximum_qos)){
rc = MOSQ_ERR_NOMEM;
goto handle_connect_error;
}
}

if(packet__read_byte(&context->in_packet, &connect_flags)){
rc = 1;
goto handle_connect_error;
Expand Down
5 changes: 5 additions & 0 deletions src/handle_publish.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context)
"Invalid QoS in PUBLISH from %s, disconnecting.", context->id);
return 1;
}
if(qos > context->maximum_qos){
log__printf(NULL, MOSQ_LOG_INFO,
"Too high QoS in PUBLISH from %s, disconnecting.", context->id);
return 1;
}
retain = (header & 0x01);

if(retain && db->config->retain_available == false){
Expand Down
1 change: 1 addition & 0 deletions src/mosquitto_broker_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ struct mosquitto__listener {
enum mosquitto_protocol protocol;
int socket_domain;
bool use_username_as_clientid;
uint8_t maximum_qos;
#ifdef WITH_TLS
char *cafile;
char *capath;
Expand Down
68 changes: 68 additions & 0 deletions test/lib/03-publish-c2b-qos2-maximum-qos-0.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#!/usr/bin/env python

# Test whether a client correctly handles sending a message with QoS > maximum QoS.

from mosq_test_helper import *

port = mosq_test.get_lib_port()

rc = 1
keepalive = 60
connect_packet = mosq_test.gen_connect("publish-qos2-test", keepalive=keepalive, proto_ver=5)

props = mqtt5_props.gen_byte_prop(mqtt5_props.PROP_MAXIMUM_QOS, 0)
connack_packet = mosq_test.gen_connack(rc=0, proto_ver=5, properties=props)

disconnect_packet = mosq_test.gen_disconnect(proto_ver=5)

publish_1_packet = mosq_test.gen_publish("maximum/qos/qos0", qos=0, payload="message", proto_ver=5)

disconnect_packet = mosq_test.gen_disconnect(proto_ver=5)

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.settimeout(10)
sock.bind(('', port))
sock.listen(5)


client_args = sys.argv[1:]
env = dict(os.environ)
env['LD_LIBRARY_PATH'] = '../../lib:../../lib/cpp'
try:
pp = env['PYTHONPATH']
except KeyError:
pp = ''
env['PYTHONPATH'] = '../../lib/python:'+pp
client = mosq_test.start_client(filename=sys.argv[1].replace('/', '-'), cmd=client_args, env=env, port=port)


try:
(conn, address) = sock.accept()
conn.settimeout(10)

if mosq_test.expect_packet(conn, "connect", connect_packet):
conn.send(connack_packet)

if mosq_test.expect_packet(conn, "publish 1", publish_1_packet):
if mosq_test.expect_packet(conn, "disconnect", disconnect_packet):
rc = 0

conn.close()
finally:
for i in range(0, 5):
if client.returncode != None:
break
time.sleep(0.1)

try:
client.terminate()
except OSError:
pass

client.wait()
sock.close()
if client.returncode != 0:
exit(1)

exit(rc)
74 changes: 74 additions & 0 deletions test/lib/03-publish-c2b-qos2-maximum-qos-1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#!/usr/bin/env python

# Test whether a client correctly handles sending a message with QoS > maximum QoS.

from mosq_test_helper import *

port = mosq_test.get_lib_port()

rc = 1
keepalive = 60
connect_packet = mosq_test.gen_connect("publish-qos2-test", keepalive=keepalive, proto_ver=5)

props = mqtt5_props.gen_byte_prop(mqtt5_props.PROP_MAXIMUM_QOS, 1)
connack_packet = mosq_test.gen_connack(rc=0, proto_ver=5, properties=props)

disconnect_packet = mosq_test.gen_disconnect(proto_ver=5)

mid = 1
publish_1_packet = mosq_test.gen_publish("maximum/qos/qos1", qos=1, mid=mid, payload="message", proto_ver=5)
puback_1_packet = mosq_test.gen_puback(mid, proto_ver=5)

publish_2_packet = mosq_test.gen_publish("maximum/qos/qos0", qos=0, payload="message", proto_ver=5)

disconnect_packet = mosq_test.gen_disconnect(proto_ver=5)

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.settimeout(10)
sock.bind(('', port))
sock.listen(5)


client_args = sys.argv[1:]
env = dict(os.environ)
env['LD_LIBRARY_PATH'] = '../../lib:../../lib/cpp'
try:
pp = env['PYTHONPATH']
except KeyError:
pp = ''
env['PYTHONPATH'] = '../../lib/python:'+pp
client = mosq_test.start_client(filename=sys.argv[1].replace('/', '-'), cmd=client_args, env=env, port=port)


try:
(conn, address) = sock.accept()
conn.settimeout(10)

if mosq_test.expect_packet(conn, "connect", connect_packet):
conn.send(connack_packet)

if mosq_test.expect_packet(conn, "publish 1", publish_1_packet):
conn.send(puback_1_packet)
if mosq_test.expect_packet(conn, "publish 2", publish_2_packet):
if mosq_test.expect_packet(conn, "disconnect", disconnect_packet):
rc = 0

conn.close()
finally:
for i in range(0, 5):
if client.returncode != None:
break
time.sleep(0.1)

try:
client.terminate()
except OSError:
pass

client.wait()
sock.close()
if client.returncode != 0:
exit(1)

exit(rc)
2 changes: 2 additions & 0 deletions test/lib/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ c : test-compile
./03-publish-c2b-qos2-receive-maximum-1.py $@/03-publish-c2b-qos2-receive-maximum-1.test
./03-publish-c2b-qos2-receive-maximum-2.py $@/03-publish-c2b-qos2-receive-maximum-2.test
./03-publish-c2b-qos2-pubrec-error.py $@/03-publish-c2b-qos2-pubrec-error.test
./03-publish-c2b-qos2-maximum-qos-0.py $@/03-publish-c2b-qos2-maximum-qos-0.test
./03-publish-c2b-qos2-maximum-qos-1.py $@/03-publish-c2b-qos2-maximum-qos-1.test
./03-publish-b2c-qos1.py $@/03-publish-b2c-qos1.test
./03-publish-b2c-qos2.py $@/03-publish-b2c-qos2.test
./03-request-response.py $@/03-request-response.test
Expand Down
58 changes: 58 additions & 0 deletions test/lib/c/03-publish-c2b-qos2-maximum-qos-0.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <mosquitto.h>

static int run = -1;

void on_connect(struct mosquitto *mosq, void *obj, int rc)
{
if(rc){
exit(1);
}else{
rc = mosquitto_publish(mosq, NULL, "maximum/qos/qos2", strlen("message"), "message", 2, false);
if(rc != MOSQ_ERR_QOS_NOT_SUPPORTED) run = 1;
rc = mosquitto_publish(mosq, NULL, "maximum/qos/qos1", strlen("message"), "message", 1, false);
if(rc != MOSQ_ERR_QOS_NOT_SUPPORTED) run = 1;
rc = mosquitto_publish(mosq, NULL, "maximum/qos/qos0", strlen("message"), "message", 0, false);
if(rc != MOSQ_ERR_SUCCESS) run = 1;
}
}

void on_publish(struct mosquitto *mosq, void *obj, int mid)
{
if(mid == 1){
mosquitto_disconnect(mosq);
}
}

void on_disconnect(struct mosquitto *mosq, void *obj, int rc)
{
run = 0;
}

int main(int argc, char *argv[])
{
int rc;
struct mosquitto *mosq;

int port = atoi(argv[1]);

mosquitto_lib_init();

mosq = mosquitto_new("publish-qos2-test", true, NULL);
mosquitto_int_option(mosq, MOSQ_OPT_PROTOCOL_VERSION, MQTT_PROTOCOL_V5);
mosquitto_connect_callback_set(mosq, on_connect);
mosquitto_disconnect_callback_set(mosq, on_disconnect);
mosquitto_publish_callback_set(mosq, on_publish);

rc = mosquitto_connect(mosq, "localhost", port, 60);

while(run == -1){
mosquitto_loop(mosq, 50, 1);
}

mosquitto_lib_cleanup();
return run;
}
Loading

0 comments on commit 3262926

Please sign in to comment.