Skip to content

Commit

Permalink
Tests and implementation for maximum packet size.
Browse files Browse the repository at this point in the history
This is for broker outgoing connack and publish packets only.
  • Loading branch information
ralight committed Feb 18, 2019
1 parent 8db1659 commit 1877f8a
Show file tree
Hide file tree
Showing 12 changed files with 137 additions and 10 deletions.
1 change: 1 addition & 0 deletions lib/mosquitto.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ enum mosq_err_t {
MOSQ_ERR_DUPLICATE_PROPERTY = 22,
MOSQ_ERR_TLS_HANDSHAKE = 23,
MOSQ_ERR_QOS_NOT_SUPPORTED = 24,
MOSQ_ERR_OVERSIZE_PACKET = 25,
};

/* Error values */
Expand Down
1 change: 1 addition & 0 deletions lib/mosquitto_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ struct mosquitto {
struct mosquitto__packet *out_packet;
struct mosquitto_message_all *will;
struct mosquitto__alias *aliases;
uint32_t maximum_packet_size;
int alias_count;
#ifdef WITH_TLS
SSL *ssl;
Expand Down
15 changes: 15 additions & 0 deletions lib/packet_mosq.c
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,21 @@ int packet__queue(struct mosquitto *mosq, struct mosquitto__packet *packet)
}


int packet__check_oversize(struct mosquitto *mosq, uint32_t remaining_length)
{
int len;

if(mosq->maximum_packet_size == 0) return MOSQ_ERR_SUCCESS;

len = remaining_length + packet__varint_bytes(remaining_length);
if(len > mosq->maximum_packet_size){
return MOSQ_ERR_OVERSIZE_PACKET;
}else{
return MOSQ_ERR_SUCCESS;
}
}


int packet__write(struct mosquitto *mosq)
{
ssize_t write_length;
Expand Down
2 changes: 2 additions & 0 deletions lib/packet_mosq.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ int packet__alloc(struct mosquitto__packet *packet);
void packet__cleanup(struct mosquitto__packet *packet);
int packet__queue(struct mosquitto *mosq, struct mosquitto__packet *packet);

int packet__check_oversize(struct mosquitto *mosq, uint32_t remaining_length);

int packet__read_byte(struct mosquitto__packet *packet, uint8_t *byte);
int packet__read_bytes(struct mosquitto__packet *packet, void *bytes, uint32_t count);
int packet__read_binary(struct mosquitto__packet *packet, uint8_t **data, int *length);
Expand Down
9 changes: 9 additions & 0 deletions lib/send_publish.c
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,15 @@ int send__real_publish(struct mosquitto *mosq, uint16_t mid, const char *topic,
packetlen += proplen + varbytes;
}
}
if(packet__check_oversize(mosq, packetlen)){
#ifdef WITH_BROKER
log__printf(NULL, MOSQ_LOG_NOTICE, "Dropping too large outgoing PUBLISH for %s (%d bytes)", mosq->id, packetlen);
#else
log__printf(NULL, MOSQ_LOG_NOTICE, "Dropping too large outgoing PUBLISH (%d bytes)", packetlen);
#endif
return MOSQ_ERR_OVERSIZE_PACKET;
}

packet = mosquitto__calloc(1, sizeof(struct mosquitto__packet));
if(!packet) return MOSQ_ERR_NOMEM;

Expand Down
10 changes: 7 additions & 3 deletions src/database.c
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,7 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context)
switch(tail->state){
case mosq_ms_publish_qos0:
rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, cmsg_props, store_props, expiry_interval);
if(!rc){
if(rc == MOSQ_ERR_SUCCESS || rc == MOSQ_ERR_OVERSIZE_PACKET){
db__message_remove(db, context, &tail, last);
}else{
return rc;
Expand All @@ -955,10 +955,12 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context)

case mosq_ms_publish_qos1:
rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, cmsg_props, store_props, expiry_interval);
if(!rc){
if(rc == MOSQ_ERR_SUCCESS){
tail->timestamp = mosquitto_time();
tail->dup = 1; /* Any retry attempts are a duplicate. */
tail->state = mosq_ms_wait_for_puback;
}else if(rc == MOSQ_ERR_OVERSIZE_PACKET){
db__message_remove(db, context, &tail, last);
}else{
return rc;
}
Expand All @@ -968,10 +970,12 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context)

case mosq_ms_publish_qos2:
rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, cmsg_props, store_props, expiry_interval);
if(!rc){
if(rc == MOSQ_ERR_SUCCESS){
tail->timestamp = mosquitto_time();
tail->dup = 1; /* Any retry attempts are a duplicate. */
tail->state = mosq_ms_wait_for_pubrec;
}else if(rc == MOSQ_ERR_OVERSIZE_PACKET){
db__message_remove(db, context, &tail, last);
}else{
return rc;
}
Expand Down
5 changes: 5 additions & 0 deletions src/property_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ int property__process_connect(struct mosquitto *context, mosquitto_property *pro

context->send_maximum = p->value.i16;
context->send_quota = context->send_maximum;
}else if(p->identifier == MQTT_PROP_MAXIMUM_PACKET_SIZE){
if(p->value.i32 == 0){
return MOSQ_ERR_PROTOCOL;
}
context->maximum_packet_size = p->value.i32;
}
p = p->next;
}
Expand Down
25 changes: 18 additions & 7 deletions src/send_connack.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ int send__connack(struct mosquitto_db *db, struct mosquitto *context, int ack, i
int rc;
mosquitto_property *connack_props = NULL;
int proplen, varbytes;
uint32_t remaining_length;

rc = mosquitto_property_copy_all(&connack_props, properties);
if(rc){
Expand All @@ -43,30 +44,40 @@ int send__connack(struct mosquitto_db *db, struct mosquitto *context, int ack, i
}
}

packet = mosquitto__calloc(1, sizeof(struct mosquitto__packet));
if(!packet) return MOSQ_ERR_NOMEM;
remaining_length = 2;

packet->command = CMD_CONNACK;
packet->remaining_length = 2;
if(context->protocol == mosq_p_mqtt5){
if(reason_code < 128 && db->config->retain_available == false){
rc = mosquitto_property_add_byte(&connack_props, MQTT_PROP_RETAIN_AVAILABLE, 0);
if(rc){
mosquitto__free(packet);
mosquitto_property_free_all(&connack_props);
return rc;
}
}
/* FIXME - disable support until available */
rc = mosquitto_property_add_byte(&connack_props, MQTT_PROP_SHARED_SUB_AVAILABLE, 0);
if(rc){
mosquitto__free(packet);
mosquitto_property_free_all(&connack_props);
return rc;
}

proplen = property__get_length_all(connack_props);
varbytes = packet__varint_bytes(proplen);
packet->remaining_length += proplen + varbytes;
remaining_length += proplen + varbytes;
}

if(packet__check_oversize(context, remaining_length)){
mosquitto_property_free_all(&connack_props);
mosquitto__free(packet);
return MOSQ_ERR_OVERSIZE_PACKET;
}

packet = mosquitto__calloc(1, sizeof(struct mosquitto__packet));
if(!packet) return MOSQ_ERR_NOMEM;

packet->command = CMD_CONNACK;
packet->remaining_length = remaining_length;

rc = packet__alloc(packet);
if(rc){
mosquitto__free(packet);
Expand Down
30 changes: 30 additions & 0 deletions test/broker/12-prop-maximum-packet-size-connect.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#!/usr/bin/env python

# Test whether setting maximum packet size to smaller than a CONNACK packet
# results in the CONNECT being rejected.
# MQTTv5

from mosq_test_helper import *

rc = 1

keepalive = 10
props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_MAXIMUM_PACKET_SIZE, 2)
connect_packet = mosq_test.gen_connect("test", proto_ver=5, keepalive=keepalive, properties=props)

port = mosq_test.get_port()
broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port)

try:
sock = mosq_test.do_client_connect(connect_packet, "", port=port)
# Exception occurs if connack packet returned
rc = 0
finally:
broker.terminate()
broker.wait()
(stdo, stde) = broker.communicate()
if rc:
print(stde)

exit(rc)

45 changes: 45 additions & 0 deletions test/broker/12-prop-maximum-packet-size-publish.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#!/usr/bin/env python

# Test whether maximum packet size is honoured on a PUBLISH to a client
# MQTTv5

from mosq_test_helper import *

rc = 1

keepalive = 10
props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_MAXIMUM_PACKET_SIZE, 20)
connect_packet = mosq_test.gen_connect("test", proto_ver=5, keepalive=keepalive, properties=props)
connack_packet = mosq_test.gen_connack(rc=0, proto_ver=5)

mid = 1
subscribe_packet = mosq_test.gen_subscribe(mid, "test/topic", 0, proto_ver=5)
suback_packet = mosq_test.gen_suback(mid, 0, proto_ver=5)

publish1_packet = mosq_test.gen_publish(topic="test/topic", qos=0, payload="12345678901234567890", proto_ver=5)
publish2_packet = mosq_test.gen_publish(topic="test/topic", qos=0, payload="67890", proto_ver=5)

pingreq_packet = mosq_test.gen_pingreq()
pingresp_packet = mosq_test.gen_pingresp()

port = mosq_test.get_port()
broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port)

try:
sock = mosq_test.do_client_connect(connect_packet, connack_packet, port=port)
mosq_test.do_send_receive(sock, subscribe_packet, suback_packet)
sock.send(publish1_packet)
# We shouldn't receive the publish here because it is > MAXIMUM_PACKET_SIZE
mosq_test.do_send_receive(sock, pingreq_packet, pingresp_packet)
mosq_test.do_send_receive(sock, publish2_packet, publish2_packet)
mosq_test.do_send_receive(sock, pingreq_packet, pingresp_packet)
rc = 0
finally:
broker.terminate()
broker.wait()
(stdo, stde) = broker.communicate()
if rc:
print(stde)

exit(rc)

2 changes: 2 additions & 0 deletions test/broker/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -175,3 +175,5 @@ endif
./12-prop-server-keepalive.py
./12-prop-response-topic.py
./12-prop-response-topic-correlation-data.py
./12-prop-maximum-packet-size-connect.py
./12-prop-maximum-packet-size-publish.py
2 changes: 2 additions & 0 deletions test/broker/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@
(1, './12-prop-server-keepalive.py'),
(1, './12-prop-response-topic.py'),
(1, './12-prop-response-topic-correlation-data.py'),
(1, './12-prop-maximum-packet-size-connect.py'),
(1, './12-prop-maximum-packet-size-publish.py'),
]

ptest.run_tests(tests)

0 comments on commit 1877f8a

Please sign in to comment.